Skip to content

Commit 454f7d1

Browse files
[FLINK-27096] Flush buffer at epoch watermark
This closes apache#112.
1 parent 966cedd commit 454f7d1

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.iteration.broadcast;
1919

2020
import org.apache.flink.api.common.typeutils.TypeSerializer;
21+
import org.apache.flink.iteration.IterationRecord;
2122
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
2223
import org.apache.flink.runtime.plugable.SerializationDelegate;
2324
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,5 +43,16 @@ public RecordWriterBroadcastOutput(
4243
public void broadcastEmit(StreamRecord<OUT> record) throws IOException {
4344
serializationDelegate.setInstance(record);
4445
recordWriter.broadcastEmit(serializationDelegate);
46+
if (isIterationEpochWatermark(record)) {
47+
recordWriter.flushAll();
48+
}
49+
}
50+
51+
private static <T> boolean isIterationEpochWatermark(StreamRecord<T> record) {
52+
if (!(record.getValue() instanceof IterationRecord)) {
53+
return false;
54+
}
55+
IterationRecord<?> iterationRecord = (IterationRecord<?>) record.getValue();
56+
return iterationRecord.getType().equals(IterationRecord.Type.EPOCH_WATERMARK);
4557
}
4658
}

0 commit comments

Comments
 (0)