File tree Expand file tree Collapse file tree 1 file changed +6
-2
lines changed
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator Expand file tree Collapse file tree 1 file changed +6
-2
lines changed Original file line number Diff line number Diff line change 23
23
import org .apache .flink .util .function .RunnableWithException ;
24
24
25
25
import java .util .ArrayList ;
26
+ import java .util .Collections ;
26
27
import java .util .List ;
27
28
import java .util .Map ;
28
29
import java .util .Optional ;
@@ -109,10 +110,13 @@ List<GloballyAlignedEvent> onStateSnapshot(long checkpointId) {
109
110
}
110
111
111
112
List <GloballyAlignedEvent > onCheckpointAborted (long checkpointId ) {
112
- // Here we need to abort all the checkpoints <= notified checkpoint id.
113
- checkState (checkpointId > latestAbortedCheckpoint );
113
+ if (checkpointId <= latestAbortedCheckpoint ) {
114
+ return Collections .emptyList ();
115
+ }
116
+
114
117
latestAbortedCheckpoint = checkpointId ;
115
118
119
+ // Here we need to abort all the checkpoints <= notified checkpoint id.
116
120
Map <Long , CheckpointAlignment > abortedAlignments =
117
121
checkpointAlignmments .headMap (latestAbortedCheckpoint , true );
118
122
List <GloballyAlignedEvent > events = new ArrayList <>();
You can’t perform that action at this time.
0 commit comments