解析kafka的mysql binlog问题

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

解析kafka的mysql binlog问题

air23
你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?

private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";


具体见附件 有打印


 

Reply | Threaded
Open this post in threaded view
|

Re: 解析kafka的mysql binlog问题

Jark
Administrator
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>         " `data` VARCHAR , " +
>         " `table` VARCHAR " +
>         ") WITH (" +
>         " 'connector' = 'kafka'," +
>         " 'topic' = 'order_source'," +
>         " 'properties.bootstrap.servers' = '***'," +
>         " 'properties.group.id' = 'real1'," +
>         " 'format' = 'json'," +
>         " 'scan.startup.mode' = 'earliest-offset'" +
>         ")";
>
>
> 具体见附件 有打印
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复:解析kafka的mysql binlog问题

air23
我再上传一次 

2020年07月27日 18:55[hidden email] 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>         " `data` VARCHAR , " +
>         " `table` VARCHAR " +
>         ") WITH (" +
>         " 'connector' = 'kafka'," +
>         " 'topic' = 'order_source'," +
>         " 'properties.bootstrap.servers' = '***'," +
>         " 'properties.group.id' = 'real1'," +
>         " 'format' = 'json'," +
>         " 'scan.startup.mode' = 'earliest-offset'" +
>         ")";
>
>
> 具体见附件 有打印
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 解析kafka的mysql binlog问题

Jark
Administrator
抱歉,还是没有看到附件。
如果是文本的话,你可以直接贴到邮件里。

On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:

> 我再上传一次
>
> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>
> Hi,
> 你的附件好像没有上传。
>
> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>
> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >
> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >         " `data` VARCHAR , " +
> >         " `table` VARCHAR " +
> >         ") WITH (" +
> >         " 'connector' = 'kafka'," +
> >         " 'topic' = 'order_source'," +
> >         " 'properties.bootstrap.servers' = '***'," +
> >         " 'properties.group.id' = 'real1'," +
> >         " 'format' = 'json'," +
> >         " 'scan.startup.mode' = 'earliest-offset'" +
> >         ")";
> >
> >
> > 具体见附件 有打印
> >
> >
> >
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re:回复:解析kafka的mysql binlog问题

hechuan
In reply to this post by air23
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去





在 2020-07-27 19:21:51,"air23" <[hidden email]> 写道:

我再上传一次


在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>         " `data` VARCHAR , " +
>         " `table` VARCHAR " +
>         ") WITH (" +
>         " 'connector' = 'kafka'," +
>         " 'topic' = 'order_source'," +
>         " 'properties.bootstrap.servers' = '***'," +
>         " 'properties.group.id' = 'real1'," +
>         " 'format' = 'json'," +
>         " 'scan.startup.mode' = 'earliest-offset'" +
>         ")";
>
>
> 具体见附件 有打印
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: 解析kafka的mysql binlog问题

air23
In reply to this post by Jark
你好 测试代码如下


private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'source_databases'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
public static void main(String[] args) throws Exception {


//bink table
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);


tableResult.print();

    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}




输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
,order_operation_time
,inventory_batch_log
,order_log
,order_address_book
,product_inventory
,order_physical_relation
,bil_business_attach
,picking_detail
,picking_detail
,orders




另外再问个问题。1.11版本 blink 不能datastream转table吗?
看到例子都是useOldPlanner 来转table的。
致谢














在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:

>抱歉,还是没有看到附件。
>如果是文本的话,你可以直接贴到邮件里。
>
>On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>
>> 我再上传一次
>>
>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>>
>> Hi,
>> 你的附件好像没有上传。
>>
>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>>
>> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >
>> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >         " `data` VARCHAR , " +
>> >         " `table` VARCHAR " +
>> >         ") WITH (" +
>> >         " 'connector' = 'kafka'," +
>> >         " 'topic' = 'order_source'," +
>> >         " 'properties.bootstrap.servers' = '***'," +
>> >         " 'properties.group.id' = 'real1'," +
>> >         " 'format' = 'json'," +
>> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >         ")";
>> >
>> >
>> > 具体见附件 有打印
>> >
>> >
>> >
>> >
>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 解析kafka的mysql binlog问题

Jark
Administrator
有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息?



On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:

