File tree Expand file tree Collapse file tree 1 file changed +12
-0
lines changed
flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast Expand file tree Collapse file tree 1 file changed +12
-0
lines changed Original file line number Diff line number Diff line change 18
18
package org .apache .flink .iteration .broadcast ;
19
19
20
20
import org .apache .flink .api .common .typeutils .TypeSerializer ;
21
+ import org .apache .flink .iteration .IterationRecord ;
21
22
import org .apache .flink .runtime .io .network .api .writer .RecordWriter ;
22
23
import org .apache .flink .runtime .plugable .SerializationDelegate ;
23
24
import org .apache .flink .streaming .runtime .streamrecord .StreamElement ;
@@ -42,5 +43,16 @@ public RecordWriterBroadcastOutput(
42
43
public void broadcastEmit (StreamRecord <OUT > record ) throws IOException {
43
44
serializationDelegate .setInstance (record );
44
45
recordWriter .broadcastEmit (serializationDelegate );
46
+ if (isIterationEpochWatermark (record )) {
47
+ recordWriter .flushAll ();
48
+ }
49
+ }
50
+
51
+ private static <T > boolean isIterationEpochWatermark (StreamRecord <T > record ) {
52
+ if (!(record .getValue () instanceof IterationRecord )) {
53
+ return false ;
54
+ }
55
+ IterationRecord <?> iterationRecord = (IterationRecord <?>) record .getValue ();
56
+ return iterationRecord .getType ().equals (IterationRecord .Type .EPOCH_WATERMARK );
45
57
}
46
58
}
You can’t perform that action at this time.
0 commit comments