Re: flinksql1.11中主键声明的问题

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: flinksql1.11中主键声明的问题

Leonard Xu
Hi,

你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。

Best
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options>

>
> def register_rides_source(st_env):
>    source_ddl = \
>    """
>    create table source1(
>     id int,
>     time1 varchar ,
>     type string
>     ) with (
>    'connector.type' = 'kafka',
>    'connector.topic' = 'tp1',
>    'connector.startup-mode' = 'latest-offset',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'format.type' = 'json',
>    'connector.version' = 'universal',
>    'update-mode' = 'append'
>     )
>    “""
Reply | Threaded
Open this post in threaded view
|

回复: Re: flinksql1.11中主键声明的问题

琴师
您好:

       非常感谢您的建议,我已经成功解决了这个问题,但是我又发现了一个新的问题,我这里设置的超时时间是一分钟或者超时行数是5000行,
我在这期间更新了维表数据,但是我发现已经超过了超时时间,输出结果仍然没有被更新,是我理解的有问题么?
我尝试了停止输入流数据直到达到超时时间后仍然没有更新维表,除非停止整个程序,否则我的维表数据都不会被更新。
请问这个问题有解决的办法么?

def register_mysql_source(st_env):
    source_ddl = \
    """
    CREATE TABLE dim_mysql (
    id int,  --
    type varchar --
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '****',
    'password' = '****',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '1s',
    'lookup.max-retries' = '3'
    )
    """    
    st_env.sql_update(source_ddl)
 


                                      感谢!




琴师

 
发件人: Leonard Xu
发送时间: 2020-07-22 10:54
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
 
你这还是connector的with参数里不是新 connector的写法[1],会走到老代码,老代码不支持声明PK的。
在老代码里,PK是通过query推导的,你用inner join替换left join后,应该是能够推断出PK了,所以没有报错。
我理解你把connector的with参数更新成新的就解决问题了。
 
Best
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options>

>
> def register_rides_source(st_env):
>    source_ddl = \
>    """
>    create table source1(
>     id int,
>     time1 varchar ,
>     type string
>     ) with (
>    'connector.type' = 'kafka',
>    'connector.topic' = 'tp1',
>    'connector.startup-mode' = 'latest-offset',
>    'connector.properties.bootstrap.servers' = 'localhost:9092',
>    'connector.properties.zookeeper.connect' = 'localhost:2181',
>    'format.type' = 'json',
>    'connector.version' = 'universal',
>    'update-mode' = 'append'
>     )
>    “""
Reply | Threaded
Open this post in threaded view
|

Re: flinksql1.11中主键声明的问题

Leonard Xu
In reply to this post by Leonard Xu
Hello
你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
去look up该表时的数据,维表的更新是不会retract之前的历史记录的。

祝好
Leonard Xu


> 在 2020年7月22日,14:13,[hidden email] 写道:
>
> 输出结果仍然没有被更新

Reply | Threaded
Open this post in threaded view
|

回复: flinksql1.11中主键声明的问题

琴师
你好:


可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
我感觉上是维表没有刷新缓存,但是我不知道这为什么。


谢谢


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flinksql1.11中主键声明的问题



Hello
你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
去look up该表时的数据,维表的更新是不会retract之前的历史记录的。

祝好
Leonard Xu


&gt; 在 2020年7月22日,14:13,[hidden email] 写道:
&gt;
&gt; 输出结果仍然没有被更新
Reply | Threaded
Open this post in threaded view
|

Re: flinksql1.11中主键声明的问题

Leonard Xu
Hi,

  我试了下应该是会更新缓存的,你有能复现的例子吗?

祝好

> 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email]> 写道:
>
> 你好:
>
>
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
>
>
> 谢谢
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: flinksql1.11中主键声明的问题
>
>
>
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
>
> 祝好
> Leonard Xu
>
>
> &gt; 在 2020年7月22日,14:13,[hidden email] 写道:
> &gt;
> &gt; 输出结果仍然没有被更新

Reply | Threaded
Open this post in threaded view
|

回复: Re: flinksql1.11中主键声明的问题

琴师
你好:
下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。

我的输入流如下,每秒新增一条写入到kafka
 topic = 'tp1'
    for i  in  range(1,10000) :
        stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        msg = {}
        msg['id']= i
        msg['time1']= stime
        msg['type']=1
        print(msg)
        send_msg(topic, msg)
        time.sleep(1)