> 你好 测试代码如下
>
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
>
>
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>
>
> tableResult.print();
>
>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>
> bsEnv.execute("aa");
>
> }
>
>
>
>
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
>
>
>
>
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
> >抱歉,还是没有看到附件。
> >如果是文本的话,你可以直接贴到邮件里。
> >
> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
> >
> >> 我再上传一次
> >>
> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
> >>
> >> Hi,
> >> 你的附件好像没有上传。
> >>
> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
> >>
> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >> >
> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
> +
> >> >         " `data` VARCHAR , " +
> >> >         " `table` VARCHAR " +
> >> >         ") WITH (" +
> >> >         " 'connector' = 'kafka'," +
> >> >         " 'topic' = 'order_source'," +
> >> >         " 'properties.bootstrap.servers' = '***'," +
> >> >         " 'properties.group.id' = 'real1'," +
> >> >         " 'format' = 'json'," +
> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
> >> >         ")";
> >> >
> >> >
> >> > 具体见附件 有打印
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: 解析kafka的mysql binlog问题

air23
格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
    "data":[
        {
            "op_id":"97037138",
            "order_id":"84172164"
        }
    ],
    "database":"order_11",
    "es":1595720375000,
    "id":17469027,
    "isDdl":false,
    "mysqlType":{
        "op_id":"int(11)",
        "order_id":"int(11)"
    },
    "old":null,
    "pkNames":[
        "op_id"
    ],
    "sql":"",
    "sqlType":{
        "op_id":4,
        "order_id":4
    },
    "table":"order_product",
    "ts":1595720375837,
    "type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:

>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> >         " `data` VARCHAR , " +
>> >> >         " `table` VARCHAR " +
>> >> >         ") WITH (" +
>> >> >         " 'connector' = 'kafka'," +
>> >> >         " 'topic' = 'order_source'," +
>> >> >         " 'properties.bootstrap.servers' = '***'," +
>> >> >         " 'properties.group.id' = 'real1'," +
>> >> >         " 'format' = 'json'," +
>> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >         ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: 解析kafka的mysql binlog问题

Jark
Administrator
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。

Best,
Jark

On Tue, 28 Jul 2020 at 15:36, air23 <[hidden email]> wrote:

> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
>     "data":[
>         {
>             "op_id":"97037138",
>             "order_id":"84172164"
>         }
>     ],
>     "database":"order_11",
>     "es":1595720375000,
>     "id":17469027,
>     "isDdl":false,
>     "mysqlType":{
>         "op_id":"int(11)",
>         "order_id":"int(11)"
>     },
>     "old":null,
>     "pkNames":[
>         "op_id"
>     ],
>     "sql":"",
>     "sqlType":{
>         "op_id":4,
>         "order_id":4
>     },
>     "table":"order_product",
>     "ts":1595720375837,
>     "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:
> >有kafka 中json 数据的样例不?
> >有没有看过 TaskManager 中有没有异常 log 信息?
> >
> >
> >
> >On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
> >
> >> 你好 测试代码如下
> >>
> >>
> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >> " `data` VARCHAR , " +
> >> " `table` VARCHAR " +
> >> ") WITH (" +
> >> " 'connector' = 'kafka'," +
> >> " 'topic' = 'source_databases'," +
> >> " 'properties.bootstrap.servers' = '***'," +
> >> " 'properties.group.id' = 'real1'," +
> >> " 'format' = 'json'," +
> >> " 'scan.startup.mode' = 'earliest-offset'" +
> >> ")";
> >> public static void main(String[] args) throws Exception {
> >>
> >>
> >> //bink table
> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>     EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >>     StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>
> >>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> >>
> >>
> >> tableResult.print();
> >>
> >>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> >>
> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> >>
> >> bsEnv.execute("aa");
> >>
> >> }
> >>
> >>
> >>
> >>
> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> >> ,order_operation_time
> >> ,inventory_batch_log
> >> ,order_log
> >> ,order_address_book
> >> ,product_inventory
> >> ,order_physical_relation
> >> ,bil_business_attach
> >> ,picking_detail
> >> ,picking_detail
> >> ,orders
> >>
> >>
> >>
> >>
> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> >> 看到例子都是useOldPlanner 来转table的。
> >> 致谢
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
> >> >抱歉,还是没有看到附件。
> >> >如果是文本的话,你可以直接贴到邮件里。
> >> >
> >> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
> >> >
> >> >> 我再上传一次
> >> >>
> >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
> >> >>
> >> >> Hi,
> >> >> 你的附件好像没有上传。
> >> >>
> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
> >> >>
> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
> 不能取到data呢?*
> >> >> >
> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
> (\n"
> >> +
> >> >> >         " `data` VARCHAR , " +
> >> >> >         " `table` VARCHAR " +
> >> >> >         ") WITH (" +
> >> >> >         " 'connector' = 'kafka'," +
> >> >> >         " 'topic' = 'order_source'," +
> >> >> >         " 'properties.bootstrap.servers' = '***'," +
> >> >> >         " 'properties.group.id' = 'real1'," +
> >> >> >         " 'format' = 'json'," +
> >> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
> >> >> >         ")";
> >> >> >
> >> >> >
> >> >> > 具体见附件 有打印
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
>
smq
Reply | Threaded
Open this post in threaded view
|

回复: Re: Re: 解析kafka的mysql binlog问题

smq
In reply to this post by air23
你的flink什么版本

发送自 Windows 10 版邮件应用

发件人: air23
发送时间: 2020年7月28日 15:36
收件人: [hidden email]
主题: Re:Re: Re: 解析kafka的mysql binlog问题

格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
    "data":[
        {
            "op_id":"97037138",
            "order_id":"84172164"
        }
    ],
    "database":"order_11",
    "es":1595720375000,
    "id":17469027,
    "isDdl":false,
    "mysqlType":{
        "op_id":"int(11)",
        "order_id":"int(11)"
    },
    "old":null,
    "pkNames":[
        "op_id"
    ],
    "sql":"",
    "sqlType":{
        "op_id":4,
        "order_id":4
    },
    "table":"order_product",
    "ts":1595720375837,
    "type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:

>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> >         " `data` VARCHAR , " +
>> >> >         " `table` VARCHAR " +
>> >> >         ") WITH (" +
>> >> >         " 'connector' = 'kafka'," +
>> >> >         " 'topic' = 'order_source'," +
>> >> >         " 'properties.bootstrap.servers' = '***'," +
>> >> >         " 'properties.group.id' = 'real1'," +
>> >> >         " 'format' = 'json'," +
>> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >         ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>

Reply | Threaded
Open this post in threaded view
|

Re: 解析kafka的mysql binlog问题

admin
In reply to this post by air23
data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>>

> 2020年7月28日 下午3:35,air23 <[hidden email]> 写道:
>
> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
>    "data":[
>        {
>            "op_id":"97037138",
>            "order_id":"84172164"
>        }
>    ],
>    "database":"order_11",
>    "es":1595720375000,
>    "id":17469027,
>    "isDdl":false,
>    "mysqlType":{
>        "op_id":"int(11)",
>        "order_id":"int(11)"
>    },
>    "old":null,
>    "pkNames":[
>        "op_id"
>    ],
>    "sql":"",
>    "sqlType":{
>        "op_id":4,
>        "order_id":4
>    },
>    "table":"order_product",
>    "ts":1595720375837,
>    "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:
>> 有kafka 中json 数据的样例不?
>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>> On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>    EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>    StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>>    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>>    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>>>> 抱歉,还是没有看到附件。
>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>
>>>> On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>>>>
>>>>> 我再上传一次
>>>>>
>>>>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>>>>>
>>>>> Hi,
>>>>> 你的附件好像没有上传。
>>>>>
>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>>>>>
>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>>>>>
>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>>>>>        " `data` VARCHAR , " +
>>>>>>        " `table` VARCHAR " +
>>>>>>        ") WITH (" +
>>>>>>        " 'connector' = 'kafka'," +
>>>>>>        " 'topic' = 'order_source'," +
>>>>>>        " 'properties.bootstrap.servers' = '***'," +
>>>>>>        " 'properties.group.id' = 'real1'," +
>>>>>>        " 'format' = 'json'," +
>>>>>>        " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>        ")";
>>>>>>
>>>>>>
>>>>>> 具体见附件 有打印
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>

Reply | Threaded
Open this post in threaded view
|

Re:Re: 解析kafka的mysql binlog问题

air23
你好。
我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表   所以ARRAY<ROW< op_id STRING, order_id STRING>>
这种 里面 不是固定的
 所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 转换为table。或者是我没有发现这个例子
谢谢

















在 2020-07-28 16:05:55,"admin" <[hidden email]> 写道:

