Hi ,请教一个奇怪的问题:
streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) .assignTimestampsAndWatermarks(new CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) .connect(ruleConfigSource) .process(new MetricDataFilterProcessFunction()) .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { MetricDataKey metricDataKey = new MetricDataKey(); metricDataKey.setDomain(metric.getDomain()); metricDataKey.setStationAliasCode(metric.getStaId()); metricDataKey.setEquipMK(metric.getEquipMK()); metricDataKey.setEquipID(metric.getEquipID()); metricDataKey.setMetric(metric.getMetric()); return metricDataKey; }) .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1))) .apply(new RichWindowFunction<Metric, MetricDataList, MetricDataKey, TimeWindow>() { @Override public void apply(MetricDataKey tuple, TimeWindow window, Iterable<Metric> input, Collector<MetricDataList> out) throws Exception { input.forEach(x->{ System.out.println("--->>>"+x); }); } }) 我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x); 数据一直在消费着,没有任何报错信息 |
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
在 2020-11-18 15:29:54,"huang botao" <[hidden email]> 写道: >Hi ,请教一个奇怪的问题: > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > > .assignTimestampsAndWatermarks(new >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > > .connect(ruleConfigSource) > .process(new MetricDataFilterProcessFunction()) > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { > MetricDataKey metricDataKey = new MetricDataKey(); > metricDataKey.setDomain(metric.getDomain()); > metricDataKey.setStationAliasCode(metric.getStaId()); > metricDataKey.setEquipMK(metric.getEquipMK()); > metricDataKey.setEquipID(metric.getEquipID()); > metricDataKey.setMetric(metric.getMetric()); > return metricDataKey; > }) > > .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1))) > .apply(new RichWindowFunction<Metric, MetricDataList, >MetricDataKey, TimeWindow>() { > @Override > public void apply(MetricDataKey tuple, TimeWindow window, >Iterable<Metric> input, Collector<MetricDataList> out) throws >Exception { > input.forEach(x->{ > System.out.println("--->>>"+x); > }); > } > }) > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x); > > >数据一直在消费着,没有任何报错信息 |
感谢您的回复,是这样的,我这边的环境设置用的是eventTime
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法 On Wed, Nov 18, 2020 at 5:50 PM hailongwang <[hidden email]> wrote: > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进 > > 在 2020-11-18 15:29:54,"huang botao" <[hidden email]> 写道: > >Hi ,请教一个奇怪的问题: > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > > > > .assignTimestampsAndWatermarks(new > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > > > > .connect(ruleConfigSource) > > .process(new MetricDataFilterProcessFunction()) > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { > > MetricDataKey metricDataKey = new MetricDataKey(); > > metricDataKey.setDomain(metric.getDomain()); > > metricDataKey.setStationAliasCode(metric.getStaId()); > > metricDataKey.setEquipMK(metric.getEquipMK()); > > metricDataKey.setEquipID(metric.getEquipID()); > > metricDataKey.setMetric(metric.getMetric()); > > return metricDataKey; > > }) > > > > .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1))) > > .apply(new RichWindowFunction<Metric, MetricDataList, > >MetricDataKey, TimeWindow>() { > > @Override > > public void apply(MetricDataKey tuple, TimeWindow window, > >Iterable<Metric> input, Collector<MetricDataList> out) throws > >Exception { > > input.forEach(x->{ > > System.out.println("--->>>"+x); > > }); > > } > > }) > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x); > > > > > >数据一直在消费着,没有任何报错信息 > |
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。
Best zhisheng huang botao <[hidden email]> 于2020年11月18日周三 下午10:34写道: > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法 > > > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <[hidden email]> wrote: > > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进 > > > > 在 2020-11-18 15:29:54,"huang botao" <[hidden email]> 写道: > > >Hi ,请教一个奇怪的问题: > > > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > > > > > > .assignTimestampsAndWatermarks(new > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > > > > > > .connect(ruleConfigSource) > > > .process(new MetricDataFilterProcessFunction()) > > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { > > > MetricDataKey metricDataKey = new MetricDataKey(); > > > metricDataKey.setDomain(metric.getDomain()); > > > metricDataKey.setStationAliasCode(metric.getStaId()); > > > metricDataKey.setEquipMK(metric.getEquipMK()); > > > metricDataKey.setEquipID(metric.getEquipID()); > > > metricDataKey.setMetric(metric.getMetric()); > > > return metricDataKey; > > > }) > > > > > > .window(SlidingEventTimeWindows.of(Time.seconds(2), > Time.seconds(1))) > > > .apply(new RichWindowFunction<Metric, MetricDataList, > > >MetricDataKey, TimeWindow>() { > > > @Override > > > public void apply(MetricDataKey tuple, TimeWindow window, > > >Iterable<Metric> input, Collector<MetricDataList> out) throws > > >Exception { > > > input.forEach(x->{ > > > System.out.println("--->>>"+x); > > > }); > > > } > > > }) > > > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x); > > > > > > > > >数据一直在消费着,没有任何报错信息 > > > |
hi, zhisheng, hailongwang:
感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 On Wed, Nov 18, 2020 at 10:46 PM zhisheng <[hidden email]> wrote: > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。 > > Best > zhisheng > > huang botao <[hidden email]> 于2020年11月18日周三 下午10:34写道: > > > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法 > > > > > > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <[hidden email]> wrote: > > > > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进 > > > > > > 在 2020-11-18 15:29:54,"huang botao" <[hidden email]> 写道: > > > >Hi ,请教一个奇怪的问题: > > > > > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > > > > > > > > .assignTimestampsAndWatermarks(new > > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > > > > > > > > .connect(ruleConfigSource) > > > > .process(new MetricDataFilterProcessFunction()) > > > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { > > > > MetricDataKey metricDataKey = new MetricDataKey(); > > > > metricDataKey.setDomain(metric.getDomain()); > > > > metricDataKey.setStationAliasCode(metric.getStaId()); > > > > metricDataKey.setEquipMK(metric.getEquipMK()); > > > > metricDataKey.setEquipID(metric.getEquipID()); > > > > metricDataKey.setMetric(metric.getMetric()); > > > > return metricDataKey; > > > > }) > > > > > > > > .window(SlidingEventTimeWindows.of(Time.seconds(2), > > Time.seconds(1))) > > > > .apply(new RichWindowFunction<Metric, MetricDataList, > > > >MetricDataKey, TimeWindow>() { > > > > @Override > > > > public void apply(MetricDataKey tuple, TimeWindow window, > > > >Iterable<Metric> input, Collector<MetricDataList> out) throws > > > >Exception { > > > > input.forEach(x->{ > > > > System.out.println("--->>>"+x); > > > > }); > > > > } > > > > }) > > > > > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x); > > > > > > > > > > > >数据一直在消费着,没有任何报错信息 > > > > > > |
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。
huang botao <[hidden email]> 于2020年11月19日周四 下午12:58写道: > hi, zhisheng, hailongwang: > > 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() > 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 > > > > On Wed, Nov 18, 2020 at 10:46 PM zhisheng <[hidden email]> wrote: > > > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。 > > > > Best > > zhisheng > > > > huang botao <[hidden email]> 于2020年11月18日周三 下午10:34写道: > > > > > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime > > > > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > > window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法 > > > > > > > > > On Wed, Nov 18, 2020 at 5:50 PM hailongwang <[hidden email]> > wrote: > > > > > > > 应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进 > > > > > > > > 在 2020-11-18 15:29:54,"huang botao" <[hidden email]> 写道: > > > > >Hi ,请教一个奇怪的问题: > > > > > > > > > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > > > > > > > > > > .assignTimestampsAndWatermarks(new > > > > >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > > > > > > > > > > .connect(ruleConfigSource) > > > > > .process(new MetricDataFilterProcessFunction()) > > > > > .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { > > > > > MetricDataKey metricDataKey = new MetricDataKey(); > > > > > metricDataKey.setDomain(metric.getDomain()); > > > > > metricDataKey.setStationAliasCode(metric.getStaId()); > > > > > metricDataKey.setEquipMK(metric.getEquipMK()); > > > > > metricDataKey.setEquipID(metric.getEquipID()); > > > > > metricDataKey.setMetric(metric.getMetric()); > > > > > return metricDataKey; > > > > > }) > > > > > > > > > > .window(SlidingEventTimeWindows.of(Time.seconds(2), > > > Time.seconds(1))) > > > > > .apply(new RichWindowFunction<Metric, MetricDataList, > > > > >MetricDataKey, TimeWindow>() { > > > > > @Override > > > > > public void apply(MetricDataKey tuple, TimeWindow window, > > > > >Iterable<Metric> input, Collector<MetricDataList> out) throws > > > > >Exception { > > > > > input.forEach(x->{ > > > > > System.out.println("--->>>"+x); > > > > > }); > > > > > } > > > > > }) > > > > > > > > > >我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 > System.out.println("--->>>"+x); > > > > > > > > > > > > > > >数据一直在消费着,没有任何报错信息 > > > > > > > > > > |
Free forum by Nabble | Edit this page |