flink 空指针警告

classic Classic list List threaded Threaded
4 messages Options
lp
Reply | Threaded
Open this post in threaded view
|

flink 空指针警告

lp
我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:

public static String timeStampToDate(Long timestamp){
        ThreadLocal<SimpleDateFormat> threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        String format = threadLocal.get().format(new Date(timestamp));
        return format.substring(0,19);
    }



根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下:

.
.
.
//根据数据所在的分钟(processingTime) keyBy
        KeyedStream<ShareRealTimeData, String> keyByStream =
signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() {
            @Override
            public String getKey(ShareRealTimeData value) throws Exception {
                return DateUtilMinutes.timeStampToDate(new
Date().getTime());
            }
        });


        SingleOutputStreamOperator<TreeMap&lt;Double, Tuple2&lt;String,
String>>> topNforEveWindow = keyByStream
               
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000)))
               
.trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500)))
//                .evictor(TimeEvictor.of(Time.seconds(0), true))
                .process(new MyProcessWindowFuncation());


        //sink
        topNforEveWindow.printToErr("topNforEveWindow====");

.
.
.

程序运行时,随机在某些整分钟时抛出以下空指针警告:
19:49:22,001 WARN  org.apache.flink.runtime.taskmanager.Task                  
[] - Window(TumblingProcessingTimeWindows(1000),
ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) ->
Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched
from RUNNING to FAILED.
java.lang.NullPointerException: null
        at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]


请帮忙查看是什么原因?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
lp
Reply | Threaded
Open this post in threaded view
|

Re: flink 空指针警告

lp
operator操作:processWindowFunction的代码如下:

class MyProcessWindowFuncation extends
ProcessWindowFunction<ShareRealTimeData, TreeMap&lt;Double,
Tuple2&lt;String, String>>, String, TimeWindow>{
        private transient MapState<String, Tuple2&lt;String, Double>>
eveShareNoMaxPrice;
        private transient ValueState<TreeMap&lt;Double, Tuple2&lt;String,
String>>> shareAndMaxPrice;


    @Override
        public void process(String s, Context context,
Iterable<ShareRealTimeData> elements, Collector<TreeMap&lt;Double,
Tuple2&lt;String, String>>> out) throws Exception {
            Iterator<ShareRealTimeData> iterator = elements.iterator();

            //得到每trigger周期内每个shareNo的最大值
            while (iterator.hasNext()) {
                ShareRealTimeData next = iterator.next();
                Tuple2<String, Double> t2 =
eveShareNoMaxPrice.get(next.getShareNo());
                if (t2 == null || t2.f1 < next.getCurrentPrice()) {
                    eveShareNoMaxPrice.put(next.getShareNo(),
Tuple2.of(next.getShareName(), next.getCurrentPrice()));
                }
            }


            TreeMap<Double, Tuple2&lt;String, String>> shareAndMaxPriceV =
shareAndMaxPrice.value();
            if (shareAndMaxPriceV == null) {
                shareAndMaxPriceV = new TreeMap(new Comparator<Double>() {
                    @Override
                    public int compare(Double o1, Double o2) {
                        return Double.compare(o2, o1);
                    }
                });
            }
            Iterator<Map.Entry&lt;String, Tuple2&lt;String, Double>>>
keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
            while (keysAndMaxPrice.hasNext()) {
                Map.Entry<String, Tuple2&lt;String, Double>> next =
keysAndMaxPrice.next();

                shareAndMaxPriceV.put(next.getValue().f1,
Tuple2.of(next.getKey(), next.getValue().f0));
                if (shareAndMaxPriceV.size() > 20) {
                    shareAndMaxPriceV.pollLastEntry();
                }
            }

            eveShareNoMaxPrice.clear();
            shareAndMaxPrice.clear();

            out.collect(shareAndMaxPriceV);
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            eveShareNoMaxPrice = getRuntimeContext().getMapState(new
MapStateDescriptor<String, Tuple2&lt;String, Double>>("eveShareNoMaxPrice",
TypeInformation.of(new TypeHint<String>() {
            }), TypeInformation.of(new TypeHint<Tuple2&lt;String, Double>>()
{
            })));
            shareAndMaxPrice = getRuntimeContext().getState(new
ValueStateDescriptor<TreeMap&lt;Double, Tuple2&lt;String,
String>>>("shareAndMaxPrice", TypeInformation.of(new
TypeHint<TreeMap&lt;Double, Tuple2&lt;String, String>>>() {
            })));
        }
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 空指针警告

nobleyd
这个问题en...出在如下地方:

        KeyedStream<ShareRealTimeData, String> keyByStream =
signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() {
            @Override
            public String getKey(ShareRealTimeData value) throws Exception {
                return DateUtilMinutes.timeStampToDate(new
Date().getTime());  //  此处,不可以使用new Date这种当前时间。
            }
        });

