窗口中的数据无法发送到下游

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

窗口中的数据无法发送到下游

方如
代码如下:
        //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!
Reply | Threaded
Open this post in threaded view
|

回复:窗口中的数据无法发送到下游

Jimmy Wong
建议检查下Watermark,打印出来看看是不是合法的。btw,这代码缩紧有点尴尬。


| |
Jimmy Wong
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年02月27日 14:34,方如<[hidden email]> 写道:
代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!