>data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>>
>
>> 2020年7月28日 下午3:35,air23 <[hidden email]> 写道:
>>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>>    "data":[
>>        {
>>            "op_id":"97037138",
>>            "order_id":"84172164"
>>        }
>>    ],
>>    "database":"order_11",
>>    "es":1595720375000,
>>    "id":17469027,
>>    "isDdl":false,
>>    "mysqlType":{
>>        "op_id":"int(11)",
>>        "order_id":"int(11)"
>>    },
>>    "old":null,
>>    "pkNames":[
>>        "op_id"
>>    ],
>>    "sql":"",
>>    "sqlType":{
>>        "op_id":4,
>>        "order_id":4
>>    },
>>    "table":"order_product",
>>    "ts":1595720375837,
>>    "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:
>>> 有kafka 中json 数据的样例不?
>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>>
>>>
>>>
>>> On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>>>
>>>> 你好 测试代码如下
>>>>
>>>>
>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>>> " `data` VARCHAR , " +
>>>> " `table` VARCHAR " +
>>>> ") WITH (" +
>>>> " 'connector' = 'kafka'," +
>>>> " 'topic' = 'source_databases'," +
>>>> " 'properties.bootstrap.servers' = '***'," +
>>>> " 'properties.group.id' = 'real1'," +
>>>> " 'format' = 'json'," +
>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>> ")";
>>>> public static void main(String[] args) throws Exception {
>>>>
>>>>
>>>> //bink table
>>>> StreamExecutionEnvironment bsEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>    EnvironmentSettings bsSettings =
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>    StreamTableEnvironment bsTableEnv =
>>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>>
>>>>    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>>
>>>>
>>>> tableResult.print();
>>>>
>>>>    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>>
>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>>
>>>> bsEnv.execute("aa");
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>>> ,order_operation_time
>>>> ,inventory_batch_log
>>>> ,order_log
>>>> ,order_address_book
>>>> ,product_inventory
>>>> ,order_physical_relation
>>>> ,bil_business_attach
>>>> ,picking_detail
>>>> ,picking_detail
>>>> ,orders
>>>>
>>>>
>>>>
>>>>
>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>>> 看到例子都是useOldPlanner 来转table的。
>>>> 致谢
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>>>>> 抱歉,还是没有看到附件。
>>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>>
>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>>>>>
>>>>>> 我再上传一次
>>>>>>
>>>>>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>>>>>>
>>>>>> Hi,
>>>>>> 你的附件好像没有上传。
>>>>>>
>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>>>>>>
>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>>>>>>
>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>>> +
>>>>>>>        " `data` VARCHAR , " +
>>>>>>>        " `table` VARCHAR " +
>>>>>>>        ") WITH (" +
>>>>>>>        " 'connector' = 'kafka'," +
>>>>>>>        " 'topic' = 'order_source'," +
>>>>>>>        " 'properties.bootstrap.servers' = '***'," +
>>>>>>>        " 'properties.group.id' = 'real1'," +
>>>>>>>        " 'format' = 'json'," +
>>>>>>>        " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>>        ")";
>>>>>>>
>>>>>>>
>>>>>>> 具体见附件 有打印
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re:回复: Re: Re: 解析kafka的mysql binlog问题

air23
In reply to this post by smq
你好 使用的是<flink.version>1.11.1</flink.version>版本的



















在 2020-07-28 16:02:30,"明启 孙" <[hidden email]> 写道:

>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: [hidden email]
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
>{
>    "data":[
>        {
>            "op_id":"97037138",
>            "order_id":"84172164"
>        }
>    ],
>    "database":"order_11",
>    "es":1595720375000,
>    "id":17469027,
>    "isDdl":false,
>    "mysqlType":{
>        "op_id":"int(11)",
>        "order_id":"int(11)"
>    },
>    "old":null,
>    "pkNames":[
>        "op_id"
>    ],
>    "sql":"",
>    "sqlType":{
>        "op_id":4,
>        "order_id":4
>    },
>    "table":"order_product",
>    "ts":1595720375837,
>    "type":"INSERT"
>}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:
>>有kafka 中json 数据的样例不?
>>有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>>On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>     StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>>> >抱歉,还是没有看到附件。
>>> >如果是文本的话,你可以直接贴到邮件里。
>>> >
>>> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>>> >
>>> >> 我再上传一次
>>> >>
>>> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>>> >>
>>> >> Hi,
>>> >> 你的附件好像没有上传。
>>> >>
>>> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>>> >>
>>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> >> >
>>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>> >> >         " `data` VARCHAR , " +
>>> >> >         " `table` VARCHAR " +
>>> >> >         ") WITH (" +
>>> >> >         " 'connector' = 'kafka'," +
>>> >> >         " 'topic' = 'order_source'," +
>>> >> >         " 'properties.bootstrap.servers' = '***'," +
>>> >> >         " 'properties.group.id' = 'real1'," +
>>> >> >         " 'format' = 'json'," +
>>> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>>> >> >         ")";
>>> >> >
>>> >> >
>>> >> > 具体见附件 有打印
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: 解析kafka的mysql binlog问题

