HI!
这边做测试时遇到一个问题: 在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + ") WITH (" + "'connector' = 'jdbc'," + "'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + "'table-name' = 'tm_dealers'," + "'driver' = 'com.mysql.cj.jdbc.Driver'," + "'username' = 'root'," + "'password' = 'Cdh2020:1'," + "'lookup.cache.max-rows' = '500',"+ "'lookup.cache.ttl' = '1800s',"+ "'sink.buffer-flush.interval' = '60s'"+ ")"); 我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint. 进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? 感谢大佬指导一下,拜谢! | | 刘海 | | [hidden email] | 签名由网易邮箱大师定制 |
+1, 目前也遇到了
在 2021-01-21 17:52:06,"刘海" <[hidden email]> 写道: >HI! >这边做测试时遇到一个问题: >在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + >") WITH (" + >"'connector' = 'jdbc'," + >"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >"'table-name' = 'tm_dealers'," + >"'driver' = 'com.mysql.cj.jdbc.Driver'," + >"'username' = 'root'," + >"'password' = 'Cdh2020:1'," + >"'lookup.cache.max-rows' = '500',"+ >"'lookup.cache.ttl' = '1800s',"+ >"'sink.buffer-flush.interval' = '60s'"+ >")"); > > >我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint. > > >进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? > > >感谢大佬指导一下,拜谢! >| | >刘海 >| >| >[hidden email] >| >签名由网易邮箱大师定制 |
In reply to this post by 刘海
hi
你需要使用 Temporal Table Join 的语法,具体操作可以参考官网 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html ----- Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Best Wishes
JasonLee |
In reply to this post by anonnius
hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的
应该使用这种语法 -- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; 而不是 SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id 文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache 希望对你有帮助 在 2021-04-14 16:47:04,"anonnius" <[hidden email]> 写道: >+1, 目前也遇到了 >在 2021-01-21 17:52:06,"刘海" <[hidden email]> 写道: >>HI! >>这边做测试时遇到一个问题: >>在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + >>") WITH (" + >>"'connector' = 'jdbc'," + >>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >>"'table-name' = 'tm_dealers'," + >>"'driver' = 'com.mysql.cj.jdbc.Driver'," + >>"'username' = 'root'," + >>"'password' = 'Cdh2020:1'," + >>"'lookup.cache.max-rows' = '500',"+ >>"'lookup.cache.ttl' = '1800s',"+ >>"'sink.buffer-flush.interval' = '60s'"+ >>")"); >> >> >>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint. >> >> >>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? >> >> >>感谢大佬指导一下,拜谢! >>| | >>刘海 >>| >>| >>[hidden email] >>| >>签名由网易邮箱大师定制 |
重新格式下, 不好意思
hi: 今天又试了下, 我这边出现问题是因为: join时使用的语法问题 照成的 应该使用这种语法 -- temporal join the JDBC table as a dimension table SELECT * FROM myTopic LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime ON myTopic.key = MyUserTable.id; 而不是 SELECT * FROM myTopic a LEFTJOIN MyUserTable b ON a.id = b.id -------------- hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的 应该使用这种语法 -- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; 而不是 SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id 文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache 希望对你有帮助 在 2021-04-19 18:54:38,"anonnius" <[hidden email]> 写道: >hi: 今天又试了下, 我这边出现问题是因为: join时使用的问题 照成的 > >应该使用这种语法 >-- temporal join the JDBC table as a dimension tableSELECT*FROMmyTopicLEFTJOINMyUserTableFORSYSTEM_TIMEASOFmyTopic.proctimeONmyTopic.key=MyUserTable.id; >而不是 >SELECT*FROMmyTopic aLEFTJOINMyUserTablebON a.id = b.id >文档连接在这里, https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#lookup-cache > >希望对你有帮助 > > > > > > > > >在 2021-04-14 16:47:04,"anonnius" <[hidden email]> 写道: >>+1, 目前也遇到了 >>在 2021-01-21 17:52:06,"刘海" <[hidden email]> 写道: >>>HI! >>>这边做测试时遇到一个问题: >>>在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: >>>bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + >>>") WITH (" + >>>"'connector' = 'jdbc'," + >>>"'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + >>>"'table-name' = 'tm_dealers'," + >>>"'driver' = 'com.mysql.cj.jdbc.Driver'," + >>>"'username' = 'root'," + >>>"'password' = 'Cdh2020:1'," + >>>"'lookup.cache.max-rows' = '500',"+ >>>"'lookup.cache.ttl' = '1800s',"+ >>>"'sink.buffer-flush.interval' = '60s'"+ >>>")"); >>> >>> >>>我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: >>>job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint. >>> >>> >>>进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗? >>> >>> >>>感谢大佬指导一下,拜谢! >>>| | >>>刘海 >>>| >>>| >>>[hidden email] >>>| >>>签名由网易邮箱大师定制 |
Free forum by Nabble | Edit this page |