Hi all:
我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为: Row(parsedResponse: BasicArrayTypeInfo<String>, timestamp: Long) 执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误 Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.lang.String; at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) 大佬们知道该怎么修改么? 我的json 的结构如下: {"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"} P.S: 如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。 |
Hi,
方便把 SQL 也贴下吗?看起来像个bug。 祝好, Leonard Xu |
Hi, Leonard Xu:
我使用的 sql 如下, > SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit, > COUNT(`fruit`) AS `cnt` > FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit) > GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit 从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志 INFO - Initializing heap keyed state backend with stream factory. INFO - Source: Custom Source -> Timestamps/Watermarks -> from: > (parsedResponse, rowtime) -> correlate: > table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0 > -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1) > (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED. > java.lang.Exception: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) > Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be > cast to [Ljava.lang.String; > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > ... 10 more > 另外,如果我把string 数组的类型从 BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO 改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从 > root > |-- parsedResponse: LEGACY(BasicArrayTypeInfo<String>) > |-- rowtime: TIMESTAMP(3) *ROWTIME* > 变为 > root > |-- parsedResponse: ARRAY<STRING> > |-- rowtime: TIMESTAMP(3) *ROWTIME* > 也仍然会发生相同的错误,但日志执行有些不同 > INFO - Source: Custom Source -> Timestamps/Watermarks -> from: > (parsedResponse, rowtime) -> correlate: > table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0 > -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1) > (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED. > java.lang.Exception: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > ... 10 more > Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be > cast to [Ljava.lang.String; > at DataStreamSourceConversion$5.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > ... 16 more > 我尝试使用 JsonRowSchemaConverter对 schema 进行转换,得到的schema和上一封邮件里面是一致的,即: > Row(parsedResponse: BasicArrayTypeInfo<String>, timestamp: Timestamp) > 所以是我的操作在哪里出现了问题呢? 感谢您的回复! 祝好! Leonard Xu <[hidden email]> 于2020年7月7日周二 下午5:48写道: > Hi, > > 方便把 SQL 也贴下吗?看起来像个bug。 > > 祝好, > Leonard Xu > > |
Hi,
看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。 祝好, Leonard Xu [1]https://issues.apache.org/jira/browse/FLINK-16622 <https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790> |
Hi,
感谢您的指导! 祝好! Leonard Xu <[hidden email]> 于2020年7月7日周二 下午9:49写道: > Hi, > > 看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 > type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。 > > 祝好, > Leonard Xu > [1]https://issues.apache.org/jira/browse/FLINK-16622 < > https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790 > > > > |
Free forum by Nabble | Edit this page |