flink sql cdc sum 结果出现NULL

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql cdc sum 结果出现NULL

kandy.wang
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `spu_id` BIGINT NOT NULL,
   `leaving_price`  DECIMAL(10, 5)
   PRIMARY KEY ( `id` ),
   unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8


--flink表
CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
   `spu_id` BIGINT ,
   `leaving_price`  DECIMAL(10, 5),
    PRIMARY KEY ( `spu_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://...',
   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
   'username' = '...',
   'password' = '..'
);


--binlog 2mysql

insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg

SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price

FROM hive.database.table

group by v_spu_id;


hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。


问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
有什么好的排查思路么?





Reply | Threaded
Open this post in threaded view
|

Re: flink sql cdc sum 结果出现NULL

Jark
Administrator
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。

Best,
Jark

On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:

> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>    `id` INT UNSIGNED AUTO_INCREMENT,
>    `spu_id` BIGINT NOT NULL,
>    `leaving_price`  DECIMAL(10, 5)
>    PRIMARY KEY ( `id` ),
>    unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>
>
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>    `spu_id` BIGINT ,
>    `leaving_price`  DECIMAL(10, 5),
>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>    'url' = 'jdbc:mysql://...',
>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>    'username' = '...',
>    'password' = '..'
> );
>
>
> --binlog 2mysql
>
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>
> FROM hive.database.table
>
> group by v_spu_id;
>
>
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>
>
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql cdc sum 结果出现NULL

kandy.wang
hi Jark:

打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:

>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>    `id` INT UNSIGNED AUTO_INCREMENT,
>>    `spu_id` BIGINT NOT NULL,
>>    `leaving_price`  DECIMAL(10, 5)
>>    PRIMARY KEY ( `id` ),
>>    unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>    `spu_id` BIGINT ,
>>    `leaving_price`  DECIMAL(10, 5),
>>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>    'url' = 'jdbc:mysql://...',
>>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>    'username' = '...',
>>    'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql cdc sum 结果出现NULL

kandy.wang
In reply to this post by Jark
hi Jark:


打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况

自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:

>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>    `id` INT UNSIGNED AUTO_INCREMENT,
>>    `spu_id` BIGINT NOT NULL,
>>    `leaving_price`  DECIMAL(10, 5)
>>    PRIMARY KEY ( `id` ),
>>    unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>    `spu_id` BIGINT ,
>>    `leaving_price`  DECIMAL(10, 5),
>>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>    'url' = 'jdbc:mysql://...',
>>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>    'username' = '...',
>>    'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink sql cdc sum 结果出现NULL

Jark
Administrator
实现上应该没什么问题。

1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?

Best,
Jark

On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote:

> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
> update_after,format逻辑是应该这么写的吧。
>
>
>
>
> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:
> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >值的,以验证你的自定义 format 没有问题。
> >
> >Best,
> >Jark
> >
> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
> >
> >> --mysql表
> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
> >>    `id` INT UNSIGNED AUTO_INCREMENT,
> >>    `spu_id` BIGINT NOT NULL,
> >>    `leaving_price`  DECIMAL(10, 5)
> >>    PRIMARY KEY ( `id` ),
> >>    unique key idx_spu_id (spu_id)
> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >>
> >>
> >> --flink表
> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> (
> >>    `spu_id` BIGINT ,
> >>    `leaving_price`  DECIMAL(10, 5),
> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'jdbc',
> >>    'url' = 'jdbc:mysql://...',
> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >>    'username' = '...',
> >>    'password' = '..'
> >> );
> >>
> >>
> >> --binlog 2mysql
> >>
> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >>
> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >>
> >> FROM hive.database.table
> >>
> >> group by v_spu_id;
> >>
> >>
> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >>
> >>
> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> 有什么好的排查思路么?
> >>
> >>
> >>
> >>
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: flink sql cdc sum 结果出现NULL

kandy.wang
1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启


在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道:

>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>    `id` INT UNSIGNED AUTO_INCREMENT,
>> >>    `spu_id` BIGINT NOT NULL,
>> >>    `leaving_price`  DECIMAL(10, 5)
>> >>    PRIMARY KEY ( `id` ),
>> >>    unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>    `spu_id` BIGINT ,
>> >>    `leaving_price`  DECIMAL(10, 5),
>> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>    'url' = 'jdbc:mysql://...',
>> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>    'username' = '...',
>> >>    'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: flink sql cdc sum 结果出现NULL

kandy.wang
In reply to this post by Jark






1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启




在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道:

>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>    `id` INT UNSIGNED AUTO_INCREMENT,
>> >>    `spu_id` BIGINT NOT NULL,
>> >>    `leaving_price`  DECIMAL(10, 5)
>> >>    PRIMARY KEY ( `id` ),
>> >>    unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>    `spu_id` BIGINT ,
>> >>    `leaving_price`  DECIMAL(10, 5),
>> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>    'url' = 'jdbc:mysql://...',
>> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>    'username' = '...',
>> >>    'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: flink sql cdc sum 结果出现NULL

Jark
Administrator
1. 没有初始的全量数据可能是会有问题的。

3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?

On Fri, 20 Nov 2020 at 12:46, kandy.wang <[hidden email]> wrote:

>
>
>
>
>
>
> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道:
> >实现上应该没什么问题。
> >
> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
> >2. 是否开启 mini-batch了?
> >
> >Best,
> >Jark
> >
> >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote:
> >
> >> hi Jark:
> >>
> >>
> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
> >>
> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
> update_before
> >> update_after,format逻辑是应该这么写的吧。
> >>
> >>
> >>
> >>
> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:
> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >> >值的,以验证你的自定义 format 没有问题。
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
> >> >
> >> >> --mysql表
> >> >> CREATE TABLE IF NOT EXISTS
> `mysql_realtime_leaving_price_spu_index_agg`(
> >> >>    `id` INT UNSIGNED AUTO_INCREMENT,
> >> >>    `spu_id` BIGINT NOT NULL,
> >> >>    `leaving_price`  DECIMAL(10, 5)
> >> >>    PRIMARY KEY ( `id` ),
> >> >>    unique key idx_spu_id (spu_id)
> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >> >>
> >> >>
> >> >> --flink表
> >> >> CREATE TABLE
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> (
> >> >>    `spu_id` BIGINT ,
> >> >>    `leaving_price`  DECIMAL(10, 5),
> >> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> >> ) WITH (
> >> >>   'connector' = 'jdbc',
> >> >>    'url' = 'jdbc:mysql://...',
> >> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >> >>    'username' = '...',
> >> >>    'password' = '..'
> >> >> );
> >> >>
> >> >>
> >> >> --binlog 2mysql
> >> >>
> >> >> insert into
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> >>
> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >> >>
> >> >> FROM hive.database.table
> >> >>
> >> >> group by v_spu_id;
> >> >>
> >> >>
> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >> >>
> >> >>
> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> >> 有什么好的排查思路么?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。





在 2020-11-20 13:25:53,"Jark Wu" <[hidden email]> 写道:

>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang <[hidden email]> wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>    `id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>    `spu_id` BIGINT NOT NULL,
>> >> >>    `leaving_price`  DECIMAL(10, 5)
>> >> >>    PRIMARY KEY ( `id` ),
>> >> >>    unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>    `spu_id` BIGINT ,
>> >> >>    `leaving_price`  DECIMAL(10, 5),
>> >> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>    'url' = 'jdbc:mysql://...',
>> >> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>    'username' = '...',
>> >> >>    'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

kandy.wang
In reply to this post by Jark
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的








在 2020-11-20 13:25:53,"Jark Wu" <[hidden email]> 写道:

>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang <[hidden email]> wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>    `id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>    `spu_id` BIGINT NOT NULL,
>> >> >>    `leaving_price`  DECIMAL(10, 5)
>> >> >>    PRIMARY KEY ( `id` ),
>> >> >>    unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>    `spu_id` BIGINT ,
>> >> >>    `leaving_price`  DECIMAL(10, 5),
>> >> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>    'url' = 'jdbc:mysql://...',
>> >> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>    'username' = '...',
>> >> >>    'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>
Reply | Threaded
Open this post in threaded view
|

Re: flink sql cdc sum 结果出现NULL

Jianzhi Zhang
In reply to this post by kandy.wang
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现

> 2020年11月19日 下午10:41,kandy.wang <[hidden email]> 写道:
>
> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>   `id` INT UNSIGNED AUTO_INCREMENT,
>   `spu_id` BIGINT NOT NULL,
>   `leaving_price`  DECIMAL(10, 5)
>   PRIMARY KEY ( `id` ),
>   unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>
>
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>   `spu_id` BIGINT ,
>   `leaving_price`  DECIMAL(10, 5),
>    PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://...',
>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>   'username' = '...',
>   'password' = '..'
> );
>
>
> --binlog 2mysql
>
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>
> FROM hive.database.table
>
> group by v_spu_id;
>
>
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>
>
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql cdc sum 结果出现NULL

kandy.wang









@Jianzhi Zhang

嗯,是这个原因,感谢 回复。 就是decimal的精度问题




在 2020-12-01 13:24:23,"Jianzhi Zhang" <[hidden email]> 写道:

>是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
>
>> 2020年11月19日 下午10:41,kandy.wang <[hidden email]> 写道:
>>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>   `id` INT UNSIGNED AUTO_INCREMENT,
>>   `spu_id` BIGINT NOT NULL,
>>   `leaving_price`  DECIMAL(10, 5)
>>   PRIMARY KEY ( `id` ),
>>   unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>   `spu_id` BIGINT ,
>>   `leaving_price`  DECIMAL(10, 5),
>>    PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>  'connector' = 'jdbc',
>>   'url' = 'jdbc:mysql://...',
>>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>   'username' = '...',
>>   'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>