flink 1.11 cdc: 如何将DataStream<RowData> 要如何转成flink sql cdc里的table?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 cdc: 如何将DataStream<RowData> 要如何转成flink sql cdc里的table?

jindy_liu
目前有两个DataStream<String>的流,通过mapfunction,
转成DataStream<RowData>流,请问DataStream<RowData>怎么转成table,并使用flink sql进行操作。
*(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*

*目前我的做法会报错:*

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
fsSettings);

DataStreamSource<String> json1 // canal json的格式
DataStreamSource<String> json2  // canal json的格式
ConnectedStreams<String, String> connect=
caliber_cdc_json.connect(caliber_snapshot_json); //connect
DataStream<RowData> snapshot_cdc_stream = connect.flatMap(
            new SnapshotCdcCoRichFlatMapFunction()
        ); //做连接

//3, 注册表,将表数据,直接输出
Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
fsTableEnv.createTemporaryView("test", snapshot_cdc_table);

String output = "CREATE TABLE test_mirror (\n" +
                "`id` INT,\n" +
                "`name` VARCHAR(255),\n" +
                "`time` TIMESTAMP(3),\n" +
                "PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'print'\n" +
                ")";

 //4, app logic
String op = "INSERT into test_mirror SELECT * from test";
fsTableEnv.executeSql(output);
fsTableEnv.executeSql(op);


*但提交任务失败,错误信息:*
serializationSchema:root
 |-- id: INT NOT NULL
 |-- name: VARCHAR(255)
 |-- time: TIMESTAMP(3)
 |-- status: INT
 |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

snapshot_cdc_table:UnnamedTable$0
+----------------+
|     table name |
+----------------+
| UnnamedTable$0 |
|           test |
|    test_mirror |
+----------------+
3 rows in set

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: A raw type backed by type information has no serializable
string representation. It needs to be resolved into a proper raw type.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at
org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
Source)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
*Caused by: org.apache.flink.table.api.TableException: A raw type backed by
type information has no serializable string representation. It needs to be
resolved into a proper raw type.*
        at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
        at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
        at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
        at scala.Option.map(Option.scala:146)
        at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
        at com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        ... 9 more

请问是啥原因?需要怎么做?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc: 如何将DataStream<RowData> 要如何转成flink sql cdc里的table?

Jark
Administrator
1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only
stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。

对于你的场景,有以下几种解决办法:
1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream<Row> 先注册成一个 insert-only 的 Table,然后用
Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。
2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc
抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。

Best,
Jark



On Thu, 5 Nov 2020 at 10:07, jindy_liu <[hidden email]> wrote:

> 目前有两个DataStream<String>的流,通过mapfunction,
> 转成DataStream<RowData>流,请问DataStream<RowData>怎么转成table,并使用flink sql进行操作。
> *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*
>
> *目前我的做法会报错:*
>
> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
> fsSettings);
>
> DataStreamSource<String> json1 // canal json的格式
> DataStreamSource<String> json2  // canal json的格式
> ConnectedStreams<String, String> connect=
> caliber_cdc_json.connect(caliber_snapshot_json); //connect
> DataStream<RowData> snapshot_cdc_stream = connect.flatMap(
>             new SnapshotCdcCoRichFlatMapFunction()
>         ); //做连接
>
> //3, 注册表,将表数据,直接输出
> Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
> fsTableEnv.createTemporaryView("test", snapshot_cdc_table);
>
> String output = "CREATE TABLE test_mirror (\n" +
>                 "`id` INT,\n" +
>                 "`name` VARCHAR(255),\n" +
>                 "`time` TIMESTAMP(3),\n" +
>                 "PRIMARY KEY(id) NOT ENFORCED\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'print'\n" +
>                 ")";
>
>  //4, app logic
> String op = "INSERT into test_mirror SELECT * from test";
> fsTableEnv.executeSql(output);
> fsTableEnv.executeSql(op);
>
>
> *但提交任务失败,错误信息:*
> serializationSchema:root
>  |-- id: INT NOT NULL
>  |-- name: VARCHAR(255)
>  |-- time: TIMESTAMP(3)
>  |-- status: INT
>  |-- CONSTRAINT PK_3386 PRIMARY KEY (id)
>
> snapshot_cdc_table:UnnamedTable$0
> +----------------+
> |     table name |
> +----------------+
> | UnnamedTable$0 |
> |           test |
> |    test_mirror |
> +----------------+
> 3 rows in set
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: A raw type backed by type information has no serializable
> string representation. It needs to be resolved into a proper raw type.
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>         at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>         at
> org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
> Source)
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> *Caused by: org.apache.flink.table.api.TableException: A raw type backed by
> type information has no serializable string representation. It needs to be
> resolved into a proper raw type.*
>         at
>
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
>         at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>         at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
>         at scala.Option.map(Option.scala:146)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>         at
> com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>         ... 9 more
>
> 请问是啥原因?需要怎么做?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc: 如何将DataStream<RowData> 要如何转成flink sql cdc里的table?