修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date
然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。
然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。

原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。

lp <[hidden email]> 于2021年1月5日周二 下午8:11写道:

> operator操作:processWindowFunction的代码如下:
>
> class MyProcessWindowFuncation extends
> ProcessWindowFunction<ShareRealTimeData, TreeMap&lt;Double,
> Tuple2&lt;String, String>>, String, TimeWindow>{
>         private transient MapState<String, Tuple2&lt;String, Double>>
> eveShareNoMaxPrice;
>         private transient ValueState<TreeMap&lt;Double, Tuple2&lt;String,
> String>>> shareAndMaxPrice;
>
>
>     @Override
>         public void process(String s, Context context,
> Iterable<ShareRealTimeData> elements, Collector<TreeMap&lt;Double,
> Tuple2&lt;String, String>>> out) throws Exception {
>             Iterator<ShareRealTimeData> iterator = elements.iterator();
>
>             //得到每trigger周期内每个shareNo的最大值
>             while (iterator.hasNext()) {
>                 ShareRealTimeData next = iterator.next();
>                 Tuple2<String, Double> t2 =
> eveShareNoMaxPrice.get(next.getShareNo());
>                 if (t2 == null || t2.f1 < next.getCurrentPrice()) {
>                     eveShareNoMaxPrice.put(next.getShareNo(),
> Tuple2.of(next.getShareName(), next.getCurrentPrice()));
>                 }
>             }
>
>
>             TreeMap<Double, Tuple2&lt;String, String>> shareAndMaxPriceV =
> shareAndMaxPrice.value();
>             if (shareAndMaxPriceV == null) {
>                 shareAndMaxPriceV = new TreeMap(new Comparator<Double>() {
>                     @Override
>                     public int compare(Double o1, Double o2) {
>                         return Double.compare(o2, o1);
>                     }
>                 });
>             }
>             Iterator<Map.Entry&lt;String, Tuple2&lt;String, Double>>>
> keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator();
>             while (keysAndMaxPrice.hasNext()) {
>                 Map.Entry<String, Tuple2&lt;String, Double>> next =
> keysAndMaxPrice.next();
>
>                 shareAndMaxPriceV.put(next.getValue().f1,
> Tuple2.of(next.getKey(), next.getValue().f0));
>                 if (shareAndMaxPriceV.size() > 20) {
>                     shareAndMaxPriceV.pollLastEntry();
>                 }
>             }
>
>             eveShareNoMaxPrice.clear();
>             shareAndMaxPrice.clear();
>
>             out.collect(shareAndMaxPriceV);
>         }
>
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>             eveShareNoMaxPrice = getRuntimeContext().getMapState(new
> MapStateDescriptor<String, Tuple2&lt;String, Double>>("eveShareNoMaxPrice",
> TypeInformation.of(new TypeHint<String>() {
>             }), TypeInformation.of(new TypeHint<Tuple2&lt;String,
> Double>>()
> {
>             })));
>             shareAndMaxPrice = getRuntimeContext().getState(new
> ValueStateDescriptor<TreeMap&lt;Double, Tuple2&lt;String,
> String>>>("shareAndMaxPrice", TypeInformation.of(new
> TypeHint<TreeMap&lt;Double, Tuple2&lt;String, String>>>() {
>             })));
>         }
> }
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
lp
Reply | Threaded
Open this post in threaded view
|

Re: flink 空指针警告

lp
好的,谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/