hi,
请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗? 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀? CREATE TABLE orders ( order_id STRING, currency STRING, amount INT, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_flink', 'properties.bootstrap.servers' = '10.3.12.113:9092', 'properties.group.id' = 'flink', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) CREATE TABLE latest_rates ( currency STRING, rate DECIMAL(38, 10), currency_time TIMESTAMP(3), WATERMARK FOR currency_time AS currency_time, PRIMARY KEY (currency) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true&characterEncoding=utf8&serverTimezone=PRC&useSSL=false' 'username' = 'root', 'password' = 'root1234', 'table-name' = 'latest_rates', 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = '1min' ) SELECT o.order_id, o.order_time, o.amount * r.rate AS amount, r.currency FROM orders AS o LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r ON o.currency = r.currency" best, amenhub |
Hi
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。 祝好, Leonard |
感谢@Leonard Xu 的回复,
这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗? >>> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。 你说的这种方式就是好像基于处理时间的join~ best, amenhub 发件人: Leonard Xu 发送时间: 2020-12-21 14:44 收件人: user-zh 主题: Re: Flink-1.12支持kafka join jdbc维表吗 Hi > 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。 祝好, Leonard |
>
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗? 是的 > 你说的这种方式就是好像基于处理时间的join~ 是的,基于处理时间的维表join和大家熟知的lookup关联, 语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。 祝好 Leonard > > > > > 发件人: Leonard Xu > 发送时间: 2020-12-21 14:44 > 收件人: user-zh > 主题: Re: Flink-1.12支持kafka join jdbc维表吗 > Hi >> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished > > 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。 > > 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。 > > 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。 > > > 祝好, > Leonard > |
今天又博学了,谢谢!
发件人: Leonard Xu 发送时间: 2020-12-21 15:01 收件人: user-zh 主题: Re: Flink-1.12支持kafka join jdbc维表吗 > > 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗? 是的 > 你说的这种方式就是好像基于处理时间的join~ 是的,基于处理时间的维表join和大家熟知的lookup关联, 语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。 祝好 Leonard > > > > > 发件人: Leonard Xu > 发送时间: 2020-12-21 14:44 > 收件人: user-zh > 主题: Re: Flink-1.12支持kafka join jdbc维表吗 > Hi >> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished > > 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。 > > 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。 > > 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的 'lookup.cache.max-rows' = '1', 'lookup.cache.ttl' = ‘1min’ 优化。 > > > 祝好, > Leonard > |
Free forum by Nabble | Edit this page |