{'id': 1, 'time1': '20200722140624', 'type': 1}
{'id': 2, 'time1': '20200722140625', 'type': 1}
{'id': 3, 'time1': '20200722140626', 'type': 1}
{'id': 4, 'time1': '20200722140627', 'type': 1}
{'id': 5, 'time1': '20200722140628', 'type': 1}
{'id': 6, 'time1': '20200722140629', 'type': 1}
{'id': 7, 'time1': '20200722140631', 'type': 1}
{'id': 8, 'time1': '20200722140632', 'type': 1}

维表数据如下
id    type
2 err
1 err

我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble


def from_kafka_to_kafka_demo():

    # use blink table planner
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
    st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)

    # register source and sink
    register_rides_source(st_env)
    register_rides_sink(st_env)
    register_mysql_source(st_env)
 

    st_env.sql_update("insert into flink_result select  cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
    st_env.execute("2-from_kafka_to_kafka")
   


def register_rides_source(st_env):
    source_ddl = \
    """
    create table source1(
     id int,
     time1 varchar ,
     type string
     ) with (
    'connector' = 'kafka',
    'topic' = 'tp1',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
     )
    """
    st_env.sql_update(source_ddl)

def register_mysql_source(st_env):
    source_ddl = \
    """
    CREATE TABLE dim_mysql (
    id int,  --
    type varchar --
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_test',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '***',
    'password' = '***',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '1s',
    'lookup.max-retries' = '3'
    )
    """    
    st_env.sql_update(source_ddl)

def register_rides_sink(st_env):
    sink_ddl = \
    """
    CREATE TABLE flink_result (
    id int,  
    type varchar,
    rtime bigint,
    primary key(id)  NOT ENFORCED
    ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3390/test',
    'table-name' = 'flink_result',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = '***',
    'password' = '***',
    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '2s',
    'sink.max-retries' = '3'
    )
    """
    st_env.sql_update(sink_ddl)


if __name__ == '__main__':
    from_kafka_to_kafka_demo()



初学者
PyFlink爱好者
琴师

 
发件人: Leonard Xu
发送时间: 2020-07-22 15:05
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
 
  我试了下应该是会更新缓存的,你有能复现的例子吗?
 
祝好

> 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email]> 写道:
>
> 你好:
>
>
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
>
>
> 谢谢
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
> 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: flinksql1.11中主键声明的问题
>
>
>
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
>
> 祝好
> Leonard Xu
>
>
> &gt; 在 2020年7月22日,14:13,[hidden email] 写道:
> &gt;
> &gt; 输出结果仍然没有被更新
 
Reply | Threaded
Open this post in threaded view
|

Re: flinksql1.11中主键声明的问题

Leonard Xu
In reply to this post by Leonard Xu
Hi,

看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。
维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal table join


祝好
Leonard Xu
[1]  https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins>

