Hi,
你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。 Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options> > > def register_rides_source(st_env): > source_ddl = \ > """ > create table source1( > id int, > time1 varchar , > type string > ) with ( > 'connector.type' = 'kafka', > 'connector.topic' = 'tp1', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'format.type' = 'json', > 'connector.version' = 'universal', > 'update-mode' = 'append' > ) > “"" |
您好:
非常感谢您的建议,我已经成功解决了这个问题,但是我又发现了一个新的问题,我这里设置的超时时间是一分钟或者超时行数是5000行, 我在这期间更新了维表数据,但是我发现已经超过了超时时间,输出结果仍然没有被更新,是我理解的有问题么? 我尝试了停止输入流数据直到达到超时时间后仍然没有更新维表,除非停止整个程序,否则我的维表数据都不会被更新。 请问这个问题有解决的办法么? def register_mysql_source(st_env): source_ddl = \ """ CREATE TABLE dim_mysql ( id int, -- type varchar -- ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3390/test', 'table-name' = 'flink_test', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '****', 'password' = '****', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '1s', 'lookup.max-retries' = '3' ) """ st_env.sql_update(source_ddl) 感谢! 琴师 发件人: Leonard Xu 发送时间: 2020-07-22 10:54 收件人: user-zh 主题: Re: flinksql1.11中主键声明的问题 Hi, 你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。 在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。 我理解你把connector的with参数更新成新的就解决问题了。 Best Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options> > > def register_rides_source(st_env): > source_ddl = \ > """ > create table source1( > id int, > time1 varchar , > type string > ) with ( > 'connector.type' = 'kafka', > 'connector.topic' = 'tp1', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'format.type' = 'json', > 'connector.version' = 'universal', > 'update-mode' = 'append' > ) > “"" |
In reply to this post by Leonard Xu
Hello
你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 祝好 Leonard Xu > 在 2020年7月22日,14:13,[hidden email] 写道: > > 输出结果仍然没有被更新 |
你好:
可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据, 我感觉上是维表没有刷新缓存,但是我不知道这为什么。 谢谢 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月22日(星期三) 下午2:42 收件人: "user-zh"<[hidden email]>; 主题: Re: flinksql1.11中主键声明的问题 Hello 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 祝好 Leonard Xu > 在 2020年7月22日,14:13,[hidden email] 写道: > > 输出结果仍然没有被更新 |
Hi,
我试了下应该是会更新缓存的,你有能复现的例子吗? 祝好 > 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email]> 写道: > > 你好: > > > 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据, > 我感觉上是维表没有刷新缓存,但是我不知道这为什么。 > > > 谢谢 > > > ------------------ 原始邮件 ------------------ > 发件人: "user-zh" <[hidden email]>; > 发送时间: 2020年7月22日(星期三) 下午2:42 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flinksql1.11中主键声明的问题 > > > > Hello > 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 > 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 > > 祝好 > Leonard Xu > > > > 在 2020年7月22日,14:13,[hidden email] 写道: > > > > 输出结果仍然没有被更新 |
你好:
下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。 我的输入流如下,每秒新增一条写入到kafka topic = 'tp1' for i in range(1,10000) : stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S') msg = {} msg['id']= i msg['time1']= stime msg['type']=1 print(msg) send_msg(topic, msg) time.sleep(1) {'id': 1, 'time1': '20200722140624', 'type': 1} {'id': 2, 'time1': '20200722140625', 'type': 1} {'id': 3, 'time1': '20200722140626', 'type': 1} {'id': 4, 'time1': '20200722140627', 'type': 1} {'id': 5, 'time1': '20200722140628', 'type': 1} {'id': 6, 'time1': '20200722140629', 'type': 1} {'id': 7, 'time1': '20200722140631', 'type': 1} {'id': 8, 'time1': '20200722140632', 'type': 1} 维表数据如下 id type 2 err 1 err 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据 from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime from pyflink.table.window import Tumble def from_kafka_to_kafka_demo(): # use blink table planner env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env_settings = EnvironmentSettings.Builder().use_blink_planner().build() st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings) # register source and sink register_rides_source(st_env) register_rides_sink(st_env) register_mysql_source(st_env) st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ") st_env.execute("2-from_kafka_to_kafka") def register_rides_source(st_env): source_ddl = \ """ create table source1( id int, time1 varchar , type string ) with ( 'connector' = 'kafka', 'topic' = 'tp1', 'scan.startup.mode' = 'latest-offset', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' ) """ st_env.sql_update(source_ddl) def register_mysql_source(st_env): source_ddl = \ """ CREATE TABLE dim_mysql ( id int, -- type varchar -- ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3390/test', 'table-name' = 'flink_test', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '***', 'password' = '***', 'lookup.cache.max-rows' = '5000', 'lookup.cache.ttl' = '1s', 'lookup.max-retries' = '3' ) """ st_env.sql_update(source_ddl) def register_rides_sink(st_env): sink_ddl = \ """ CREATE TABLE flink_result ( id int, type varchar, rtime bigint, primary key(id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3390/test', 'table-name' = 'flink_result', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '***', 'password' = '***', 'sink.buffer-flush.max-rows' = '5000', 'sink.buffer-flush.interval' = '2s', 'sink.max-retries' = '3' ) """ st_env.sql_update(sink_ddl) if __name__ == '__main__': from_kafka_to_kafka_demo() 初学者 PyFlink爱好者 琴师 发件人: Leonard Xu 发送时间: 2020-07-22 15:05 收件人: user-zh 主题: Re: flinksql1.11中主键声明的问题 Hi, 我试了下应该是会更新缓存的,你有能复现的例子吗? 祝好 > 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email]> 写道: > > 你好: > > > 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据, > 我感觉上是维表没有刷新缓存,但是我不知道这为什么。 > > > 谢谢 > > > ------------------ 原始邮件 ------------------ > 发件人: "user-zh" <[hidden email]>; > 发送时间: 2020年7月22日(星期三) 下午2:42 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flinksql1.11中主键声明的问题 > > > > Hello > 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 > 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 > > 祝好 > Leonard Xu > > > > 在 2020年7月22日,14:13,[hidden email] 写道: > > > > 输出结果仍然没有被更新 |
In reply to this post by Leonard Xu
Hi,
看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。 维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal table join 祝好 Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins> > 在 2020年7月23日,09:06,琴师 <[hidden email]> 写道: > > > HI: > 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下 >>> 维表 >>> id type >>> 2 err >>> 1 err >>> 结果 >>> > 1 err 20200723085754 > 2 err 20200723085755 > 3 err 20200723085756 > 4 err 20200723085757 > > 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s: >>> 维表 >>> id type >>> 2 acc >>> 1 acc >>> 结果 > > 94 err 20200723084455 > 95 err 20200723084456 > 96 err 20200723084457 > 97 err 20200723084458 > 98 err 20200723084459 > 99 err 20200723084500 > 100 err 20200723084501 > > 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。 > > 请问我的使用过程哪里不对么,请帮我指出来,万分感谢! > > > 谢谢! > > >>> >>> > > > ------------------ 原始邮件 ------------------ > 发件人: "Leonard Xu" <[hidden email]>; > 发送时间: 2020年7月22日(星期三) 晚上9:39 > 收件人: "琴师"<[hidden email]>; > 主题: Re: flinksql1.11中主键声明的问题 > > <[hidden email]> > > 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式 > > 祝好 > >> 在 2020年7月22日,16:39,Leonard Xu <[hidden email] <mailto:[hidden email]>> 写道: >> >> HI, >> 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。 >> >> >>> 在 2020年7月22日,16:27,琴师 <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> >>> HI >>> 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!! >>> 谢谢 >>> >>> ------------------ 原始邮件 ------------------ >>> 发件人: "琴师" <[hidden email] <mailto:[hidden email]>>; >>> 发送时间: 2020年7月22日(星期三) 下午3:17 >>> 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>; >>> 主题: 回复: Re: flinksql1.11中主键声明的问题 >>> >>> 你好: >>> 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。 >>> >>> 我的输入流如下,每秒新增一条写入到kafka >>> topic = 'tp1' >>> for i in range(1,10000) : >>> stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S') >>> msg = {} >>> msg['id']= i >>> msg['time1']= stime >>> msg['type']=1 >>> print(msg) >>> send_msg(topic, msg) >>> time.sleep(1) >>> >>> {'id': 1, 'time1': '20200722140624', 'type': 1} >>> {'id': 2, 'time1': '20200722140625', 'type': 1} >>> {'id': 3, 'time1': '20200722140626', 'type': 1} >>> {'id': 4, 'time1': '20200722140627', 'type': 1} >>> {'id': 5, 'time1': '20200722140628', 'type': 1} >>> {'id': 6, 'time1': '20200722140629', 'type': 1} >>> {'id': 7, 'time1': '20200722140631', 'type': 1} >>> {'id': 8, 'time1': '20200722140632', 'type': 1} >>> >>> 维表数据如下 >>> id type >>> 2 err >>> 1 err >>> >>> 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据 >>> >>> >>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >>> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink >>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime >>> from pyflink.table.window import Tumble >>> >>> >>> def from_kafka_to_kafka_demo(): >>> >>> # use blink table planner >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >>> st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings) >>> >>> # register source and sink >>> register_rides_source(st_env) >>> register_rides_sink(st_env) >>> register_mysql_source(st_env) >>> >>> >>> st_env.sql_update("insert into flink_result select cast(t1.id <http://t1.id/> as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/> as varchar) ") >>> st_env.execute("2-from_kafka_to_kafka") >>> >>> >>> >>> def register_rides_source(st_env): >>> source_ddl = \ >>> """ >>> create table source1( >>> id int, >>> time1 varchar , >>> type string >>> ) with ( >>> 'connector' = 'kafka', >>> 'topic' = 'tp1', >>> 'scan.startup.mode' = 'latest-offset', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'format' = 'json' >>> ) >>> """ >>> st_env.sql_update(source_ddl) >>> >>> def register_mysql_source(st_env): >>> source_ddl = \ >>> """ >>> CREATE TABLE dim_mysql ( >>> id int, -- >>> type varchar -- >>> ) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3390/test' <>, >>> 'table-name' = 'flink_test', >>> 'driver' = 'com.mysql.cj.jdbc.Driver', >>> 'username' = '***', >>> 'password' = '***', >>> 'lookup.cache.max-rows' = '5000', >>> 'lookup.cache.ttl' = '1s', >>> 'lookup.max-retries' = '3' >>> ) >>> """ >>> st_env.sql_update(source_ddl) >>> >>> def register_rides_sink(st_env): >>> sink_ddl = \ >>> """ >>> CREATE TABLE flink_result ( >>> id int, >>> type varchar, >>> rtime bigint, >>> primary key(id) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3390/test' <>, >>> 'table-name' = 'flink_result', >>> 'driver' = 'com.mysql.cj.jdbc.Driver', >>> 'username' = '***', >>> 'password' = '***', >>> 'sink.buffer-flush.max-rows' = '5000', >>> 'sink.buffer-flush.interval' = '2s', >>> 'sink.max-retries' = '3' >>> ) >>> """ >>> st_env.sql_update(sink_ddl) >>> >>> >>> if __name__ == '__main__': >>> from_kafka_to_kafka_demo() >>> >>> 初学者 >>> PyFlink爱好者 >>> 琴师 >>> >>> >>> 发件人: Leonard Xu <mailto:[hidden email]> >>> 发送时间: 2020-07-22 15:05 >>> 收件人: user-zh <mailto:[hidden email]> >>> 主题: Re: flinksql1.11中主键声明的问题 >>> Hi, >>> >>> 我试了下应该是会更新缓存的,你有能复现的例子吗? >>> >>> 祝好 >>> > 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email] <mailto:[hidden email]>> 写道: >>> > >>> > 你好: >>> > >>> > >>> > 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据, >>> > 我感觉上是维表没有刷新缓存,但是我不知道这为什么。 >>> > >>> > >>> > 谢谢 >>> > >>> > >>> > ------------------ 原始邮件 ------------------ >>> > 发件人: "user-zh" <[hidden email] <mailto:[hidden email]>>; >>> > 发送时间: 2020年7月22日(星期三) 下午2:42 >>> > 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>; >>> > >>> > 主题: Re: flinksql1.11中主键声明的问题 >>> > >>> > >>> > >>> > Hello >>> > 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 >>> > 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 >>> > >>> > 祝好 >>> > Leonard Xu >>> > >>> > >>> > > 在 2020年7月22日,14:13,[hidden email] <http://qq.com/> 写道: >>> > > >>> > > 输出结果仍然没有被更新 >> > |
好的感谢,果然还是理解上有些问题!
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月23日(星期四) 上午9:30 收件人: "琴师"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: Re: flinksql1.11中主键声明的问题 Hi, 看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。 维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal table join 祝好 Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins> > 在 2020年7月23日,09:06,琴师 <[hidden email]> 写道: > > > HI: > 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下 >>> 维表 >>> id type >>> 2 err >>> 1 err >>> 结果 >>> > 1 err 20200723085754 > 2 err 20200723085755 > 3 err 20200723085756 > 4 err 20200723085757 > > 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s: >>> 维表 >>> id type >>> 2 acc >>> 1 acc >>> 结果 > > 94 err 20200723084455 > 95 err 20200723084456 > 96 err 20200723084457 > 97 err 20200723084458 > 98 err 20200723084459 > 99 err 20200723084500 > 100 err 20200723084501 > > 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。 > > 请问我的使用过程哪里不对么,请帮我指出来,万分感谢! > > > 谢谢! > > >>> >>> > > > ------------------ 原始邮件 ------------------ > 发件人: "Leonard Xu" <[hidden email]>; > 发送时间: 2020年7月22日(星期三) 晚上9:39 > 收件人: "琴师"<[hidden email]>; > 主题: Re: flinksql1.11中主键声明的问题 > > <[hidden email]> > > 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式 > > 祝好 > >> 在 2020年7月22日,16:39,Leonard Xu <[hidden email] <mailto:[hidden email]>> 写道: >> >> HI, >> 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。 >> >> >>> 在 2020年7月22日,16:27,琴师 <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> >>> HI >>> 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!! >>> 谢谢 >>> >>> ------------------ 原始邮件 ------------------ >>> 发件人: "琴师" <[hidden email] <mailto:[hidden email]>>; >>> 发送时间: 2020年7月22日(星期三) 下午3:17 >>> 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>; >>> 主题: 回复: Re: flinksql1.11中主键声明的问题 >>> >>> 你好: >>> 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。 >>> >>> 我的输入流如下,每秒新增一条写入到kafka >>> topic = 'tp1' >>> for i in range(1,10000) : >>> stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S') >>> msg = {} >>> msg['id']= i >>> msg['time1']= stime >>> msg['type']=1 >>> print(msg) >>> send_msg(topic, msg) >>> time.sleep(1) >>> >>> {'id': 1, 'time1': '20200722140624', 'type': 1} >>> {'id': 2, 'time1': '20200722140625', 'type': 1} >>> {'id': 3, 'time1': '20200722140626', 'type': 1} >>> {'id': 4, 'time1': '20200722140627', 'type': 1} >>> {'id': 5, 'time1': '20200722140628', 'type': 1} >>> {'id': 6, 'time1': '20200722140629', 'type': 1} >>> {'id': 7, 'time1': '20200722140631', 'type': 1} >>> {'id': 8, 'time1': '20200722140632', 'type': 1} >>> >>> 维表数据如下 >>> id type >>> 2 err >>> 1 err >>> >>> 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据 >>> >>> >>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >>> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink >>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime >>> from pyflink.table.window import Tumble >>> >>> >>> def from_kafka_to_kafka_demo(): >>> >>> # use blink table planner >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >>> st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings) >>> >>> # register source and sink >>> register_rides_source(st_env) >>> register_rides_sink(st_env) >>> register_mysql_source(st_env) >>> >>> >>> st_env.sql_update("insert into flink_result select cast(t1.id <http://t1.id/> as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/> as varchar) ") >>> st_env.execute("2-from_kafka_to_kafka") >>> >>> >>> >>> def register_rides_source(st_env): >>> source_ddl = \ >>> """ >>> create table source1( >>> id int, >>> time1 varchar , >>> type string >>> ) with ( >>> 'connector' = 'kafka', >>> 'topic' = 'tp1', >>> 'scan.startup.mode' = 'latest-offset', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'format' = 'json' >>> ) >>> """ >>> st_env.sql_update(source_ddl) >>> >>> def register_mysql_source(st_env): >>> source_ddl = \ >>> """ >>> CREATE TABLE dim_mysql ( >>> id int, -- >>> type varchar -- >>> ) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3390/test' <>, >>> 'table-name' = 'flink_test', >>> 'driver' = 'com.mysql.cj.jdbc.Driver', >>> 'username' = '***', >>> 'password' = '***', >>> 'lookup.cache.max-rows' = '5000', >>> 'lookup.cache.ttl' = '1s', >>> 'lookup.max-retries' = '3' >>> ) >>> """ >>> st_env.sql_update(source_ddl) >>> >>> def register_rides_sink(st_env): >>> sink_ddl = \ >>> """ >>> CREATE TABLE flink_result ( >>> id int, >>> type varchar, >>> rtime bigint, >>> primary key(id) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3390/test' <>, >>> 'table-name' = 'flink_result', >>> 'driver' = 'com.mysql.cj.jdbc.Driver', >>> 'username' = '***', >>> 'password' = '***', >>> 'sink.buffer-flush.max-rows' = '5000', >>> 'sink.buffer-flush.interval' = '2s', >>> 'sink.max-retries' = '3' >>> ) >>> """ >>> st_env.sql_update(sink_ddl) >>> >>> >>> if __name__ == '__main__': >>> from_kafka_to_kafka_demo() >>> >>> 初学者 >>> PyFlink爱好者 >>> 琴师 >>> >>> >>> 发件人: Leonard Xu <mailto:[hidden email]> >>> 发送时间: 2020-07-22 15:05 >>> 收件人: user-zh <mailto:[hidden email]> >>> 主题: Re: flinksql1.11中主键声明的问题 >>> Hi, >>> >>> 我试了下应该是会更新缓存的,你有能复现的例子吗? >>> >>> 祝好 >>> > 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email] <mailto:[hidden email]>> 写道: >>> > >>> > 你好: >>> > >>> > >>> > 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据, >>> > 我感觉上是维表没有刷新缓存,但是我不知道这为什么。 >>> > >>> > >>> > 谢谢 >>> > >>> > >>> > ------------------&nbsp;原始邮件&nbsp;------------------ >>> > 发件人: "user-zh" <[hidden email] <mailto:[hidden email]>&gt;; >>> > 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42 >>> > 收件人:&nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&gt;; >>> > >>> > 主题:&nbsp;Re: flinksql1.11中主键声明的问题 >>> > >>> > >>> > >>> > Hello >>> > 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 >>> > 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 >>> > >>> > 祝好 >>> > Leonard Xu >>> > >>> > >>> > &gt; 在 2020年7月22日,14:13,[hidden email] <http://qq.com/> 写道: >>> > &gt; >>> > &gt; 输出结果仍然没有被更新 >> > |
HI:
听取了大佬的建议,已实现所需功能,非常感谢! ------------------ 原始邮件 ------------------ 发件人: "琴师" <[hidden email]>; 发送时间: 2020年7月23日(星期四) 上午9:35 收件人: "user-zh"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: 回复: flinksql1.11中主键声明的问题 好的感谢,果然还是理解上有些问题! ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年7月23日(星期四) 上午9:30 收件人: "琴师"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: Re: flinksql1.11中主键声明的问题 Hi, 看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。 维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal table join 祝好 Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins> > 在 2020年7月23日,09:06,琴师 <[hidden email]> 写道: > > > HI: > 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下 >>> 维表 >>> id type >>> 2 err >>> 1 err >>> 结果 >>> > 1 err 20200723085754 > 2 err 20200723085755 > 3 err 20200723085756 > 4 err 20200723085757 > > 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s: >>> 维表 >>> id type >>> 2 acc >>> 1 acc >>> 结果 > > 94 err 20200723084455 > 95 err 20200723084456 > 96 err 20200723084457 > 97 err 20200723084458 > 98 err 20200723084459 > 99 err 20200723084500 > 100 err 20200723084501 > > 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。 > > 请问我的使用过程哪里不对么,请帮我指出来,万分感谢! > > > 谢谢! > > >>> >>> > > > ------------------ 原始邮件 ------------------ > 发件人: "Leonard Xu" <[hidden email]>; > 发送时间: 2020年7月22日(星期三) 晚上9:39 > 收件人: "琴师"<[hidden email]>; > 主题: Re: flinksql1.11中主键声明的问题 > > <[hidden email]> > > 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式 > > 祝好 > >> 在 2020年7月22日,16:39,Leonard Xu <[hidden email] <mailto:[hidden email]>> 写道: >> >> HI, >> 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。 >> >> >>> 在 2020年7月22日,16:27,琴师 <[hidden email] <mailto:[hidden email]>> 写道: >>> >>> >>> HI >>> 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!! >>> 谢谢 >>> >>> ------------------ 原始邮件 ------------------ >>> 发件人: "琴师" <[hidden email] <mailto:[hidden email]>>; >>> 发送时间: 2020年7月22日(星期三) 下午3:17 >>> 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>; >>> 主题: 回复: Re: flinksql1.11中主键声明的问题 >>> >>> 你好: >>> 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。 >>> >>> 我的输入流如下,每秒新增一条写入到kafka >>> topic = 'tp1' >>> for i in range(1,10000) : >>> stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S') >>> msg = {} >>> msg['id']= i >>> msg['time1']= stime >>> msg['type']=1 >>> print(msg) >>> send_msg(topic, msg) >>> time.sleep(1) >>> >>> {'id': 1, 'time1': '20200722140624', 'type': 1} >>> {'id': 2, 'time1': '20200722140625', 'type': 1} >>> {'id': 3, 'time1': '20200722140626', 'type': 1} >>> {'id': 4, 'time1': '20200722140627', 'type': 1} >>> {'id': 5, 'time1': '20200722140628', 'type': 1} >>> {'id': 6, 'time1': '20200722140629', 'type': 1} >>> {'id': 7, 'time1': '20200722140631', 'type': 1} >>> {'id': 8, 'time1': '20200722140632', 'type': 1} >>> >>> 维表数据如下 >>> id type >>> 2 err >>> 1 err >>> >>> 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据 >>> >>> >>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic >>> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink >>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime >>> from pyflink.table.window import Tumble >>> >>> >>> def from_kafka_to_kafka_demo(): >>> >>> # use blink table planner >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build() >>> st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings) >>> >>> # register source and sink >>> register_rides_source(st_env) >>> register_rides_sink(st_env) >>> register_mysql_source(st_env) >>> >>> >>> st_env.sql_update("insert into flink_result select cast(t1.id <http://t1.id/> as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/> as varchar) ") >>> st_env.execute("2-from_kafka_to_kafka") >>> >>> >>> >>> def register_rides_source(st_env): >>> source_ddl = \ >>> """ >>> create table source1( >>> id int, >>> time1 varchar , >>> type string >>> ) with ( >>> 'connector' = 'kafka', >>> 'topic' = 'tp1', >>> 'scan.startup.mode' = 'latest-offset', >>> 'properties.bootstrap.servers' = 'localhost:9092', >>> 'format' = 'json' >>> ) >>> """ >>> st_env.sql_update(source_ddl) >>> >>> def register_mysql_source(st_env): >>> source_ddl = \ >>> """ >>> CREATE TABLE dim_mysql ( >>> id int, -- >>> type varchar -- >>> ) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3390/test' <>, >>> 'table-name' = 'flink_test', >>> 'driver' = 'com.mysql.cj.jdbc.Driver', >>> 'username' = '***', >>> 'password' = '***', >>> 'lookup.cache.max-rows' = '5000', >>> 'lookup.cache.ttl' = '1s', >>> 'lookup.max-retries' = '3' >>> ) >>> """ >>> st_env.sql_update(source_ddl) >>> >>> def register_rides_sink(st_env): >>> sink_ddl = \ >>> """ >>> CREATE TABLE flink_result ( >>> id int, >>> type varchar, >>> rtime bigint, >>> primary key(id) NOT ENFORCED >>> ) WITH ( >>> 'connector' = 'jdbc', >>> 'url' = 'jdbc:mysql://localhost:3390/test' <>, >>> 'table-name' = 'flink_result', >>> 'driver' = 'com.mysql.cj.jdbc.Driver', >>> 'username' = '***', >>> 'password' = '***', >>> 'sink.buffer-flush.max-rows' = '5000', >>> 'sink.buffer-flush.interval' = '2s', >>> 'sink.max-retries' = '3' >>> ) >>> """ >>> st_env.sql_update(sink_ddl) >>> >>> >>> if __name__ == '__main__': >>> from_kafka_to_kafka_demo() >>> >>> 初学者 >>> PyFlink爱好者 >>> 琴师 >>> >>> >>> 发件人: Leonard Xu <mailto:[hidden email]> >>> 发送时间: 2020-07-22 15:05 >>> 收件人: user-zh <mailto:[hidden email]> >>> 主题: Re: flinksql1.11中主键声明的问题 >>> Hi, >>> >>> 我试了下应该是会更新缓存的,你有能复现的例子吗? >>> >>> 祝好 >>> > 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email] <mailto:[hidden email]>> 写道: >>> > >>> > 你好: >>> > >>> > >>> > 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据, >>> > 我感觉上是维表没有刷新缓存,但是我不知道这为什么。 >>> > >>> > >>> > 谢谢 >>> > >>> > >>> > ------------------&nbsp;原始邮件&nbsp;------------------ >>> > 发件人: "user-zh" <[hidden email] <mailto:[hidden email]>&gt;; >>> > 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42 >>> > 收件人:&nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&gt;; >>> > >>> > 主题:&nbsp;Re: flinksql1.11中主键声明的问题 >>> > >>> > >>> > >>> > Hello >>> > 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表 >>> > 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。 >>> > >>> > 祝好 >>> > Leonard Xu >>> > >>> > >>> > &gt; 在 2020年7月22日,14:13,[hidden email] <http://qq.com/> 写道: >>> > &gt; >>> > &gt; 输出结果仍然没有被更新 >> > |
Free forum by Nabble | Edit this page |