回复:在json schema中如何定义数组字段

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

回复:在json schema中如何定义数组字段

Jun Zhang-2
你可以这样用:


Types.LIST(Types.STRING)






------------------ 原始邮件 ------------------
发件人: 苏 欣 <[hidden email]&gt;
发送时间: 2019年9月19日 16:39
收件人: [hidden email] <[hidden email]&gt;
主题: 回复:在json schema中如何定义数组字段



我是用的是flink-1.9版本,现在想消费带数组字段的json数据,类似{“field1:” “aaa”, “field2”: [“bbb, “ccc”]} 这种数据格式。


注册schema的时候我使用了field("field2", Types.OBJECT_ARRAY(Types.STRING))这种方式。EnvironmentSetting指定为flink-planner和blink-planner都有问题。


在flink-planner下,会报下面的错误,我跟了下代码,发现是StringArraySerializer.copy方法接收的是String[],传过去的却是Object[]。
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.lang.String;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 13 more

在blink-planner下,程序一启动就会报下面的错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(BasicArrayTypeInfo<String&gt;) of table field 'field2' does not match with type BasicArrayTypeInfo<String&gt; of the field ' field2' of the TableSource return type.
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

请问这个问题有没有人知道怎么解决,定义数组字段有没有其他的方法呢?

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986&gt;应用
Reply | Threaded
Open this post in threaded view
|

回复: 回复:在json schema中如何定义数组字段

苏 欣
感谢回复。Types.LIST(Types.STRING)我试了一下,在flink-planner下可以,在blink-planner下还是会报跟Types.OBJECT_ARRAY(Types.STRING)一样的错:
Type LEGACY(List<String>) of table field 'ARRAY_FIELD' does not match with type List<String> of the field 'ARRAY_FIELD' of the TableSource return type.
请问在blink-planner下面有没有方法可以处理这种字段结构?

另外还有个问题,怎么在flink sql中表达这种字段,比如说我想取数组字段中指定索引的值,使用select array_field[1] from test 这种方式会报错 Type is not supported: ANY。

________________________________
[hidden email]

发件人: Jun Zhang<mailto:[hidden email]>
发送时间: 2019-09-19 18:55
收件人: user-zh<mailto:[hidden email]>; [hidden email]<mailto:[hidden email]>
主题: 回复:在json schema中如何定义数组字段
你可以这样用:


Types.LIST(Types.STRING)






------------------ 原始邮件 ------------------
发件人: 苏 欣 <[hidden email]&gt;
发送时间: 2019年9月19日 16:39
收件人: [hidden email] <[hidden email]&gt;
主题: 回复:在json schema中如何定义数组字段



我是用的是flink-1.9版本,现在想消费带数组字段的json数据,类似{“field1:” “aaa”, “field2”: [“bbb, “ccc”]} 这种数据格式。


注册schema的时候我使用了field("field2", Types.OBJECT_ARRAY(Types.STRING))这种方式。EnvironmentSetting指定为flink-planner和blink-planner都有问题。


在flink-planner下,会报下面的错误,我跟了下代码,发现是StringArraySerializer.copy方法接收的是String[],传过去的却是Object[]。
Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Ljava.lang.String;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 13 more

在blink-planner下,程序一启动就会报下面的错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type LEGACY(BasicArrayTypeInfo<String&gt;) of table field 'field2' does not match with type BasicArrayTypeInfo<String&gt; of the field ' field2' of the TableSource return type.
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

请问这个问题有没有人知道怎么解决,定义数组字段有没有其他的方法呢?

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986&gt;应用