flink sql 读取mysql

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql 读取mysql

liunaihua521
hi!
    版本:flink  1.10
            mysql 5.7.24

    需求场景是:
            使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

    现在本地测试时,维表的DDL是:
            
    但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.
        

Reply | Threaded
Open this post in threaded view
|

Re: flink sql 读取mysql

Leonard Xu
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1]

祝好
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins>

> 在 2020年7月24日,14:14,liunaihua521 <[hidden email]> 写道:
>
> hi!
>     版本:flink  1.10
>             mysql 5.7.24
>
>     需求场景是:
>             使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
>
>     现在本地测试时,维表的DDL是:
>            
>     但是去mysql修改了数据后,join操作还是旧数据.
>
> 望大神们指点方向,提前谢谢了.
>        
>
>

Reply | Threaded
Open this post in threaded view
|

Re: flink sql 读取mysql

Leonard Xu

另外社区中文邮件交流直接发邮件到user-[hidden email] <mailto:[hidden email]>就可以了,不用发user-[hidden email] <mailto:[hidden email]> 这个地址。


> 在 2020年7月24日,14:25,Leonard Xu <[hidden email]> 写道:
>
> Hello
> 图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1]
>
> 祝好
> Leonard Xu
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins>
>
>> 在 2020年7月24日,14:14,liunaihua521 <[hidden email] <mailto:[hidden email]>> 写道:
>>
>> hi!
>>     版本:flink  1.10
>>             mysql 5.7.24
>>
>>     需求场景是:
>>             使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?
>>
>>     现在本地测试时,维表的DDL是:
>>            
>>     但是去mysql修改了数据后,join操作还是旧数据.
>>
>> 望大神们指点方向,提前谢谢了.
>>        
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

回复: flink sql 读取mysql

liunaihua521
In reply to this post by Leonard Xu
hi!
    您好,我明白您的意思了,并且看了下网上的资料,改完后如下


DDL:
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME()
) WITH (
    'connector.type' = 'kafka',  -- kafka connector
    'connector.version' = 'universal',  -- universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 从起始 offset 开始读取
    'connector.properties.zookeeper.connect' = '',  -- zk 地址
    'connector.properties.bootstrap.servers' = '',  -- broker 地址
    'format.type' = 'json'  -- 数据源格式为 json
);




CREATE TABLE category_info (
parent_id BIGINT, -- 商品大类
    category_id BIGINT  -- 商品详细类目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://:3306/flinkdemo',
    'connector.table' = 'category_info',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = '',
    'connector.password' = '',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);


SQL:


SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id
FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.category_id;


但是执行SQL报错了(由于代码在办公环境粘不出来,就手打如下部分):
org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered "timestamp,"at line
Was expecting one of:
"CURSOR"...
"EXISTS"...
"NOT"...
"ROW"...
"("...


一直调试不好,望指教




在2020年7月24日 14:25,Leonard Xu<[hidden email]> 写道:
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1]

祝好
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins>

在 2020年7月24日,14:14,liunaihua521 <[hidden email]> 写道:

hi!
版本:flink  1.10
mysql 5.7.24

需求场景是:
使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作?

现在本地测试时,维表的DDL是:

但是去mysql修改了数据后,join操作还是旧数据.

望大神们指点方向,提前谢谢了.




Reply | Threaded
Open this post in threaded view
|

Re: flink sql 读取mysql

Leonard Xu
Hello

这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句

祝好
Leonard Xu
> 在 2020年7月24日,16:20,liunaihua521 <[hidden email]> 写道:
>
> org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered "timestamp,"at line
> Was expecting one of:
> "CURSOR"...

Reply | Threaded
Open this post in threaded view
|

Re: flink sql 读取mysql

admin
In reply to this post by liunaihua521
 'connector.properties.zookeeper.connect' = '',  -- zk 地址
   'connector.properties.bootstrap.servers' = '',  -- broker 地址

'connector.username' = '',
   'connector.password' = ‘',
这几行有问题吧

> 2020年7月24日 下午4:20,liunaihua521 <[hidden email]> 写道:
>
>  'connector.properties.zookeeper.connect' = '',  -- zk 地址
>    'connector.properties.bootstrap.servers' = '',  -- broker 地址

Reply | Threaded
Open this post in threaded view
|

Re: flink sql 读取mysql

Caizhi Weng
Hi,

关于数据修改后还是读到旧数据的问题,可能是因为配置了 cache。我看到超时时间配置的是 'connector.lookup.cache.ttl' =
'10min',也就是说数据修改后最长要 10 分钟 Flink 才会读到修改后的数据。

admin <[hidden email]> 于2020年7月24日周五 下午7:32写道:

>  'connector.properties.zookeeper.connect' = '',  -- zk 地址
>    'connector.properties.bootstrap.servers' = '',  -- broker 地址
>
> 'connector.username' = '',
>    'connector.password' = ‘',
> 这几行有问题吧
>
> > 2020年7月24日 下午4:20,liunaihua521 <[hidden email]> 写道:
> >
> >  'connector.properties.zookeeper.connect' = '',  -- zk 地址
> >    'connector.properties.bootstrap.servers' = '',  -- broker 地址
>
>