Skip to content

Commit f0c1c7a

Browse files
authored
[FLINK-26801] Support duplicate checkpoint aborted messages
This closes apache#107.
1 parent 18ec1b3 commit f0c1c7a

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperatorCheckpointAligner.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.util.function.RunnableWithException;
2424

2525
import java.util.ArrayList;
26+
import java.util.Collections;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.Optional;
@@ -109,10 +110,13 @@ List<GloballyAlignedEvent> onStateSnapshot(long checkpointId) {
109110
}
110111

111112
List<GloballyAlignedEvent> onCheckpointAborted(long checkpointId) {
112-
// Here we need to abort all the checkpoints <= notified checkpoint id.
113-
checkState(checkpointId > latestAbortedCheckpoint);
113+
if (checkpointId <= latestAbortedCheckpoint) {
114+
return Collections.emptyList();
115+
}
116+
114117
latestAbortedCheckpoint = checkpointId;
115118

119+
// Here we need to abort all the checkpoints <= notified checkpoint id.
116120
Map<Long, CheckpointAlignment> abortedAlignments =
117121
checkpointAlignmments.headMap(latestAbortedCheckpoint, true);
118122
List<GloballyAlignedEvent> events = new ArrayList<>();

0 commit comments

Comments
 (0)