|
Administrator
|
Hi,
你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > " `data` VARCHAR , " + > " `table` VARCHAR " + > ") WITH (" + > " 'connector' = 'kafka'," + > " 'topic' = 'order_source'," + > " 'properties.bootstrap.servers' = '***'," + > " 'properties.group.id' = 'real1'," + > " 'format' = 'json'," + > " 'scan.startup.mode' = 'earliest-offset'" + > ")"; > > > 具体见附件 有打印 > > > > > |
我再上传一次
在2020年07月27日 18:55,[hidden email] 写道: Hi, |
Administrator
|
抱歉,还是没有看到附件。
如果是文本的话,你可以直接贴到邮件里。 On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: > 我再上传一次 > > 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: > > Hi, > 你的附件好像没有上传。 > > On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: > > > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > > " `data` VARCHAR , " + > > " `table` VARCHAR " + > > ") WITH (" + > > " 'connector' = 'kafka'," + > > " 'topic' = 'order_source'," + > > " 'properties.bootstrap.servers' = '***'," + > > " 'properties.group.id' = 'real1'," + > > " 'format' = 'json'," + > > " 'scan.startup.mode' = 'earliest-offset'" + > > ")"; > > > > > > 具体见附件 有打印 > > > > > > > > > > > > |
In reply to this post by air23
Hi,
附近应该是收不到的,包括图片啥的 只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去 在 2020-07-27 19:21:51,"air23" <[hidden email]> 写道: 我再上传一次 在2020年07月27日 18:55,Jark Wu 写道: Hi, 你的附件好像没有上传。 On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > " `data` VARCHAR , " + > " `table` VARCHAR " + > ") WITH (" + > " 'connector' = 'kafka'," + > " 'topic' = 'order_source'," + > " 'properties.bootstrap.servers' = '***'," + > " 'properties.group.id' = 'real1'," + > " 'format' = 'json'," + > " 'scan.startup.mode' = 'earliest-offset'" + > ")"; > > > 具体见附件 有打印 > > > > > |
In reply to this post by Jark
你好 测试代码如下
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + " `data` VARCHAR , " + " `table` VARCHAR " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'earliest-offset'" + ")"; public static void main(String[] args) throws Exception { //bink table StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); tableResult.print(); Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); bsEnv.execute("aa"); } 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,orders 另外再问个问题。1.11版本 blink 不能datastream转table吗? 看到例子都是useOldPlanner 来转table的。 致谢 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >抱歉,还是没有看到附件。 >如果是文本的话,你可以直接贴到邮件里。 > >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: > >> 我再上传一次 >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >> >> Hi, >> 你的附件好像没有上传。 >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >> > >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> > " `data` VARCHAR , " + >> > " `table` VARCHAR " + >> > ") WITH (" + >> > " 'connector' = 'kafka'," + >> > " 'topic' = 'order_source'," + >> > " 'properties.bootstrap.servers' = '***'," + >> > " 'properties.group.id' = 'real1'," + >> > " 'format' = 'json'," + >> > " 'scan.startup.mode' = 'earliest-offset'" + >> > ")"; >> > >> > >> > 具体见附件 有打印 >> > >> > >> > >> > >> > >> >> |
Administrator
|
有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息? On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: > 你好 测试代码如下 > > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > " `data` VARCHAR , " + > " `table` VARCHAR " + > ") WITH (" + > " 'connector' = 'kafka'," + > " 'topic' = 'source_databases'," + > " 'properties.bootstrap.servers' = '***'," + > " 'properties.group.id' = 'real1'," + > " 'format' = 'json'," + > " 'scan.startup.mode' = 'earliest-offset'" + > ")"; > public static void main(String[] args) throws Exception { > > > //bink table > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > > TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); > > > tableResult.print(); > > Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); > > bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); > > bsEnv.execute("aa"); > > } > > > > > 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog > ,order_operation_time > ,inventory_batch_log > ,order_log > ,order_address_book > ,product_inventory > ,order_physical_relation > ,bil_business_attach > ,picking_detail > ,picking_detail > ,orders > > > > > 另外再问个问题。1.11版本 blink 不能datastream转table吗? > 看到例子都是useOldPlanner 来转table的。 > 致谢 > > > > > > > > > > > > > > > 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: > >抱歉,还是没有看到附件。 > >如果是文本的话,你可以直接贴到邮件里。 > > > >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: > > > >> 我再上传一次 > >> > >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: > >> > >> Hi, > >> 你的附件好像没有上传。 > >> > >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: > >> > >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > >> > > >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" > + > >> > " `data` VARCHAR , " + > >> > " `table` VARCHAR " + > >> > ") WITH (" + > >> > " 'connector' = 'kafka'," + > >> > " 'topic' = 'order_source'," + > >> > " 'properties.bootstrap.servers' = '***'," + > >> > " 'properties.group.id' = 'real1'," + > >> > " 'format' = 'json'," + > >> > " 'scan.startup.mode' = 'earliest-offset'" + > >> > ")"; > >> > > >> > > >> > 具体见附件 有打印 > >> > > >> > > >> > > >> > > >> > > >> > >> > |
格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
{ "data":[ { "op_id":"97037138", "order_id":"84172164" } ], "database":"order_11", "es":1595720375000, "id":17469027, "isDdl":false, "mysqlType":{ "op_id":"int(11)", "order_id":"int(11)" }, "old":null, "pkNames":[ "op_id" ], "sql":"", "sqlType":{ "op_id":4, "order_id":4 }, "table":"order_product", "ts":1595720375837, "type":"INSERT" } 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >有kafka 中json 数据的样例不? >有没有看过 TaskManager 中有没有异常 log 信息? > > > >On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: > >> 你好 测试代码如下 >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> " `data` VARCHAR , " + >> " `table` VARCHAR " + >> ") WITH (" + >> " 'connector' = 'kafka'," + >> " 'topic' = 'source_databases'," + >> " 'properties.bootstrap.servers' = '***'," + >> " 'properties.group.id' = 'real1'," + >> " 'format' = 'json'," + >> " 'scan.startup.mode' = 'earliest-offset'" + >> ")"; >> public static void main(String[] args) throws Exception { >> >> >> //bink table >> StreamExecutionEnvironment bsEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> tableResult.print(); >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> bsEnv.execute("aa"); >> >> } >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> ,order_operation_time >> ,inventory_batch_log >> ,order_log >> ,order_address_book >> ,product_inventory >> ,order_physical_relation >> ,bil_business_attach >> ,picking_detail >> ,picking_detail >> ,orders >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> 看到例子都是useOldPlanner 来转table的。 >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >> >抱歉,还是没有看到附件。 >> >如果是文本的话,你可以直接贴到邮件里。 >> > >> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >> > >> >> 我再上传一次 >> >> >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >> >> >> >> Hi, >> >> 你的附件好像没有上传。 >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >> >> > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >> + >> >> > " `data` VARCHAR , " + >> >> > " `table` VARCHAR " + >> >> > ") WITH (" + >> >> > " 'connector' = 'kafka'," + >> >> > " 'topic' = 'order_source'," + >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> > " 'properties.group.id' = 'real1'," + >> >> > " 'format' = 'json'," + >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> > ")"; >> >> > >> >> > >> >> > 具体见附件 有打印 >> >> > >> >> > >> >> > >> >> > >> >> > >> >> >> >> >> |
Administrator
|
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。 Best, Jark On Tue, 28 Jul 2020 at 15:36, air23 <[hidden email]> wrote: > 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > > { > "data":[ > { > "op_id":"97037138", > "order_id":"84172164" > } > ], > "database":"order_11", > "es":1595720375000, > "id":17469027, > "isDdl":false, > "mysqlType":{ > "op_id":"int(11)", > "order_id":"int(11)" > }, > "old":null, > "pkNames":[ > "op_id" > ], > "sql":"", > "sqlType":{ > "op_id":4, > "order_id":4 > }, > "table":"order_product", > "ts":1595720375837, > "type":"INSERT" > } > > > > > > > > > > > > > > > > > > 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: > >有kafka 中json 数据的样例不? > >有没有看过 TaskManager 中有没有异常 log 信息? > > > > > > > >On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: > > > >> 你好 测试代码如下 > >> > >> > >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > >> " `data` VARCHAR , " + > >> " `table` VARCHAR " + > >> ") WITH (" + > >> " 'connector' = 'kafka'," + > >> " 'topic' = 'source_databases'," + > >> " 'properties.bootstrap.servers' = '***'," + > >> " 'properties.group.id' = 'real1'," + > >> " 'format' = 'json'," + > >> " 'scan.startup.mode' = 'earliest-offset'" + > >> ")"; > >> public static void main(String[] args) throws Exception { > >> > >> > >> //bink table > >> StreamExecutionEnvironment bsEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> EnvironmentSettings bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > >> StreamTableEnvironment bsTableEnv = > >> StreamTableEnvironment.create(bsEnv, bsSettings); > >> > >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); > >> > >> > >> tableResult.print(); > >> > >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); > >> > >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); > >> > >> bsEnv.execute("aa"); > >> > >> } > >> > >> > >> > >> > >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog > >> ,order_operation_time > >> ,inventory_batch_log > >> ,order_log > >> ,order_address_book > >> ,product_inventory > >> ,order_physical_relation > >> ,bil_business_attach > >> ,picking_detail > >> ,picking_detail > >> ,orders > >> > >> > >> > >> > >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? > >> 看到例子都是useOldPlanner 来转table的。 > >> 致谢 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: > >> >抱歉,还是没有看到附件。 > >> >如果是文本的话,你可以直接贴到邮件里。 > >> > > >> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: > >> > > >> >> 我再上传一次 > >> >> > >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: > >> >> > >> >> Hi, > >> >> 你的附件好像没有上传。 > >> >> > >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: > >> >> > >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table > 不能取到data呢?* > >> >> > > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable > (\n" > >> + > >> >> > " `data` VARCHAR , " + > >> >> > " `table` VARCHAR " + > >> >> > ") WITH (" + > >> >> > " 'connector' = 'kafka'," + > >> >> > " 'topic' = 'order_source'," + > >> >> > " 'properties.bootstrap.servers' = '***'," + > >> >> > " 'properties.group.id' = 'real1'," + > >> >> > " 'format' = 'json'," + > >> >> > " 'scan.startup.mode' = 'earliest-offset'" + > >> >> > ")"; > >> >> > > >> >> > > >> >> > 具体见附件 有打印 > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > >> >> > >> > |
In reply to this post by air23
你的flink什么版本
发送自 Windows 10 版邮件应用 发件人: air23 发送时间: 2020年7月28日 15:36 收件人: [hidden email] 主题: Re:Re: Re: 解析kafka的mysql binlog问题 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 { "data":[ { "op_id":"97037138", "order_id":"84172164" } ], "database":"order_11", "es":1595720375000, "id":17469027, "isDdl":false, "mysqlType":{ "op_id":"int(11)", "order_id":"int(11)" }, "old":null, "pkNames":[ "op_id" ], "sql":"", "sqlType":{ "op_id":4, "order_id":4 }, "table":"order_product", "ts":1595720375837, "type":"INSERT" } 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >有kafka 中json 数据的样例不? >有没有看过 TaskManager 中有没有异常 log 信息? > > > >On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: > >> 你好 测试代码如下 >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> " `data` VARCHAR , " + >> " `table` VARCHAR " + >> ") WITH (" + >> " 'connector' = 'kafka'," + >> " 'topic' = 'source_databases'," + >> " 'properties.bootstrap.servers' = '***'," + >> " 'properties.group.id' = 'real1'," + >> " 'format' = 'json'," + >> " 'scan.startup.mode' = 'earliest-offset'" + >> ")"; >> public static void main(String[] args) throws Exception { >> >> >> //bink table >> StreamExecutionEnvironment bsEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> tableResult.print(); >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> bsEnv.execute("aa"); >> >> } >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> ,order_operation_time >> ,inventory_batch_log >> ,order_log >> ,order_address_book >> ,product_inventory >> ,order_physical_relation >> ,bil_business_attach >> ,picking_detail >> ,picking_detail >> ,orders >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> 看到例子都是useOldPlanner 来转table的。 >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >> >抱歉,还是没有看到附件。 >> >如果是文本的话,你可以直接贴到邮件里。 >> > >> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >> > >> >> 我再上传一次 >> >> >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >> >> >> >> Hi, >> >> 你的附件好像没有上传。 >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >> >> > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >> + >> >> > " `data` VARCHAR , " + >> >> > " `table` VARCHAR " + >> >> > ") WITH (" + >> >> > " 'connector' = 'kafka'," + >> >> > " 'topic' = 'order_source'," + >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> > " 'properties.group.id' = 'real1'," + >> >> > " 'format' = 'json'," + >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> > ")"; >> >> > >> >> > >> >> > 具体见附件 有打印 >> >> > >> >> > >> >> > >> >> > >> >> > >> >> >> >> >> |
In reply to this post by air23
data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>>
> 2020年7月28日 下午3:35,air23 <[hidden email]> 写道: > > 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > > { > "data":[ > { > "op_id":"97037138", > "order_id":"84172164" > } > ], > "database":"order_11", > "es":1595720375000, > "id":17469027, > "isDdl":false, > "mysqlType":{ > "op_id":"int(11)", > "order_id":"int(11)" > }, > "old":null, > "pkNames":[ > "op_id" > ], > "sql":"", > "sqlType":{ > "op_id":4, > "order_id":4 > }, > "table":"order_product", > "ts":1595720375837, > "type":"INSERT" > } > > > > > > > > > > > > > > > > > > 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >> 有kafka 中json 数据的样例不? >> 有没有看过 TaskManager 中有没有异常 log 信息? >> >> >> >> On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: >> >>> 你好 测试代码如下 >>> >>> >>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>> " `data` VARCHAR , " + >>> " `table` VARCHAR " + >>> ") WITH (" + >>> " 'connector' = 'kafka'," + >>> " 'topic' = 'source_databases'," + >>> " 'properties.bootstrap.servers' = '***'," + >>> " 'properties.group.id' = 'real1'," + >>> " 'format' = 'json'," + >>> " 'scan.startup.mode' = 'earliest-offset'" + >>> ")"; >>> public static void main(String[] args) throws Exception { >>> >>> >>> //bink table >>> StreamExecutionEnvironment bsEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings bsSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> StreamTableEnvironment bsTableEnv = >>> StreamTableEnvironment.create(bsEnv, bsSettings); >>> >>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>> >>> >>> tableResult.print(); >>> >>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>> >>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>> >>> bsEnv.execute("aa"); >>> >>> } >>> >>> >>> >>> >>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>> ,order_operation_time >>> ,inventory_batch_log >>> ,order_log >>> ,order_address_book >>> ,product_inventory >>> ,order_physical_relation >>> ,bil_business_attach >>> ,picking_detail >>> ,picking_detail >>> ,orders >>> >>> >>> >>> >>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>> 看到例子都是useOldPlanner 来转table的。 >>> 致谢 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >>>> 抱歉,还是没有看到附件。 >>>> 如果是文本的话,你可以直接贴到邮件里。 >>>> >>>> On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >>>> >>>>> 我再上传一次 >>>>> >>>>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >>>>> >>>>> Hi, >>>>> 你的附件好像没有上传。 >>>>> >>>>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >>>>> >>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >>>>>> >>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >>> + >>>>>> " `data` VARCHAR , " + >>>>>> " `table` VARCHAR " + >>>>>> ") WITH (" + >>>>>> " 'connector' = 'kafka'," + >>>>>> " 'topic' = 'order_source'," + >>>>>> " 'properties.bootstrap.servers' = '***'," + >>>>>> " 'properties.group.id' = 'real1'," + >>>>>> " 'format' = 'json'," + >>>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>>> ")"; >>>>>> >>>>>> >>>>>> 具体见附件 有打印 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>> |
你好。
我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表 所以ARRAY<ROW< op_id STRING, order_id STRING>> 这种 里面 不是固定的 所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 转换为table。或者是我没有发现这个例子 谢谢 在 2020-07-28 16:05:55,"admin" <[hidden email]> 写道: >data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>> > >> 2020年7月28日 下午3:35,air23 <[hidden email]> 写道: >> >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >> "data":[ >> { >> "op_id":"97037138", >> "order_id":"84172164" >> } >> ], >> "database":"order_11", >> "es":1595720375000, >> "id":17469027, >> "isDdl":false, >> "mysqlType":{ >> "op_id":"int(11)", >> "order_id":"int(11)" >> }, >> "old":null, >> "pkNames":[ >> "op_id" >> ], >> "sql":"", >> "sqlType":{ >> "op_id":4, >> "order_id":4 >> }, >> "table":"order_product", >> "ts":1595720375837, >> "type":"INSERT" >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >>> 有kafka 中json 数据的样例不? >>> 有没有看过 TaskManager 中有没有异常 log 信息? >>> >>> >>> >>> On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: >>> >>>> 你好 测试代码如下 >>>> >>>> >>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>>> " `data` VARCHAR , " + >>>> " `table` VARCHAR " + >>>> ") WITH (" + >>>> " 'connector' = 'kafka'," + >>>> " 'topic' = 'source_databases'," + >>>> " 'properties.bootstrap.servers' = '***'," + >>>> " 'properties.group.id' = 'real1'," + >>>> " 'format' = 'json'," + >>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>> ")"; >>>> public static void main(String[] args) throws Exception { >>>> >>>> >>>> //bink table >>>> StreamExecutionEnvironment bsEnv = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> EnvironmentSettings bsSettings = >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>> StreamTableEnvironment bsTableEnv = >>>> StreamTableEnvironment.create(bsEnv, bsSettings); >>>> >>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>>> >>>> >>>> tableResult.print(); >>>> >>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>>> >>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>>> >>>> bsEnv.execute("aa"); >>>> >>>> } >>>> >>>> >>>> >>>> >>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>>> ,order_operation_time >>>> ,inventory_batch_log >>>> ,order_log >>>> ,order_address_book >>>> ,product_inventory >>>> ,order_physical_relation >>>> ,bil_business_attach >>>> ,picking_detail >>>> ,picking_detail >>>> ,orders >>>> >>>> >>>> >>>> >>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>>> 看到例子都是useOldPlanner 来转table的。 >>>> 致谢 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >>>>> 抱歉,还是没有看到附件。 >>>>> 如果是文本的话,你可以直接贴到邮件里。 >>>>> >>>>> On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >>>>> >>>>>> 我再上传一次 >>>>>> >>>>>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >>>>>> >>>>>> Hi, >>>>>> 你的附件好像没有上传。 >>>>>> >>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >>>>>> >>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >>>>>>> >>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >>>> + >>>>>>> " `data` VARCHAR , " + >>>>>>> " `table` VARCHAR " + >>>>>>> ") WITH (" + >>>>>>> " 'connector' = 'kafka'," + >>>>>>> " 'topic' = 'order_source'," + >>>>>>> " 'properties.bootstrap.servers' = '***'," + >>>>>>> " 'properties.group.id' = 'real1'," + >>>>>>> " 'format' = 'json'," + >>>>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>>>> ")"; >>>>>>> >>>>>>> >>>>>>> 具体见附件 有打印 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>> |
In reply to this post by smq
你好 使用的是<flink.version>1.11.1</flink.version>版本的
在 2020-07-28 16:02:30,"明启 孙" <[hidden email]> 写道: >你的flink什么版本 > >发送自 Windows 10 版邮件应用 > >发件人: air23 >发送时间: 2020年7月28日 15:36 >收件人: [hidden email] >主题: Re:Re: Re: 解析kafka的mysql binlog问题 > >格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > >{ > "data":[ > { > "op_id":"97037138", > "order_id":"84172164" > } > ], > "database":"order_11", > "es":1595720375000, > "id":17469027, > "isDdl":false, > "mysqlType":{ > "op_id":"int(11)", > "order_id":"int(11)" > }, > "old":null, > "pkNames":[ > "op_id" > ], > "sql":"", > "sqlType":{ > "op_id":4, > "order_id":4 > }, > "table":"order_product", > "ts":1595720375837, > "type":"INSERT" >} > > > > > > > > > > > > > > > > > >在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >>有kafka 中json 数据的样例不? >>有没有看过 TaskManager 中有没有异常 log 信息? >> >> >> >>On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: >> >>> 你好 测试代码如下 >>> >>> >>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>> " `data` VARCHAR , " + >>> " `table` VARCHAR " + >>> ") WITH (" + >>> " 'connector' = 'kafka'," + >>> " 'topic' = 'source_databases'," + >>> " 'properties.bootstrap.servers' = '***'," + >>> " 'properties.group.id' = 'real1'," + >>> " 'format' = 'json'," + >>> " 'scan.startup.mode' = 'earliest-offset'" + >>> ")"; >>> public static void main(String[] args) throws Exception { >>> >>> >>> //bink table >>> StreamExecutionEnvironment bsEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings bsSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> StreamTableEnvironment bsTableEnv = >>> StreamTableEnvironment.create(bsEnv, bsSettings); >>> >>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>> >>> >>> tableResult.print(); >>> >>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>> >>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>> >>> bsEnv.execute("aa"); >>> >>> } >>> >>> >>> >>> >>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>> ,order_operation_time >>> ,inventory_batch_log >>> ,order_log >>> ,order_address_book >>> ,product_inventory >>> ,order_physical_relation >>> ,bil_business_attach >>> ,picking_detail >>> ,picking_detail >>> ,orders >>> >>> >>> >>> >>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>> 看到例子都是useOldPlanner 来转table的。 >>> 致谢 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >>> >抱歉,还是没有看到附件。 >>> >如果是文本的话,你可以直接贴到邮件里。 >>> > >>> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >>> > >>> >> 我再上传一次 >>> >> >>> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >>> >> >>> >> Hi, >>> >> 你的附件好像没有上传。 >>> >> >>> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >>> >> >>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >>> >> > >>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >>> + >>> >> > " `data` VARCHAR , " + >>> >> > " `table` VARCHAR " + >>> >> > ") WITH (" + >>> >> > " 'connector' = 'kafka'," + >>> >> > " 'topic' = 'order_source'," + >>> >> > " 'properties.bootstrap.servers' = '***'," + >>> >> > " 'properties.group.id' = 'real1'," + >>> >> > " 'format' = 'json'," + >>> >> > " 'scan.startup.mode' = 'earliest-offset'" + >>> >> > ")"; >>> >> > >>> >> > >>> >> > 具体见附件 有打印 >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> >>> >> >>> > |
In reply to this post by Jark
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的, 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。 在 2020-07-28 16:02:18,"Jark Wu" <[hidden email]> 写道: >因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 >1.12 中已经支持读取复杂结构为 string 类型了。 > >Best, >Jark > >On Tue, 28 Jul 2020 at 15:36, air23 <[hidden email]> wrote: > >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >> "data":[ >> { >> "op_id":"97037138", >> "order_id":"84172164" >> } >> ], >> "database":"order_11", >> "es":1595720375000, >> "id":17469027, >> "isDdl":false, >> "mysqlType":{ >> "op_id":"int(11)", >> "order_id":"int(11)" >> }, >> "old":null, >> "pkNames":[ >> "op_id" >> ], >> "sql":"", >> "sqlType":{ >> "op_id":4, >> "order_id":4 >> }, >> "table":"order_product", >> "ts":1595720375837, >> "type":"INSERT" >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >> >有kafka 中json 数据的样例不? >> >有没有看过 TaskManager 中有没有异常 log 信息? >> > >> > >> > >> >On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: >> > >> >> 你好 测试代码如下 >> >> >> >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> >> " `data` VARCHAR , " + >> >> " `table` VARCHAR " + >> >> ") WITH (" + >> >> " 'connector' = 'kafka'," + >> >> " 'topic' = 'source_databases'," + >> >> " 'properties.bootstrap.servers' = '***'," + >> >> " 'properties.group.id' = 'real1'," + >> >> " 'format' = 'json'," + >> >> " 'scan.startup.mode' = 'earliest-offset'" + >> >> ")"; >> >> public static void main(String[] args) throws Exception { >> >> >> >> >> >> //bink table >> >> StreamExecutionEnvironment bsEnv = >> >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> EnvironmentSettings bsSettings = >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> StreamTableEnvironment bsTableEnv = >> >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> >> >> >> tableResult.print(); >> >> >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> >> >> bsEnv.execute("aa"); >> >> >> >> } >> >> >> >> >> >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> >> ,order_operation_time >> >> ,inventory_batch_log >> >> ,order_log >> >> ,order_address_book >> >> ,product_inventory >> >> ,order_physical_relation >> >> ,bil_business_attach >> >> ,picking_detail >> >> ,picking_detail >> >> ,orders >> >> >> >> >> >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> >> 看到例子都是useOldPlanner 来转table的。 >> >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >> >> >抱歉,还是没有看到附件。 >> >> >如果是文本的话,你可以直接贴到邮件里。 >> >> > >> >> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >> >> > >> >> >> 我再上传一次 >> >> >> >> >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >> >> >> >> >> >> Hi, >> >> >> 你的附件好像没有上传。 >> >> >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >> >> >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table >> 不能取到data呢?* >> >> >> > >> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable >> (\n" >> >> + >> >> >> > " `data` VARCHAR , " + >> >> >> > " `table` VARCHAR " + >> >> >> > ") WITH (" + >> >> >> > " 'connector' = 'kafka'," + >> >> >> > " 'topic' = 'order_source'," + >> >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> >> > " 'properties.group.id' = 'real1'," + >> >> >> > " 'format' = 'json'," + >> >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> >> > ")"; >> >> >> > >> >> >> > >> >> >> > 具体见附件 有打印 >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> >> |
直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]
[1]https://issues.apache.org/jira/browse/FLINK-18002 <https://issues.apache.org/jira/browse/FLINK-18002> > 2020年7月28日 下午5:20,air23 <[hidden email]> 写道: > > 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致 > 另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的, > 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。 > > > > > > > > > > > > > > > > > > 在 2020-07-28 16:02:18,"Jark Wu" <[hidden email]> 写道: >> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 >> 1.12 中已经支持读取复杂结构为 string 类型了。 >> >> Best, >> Jark >> >> On Tue, 28 Jul 2020 at 15:36, air23 <[hidden email]> wrote: >> >>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >>> >>> >>> { >>> "data":[ >>> { >>> "op_id":"97037138", >>> "order_id":"84172164" >>> } >>> ], >>> "database":"order_11", >>> "es":1595720375000, >>> "id":17469027, >>> "isDdl":false, >>> "mysqlType":{ >>> "op_id":"int(11)", >>> "order_id":"int(11)" >>> }, >>> "old":null, >>> "pkNames":[ >>> "op_id" >>> ], >>> "sql":"", >>> "sqlType":{ >>> "op_id":4, >>> "order_id":4 >>> }, >>> "table":"order_product", >>> "ts":1595720375837, >>> "type":"INSERT" >>> } >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道: >>>> 有kafka 中json 数据的样例不? >>>> 有没有看过 TaskManager 中有没有异常 log 信息? >>>> >>>> >>>> >>>> On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote: >>>> >>>>> 你好 测试代码如下 >>>>> >>>>> >>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>>>> " `data` VARCHAR , " + >>>>> " `table` VARCHAR " + >>>>> ") WITH (" + >>>>> " 'connector' = 'kafka'," + >>>>> " 'topic' = 'source_databases'," + >>>>> " 'properties.bootstrap.servers' = '***'," + >>>>> " 'properties.group.id' = 'real1'," + >>>>> " 'format' = 'json'," + >>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>> ")"; >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> >>>>> //bink table >>>>> StreamExecutionEnvironment bsEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> EnvironmentSettings bsSettings = >>>>> >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>> StreamTableEnvironment bsTableEnv = >>>>> StreamTableEnvironment.create(bsEnv, bsSettings); >>>>> >>>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>>>> >>>>> >>>>> tableResult.print(); >>>>> >>>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>>>> >>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>>>> >>>>> bsEnv.execute("aa"); >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>>>> ,order_operation_time >>>>> ,inventory_batch_log >>>>> ,order_log >>>>> ,order_address_book >>>>> ,product_inventory >>>>> ,order_physical_relation >>>>> ,bil_business_attach >>>>> ,picking_detail >>>>> ,picking_detail >>>>> ,orders >>>>> >>>>> >>>>> >>>>> >>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>>>> 看到例子都是useOldPlanner 来转table的。 >>>>> 致谢 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道: >>>>>> 抱歉,还是没有看到附件。 >>>>>> 如果是文本的话,你可以直接贴到邮件里。 >>>>>> >>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote: >>>>>> >>>>>>> 我再上传一次 >>>>>>> >>>>>>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道: >>>>>>> >>>>>>> Hi, >>>>>>> 你的附件好像没有上传。 >>>>>>> >>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote: >>>>>>> >>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table >>> 不能取到data呢?* >>>>>>>> >>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable >>> (\n" >>>>> + >>>>>>>> " `data` VARCHAR , " + >>>>>>>> " `table` VARCHAR " + >>>>>>>> ") WITH (" + >>>>>>>> " 'connector' = 'kafka'," + >>>>>>>> " 'topic' = 'order_source'," + >>>>>>>> " 'properties.bootstrap.servers' = '***'," + >>>>>>>> " 'properties.group.id' = 'real1'," + >>>>>>>> " 'format' = 'json'," + >>>>>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>>>>> ")"; >>>>>>>> >>>>>>>> >>>>>>>> 具体见附件 有打印 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>> >>> |
Free forum by Nabble | Edit this page |