代码如下:
//将json转化为LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean()); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //获取用户id做分组 return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); window.sum(2).print(); 在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的 拜谢!代码如下: //将json转化为LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean()); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //获取用户id做分组 return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); window.sum(2).print(); 在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的 拜谢! |
建议检查下Watermark,打印出来看看是不是合法的。btw,这代码缩紧有点尴尬。
| | Jimmy Wong | | [hidden email] | 签名由网易邮箱大师定制 在2020年02月27日 14:34,方如<[hidden email]> 写道: 代码如下: //将json转化为LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean()); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //获取用户id做分组 return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); window.sum(2).print(); 在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的 拜谢!代码如下: //将json转化为LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean()); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //获取用户id做分组 return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); window.sum(2).print(); 在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的 拜谢! |
Free forum by Nabble | Edit this page |