Hi,
发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。 java.lang.NullPointerException at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。 难道map 不能返回null值吗? @Override protected <X> void pushToOperator(StreamRecord<X> record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects. @SuppressWarnings("unchecked") StreamRecord<T> castRecord = (StreamRecord<T>) record; numRecordsIn.inc(); StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String.format( "%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } } |
Hi Allan,
map只能返回非null,你可以考虑使用flatMap。 Qi On Sun, Sep 29, 2019 at 4:31 PM allan <[hidden email]> wrote: > Hi, > 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。 > > java.lang.NullPointerException > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at org.apache.flink.streaming.runtime.io > .StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > > 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。 > 难道map 不能返回null值吗? > > > @Override > protected <X> void pushToOperator(StreamRecord<X> record) { > try { > // we know that the given outputTag matches our OutputTag so the > record > // must be of the type that our operator (and Serializer) expects. > @SuppressWarnings("unchecked") > StreamRecord<T> castRecord = (StreamRecord<T>) record; > > numRecordsIn.inc(); > StreamRecord<T> copy = > castRecord.copy(serializer.copy(castRecord.getValue())); > operator.setKeyContextElement1(copy); > operator.processElement(copy); > } catch (ClassCastException e) { > if (outputTag != null) { > // Enrich error message > ClassCastException replace = new ClassCastException( > String.format( > "%s. Failed to push OutputTag with id '%s' to operator. > " + > "This can occur when multiple OutputTags with > different types " + > "but identical names are being used.", > e.getMessage(), > outputTag.getId())); > > throw new ExceptionInChainedOperatorException(replace); > } else { > throw new ExceptionInChainedOperatorException(e); > } > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > > } > } > > > |
ok,我知道了。确定一下,之前没发现,跟了一下代码,所以问一下。多谢! 在 2019-09-29 16:44:53,"Qi Luo" <[hidden email]> 写道: >Hi Allan, > >map只能返回非null,你可以考虑使用flatMap。 > >Qi > >On Sun, Sep 29, 2019 at 4:31 PM allan <[hidden email]> wrote: > >> Hi, >> 发现了一个问题,就是在调用map函数后,返回的null值,map的返回值为tuple2,在下一步filter过滤null值是,作业报异常。 >> >> java.lang.NullPointerException >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) >> at org.apache.flink.streaming.runtime.io >> .StreamInputProcessor.processInput(StreamInputProcessor.java:202) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> >> >> 经过调试发现代码中 OperatorChain 类中的 pushToOperator 方法 ,参数 record为空的情况下会报异常。 >> 难道map 不能返回null值吗? >> >> >> @Override >> protected <X> void pushToOperator(StreamRecord<X> record) { >> try { >> // we know that the given outputTag matches our OutputTag so the >> record >> // must be of the type that our operator (and Serializer) expects. >> @SuppressWarnings("unchecked") >> StreamRecord<T> castRecord = (StreamRecord<T>) record; >> >> numRecordsIn.inc(); >> StreamRecord<T> copy = >> castRecord.copy(serializer.copy(castRecord.getValue())); >> operator.setKeyContextElement1(copy); >> operator.processElement(copy); >> } catch (ClassCastException e) { >> if (outputTag != null) { >> // Enrich error message >> ClassCastException replace = new ClassCastException( >> String.format( >> "%s. Failed to push OutputTag with id '%s' to operator. >> " + >> "This can occur when multiple OutputTags with >> different types " + >> "but identical names are being used.", >> e.getMessage(), >> outputTag.getId())); >> >> throw new ExceptionInChainedOperatorException(replace); >> } else { >> throw new ExceptionInChainedOperatorException(e); >> } >> } catch (Exception e) { >> throw new ExceptionInChainedOperatorException(e); >> } >> >> } >> } >> >> >> |
Free forum by Nabble | Edit this page |