Jark
Administrator
附去重文档链接:
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D

On Thu, 5 Nov 2020 at 12:01, Jark Wu <[hidden email]> wrote:

> 1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
> 2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only
> stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。
>
> 对于你的场景,有以下几种解决办法:
> 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream<Row> 先注册成一个 insert-only 的 Table,然后用
> Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。
> 2. 如果你的流中有删除,那么....你得自己开发一个 sql connector,把 cdc
> 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。
>
> Best,
> Jark
>
>
>
> On Thu, 5 Nov 2020 at 10:07, jindy_liu <[hidden email]> wrote:
>
>> 目前有两个DataStream<String>的流,通过mapfunction,
>> 转成DataStream<RowData>流,请问DataStream<RowData>怎么转成table,并使用flink sql进行操作。
>> *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*
>>
>> *目前我的做法会报错:*
>>
>> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
>> fsSettings);
>>
>> DataStreamSource<String> json1 // canal json的格式
>> DataStreamSource<String> json2  // canal json的格式
>> ConnectedStreams<String, String> connect=
>> caliber_cdc_json.connect(caliber_snapshot_json); //connect
>> DataStream<RowData> snapshot_cdc_stream = connect.flatMap(
>>             new SnapshotCdcCoRichFlatMapFunction()
>>         ); //做连接
>>
>> //3, 注册表,将表数据,直接输出
>> Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
>> fsTableEnv.createTemporaryView("test", snapshot_cdc_table);
>>
>> String output = "CREATE TABLE test_mirror (\n" +
>>                 "`id` INT,\n" +
>>                 "`name` VARCHAR(255),\n" +
>>                 "`time` TIMESTAMP(3),\n" +
>>                 "PRIMARY KEY(id) NOT ENFORCED\n" +
>>                 ") WITH (\n" +
>>                 "  'connector' = 'print'\n" +
>>                 ")";
>>
>>  //4, app logic
>> String op = "INSERT into test_mirror SELECT * from test";
>> fsTableEnv.executeSql(output);
>> fsTableEnv.executeSql(op);
>>
>>
>> *但提交任务失败,错误信息:*
>> serializationSchema:root
>>  |-- id: INT NOT NULL
>>  |-- name: VARCHAR(255)
>>  |-- time: TIMESTAMP(3)
>>  |-- status: INT
>>  |-- CONSTRAINT PK_3386 PRIMARY KEY (id)
>>
>> snapshot_cdc_table:UnnamedTable$0
>> +----------------+
>> |     table name |
>> +----------------+
>> | UnnamedTable$0 |
>> |           test |
>> |    test_mirror |
>> +----------------+
>> 3 rows in set
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error: A raw type backed by type information has no serializable
>> string representation. It needs to be resolved into a proper raw type.
>>         at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>>         at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>         at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>         at
>>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>>         at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>>         at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>>         at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>>         at
>> org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
>> Source)
>>         at
>>
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>         at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> *Caused by: org.apache.flink.table.api.TableException: A raw type backed
>> by
>> type information has no serializable string representation. It needs to be
>> resolved into a proper raw type.*
>>         at
>>
>> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
>>         at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>>         at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>>         at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>         at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>         at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>         at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
>>         at
>>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
>>         at
>>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
>>         at scala.Option.map(Option.scala:146)
>>         at
>>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>         at
>>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>         at
>>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>         at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>         at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>         at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>         at
>>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>         at
>>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>>         at
>>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>>         at
>>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>>         at
>>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>>         at
>> com.qqmusic.quku.demo_app.StreamTableSql.main(StreamTableSql.java:126)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>         ... 9 more
>>
>> 请问是啥原因?需要怎么做?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 cdc: 如何将DataStream<RowData> 要如何转成flink sql cdc里的table?

jindy_liu
好的,谢谢jark!
数据是有删除的,所以看看要实现下souce方案。本来只想在上层用mapfuction进行一下合并来的,再做转换!
看来还是绕不过sql connector实现。源是kafka,看样子要想办法把kafka的流KafkaDynamicSource想办法改造下!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/