air23
In reply to this post by Jark
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。

















在 2020-07-28 16:02:18,"Jark Wu" <[hidden email]> 写道:

>因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23 <[hidden email]> wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>>     "data":[
>>         {
>>             "op_id":"97037138",
>>             "order_id":"84172164"
>>         }
>>     ],
>>     "database":"order_11",
>>     "es":1595720375000,
>>     "id":17469027,
>>     "isDdl":false,
>>     "mysqlType":{
>>         "op_id":"int(11)",
>>         "order_id":"int(11)"
>>     },
>>     "old":null,
>>     "pkNames":[
>>         "op_id"
>>     ],
>>     "sql":"",
>>     "sqlType":{
>>         "op_id":4,
>>         "order_id":4
>>     },
>>     "table":"order_product",
>>     "ts":1595720375837,
>>     "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:
>> >有kafka 中json 数据的样例不?
>> >有没有看过 TaskManager 中有没有异常 log 信息?
>> >
>> >
>> >
>> >On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>> >
>> >> 你好 测试代码如下
>> >>
>> >>
>> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >> " `data` VARCHAR , " +
>> >> " `table` VARCHAR " +
>> >> ") WITH (" +
>> >> " 'connector' = 'kafka'," +
>> >> " 'topic' = 'source_databases'," +
>> >> " 'properties.bootstrap.servers' = '***'," +
>> >> " 'properties.group.id' = 'real1'," +
>> >> " 'format' = 'json'," +
>> >> " 'scan.startup.mode' = 'earliest-offset'" +
>> >> ")";
>> >> public static void main(String[] args) throws Exception {
>> >>
>> >>
>> >> //bink table
>> >> StreamExecutionEnvironment bsEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>     EnvironmentSettings bsSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >>     StreamTableEnvironment bsTableEnv =
>> >> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>
>> >>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>> >>
>> >>
>> >> tableResult.print();
>> >>
>> >>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>> >>
>> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>> >>
>> >> bsEnv.execute("aa");
>> >>
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> >> ,order_operation_time
>> >> ,inventory_batch_log
>> >> ,order_log
>> >> ,order_address_book
>> >> ,product_inventory
>> >> ,order_physical_relation
>> >> ,bil_business_attach
>> >> ,picking_detail
>> >> ,picking_detail
>> >> ,orders
>> >>
>> >>
>> >>
>> >>
>> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> >> 看到例子都是useOldPlanner 来转table的。
>> >> 致谢
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>> >> >抱歉,还是没有看到附件。
>> >> >如果是文本的话,你可以直接贴到邮件里。
>> >> >
>> >> >On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>> >> >
>> >> >> 我再上传一次
>> >> >>
>> >> >> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>> >> >>
>> >> >> Hi,
>> >> >> 你的附件好像没有上传。
>> >> >>
>> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>> >> >>
>> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>> 不能取到data呢?*
>> >> >> >
>> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>> (\n"
>> >> +
>> >> >> >         " `data` VARCHAR , " +
>> >> >> >         " `table` VARCHAR " +
>> >> >> >         ") WITH (" +
>> >> >> >         " 'connector' = 'kafka'," +
>> >> >> >         " 'topic' = 'order_source'," +
>> >> >> >         " 'properties.bootstrap.servers' = '***'," +
>> >> >> >         " 'properties.group.id' = 'real1'," +
>> >> >> >         " 'format' = 'json'," +
>> >> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >> >         ")";
>> >> >> >
>> >> >> >
>> >> >> > 具体见附件 有打印
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: 解析kafka的mysql binlog问题

admin
直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]

[1]https://issues.apache.org/jira/browse/FLINK-18002 <https://issues.apache.org/jira/browse/FLINK-18002>

> 2020年7月28日 下午5:20,air23 <[hidden email]> 写道:
>
> 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
> 另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
> 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 16:02:18,"Jark Wu" <[hidden email]> 写道:
>> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>> 1.12 中已经支持读取复杂结构为 string 类型了。
>>
>> Best,
>> Jark
>>
>> On Tue, 28 Jul 2020 at 15:36, air23 <[hidden email]> wrote:
>>
>>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>>
>>>
>>> {
>>>    "data":[
>>>        {
>>>            "op_id":"97037138",
>>>            "order_id":"84172164"
>>>        }
>>>    ],
>>>    "database":"order_11",
>>>    "es":1595720375000,
>>>    "id":17469027,
>>>    "isDdl":false,
>>>    "mysqlType":{
>>>        "op_id":"int(11)",
>>>        "order_id":"int(11)"
>>>    },
>>>    "old":null,
>>>    "pkNames":[
>>>        "op_id"
>>>    ],
>>>    "sql":"",
>>>    "sqlType":{
>>>        "op_id":4,
>>>        "order_id":4
>>>    },
>>>    "table":"order_product",
>>>    "ts":1595720375837,
>>>    "type":"INSERT"
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-28 14:44:35,"Jark Wu" <[hidden email]> 写道:
>>>> 有kafka 中json 数据的样例不?
>>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>>>
>>>>
>>>>
>>>> On Tue, 28 Jul 2020 at 09:40, air23 <[hidden email]> wrote:
>>>>
>>>>> 你好 测试代码如下
>>>>>
>>>>>
>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>>>> " `data` VARCHAR , " +
>>>>> " `table` VARCHAR " +
>>>>> ") WITH (" +
>>>>> " 'connector' = 'kafka'," +
>>>>> " 'topic' = 'source_databases'," +
>>>>> " 'properties.bootstrap.servers' = '***'," +
>>>>> " 'properties.group.id' = 'real1'," +
>>>>> " 'format' = 'json'," +
>>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>>> ")";
>>>>> public static void main(String[] args) throws Exception {
>>>>>
>>>>>
>>>>> //bink table
>>>>> StreamExecutionEnvironment bsEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>    EnvironmentSettings bsSettings =
>>>>>
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>    StreamTableEnvironment bsTableEnv =
>>>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>>>
>>>>>    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>>>
>>>>>
>>>>> tableResult.print();
>>>>>
>>>>>    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>>>
>>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>>>
>>>>> bsEnv.execute("aa");
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>>>> ,order_operation_time
>>>>> ,inventory_batch_log
>>>>> ,order_log
>>>>> ,order_address_book
>>>>> ,product_inventory
>>>>> ,order_physical_relation
>>>>> ,bil_business_attach
>>>>> ,picking_detail
>>>>> ,picking_detail
>>>>> ,orders
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>>>> 看到例子都是useOldPlanner 来转table的。
>>>>> 致谢
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2020-07-27 19:44:10,"Jark Wu" <[hidden email]> 写道:
>>>>>> 抱歉,还是没有看到附件。
>>>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>>>
>>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <[hidden email]> wrote:
>>>>>>
>>>>>>> 我再上传一次
>>>>>>>
>>>>>>> 在2020年07月27日 18:55,Jark Wu <[hidden email]> 写道:
>>>>>>>
>>>>>>> Hi,
>>>>>>> 你的附件好像没有上传。
>>>>>>>
>>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <[hidden email]> wrote:
>>>>>>>
>>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>>> 不能取到data呢?*
>>>>>>>>
>>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>>> (\n"
>>>>> +
>>>>>>>>        " `data` VARCHAR , " +
>>>>>>>>        " `table` VARCHAR " +
>>>>>>>>        ") WITH (" +
>>>>>>>>        " 'connector' = 'kafka'," +
>>>>>>>>        " 'topic' = 'order_source'," +
>>>>>>>>        " 'properties.bootstrap.servers' = '***'," +
>>>>>>>>        " 'properties.group.id' = 'real1'," +
>>>>>>>>        " 'format' = 'json'," +
>>>>>>>>        " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>>>        ")";
>>>>>>>>
>>>>>>>>
>>>>>>>> 具体见附件 有打印
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>