> 在 2020年7月23日,09:06,琴师 <[hidden email]> 写道:
>
>
> HI:
> 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下
>>> 维表          
>>> id    type  
>>> 2 err
>>> 1 err
>>> 结果
>>>
> 1 err 20200723085754
> 2 err 20200723085755
> 3 err 20200723085756
> 4 err 20200723085757
>
> 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s:
>>> 维表          
>>> id    type  
>>> 2 acc
>>> 1 acc
>>> 结果
>
> 94 err 20200723084455
> 95 err 20200723084456
> 96 err 20200723084457
> 97 err 20200723084458
> 98 err 20200723084459
> 99 err 20200723084500
> 100 err 20200723084501
>
> 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。
>
> 请问我的使用过程哪里不对么,请帮我指出来,万分感谢!
>
>
> 谢谢!
>
>
>>>
>>>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Leonard Xu" <[hidden email]>;
> 发送时间: 2020年7月22日(星期三) 晚上9:39
> 收件人: "琴师"<[hidden email]>;
> 主题: Re: flinksql1.11中主键声明的问题
>
> <[hidden email]>
>
> 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式
>
> 祝好
>
>> 在 2020年7月22日,16:39,Leonard Xu <[hidden email] <mailto:[hidden email]>> 写道:
>>
>> HI,
>> 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。
>>
>>
>>> 在 2020年7月22日,16:27,琴师 <[hidden email] <mailto:[hidden email]>> 写道:
>>>
>>>
>>> HI
>>> 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!!
>>> 谢谢
>>>
>>> ------------------ 原始邮件 ------------------
>>> 发件人: "琴师" <[hidden email] <mailto:[hidden email]>>;
>>> 发送时间: 2020年7月22日(星期三) 下午3:17
>>> 收件人: "user-zh"<[hidden email] <mailto:[hidden email]>>;
>>> 主题: 回复: Re: flinksql1.11中主键声明的问题
>>>
>>> 你好:
>>> 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。
>>>
>>> 我的输入流如下,每秒新增一条写入到kafka
>>>  topic = 'tp1'
>>>     for i  in  range(1,10000) :
>>>         stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
>>>         msg = {}
>>>         msg['id']= i
>>>         msg['time1']= stime
>>>         msg['type']=1
>>>         print(msg)
>>>         send_msg(topic, msg)
>>>         time.sleep(1)
>>>
>>> {'id': 1, 'time1': '20200722140624', 'type': 1}
>>> {'id': 2, 'time1': '20200722140625', 'type': 1}
>>> {'id': 3, 'time1': '20200722140626', 'type': 1}
>>> {'id': 4, 'time1': '20200722140627', 'type': 1}
>>> {'id': 5, 'time1': '20200722140628', 'type': 1}
>>> {'id': 6, 'time1': '20200722140629', 'type': 1}
>>> {'id': 7, 'time1': '20200722140631', 'type': 1}
>>> {'id': 8, 'time1': '20200722140632', 'type': 1}
>>>
>>> 维表数据如下
>>> id    type
>>> 2 err
>>> 1 err
>>>
>>> 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
>>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
>>> from pyflink.table.window import Tumble
>>>
>>>
>>> def from_kafka_to_kafka_demo():
>>>
>>>     # use blink table planner
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>     env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>>>     st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
>>>
>>>     # register source and sink
>>>     register_rides_source(st_env)
>>>     register_rides_sink(st_env)
>>>     register_mysql_source(st_env)
>>>  
>>>
>>>     st_env.sql_update("insert into flink_result select  cast(t1.id <http://t1.id/> as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/> as varchar) ")
>>>     st_env.execute("2-from_kafka_to_kafka")
>>>    
>>>
>>>
>>> def register_rides_source(st_env):
>>>     source_ddl = \
>>>     """
>>>     create table source1(
>>>      id int,
>>>      time1 varchar ,
>>>      type string
>>>      ) with (
>>>     'connector' = 'kafka',
>>>     'topic' = 'tp1',
>>>     'scan.startup.mode' = 'latest-offset',
>>>     'properties.bootstrap.servers' = 'localhost:9092',
>>>     'format' = 'json'
>>>      )
>>>     """
>>>     st_env.sql_update(source_ddl)
>>>
>>> def register_mysql_source(st_env):
>>>     source_ddl = \
>>>     """
>>>     CREATE TABLE dim_mysql (
>>>     id int,  --
>>>     type varchar --
>>>     ) WITH (
>>>     'connector' = 'jdbc',
>>>     'url' = 'jdbc:mysql://localhost:3390/test' <>,
>>>     'table-name' = 'flink_test',
>>>     'driver' = 'com.mysql.cj.jdbc.Driver',
>>>     'username' = '***',
>>>     'password' = '***',
>>>     'lookup.cache.max-rows' = '5000',
>>>     'lookup.cache.ttl' = '1s',
>>>     'lookup.max-retries' = '3'
>>>     )
>>>     """    
>>>     st_env.sql_update(source_ddl)
>>>
>>> def register_rides_sink(st_env):
>>>     sink_ddl = \
>>>     """
>>>     CREATE TABLE flink_result (
>>>     id int,  
>>>     type varchar,
>>>     rtime bigint,
>>>     primary key(id)  NOT ENFORCED
>>>     ) WITH (
>>>     'connector' = 'jdbc',
>>>     'url' = 'jdbc:mysql://localhost:3390/test' <>,
>>>     'table-name' = 'flink_result',
>>>     'driver' = 'com.mysql.cj.jdbc.Driver',
>>>     'username' = '***',
>>>     'password' = '***',
>>>     'sink.buffer-flush.max-rows' = '5000',
>>>     'sink.buffer-flush.interval' = '2s',
>>>     'sink.max-retries' = '3'
>>>     )
>>>     """
>>>     st_env.sql_update(sink_ddl)
>>>
>>>
>>> if __name__ == '__main__':
>>>     from_kafka_to_kafka_demo()
>>>
>>> 初学者
>>> PyFlink爱好者
>>> 琴师
>>>
>>>  
>>> 发件人: Leonard Xu <mailto:[hidden email]>
>>> 发送时间: 2020-07-22 15:05
>>> 收件人: user-zh <mailto:[hidden email]>
>>> 主题: Re: flinksql1.11中主键声明的问题
>>> Hi,
>>>  
>>>   我试了下应该是会更新缓存的,你有能复现的例子吗?
>>>  
>>> 祝好
>>> > 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email] <mailto:[hidden email]>> 写道:
>>> >
>>> > 你好:
>>> >
>>> >
>>> > 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
>>> > 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
>>> >
>>> >
>>> > 谢谢
>>> >
>>> >
>>> > ------------------&nbsp;原始邮件&nbsp;------------------
>>> > 发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email] <mailto:[hidden email]>&gt;;
>>> > 发送时间:&nbsp;2020年7月22日(星期三) 下午2:42
>>> > 收件人:&nbsp;"user-zh"<[hidden email] <mailto:[hidden email]>&gt;;
>>> >
>>> > 主题:&nbsp;Re: flinksql1.11中主键声明的问题
>>> >
>>> >
>>> >
>>> > Hello
>>> > 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
>>> > 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
>>> >
>>> > 祝好
>>> > Leonard Xu
>>> >
>>> >
>>> > &gt; 在 2020年7月22日,14:13,[hidden email] <http://qq.com/> 写道:
>>> > &gt;
>>> > &gt; 输出结果仍然没有被更新
>>
>

