目前有两个DataStream<String>的流,通过mapfunction,
转成DataStream<RowData>流,请问DataStream<RowData>怎么转成table,并使用flink sql进行操作。 *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)* *目前我的做法会报错:* StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStreamSource<String> json1 // canal json的格式 DataStreamSource<String> json2 // canal json的格式 ConnectedStreams<String, String> connect= caliber_cdc_json.connect(caliber_snapshot_json); //connect DataStream<RowData> snapshot_cdc_stream = connect.flatMap( new SnapshotCdcCoRichFlatMapFunction() ); //做连接 //3, 注册表,将表数据,直接输出 Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream); fsTableEnv.createTemporaryView("test", snapshot_cdc_table); String output = "CREATE TABLE test_mirror (\n" + "`id` INT,\n" + "`name` VARCHAR(255),\n" + "`time` TIMESTAMP(3),\n" + "PRIMARY KEY(id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"; //4, app logic String op = "INSERT into test_mirror SELECT * from test"; fsTableEnv.executeSql(output); fsTableEnv.executeSql(op); *但提交任务失败,错误信息:* serializationSchema:root |-- id: INT NOT NULL |-- name: VARCHAR(255) |-- time: TIMESTAMP(3) |-- status: INT |-- CONSTRAINT PK_3386 PRIMARY KEY (id) snapshot_cdc_table:UnnamedTable$0 +----------------+ | table name | +----------------+ | UnnamedTable$0 | | test | | test_mirror | +----------------+ 3 rows in set ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown Source) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) *Caused by: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type.* at org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 9 more 请问是啥原因?需要怎么做? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。 对于你的场景,有以下几种解决办法: 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream<Row> 先注册成一个 insert-only 的 Table,然后用 Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。 2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。 Best, Jark On Thu, 5 Nov 2020 at 10:07, jindy_liu <[hidden email]> wrote: > 目前有两个DataStream<String>的流,通过mapfunction, > 转成DataStream<RowData>流,请问DataStream<RowData>怎么转成table,并使用flink sql进行操作。 > *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)* > > *目前我的做法会报错:* > > StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, > fsSettings); > > DataStreamSource<String> json1 // canal json的格式 > DataStreamSource<String> json2 // canal json的格式 > ConnectedStreams<String, String> connect= > caliber_cdc_json.connect(caliber_snapshot_json); //connect > DataStream<RowData> snapshot_cdc_stream = connect.flatMap( > new SnapshotCdcCoRichFlatMapFunction() > ); //做连接 > > //3, 注册表,将表数据,直接输出 > Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream); > fsTableEnv.createTemporaryView("test", snapshot_cdc_table); > > String output = "CREATE TABLE test_mirror (\n" + > "`id` INT,\n" + > "`name` VARCHAR(255),\n" + > "`time` TIMESTAMP(3),\n" + > "PRIMARY KEY(id) NOT ENFORCED\n" + > ") WITH (\n" + > " 'connector' = 'print'\n" + > ")"; > > //4, app logic > String op = "INSERT into test_mirror SELECT * from test"; > fsTableEnv.executeSql(output); > fsTableEnv.executeSql(op); > > > *但提交任务失败,错误信息:* > serializationSchema:root > |-- id: INT NOT NULL > |-- name: VARCHAR(255) > |-- time: TIMESTAMP(3) > |-- status: INT > |-- CONSTRAINT PK_3386 PRIMARY KEY (id) > > snapshot_cdc_table:UnnamedTable$0 > +----------------+ > | table name | > +----------------+ > | UnnamedTable$0 | > | test | > | test_mirror | > +----------------+ > 3 rows in set > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: A raw type backed by type information has no serializable > string representation. It needs to be resolved into a proper raw type. > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown > Source) > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > *Caused by: org.apache.flink.table.api.TableException: A raw type backed by > type information has no serializable string representation. It needs to be > resolved into a proper raw type.* > at > > org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) > at scala.Option.map(Option.scala:146) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > at > com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 9 more > > 请问是啥原因?需要怎么做? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Administrator
|
附去重文档链接:
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D On Thu, 5 Nov 2020 at 12:01, Jark Wu <[hidden email]> wrote: > 1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。 > 2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only > stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。 > > 对于你的场景,有以下几种解决办法: > 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream<Row> 先注册成一个 insert-only 的 Table,然后用 > Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。 > 2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc > 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。 > > Best, > Jark > > > > On Thu, 5 Nov 2020 at 10:07, jindy_liu <[hidden email]> wrote: > >> 目前有两个DataStream<String>的流,通过mapfunction, >> 转成DataStream<RowData>流,请问DataStream<RowData>怎么转成table,并使用flink sql进行操作。 >> *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)* >> >> *目前我的做法会报错:* >> >> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, >> fsSettings); >> >> DataStreamSource<String> json1 // canal json的格式 >> DataStreamSource<String> json2 // canal json的格式 >> ConnectedStreams<String, String> connect= >> caliber_cdc_json.connect(caliber_snapshot_json); //connect >> DataStream<RowData> snapshot_cdc_stream = connect.flatMap( >> new SnapshotCdcCoRichFlatMapFunction() >> ); //做连接 >> >> //3, 注册表,将表数据,直接输出 >> Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream); >> fsTableEnv.createTemporaryView("test", snapshot_cdc_table); >> >> String output = "CREATE TABLE test_mirror (\n" + >> "`id` INT,\n" + >> "`name` VARCHAR(255),\n" + >> "`time` TIMESTAMP(3),\n" + >> "PRIMARY KEY(id) NOT ENFORCED\n" + >> ") WITH (\n" + >> " 'connector' = 'print'\n" + >> ")"; >> >> //4, app logic >> String op = "INSERT into test_mirror SELECT * from test"; >> fsTableEnv.executeSql(output); >> fsTableEnv.executeSql(op); >> >> >> *但提交任务失败,错误信息:* >> serializationSchema:root >> |-- id: INT NOT NULL >> |-- name: VARCHAR(255) >> |-- time: TIMESTAMP(3) >> |-- status: INT >> |-- CONSTRAINT PK_3386 PRIMARY KEY (id) >> >> snapshot_cdc_table:UnnamedTable$0 >> +----------------+ >> | table name | >> +----------------+ >> | UnnamedTable$0 | >> | test | >> | test_mirror | >> +----------------+ >> 3 rows in set >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method >> caused an error: A raw type backed by type information has no serializable >> string representation. It needs to be resolved into a proper raw type. >> at >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) >> at >> >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) >> at >> >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) >> at >> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) >> at >> >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) >> at >> >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) >> at >> org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown >> Source) >> at >> >> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) >> *Caused by: org.apache.flink.table.api.TableException: A raw type backed >> by >> type information has no serializable string representation. It needs to be >> resolved into a proper raw type.* >> at >> >> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101) >> at >> >> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) >> at >> >> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92) >> at >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> >> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) >> at scala.Option.map(Option.scala:146) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >> at >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) >> at >> com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> ... 9 more >> >> 请问是啥原因?需要怎么做? >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> > |
好的,谢谢jark!
数据是有删除的,所以看看要实现下souce方案。本来只想在上层用mapfuction进行一下合并来的,再做转换! 看来还是绕不过sql connector实现。源是kafka,看样子要想办法把kafka的流KafkaDynamicSource想办法改造下!!! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |