Skip to content

Commit e13e664

Browse files
committed
add test watermark
1 parent c6f162a commit e13e664

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.zhisheng.examples.streaming.broadcast;
2+
3+
import com.zhisheng.common.utils.ExecutionEnvUtil;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.apache.flink.api.common.functions.MapFunction;
6+
import org.apache.flink.api.common.state.MapStateDescriptor;
7+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
8+
import org.apache.flink.api.java.tuple.Tuple2;
9+
import org.apache.flink.api.java.utils.ParameterTool;
10+
import org.apache.flink.streaming.api.TimeCharacteristic;
11+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
12+
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
13+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
14+
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
15+
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
16+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
17+
import org.apache.flink.streaming.api.watermark.Watermark;
18+
import org.apache.flink.streaming.api.windowing.time.Time;
19+
import org.apache.flink.util.Collector;
20+
21+
import javax.annotation.Nullable;
22+
23+
/**
24+
* Desc:
25+
* Created by zhisheng on 2020-02-26 18:38
26+
* blog:http://www.54tianzhisheng.cn/
27+
* 微信公众号:zhisheng
28+
*/
29+
@Slf4j
30+
public class Main2 {
31+
final static MapStateDescriptor<String, String> ALARM_RULES = new MapStateDescriptor<>(
32+
"alarm_rules",
33+
BasicTypeInfo.STRING_TYPE_INFO,
34+
BasicTypeInfo.STRING_TYPE_INFO);
35+
36+
public static void main(String[] args) throws Exception {
37+
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
38+
StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
39+
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
40+
env.setParallelism(1);
41+
42+
DataStreamSource<String> rule = env.addSource(new SourceFunction<String>() {
43+
@Override
44+
public void run(SourceContext<String> sourceContext) throws Exception {
45+
while (true) {
46+
sourceContext.collect("AAA");
47+
Thread.sleep(60000);
48+
}
49+
}
50+
51+
@Override
52+
public void cancel() {
53+
54+
}
55+
});
56+
57+
SingleOutputStreamOperator<Tuple2<String, Long>> flatMap = env.socketTextStream("127.0.0.1", 9001)
58+
.map(new MapFunction<String, Tuple2<String, Long>>() {
59+
@Override
60+
public Tuple2<String, Long> map(String s) throws Exception {
61+
String[] split = s.split(",");
62+
return new Tuple2<>(split[0], Long.valueOf(split[1]));
63+
}
64+
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
65+
66+
private long currentTimestamp = Long.MIN_VALUE;
67+
68+
@Nullable
69+
@Override
70+
public Watermark getCurrentWatermark() {
71+
Watermark watermark = new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 5000);
72+
log.info("watermark is {}", watermark.getTimestamp());
73+
return watermark;
74+
}
75+
76+
@Override
77+
public long extractTimestamp(Tuple2<String, Long> tuple2, long l) {
78+
long timestamp = tuple2.f1;
79+
currentTimestamp = Math.max(timestamp, currentTimestamp);
80+
return timestamp;
81+
}
82+
}).connect(rule.broadcast())
83+
.flatMap(new CoFlatMapFunction<Tuple2<String, Long>, String, Tuple2<String, Long>>() {
84+
@Override
85+
public void flatMap1(Tuple2<String, Long> tuple2, Collector<Tuple2<String, Long>> collector) throws Exception {
86+
System.out.println("flatMap1 " + tuple2.f0 + " " + tuple2.f1);
87+
collector.collect(tuple2);
88+
}
89+
90+
@Override
91+
public void flatMap2(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
92+
System.out.println("flatMap2 " + s);
93+
collector.collect(new Tuple2<>(s, System.currentTimeMillis()));
94+
}
95+
})/*.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
96+
97+
private long currentTimestamp = Long.MIN_VALUE;
98+
99+
@Nullable
100+
@Override
101+
public Watermark getCurrentWatermark() {
102+
Watermark watermark = new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 5000);
103+
log.info("watermark is {}", watermark.getTimestamp());
104+
return watermark;
105+
}
106+
107+
@Override
108+
public long extractTimestamp(Tuple2<String, Long> tuple2, long l) {
109+
long timestamp = tuple2.f1;
110+
currentTimestamp = Math.max(timestamp, currentTimestamp);
111+
return timestamp;
112+
}
113+
})*/;
114+
115+
flatMap.keyBy(0).timeWindow(Time.minutes(2)).sum(1).print();
116+
117+
118+
env.execute();
119+
}
120+
}

0 commit comments

Comments
 (0)