Flink-1.12支持kafka join jdbc维表吗

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink-1.12支持kafka join jdbc维表吗

amenhub@163.com
hi,

请问kafka join jdbc维表数据而不是join jdbc的changelog,支持吗?

在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished,这样的话按理来说不管维表数据怎么变kafka都join不到维表数据了呀?

CREATE TABLE orders (
    order_id STRING,
    currency STRING,
    amount INT,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic_flink',
    'properties.bootstrap.servers' = '10.3.12.113:9092',
    'properties.group.id' = 'flink',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
)

CREATE TABLE latest_rates (
    currency STRING,
    rate DECIMAL(38, 10),
    currency_time TIMESTAMP(3),
    WATERMARK FOR currency_time AS currency_time,
    PRIMARY KEY (currency) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.3.12.113:3306/base?useUnicode=true&characterEncoding=utf8&serverTimezone=PRC&useSSL=false'
    'username' = 'root',
    'password' = 'root1234',
    'table-name' = 'latest_rates',
    'lookup.cache.max-rows' = '1',
    'lookup.cache.ttl' = '1min'
)

SELECT
    o.order_id,
    o.order_time,
    o.amount * r.rate AS amount,
    r.currency
FROM orders AS o
LEFT JOIN latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency"

best,
amenhub





Reply | Threaded
Open this post in threaded view
|

Re: Flink-1.12支持kafka join jdbc维表吗

Leonard Xu
Hi
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished

这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。

如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。

另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。


祝好,
Leonard

Reply | Threaded
Open this post in threaded view
|

回复: Re: Flink-1.12支持kafka join jdbc维表吗

amenhub@163.com
感谢@Leonard Xu 的回复,

这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

>>> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
你说的这种方式就是好像基于处理时间的join~

best,
amenhub


 
发件人: Leonard Xu
发送时间: 2020-12-21 14:44
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
Hi
> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
 
这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
 
如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。
 
另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
 
 
祝好,
Leonard
 
Reply | Threaded
Open this post in threaded view
|

Re: Flink-1.12支持kafka join jdbc维表吗

Leonard Xu
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?

是的


> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。

祝好
Leonard



>  
>
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
>
> 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
>
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。
>
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
>
>
> 祝好,
> Leonard
>

Reply | Threaded
Open this post in threaded view
|

回复: Re: Flink-1.12支持kafka join jdbc维表吗

amenhub@163.com
今天又博学了,谢谢!



 
发件人: Leonard Xu
发送时间: 2020-12-21 15:01
收件人: user-zh
主题: Re: Flink-1.12支持kafka join jdbc维表吗
>  
> 这么讲的话算是比较清晰了,所以如果想要基于事件时间进行jdbc维表Join,首先需要将jdbc维表的changelog数据接入kafka再进行join,这也是官网给的例子,对吗?
 
是的
 
 
> 你说的这种方式就是好像基于处理时间的join~
是的,基于处理时间的维表join和大家熟知的lookup关联, 语法都是一样的,因为两者语义是一样的,就是在运行时关联最新的维表数据,只是两者实现方式不同,lookup 关联维表只是一种实现方式,实现方式是运行时每条数据都去查询数据库(语义上就是关联了最新的维表数据),关联维表也有其他的实现方式,比如把维表最新的数据维护放在state里,在运行时每条数据去和state中的数据关联。
 
祝好
Leonard
 
 
 

>  
>
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-12-21 14:44
> 收件人: user-zh
> 主题: Re: Flink-1.12支持kafka join jdbc维表吗
> Hi
>> 在给jdbc声明主键和watermark后,使用官网的事件时间join语法,任务启动后发现jdbc算子一次性读取了所有jdbc维表数据,状态变成了finished
>
> 这是正常的,jdbc connector实现的表就是bounded的,只会scan一次,一次读完,之后数据库表的数据更新是没有去捕捉的,connector也没有很好的办法去实时监控数据库表的更新并广播到下游节点。
>
> 如果想要有获取实时更新的维表并做基于event time语义的维表关联,那么推荐的方式就是接入数据库表的binlog(changelog), 用主流去 temporal join changelog流 实现关联维表的准确版本。
>
> 另外,如果对维表实时性要求不高,也没有延迟join的需求,你也可以用运行时lookup的方式完成维表join(即:JOIN latest_rates FOR SYSTEM_TIME AS OF o.proctime),这种方式是在每条主流的数据到来时去查询一次外部DB,这种方式支持你参数中写的   'lookup.cache.max-rows' = '1',   'lookup.cache.ttl' = ‘1min’ 优化。
>
>
> 祝好,
> Leonard
>