hi!
版本:flink 1.10 mysql 5.7.24 需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作? 现在本地测试时,维表的DDL是: 但是去mysql修改了数据后,join操作还是旧数据. 望大神们指点方向,提前谢谢了. |
Hello
图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] 祝好 Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins> > 在 2020年7月24日,14:14,liunaihua521 <[hidden email]> 写道: > > hi! > 版本:flink 1.10 > mysql 5.7.24 > > 需求场景是: > 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作? > > 现在本地测试时,维表的DDL是: > > 但是去mysql修改了数据后,join操作还是旧数据. > > 望大神们指点方向,提前谢谢了. > > > |
另外社区中文邮件交流直接发邮件到user-[hidden email] <mailto:[hidden email]>就可以了,不用发user-[hidden email] <mailto:[hidden email]> 这个地址。 > 在 2020年7月24日,14:25,Leonard Xu <[hidden email]> 写道: > > Hello > 图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] > > 祝好 > Leonard Xu > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins> > >> 在 2020年7月24日,14:14,liunaihua521 <[hidden email] <mailto:[hidden email]>> 写道: >> >> hi! >> 版本:flink 1.10 >> mysql 5.7.24 >> >> 需求场景是: >> 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作? >> >> 现在本地测试时,维表的DDL是: >> >> 但是去mysql修改了数据后,join操作还是旧数据. >> >> 望大神们指点方向,提前谢谢了. >> >> >> > |
In reply to this post by Leonard Xu
hi!
您好,我明白您的意思了,并且看了下网上的资料,改完后如下 DDL: CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME() ) WITH ( 'connector.type' = 'kafka', -- kafka connector 'connector.version' = 'universal', -- universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.zookeeper.connect' = '', -- zk 地址 'connector.properties.bootstrap.servers' = '', -- broker 地址 'format.type' = 'json' -- 数据源格式为 json ); CREATE TABLE category_info ( parent_id BIGINT, -- 商品大类 category_id BIGINT -- 商品详细类目 ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://:3306/flinkdemo', 'connector.table' = 'category_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = '', 'connector.password' = '', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); SQL: SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id; 但是执行SQL报错了(由于代码在办公环境粘不出来,就手打如下部分): org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered "timestamp,"at line Was expecting one of: "CURSOR"... "EXISTS"... "NOT"... "ROW"... "("... 一直调试不好,望指教 在2020年7月24日 14:25,Leonard Xu<[hidden email]> 写道: Hello 图挂了,可以贴下DDL吗?另外你没有使用维表join语法 FOR SYSTEM_TIME AS OF[1] 祝好 Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins> 在 2020年7月24日,14:14,liunaihua521 <[hidden email]> 写道: hi! 版本:flink 1.10 mysql 5.7.24 需求场景是: 使用flink SQL,读取kafka数据,和mysql的维表数据,之后进行join操作,如何配置mysql connector,才能使维表数据修改后,flink能读取到更新后的数据进行join操作? 现在本地测试时,维表的DDL是: 但是去mysql修改了数据后,join操作还是旧数据. 望大神们指点方向,提前谢谢了. |
Hello
这个报错一般是sql格式错误,比如中英文逗号等,你可以检查下你的SQL语句 祝好 Leonard Xu > 在 2020年7月24日,16:20,liunaihua521 <[hidden email]> 写道: > > org.apache.flink.table.api.SqlParserExcption:Sql parse failed.Encountered "timestamp,"at line > Was expecting one of: > "CURSOR"... |
In reply to this post by liunaihua521
'connector.properties.zookeeper.connect' = '', -- zk 地址
'connector.properties.bootstrap.servers' = '', -- broker 地址 'connector.username' = '', 'connector.password' = ‘', 这几行有问题吧 > 2020年7月24日 下午4:20,liunaihua521 <[hidden email]> 写道: > > 'connector.properties.zookeeper.connect' = '', -- zk 地址 > 'connector.properties.bootstrap.servers' = '', -- broker 地址 |
Hi,
关于数据修改后还是读到旧数据的问题,可能是因为配置了 cache。我看到超时时间配置的是 'connector.lookup.cache.ttl' = '10min',也就是说数据修改后最长要 10 分钟 Flink 才会读到修改后的数据。 admin <[hidden email]> 于2020年7月24日周五 下午7:32写道: > 'connector.properties.zookeeper.connect' = '', -- zk 地址 > 'connector.properties.bootstrap.servers' = '', -- broker 地址 > > 'connector.username' = '', > 'connector.password' = ‘', > 这几行有问题吧 > > > 2020年7月24日 下午4:20,liunaihua521 <[hidden email]> 写道: > > > > 'connector.properties.zookeeper.connect' = '', -- zk 地址 > > 'connector.properties.bootstrap.servers' = '', -- broker 地址 > > |
Free forum by Nabble | Edit this page |