获取流数据计算报错,socket输入能计算成功

classic Classic list List threaded Threaded
2 messages Options
zdj
Reply | Threaded
Open this post in threaded view
|

获取流数据计算报错,socket输入能计算成功

zdj
背景:阿里云rocketMq实例,开源flink1.11.2

自定义rocketmq source,可以正常消费到数据,但是在流式计算的时候会报错


org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at cn.shltkj.utils.RocketMqSourceFunction.lambda$run$0(RocketMqSourceFunction.java:49) ~[classes/:?]
        at com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl$MessageListenerImpl.consumeMessage(ConsumerImpl.java:110) [ons-client-1.8.8.Final.jar:1.8.8.Final]
        at com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:710) [ons-client-1.8.8.Final.jar:1.8.8.Final]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_271]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_271]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_271]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_271]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        ... 14 more
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:97) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        ... 14 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:256) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.partition.ResultPartition.tryGetBufferBuilder(ResultPartition.java:218) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:291) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:135) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:97) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        ... 14 more


查了一下说是数据有空字段或者null,检查了一下没发现空的问题


计算逻辑不变,source改成由socket输入,是可以正常解析并且能按照watermark输出结果


请问可能是什么原因引起的
Reply | Threaded
Open this post in threaded view
|

Re: 获取流数据计算报错,socket输入能计算成功

Lin Li
你好, 异常栈不是作业 fail 的根本原因,可以在 flink ui 的 exception history 或 jobmanager log
中尝试查找第一次 fail 的原因

zdj <[hidden email]> 于2021年6月8日周二 下午7:08写道:

> 背景:阿里云rocketMq实例,开源flink1.11.2
>
> 自定义rocketmq source,可以正常消费到数据,但是在流式计算的时候会报错
>
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> cn.shltkj.utils.RocketMqSourceFunction.lambda$run$0(RocketMqSourceFunction.java:49)
> ~[classes/:?]
>         at
> com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl$MessageListenerImpl.consumeMessage(ConsumerImpl.java:110)
> [ons-client-1.8.8.Final.jar:1.8.8.Final]
>         at
> com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:710)
> [ons-client-1.8.8.Final.jar:1.8.8.Final]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_271]
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_271]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_271]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_271]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         ... 14 more
> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:97)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         ... 14 more
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>         at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:339)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:309)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilder(LocalBufferPool.java:256)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.partition.ResultPartition.tryGetBufferBuilder(ResultPartition.java:218)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:291)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:135)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> ~[flink-runtime_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:97)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> ~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
>         ... 14 more
>
>
> 查了一下说是数据有空字段或者null,检查了一下没发现空的问题
>
>
> 计算逻辑不变,source改成由socket输入,是可以正常解析并且能按照watermark输出结果
>
>
> 请问可能是什么原因引起的