我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:
public static String timeStampToDate(Long timestamp){ ThreadLocal<SimpleDateFormat> threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); String format = threadLocal.get().format(new Date(timestamp)); return format.substring(0,19); } 根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下: . . . //根据数据所在的分钟(processingTime) keyBy KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); } }); SingleOutputStreamOperator<TreeMap<Double, Tuple2<String, String>>> topNforEveWindow = keyByStream .window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000))) .trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500))) // .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new MyProcessWindowFuncation()); //sink topNforEveWindow.printToErr("topNforEveWindow===="); . . . 程序运行时,随机在某些整分钟时抛出以下空指针警告: 19:49:22,001 WARN org.apache.flink.runtime.taskmanager.Task [] - Window(TumblingProcessingTimeWindows(1000), ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) -> Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99) ~[flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-runtime_2.11-1.11.2.jar:1.11.2] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-runtime_2.11-1.11.2.jar:1.11.2] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] 请帮忙查看是什么原因? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
operator操作:processWindowFunction的代码如下:
class MyProcessWindowFuncation extends ProcessWindowFunction<ShareRealTimeData, TreeMap<Double, Tuple2<String, String>>, String, TimeWindow>{ private transient MapState<String, Tuple2<String, Double>> eveShareNoMaxPrice; private transient ValueState<TreeMap<Double, Tuple2<String, String>>> shareAndMaxPrice; @Override public void process(String s, Context context, Iterable<ShareRealTimeData> elements, Collector<TreeMap<Double, Tuple2<String, String>>> out) throws Exception { Iterator<ShareRealTimeData> iterator = elements.iterator(); //得到每trigger周期内每个shareNo的最大值 while (iterator.hasNext()) { ShareRealTimeData next = iterator.next(); Tuple2<String, Double> t2 = eveShareNoMaxPrice.get(next.getShareNo()); if (t2 == null || t2.f1 < next.getCurrentPrice()) { eveShareNoMaxPrice.put(next.getShareNo(), Tuple2.of(next.getShareName(), next.getCurrentPrice())); } } TreeMap<Double, Tuple2<String, String>> shareAndMaxPriceV = shareAndMaxPrice.value(); if (shareAndMaxPriceV == null) { shareAndMaxPriceV = new TreeMap(new Comparator<Double>() { @Override public int compare(Double o1, Double o2) { return Double.compare(o2, o1); } }); } Iterator<Map.Entry<String, Tuple2<String, Double>>> keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator(); while (keysAndMaxPrice.hasNext()) { Map.Entry<String, Tuple2<String, Double>> next = keysAndMaxPrice.next(); shareAndMaxPriceV.put(next.getValue().f1, Tuple2.of(next.getKey(), next.getValue().f0)); if (shareAndMaxPriceV.size() > 20) { shareAndMaxPriceV.pollLastEntry(); } } eveShareNoMaxPrice.clear(); shareAndMaxPrice.clear(); out.collect(shareAndMaxPriceV); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); eveShareNoMaxPrice = getRuntimeContext().getMapState(new MapStateDescriptor<String, Tuple2<String, Double>>("eveShareNoMaxPrice", TypeInformation.of(new TypeHint<String>() { }), TypeInformation.of(new TypeHint<Tuple2<String, Double>>() { }))); shareAndMaxPrice = getRuntimeContext().getState(new ValueStateDescriptor<TreeMap<Double, Tuple2<String, String>>>("shareAndMaxPrice", TypeInformation.of(new TypeHint<TreeMap<Double, Tuple2<String, String>>>() { }))); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
这个问题en...出在如下地方:
KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); // 此处,不可以使用new Date这种当前时间。 } }); 修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date 然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。 然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。 原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。 lp <[hidden email]> 于2021年1月5日周二 下午8:11写道: > operator操作:processWindowFunction的代码如下: > > class MyProcessWindowFuncation extends > ProcessWindowFunction<ShareRealTimeData, TreeMap<Double, > Tuple2<String, String>>, String, TimeWindow>{ > private transient MapState<String, Tuple2<String, Double>> > eveShareNoMaxPrice; > private transient ValueState<TreeMap<Double, Tuple2<String, > String>>> shareAndMaxPrice; > > > @Override > public void process(String s, Context context, > Iterable<ShareRealTimeData> elements, Collector<TreeMap<Double, > Tuple2<String, String>>> out) throws Exception { > Iterator<ShareRealTimeData> iterator = elements.iterator(); > > //得到每trigger周期内每个shareNo的最大值 > while (iterator.hasNext()) { > ShareRealTimeData next = iterator.next(); > Tuple2<String, Double> t2 = > eveShareNoMaxPrice.get(next.getShareNo()); > if (t2 == null || t2.f1 < next.getCurrentPrice()) { > eveShareNoMaxPrice.put(next.getShareNo(), > Tuple2.of(next.getShareName(), next.getCurrentPrice())); > } > } > > > TreeMap<Double, Tuple2<String, String>> shareAndMaxPriceV = > shareAndMaxPrice.value(); > if (shareAndMaxPriceV == null) { > shareAndMaxPriceV = new TreeMap(new Comparator<Double>() { > @Override > public int compare(Double o1, Double o2) { > return Double.compare(o2, o1); > } > }); > } > Iterator<Map.Entry<String, Tuple2<String, Double>>> > keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator(); > while (keysAndMaxPrice.hasNext()) { > Map.Entry<String, Tuple2<String, Double>> next = > keysAndMaxPrice.next(); > > shareAndMaxPriceV.put(next.getValue().f1, > Tuple2.of(next.getKey(), next.getValue().f0)); > if (shareAndMaxPriceV.size() > 20) { > shareAndMaxPriceV.pollLastEntry(); > } > } > > eveShareNoMaxPrice.clear(); > shareAndMaxPrice.clear(); > > out.collect(shareAndMaxPriceV); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > eveShareNoMaxPrice = getRuntimeContext().getMapState(new > MapStateDescriptor<String, Tuple2<String, Double>>("eveShareNoMaxPrice", > TypeInformation.of(new TypeHint<String>() { > }), TypeInformation.of(new TypeHint<Tuple2<String, > Double>>() > { > }))); > shareAndMaxPrice = getRuntimeContext().getState(new > ValueStateDescriptor<TreeMap<Double, Tuple2<String, > String>>>("shareAndMaxPrice", TypeInformation.of(new > TypeHint<TreeMap<Double, Tuple2<String, String>>>() { > }))); > } > } > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |