嵌套 json 中string 数组的解析异常

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

嵌套 json 中string 数组的解析异常

Jun Zou
Hi all:
我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:
Row(parsedResponse: BasicArrayTypeInfo<String>, timestamp: Long)
执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast
to [Ljava.lang.String;
at
org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:
{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}
 P.S:
如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。
Reply | Threaded
Open this post in threaded view
|

Re: 嵌套 json 中string 数组的解析异常

Leonard Xu
Hi,

方便把 SQL 也贴下吗?看起来像个bug。

祝好,
Leonard Xu

Reply | Threaded
Open this post in threaded view
|

Re: 嵌套 json 中string 数组的解析异常

Jun Zou
Hi, Leonard Xu:

我使用的 sql 如下,

> SELECT TUMBLE_START(rowtime, INTERVAL '30' SECOND) AS ts, fruit,
> COUNT(`fruit`) AS `cnt`
> FROM mysource, UNNEST(mysource.parsedResponse) AS A(fruit)
> GROUP BY TUMBLE(rowtime, INTERVAL '30' SECOND), fruit


从调试日志来看,应该是一开始就挂掉了,我贴一下相关的日志

INFO - Initializing heap keyed state backend with stream factory.

INFO - Source: Custom Source -> Timestamps/Watermarks -> from:

> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (d8c5f92b850811595dbdc130c04f9e58) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be
> cast to [Ljava.lang.String;
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
> ... 10 more
>

另外,如果我把string 数组的类型从  BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
改为 ObjectArrayTypeInfo.getInfoFor(Types.STRING), 即schema 从

> root
>  |-- parsedResponse: LEGACY(BasicArrayTypeInfo<String>)
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>
变为

> root
>  |-- parsedResponse: ARRAY<STRING>
>  |-- rowtime: TIMESTAMP(3) *ROWTIME*
>

也仍然会发生相同的错误,但日志执行有些不同

> INFO - Source: Custom Source -> Timestamps/Watermarks -> from:
> (parsedResponse, rowtime) -> correlate:
> table(explode($cor0.parsedResponse)), select: parsedResponse, rowtime, f0
> -> select: (rowtime, fruit) -> time attribute: (rowtime) (1/1)
> (36b79032354b9e9ab70a30d98b1de903) switched from RUNNING to FAILED.
> java.lang.Exception:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.CommonConsumer.run(CommonConsumer.java:49)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> ... 10 more
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be
> cast to [Ljava.lang.String;
> at DataStreamSourceConversion$5.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
> ... 16 more
>

我尝试使用 JsonRowSchemaConverter对 schema 进行转换,得到的schema和上一封邮件里面是一致的,即:

> Row(parsedResponse: BasicArrayTypeInfo<String>, timestamp: Timestamp)
>
所以是我的操作在哪里出现了问题呢?

感谢您的回复!

祝好!


Leonard Xu <[hidden email]> 于2020年7月7日周二 下午5:48写道:

> Hi,
>
> 方便把 SQL 也贴下吗?看起来像个bug。
>
> 祝好,
> Leonard Xu
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 嵌套 json 中string 数组的解析异常

Leonard Xu
Hi,

看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。

祝好,
Leonard Xu
[1]https://issues.apache.org/jira/browse/FLINK-16622 <https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790>

Reply | Threaded
Open this post in threaded view
|

Re: 嵌套 json 中string 数组的解析异常

Jun Zou
Hi,
感谢您的指导!

祝好!

Leonard Xu <[hidden email]> 于2020年7月7日周二 下午9:49写道:

> Hi,
>
> 看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的
> type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。
>
> 祝好,
> Leonard Xu
> [1]https://issues.apache.org/jira/browse/FLINK-16622 <
> https://issues.apache.org/jira/browse/FLINK-16622?focusedCommentId=17061790&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17061790
> >
>
>