Reply | Threaded
Open this post in threaded view
|

回复: flinksql1.11中主键声明的问题

琴师
好的感谢,果然还是理解上有些问题!




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月23日(星期四) 上午9:30
收件人:&nbsp;"琴师"<[hidden email]&gt;;
抄送:&nbsp;"user-zh"<[hidden email]&gt;;
主题:&nbsp;Re: flinksql1.11中主键声明的问题



Hi,

看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。
维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal table join


祝好
Leonard Xu
[1]&nbsp; https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins&gt;

&gt; 在 2020年7月23日,09:06,琴师 <[hidden email]&gt; 写道:
&gt;
&gt;
&gt; HI:
&gt; 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下
&gt;&gt;&gt; 维表&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&gt;&gt;&gt; id&nbsp;&nbsp;&nbsp; type&nbsp;&nbsp;
&gt;&gt;&gt; 2 err
&gt;&gt;&gt; 1 err
&gt;&gt;&gt; 结果
&gt;&gt;&gt;
&gt; 1 err 20200723085754
&gt; 2 err 20200723085755
&gt; 3 err 20200723085756
&gt; 4 err 20200723085757
&gt;
&gt; 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s:
&gt;&gt;&gt; 维表&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&gt;&gt;&gt; id&nbsp;&nbsp;&nbsp; type&nbsp;&nbsp;
&gt;&gt;&gt; 2 acc
&gt;&gt;&gt; 1 acc
&gt;&gt;&gt; 结果
&gt;
&gt; 94 err 20200723084455
&gt; 95 err 20200723084456
&gt; 96 err 20200723084457
&gt; 97 err 20200723084458
&gt; 98 err 20200723084459
&gt; 99 err 20200723084500
&gt; 100 err 20200723084501
&gt;
&gt; 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。
&gt;
&gt; 请问我的使用过程哪里不对么,请帮我指出来,万分感谢!
&gt;
&gt;
&gt; 谢谢!
&gt;
&gt;
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;
&gt;
&gt; ------------------ 原始邮件 ------------------
&gt; 发件人: "Leonard Xu" <[hidden email]&gt;;
&gt; 发送时间: 2020年7月22日(星期三) 晚上9:39
&gt; 收件人: "琴师"<[hidden email]&gt;;
&gt; 主题: Re: flinksql1.11中主键声明的问题
&gt;
&gt; <[hidden email]&gt;
&gt;
&gt; 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式
&gt;
&gt; 祝好
&gt;
&gt;&gt; 在 2020年7月22日,16:39,Leonard Xu <[hidden email] <mailto:[hidden email]&gt;&gt; 写道:
&gt;&gt;
&gt;&gt; HI,
&gt;&gt; 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。
&gt;&gt;
&gt;&gt;
&gt;&gt;&gt; 在 2020年7月22日,16:27,琴师 <[hidden email] <mailto:[hidden email]&gt;&gt; 写道:
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; HI
&gt;&gt;&gt; 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!!
&gt;&gt;&gt; 谢谢
&gt;&gt;&gt;
&gt;&gt;&gt; ------------------ 原始邮件 ------------------
&gt;&gt;&gt; 发件人: "琴师" <[hidden email] <mailto:[hidden email]&gt;&gt;;
&gt;&gt;&gt; 发送时间: 2020年7月22日(星期三) 下午3:17
&gt;&gt;&gt; 收件人: "user-zh"<[hidden email] <mailto:[hidden email]&gt;&gt;;
&gt;&gt;&gt; 主题: 回复: Re: flinksql1.11中主键声明的问题
&gt;&gt;&gt;
&gt;&gt;&gt; 你好:
&gt;&gt;&gt; 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。
&gt;&gt;&gt;
&gt;&gt;&gt; 我的输入流如下,每秒新增一条写入到kafka
&gt;&gt;&gt;&nbsp; topic = 'tp1'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; for i&nbsp; in&nbsp; range(1,10000) :
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg = {}
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg['id']= i
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg['time1']= stime
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; msg['type']=1
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; print(msg)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; send_msg(topic, msg)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; time.sleep(1)
&gt;&gt;&gt;
&gt;&gt;&gt; {'id': 1, 'time1': '20200722140624', 'type': 1}
&gt;&gt;&gt; {'id': 2, 'time1': '20200722140625', 'type': 1}
&gt;&gt;&gt; {'id': 3, 'time1': '20200722140626', 'type': 1}
&gt;&gt;&gt; {'id': 4, 'time1': '20200722140627', 'type': 1}
&gt;&gt;&gt; {'id': 5, 'time1': '20200722140628', 'type': 1}
&gt;&gt;&gt; {'id': 6, 'time1': '20200722140629', 'type': 1}
&gt;&gt;&gt; {'id': 7, 'time1': '20200722140631', 'type': 1}
&gt;&gt;&gt; {'id': 8, 'time1': '20200722140632', 'type': 1}
&gt;&gt;&gt;
&gt;&gt;&gt; 维表数据如下
&gt;&gt;&gt; id&nbsp;&nbsp;&nbsp; type
&gt;&gt;&gt; 2 err
&gt;&gt;&gt; 1 err
&gt;&gt;&gt;
&gt;&gt;&gt; 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
&gt;&gt;&gt; from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
&gt;&gt;&gt; from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
&gt;&gt;&gt; from pyflink.table.window import Tumble
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; def from_kafka_to_kafka_demo():
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; # use blink table planner
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; env = StreamExecutionEnvironment.get_execution_environment()
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; # register source and sink
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; register_rides_source(st_env)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; register_rides_sink(st_env)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; register_mysql_source(st_env)
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update("insert into flink_result select&nbsp; cast(t1.id <http://t1.id/&gt; as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/&gt; as varchar) ")
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.execute("2-from_kafka_to_kafka")
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; def register_rides_source(st_env):
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; source_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; create table source1(
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; id int,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; time1 varchar ,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; type string
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) with (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'kafka',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'topic' = 'tp1',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'scan.startup.mode' = 'latest-offset',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'format' = 'json'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update(source_ddl)
&gt;&gt;&gt;
&gt;&gt;&gt; def register_mysql_source(st_env):
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; source_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE dim_mysql (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; id int,&nbsp; --
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; type varchar --
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'jdbc',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3390/test' <&gt;,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'table-name' = 'flink_test',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'username' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'password' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'lookup.cache.max-rows' = '5000',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'lookup.cache.ttl' = '1s',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'lookup.max-retries' = '3'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """&nbsp;&nbsp;&nbsp;
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update(source_ddl)
&gt;&gt;&gt;
&gt;&gt;&gt; def register_rides_sink(st_env):
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; sink_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; CREATE TABLE flink_result (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; id int,&nbsp;
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; type varchar,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; rtime bigint,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; primary key(id)&nbsp; NOT ENFORCED
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; ) WITH (
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'connector' = 'jdbc',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3390/test' <&gt;,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'table-name' = 'flink_result',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'username' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'password' = '***',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'sink.buffer-flush.max-rows' = '5000',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'sink.buffer-flush.interval' = '2s',
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; 'sink.max-retries' = '3'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; st_env.sql_update(sink_ddl)
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; if __name__ == '__main__':
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp; from_kafka_to_kafka_demo()
&gt;&gt;&gt;
&gt;&gt;&gt; 初学者
&gt;&gt;&gt; PyFlink爱好者
&gt;&gt;&gt; 琴师
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt; 发件人: Leonard Xu <mailto:[hidden email]&gt;
&gt;&gt;&gt; 发送时间: 2020-07-22 15:05
&gt;&gt;&gt; 收件人: user-zh <mailto:[hidden email]&gt;
&gt;&gt;&gt; 主题: Re: flinksql1.11中主键声明的问题
&gt;&gt;&gt; Hi,
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt;&nbsp;&nbsp; 我试了下应该是会更新缓存的,你有能复现的例子吗?
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt; 祝好
&gt;&gt;&gt; &gt; 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email] <mailto:[hidden email]&gt;&gt; 写道:
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 你好:
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
&gt;&gt;&gt; &gt; 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 谢谢
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt;&gt;&gt; &gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <[hidden email] <mailto:[hidden email]&gt;&amp;gt;;
&gt;&gt;&gt; &gt; 发送时间:&amp;nbsp;2020年7月22日(星期三) 下午2:42
&gt;&gt;&gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]&gt;&amp;gt;;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 主题:&amp;nbsp;Re: flinksql1.11中主键声明的问题
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; Hello
&gt;&gt;&gt; &gt; 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
&gt;&gt;&gt; &gt; 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 祝好
&gt;&gt;&gt; &gt; Leonard Xu
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; &amp;gt; 在 2020年7月22日,14:13,[hidden email] <http://qq.com/&gt; 写道:
&gt;&gt;&gt; &gt; &amp;gt;
&gt;&gt;&gt; &gt; &amp;gt; 输出结果仍然没有被更新
&gt;&gt;
&gt;
Reply | Threaded
Open this post in threaded view
|

回复: flinksql1.11中主键声明的问题

琴师
HI:


听取了大佬的建议,已实现所需功能,非常感谢!


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "琴师"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月23日(星期四) 上午9:35
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
抄送:&nbsp;"user-zh"<[hidden email]&gt;;
主题:&nbsp;回复: flinksql1.11中主键声明的问题



好的感谢,果然还是理解上有些问题!




------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年7月23日(星期四) 上午9:30
收件人:&nbsp;"琴师"<[hidden email]&gt;;
抄送:&nbsp;"user-zh"<[hidden email]&gt;;
主题:&nbsp;Re: flinksql1.11中主键声明的问题



Hi,

看了下query,你没有使用维表join语法 FOR SYSTEM_TIME AS OF ,这样直接做的regular join,mysql表是bounded的,第一次读完就不会再读了,所以不会更新。
维表join才会按照你设置的时间去look up 最新的数据,维表是我们常说的temporal table(时态表)的一种,参考[1] 中的 temporal table join


祝好
Leonard Xu
[1]&nbsp; https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins&gt;

&gt; 在 2020年7月23日,09:06,琴师 <[hidden email]&gt; 写道:
&gt;
&gt;
&gt; HI:
&gt; 我是使用场景是这样的,首先开启计算脚本,开启流输入,此时的数据中维表数据如下,此时输出的计算结果如下
&gt;&gt;&gt; 维表&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;
&gt;&gt;&gt; id&nbsp; &nbsp; type &nbsp;
&gt;&gt;&gt; 2 err
&gt;&gt;&gt; 1 err
&gt;&gt;&gt; 结果
&gt;&gt;&gt;
&gt; 1 err 20200723085754
&gt; 2 err 20200723085755
&gt; 3 err 20200723085756
&gt; 4 err 20200723085757
&gt;
&gt; 然后我更新了数据库维表数据,新的数据库维表数据如下,此时输出的结果并没有随着维表的改变而改变,时间已经超过了缓存刷新时间2s:
&gt;&gt;&gt; 维表&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;
&gt;&gt;&gt; id&nbsp; &nbsp; type &nbsp;
&gt;&gt;&gt; 2 acc
&gt;&gt;&gt; 1 acc
&gt;&gt;&gt; 结果
&gt;
&gt; 94 err 20200723084455
&gt; 95 err 20200723084456
&gt; 96 err 20200723084457
&gt; 97 err 20200723084458
&gt; 98 err 20200723084459
&gt; 99 err 20200723084500
&gt; 100 err 20200723084501
&gt;
&gt; 然后我断开了流输入,间隔时间大于缓存刷新时间,然后重新输入流,但是我的新输出结果仅仅是更新了与流有关的时间字段,与维表相关的字段仍没有得到更新。
&gt;
&gt; 请问我的使用过程哪里不对么,请帮我指出来,万分感谢!
&gt;
&gt;
&gt; 谢谢!
&gt;
&gt;
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;
&gt;
&gt; ------------------ 原始邮件 ------------------
&gt; 发件人: "Leonard Xu" <[hidden email]&gt;;
&gt; 发送时间: 2020年7月22日(星期三) 晚上9:39
&gt; 收件人: "琴师"<[hidden email]&gt;;
&gt; 主题: Re: flinksql1.11中主键声明的问题
&gt;
&gt; <[hidden email]&gt;
&gt;
&gt; 代码应该没问题的,我源码和本地都复现了下,你检查下你使用方式
&gt;
&gt; 祝好
&gt;
&gt;&gt; 在 2020年7月22日,16:39,Leonard Xu <[hidden email] <mailto:[hidden email]&gt;&gt; 写道:
&gt;&gt;
&gt;&gt; HI,
&gt;&gt; 我看了维表这块的代码,应该没啥问题的,晚点我本地环境复现确认下哈。
&gt;&gt;
&gt;&gt;
&gt;&gt;&gt; 在 2020年7月22日,16:27,琴师 <[hidden email] <mailto:[hidden email]&gt;&gt; 写道:
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; HI
&gt;&gt;&gt; 我附录了我的代码,现在基本上测通了流程,卡在维表刷新这里,不能刷新的话很受打击。HELP!!
&gt;&gt;&gt; 谢谢
&gt;&gt;&gt;
&gt;&gt;&gt; ------------------ 原始邮件 ------------------
&gt;&gt;&gt; 发件人: "琴师" <[hidden email] <mailto:[hidden email]&gt;&gt;;
&gt;&gt;&gt; 发送时间: 2020年7月22日(星期三) 下午3:17
&gt;&gt;&gt; 收件人: "user-zh"<[hidden email] <mailto:[hidden email]&gt;&gt;;
&gt;&gt;&gt; 主题: 回复: Re: flinksql1.11中主键声明的问题
&gt;&gt;&gt;
&gt;&gt;&gt; 你好:
&gt;&gt;&gt; 下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。
&gt;&gt;&gt;
&gt;&gt;&gt; 我的输入流如下,每秒新增一条写入到kafka
&gt;&gt;&gt;&nbsp; topic = 'tp1'
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; for i&nbsp; in&nbsp; range(1,10000) :
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; msg = {}
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; msg['id']= i
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; msg['time1']= stime
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; msg['type']=1
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; print(msg)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; send_msg(topic, msg)
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; time.sleep(1)
&gt;&gt;&gt;
&gt;&gt;&gt; {'id': 1, 'time1': '20200722140624', 'type': 1}
&gt;&gt;&gt; {'id': 2, 'time1': '20200722140625', 'type': 1}
&gt;&gt;&gt; {'id': 3, 'time1': '20200722140626', 'type': 1}
&gt;&gt;&gt; {'id': 4, 'time1': '20200722140627', 'type': 1}
&gt;&gt;&gt; {'id': 5, 'time1': '20200722140628', 'type': 1}
&gt;&gt;&gt; {'id': 6, 'time1': '20200722140629', 'type': 1}
&gt;&gt;&gt; {'id': 7, 'time1': '20200722140631', 'type': 1}
&gt;&gt;&gt; {'id': 8, 'time1': '20200722140632', 'type': 1}
&gt;&gt;&gt;
&gt;&gt;&gt; 维表数据如下
&gt;&gt;&gt; id&nbsp; &nbsp; type
&gt;&gt;&gt; 2 err
&gt;&gt;&gt; 1 err
&gt;&gt;&gt;
&gt;&gt;&gt; 我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
&gt;&gt;&gt; from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
&gt;&gt;&gt; from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
&gt;&gt;&gt; from pyflink.table.window import Tumble
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; def from_kafka_to_kafka_demo():
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; # use blink table planner
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; env = StreamExecutionEnvironment.get_execution_environment()
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; # register source and sink
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; register_rides_source(st_env)
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; register_rides_sink(st_env)
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; register_mysql_source(st_env)
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; st_env.sql_update("insert into flink_result select&nbsp; cast(t1.id <http://t1.id/&gt; as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/&gt; as varchar) ")
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; st_env.execute("2-from_kafka_to_kafka")
&gt;&gt;&gt;&nbsp; &nbsp;
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; def register_rides_source(st_env):
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; source_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; create table source1(
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; &nbsp; id int,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; &nbsp; time1 varchar ,
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; &nbsp; type string
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; &nbsp; ) with (
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'connector' = 'kafka',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'topic' = 'tp1',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'scan.startup.mode' = 'latest-offset',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'format' = 'json'
&gt;&gt;&gt;&nbsp;&nbsp;&nbsp; &nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; st_env.sql_update(source_ddl)
&gt;&gt;&gt;
&gt;&gt;&gt; def register_mysql_source(st_env):
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; source_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; CREATE TABLE dim_mysql (
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; id int,&nbsp; --
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; type varchar --
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; ) WITH (
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'connector' = 'jdbc',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'url' = 'jdbc:mysql://localhost:3390/test' <&gt;,
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'table-name' = 'flink_test',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'username' = '***',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'password' = '***',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'lookup.cache.max-rows' = '5000',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'lookup.cache.ttl' = '1s',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'lookup.max-retries' = '3'
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; """&nbsp; &nbsp;
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; st_env.sql_update(source_ddl)
&gt;&gt;&gt;
&gt;&gt;&gt; def register_rides_sink(st_env):
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; sink_ddl = \
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; CREATE TABLE flink_result (
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; id int,&nbsp;
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; type varchar,
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; rtime bigint,
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; primary key(id)&nbsp; NOT ENFORCED
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; ) WITH (
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'connector' = 'jdbc',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'url' = 'jdbc:mysql://localhost:3390/test' <&gt;,
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'table-name' = 'flink_result',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'username' = '***',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'password' = '***',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'sink.buffer-flush.max-rows' = '5000',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'sink.buffer-flush.interval' = '2s',
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; 'sink.max-retries' = '3'
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; )
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; """
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; st_env.sql_update(sink_ddl)
&gt;&gt;&gt;
&gt;&gt;&gt;
&gt;&gt;&gt; if __name__ == '__main__':
&gt;&gt;&gt;&nbsp;&nbsp; &nbsp; from_kafka_to_kafka_demo()
&gt;&gt;&gt;
&gt;&gt;&gt; 初学者
&gt;&gt;&gt; PyFlink爱好者
&gt;&gt;&gt; 琴师
&gt;&gt;&gt;
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt; 发件人: Leonard Xu <mailto:[hidden email]&gt;
&gt;&gt;&gt; 发送时间: 2020-07-22 15:05
&gt;&gt;&gt; 收件人: user-zh <mailto:[hidden email]&gt;
&gt;&gt;&gt; 主题: Re: flinksql1.11中主键声明的问题
&gt;&gt;&gt; Hi,
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt; &nbsp; 我试了下应该是会更新缓存的,你有能复现的例子吗?
&gt;&gt;&gt;&nbsp;
&gt;&gt;&gt; 祝好
&gt;&gt;&gt; &gt; 在 2020年7月22日,14:50,奇怪的不朽琴师 <[hidden email] <mailto:[hidden email]&gt;&gt; 写道:
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 你好:
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
&gt;&gt;&gt; &gt; 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 谢谢
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt;&gt;&gt; &gt; 发件人:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; "user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp; <[hidden email] <mailto:[hidden email]&gt;&amp;gt;;
&gt;&gt;&gt; &gt; 发送时间:&amp;nbsp;2020年7月22日(星期三) 下午2:42
&gt;&gt;&gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email] <mailto:[hidden email]&gt;&amp;gt;;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 主题:&amp;nbsp;Re: flinksql1.11中主键声明的问题
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; Hello
&gt;&gt;&gt; &gt; 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
&gt;&gt;&gt; &gt; 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; 祝好
&gt;&gt;&gt; &gt; Leonard Xu
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt;
&gt;&gt;&gt; &gt; &amp;gt; 在 2020年7月22日,14:13,[hidden email] <http://qq.com/&gt; 写道:
&gt;&gt;&gt; &gt; &amp;gt;
&gt;&gt;&gt; &gt; &amp;gt; 输出结果仍然没有被更新
&gt;&gt;
&gt;