--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( `id` INT UNSIGNED AUTO_INCREMENT, `spu_id` BIGINT NOT NULL, `leaving_price` DECIMAL(10, 5) PRIMARY KEY ( `id` ), unique key idx_spu_id (spu_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8 --flink表 CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( `spu_id` BIGINT , `leaving_price` DECIMAL(10, 5), PRIMARY KEY ( `spu_id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://...', 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', 'username' = '...', 'password' = '..' ); --binlog 2mysql insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price FROM hive.database.table group by v_spu_id; hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 有什么好的排查思路么? |
Administrator
|
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。 Best, Jark On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > `id` INT UNSIGNED AUTO_INCREMENT, > `spu_id` BIGINT NOT NULL, > `leaving_price` DECIMAL(10, 5) > PRIMARY KEY ( `id` ), > unique key idx_spu_id (spu_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > --flink表 > CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( > `spu_id` BIGINT , > `leaving_price` DECIMAL(10, 5), > PRIMARY KEY ( `spu_id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://...', > 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > 'username' = '...', > 'password' = '..' > ); > > > --binlog 2mysql > > insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > > SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > > FROM hive.database.table > > group by v_spu_id; > > > hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > > > 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > 有什么好的排查思路么? > > > > > > |
hi Jark:
打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >> |
In reply to this post by Jark
hi Jark:
打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >> |
Administrator
|
实现上应该没什么问题。
1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了? Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before > update_after,format逻辑是应该这么写的吧。 > > > > > 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: > >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >值的,以验证你的自定义 format 没有问题。 > > > >Best, > >Jark > > > >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: > > > >> --mysql表 > >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > >> `id` INT UNSIGNED AUTO_INCREMENT, > >> `spu_id` BIGINT NOT NULL, > >> `leaving_price` DECIMAL(10, 5) > >> PRIMARY KEY ( `id` ), > >> unique key idx_spu_id (spu_id) > >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> > >> > >> --flink表 > >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > ( > >> `spu_id` BIGINT , > >> `leaving_price` DECIMAL(10, 5), > >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'jdbc', > >> 'url' = 'jdbc:mysql://...', > >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >> 'username' = '...', > >> 'password' = '..' > >> ); > >> > >> > >> --binlog 2mysql > >> > >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> > >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> > >> FROM hive.database.table > >> > >> group by v_spu_id; > >> > >> > >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> > >> > >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> 有什么好的排查思路么? > >> > >> > >> > >> > >> > >> > |
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >> `id` INT UNSIGNED AUTO_INCREMENT, >> >> `spu_id` BIGINT NOT NULL, >> >> `leaving_price` DECIMAL(10, 5) >> >> PRIMARY KEY ( `id` ), >> >> unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >> `spu_id` BIGINT , >> >> `leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >> 'url' = 'jdbc:mysql://...', >> >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> 'username' = '...', >> >> 'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> |
In reply to this post by Jark
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >> `id` INT UNSIGNED AUTO_INCREMENT, >> >> `spu_id` BIGINT NOT NULL, >> >> `leaving_price` DECIMAL(10, 5) >> >> PRIMARY KEY ( `id` ), >> >> unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >> `spu_id` BIGINT , >> >> `leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >> 'url' = 'jdbc:mysql://...', >> >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> 'username' = '...', >> >> 'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> |
Administrator
|
1. 没有初始的全量数据可能是会有问题的。
3. 你的 format 再解析 update 时,时先发的 before 还是 after? 4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? On Fri, 20 Nov 2020 at 12:46, kandy.wang <[hidden email]> wrote: > > > > > > > 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 > > 2. 没有开启 > > > > > 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道: > >实现上应该没什么问题。 > > > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? > >2. 是否开启 mini-batch了? > > > >Best, > >Jark > > > >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote: > > > >> hi Jark: > >> > >> > >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > >> > >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 > update_before > >> update_after,format逻辑是应该这么写的吧。 > >> > >> > >> > >> > >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: > >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >> >值的,以验证你的自定义 format 没有问题。 > >> > > >> >Best, > >> >Jark > >> > > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: > >> > > >> >> --mysql表 > >> >> CREATE TABLE IF NOT EXISTS > `mysql_realtime_leaving_price_spu_index_agg`( > >> >> `id` INT UNSIGNED AUTO_INCREMENT, > >> >> `spu_id` BIGINT NOT NULL, > >> >> `leaving_price` DECIMAL(10, 5) > >> >> PRIMARY KEY ( `id` ), > >> >> unique key idx_spu_id (spu_id) > >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> >> > >> >> > >> >> --flink表 > >> >> CREATE TABLE > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> ( > >> >> `spu_id` BIGINT , > >> >> `leaving_price` DECIMAL(10, 5), > >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> >> ) WITH ( > >> >> 'connector' = 'jdbc', > >> >> 'url' = 'jdbc:mysql://...', > >> >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >> >> 'username' = '...', > >> >> 'password' = '..' > >> >> ); > >> >> > >> >> > >> >> --binlog 2mysql > >> >> > >> >> insert into > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> >> > >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> >> > >> >> FROM hive.database.table > >> >> > >> >> group by v_spu_id; > >> >> > >> >> > >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> >> > >> >> > >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> >> 有什么好的排查思路么? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> > |
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" <[hidden email]> 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang <[hidden email]> wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >> `id` INT UNSIGNED AUTO_INCREMENT, >> >> >> `spu_id` BIGINT NOT NULL, >> >> >> `leaving_price` DECIMAL(10, 5) >> >> >> PRIMARY KEY ( `id` ), >> >> >> unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >> `spu_id` BIGINT , >> >> >> `leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >> 'url' = 'jdbc:mysql://...', >> >> >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >> 'username' = '...', >> >> >> 'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
In reply to this post by Jark
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" <[hidden email]> 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang <[hidden email]> wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" <[hidden email]> 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[hidden email]> wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" <[hidden email]> 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[hidden email]> wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >> `id` INT UNSIGNED AUTO_INCREMENT, >> >> >> `spu_id` BIGINT NOT NULL, >> >> >> `leaving_price` DECIMAL(10, 5) >> >> >> PRIMARY KEY ( `id` ), >> >> >> unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >> `spu_id` BIGINT , >> >> >> `leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >> 'url' = 'jdbc:mysql://...', >> >> >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >> 'username' = '...', >> >> >> 'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
In reply to this post by kandy.wang
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
> 2020年11月19日 下午10:41,kandy.wang <[hidden email]> 写道: > > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > `id` INT UNSIGNED AUTO_INCREMENT, > `spu_id` BIGINT NOT NULL, > `leaving_price` DECIMAL(10, 5) > PRIMARY KEY ( `id` ), > unique key idx_spu_id (spu_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > --flink表 > CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( > `spu_id` BIGINT , > `leaving_price` DECIMAL(10, 5), > PRIMARY KEY ( `spu_id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://...', > 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > 'username' = '...', > 'password' = '..' > ); > > > --binlog 2mysql > > insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > > SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > > FROM hive.database.table > > group by v_spu_id; > > > hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > > > 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > 有什么好的排查思路么? > > > > > |
@Jianzhi Zhang 嗯,是这个原因,感谢 回复。 就是decimal的精度问题 在 2020-12-01 13:24:23,"Jianzhi Zhang" <[hidden email]> 写道: >是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > >> 2020年11月19日 下午10:41,kandy.wang <[hidden email]> 写道: >> >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> |
Free forum by Nabble | Edit this page |