flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

鱼子酱
请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
是目前不支持还是我使用的方法不对呢?
版本:flink 1.11.1

关键的2个sql如下

    create table open_and_close_terminal_minute_1 (
      request_date varchar
      ,terminal_no varchar
      ,logon_time varchar
      ,logout_time varchar
      ,insert_time varchar
      ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
    ) with (
      'connector' = 'jd……
      'url' = 'jdbc:mys……se',
      'table-name' = 'c……,
      'driver' = 'com.m……
      'username' = 'ana……
      'password' = 'ana……
      'sink.buffer-flus……
    )

    upsert into open_and_close_terminal_minute_1
    select request_date ,terminal_no ,logon_time ,logout_time
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)  from
    (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date
        ,cast(terminalNo as varchar) as terminal_no
        ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time
        ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time
        from caslog INNER join itoa_b_terminal_shop  for system_time as of
caslog.proc_time
        on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
        where
        errCode=0 and attr=0
        group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo

    )



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

hailongwang
Hi,
这个版本是支持的。
其中插入语句是 "insert into " 而不是 “update into”?

在 2020-11-16 17:04:23,"鱼子酱" <[hidden email]> 写道:

>请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
>是目前不支持还是我使用的方法不对呢?
>版本:flink 1.11.1
>
>关键的2个sql如下
>
>    create table open_and_close_terminal_minute_1 (
>      request_date varchar
>      ,terminal_no varchar
>      ,logon_time varchar
>      ,logout_time varchar
>      ,insert_time varchar
>      ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
>    ) with (
>      'connector' = 'jd……
>      'url' = 'jdbc:mys……se',
>      'table-name' = 'c……,
>      'driver' = 'com.m……
>      'username' = 'ana……
>      'password' = 'ana……
>      'sink.buffer-flus……
>    )
>
>    upsert into open_and_close_terminal_minute_1
>    select request_date ,terminal_no ,logon_time ,logout_time
>,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)  from
>    (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date
>        ,cast(terminalNo as varchar) as terminal_no
>        ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time
>        ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time
>        from caslog INNER join itoa_b_terminal_shop  for system_time as of
>caslog.proc_time
>        on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
>        where
>        errCode=0 and attr=0
>        group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo
>
>    )
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

鱼子酱
我写的是upsert呀。。。
insert into 我也测试了,也不行。

是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

史 正超
你的sql里用的是 Tumble窗口,不是一个回撤流,不会有更新的,只有insert
________________________________
发件人: 鱼子酱 <[hidden email]>
发送时间: 2020年11月17日 1:12
收件人: [hidden email] <[hidden email]>
主题: Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

我写的是upsert呀。。。
insert into 我也测试了,也不行。

是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

zongsforce
In reply to this post by 鱼子酱
需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
DUPLICATE KEY UPDATE..."实现的。
ddl中可以不声明mysql主健。

鱼子酱 <[hidden email]>于2020年11月17日 周二09:13写道:

> 我写的是upsert呀。。。
> insert into 我也测试了,也不行。
>
> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

hailongwang
Hello,
   我使用 MySQLDialect 在本地确认了下,
   1. 在数据库需要建主键,因为建了主键 “INSERT INTO ... ON DUPLICATE KEY UPDATE”[1] 语句的 upsert 语义才会生效。
   2. 需要在 DDL 中定义 'PRIMARY KEY',因为需要根据 ‘PRIMARY KEY’ 确认是否使用 'upsert query' [2]


[1] https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/MySQLDialect.java#L76
[2] https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatBuilder.java#L98





在 2020-11-17 09:28:14,"Tio Planto" <[hidden email]> 写道:

>需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
>DUPLICATE KEY UPDATE..."实现的。
>ddl中可以不声明mysql主健。
>
>鱼子酱 <[hidden email]>于2020年11月17日 周二09:13写道:
>
>> 我写的是upsert呀。。。
>> insert into 我也测试了,也不行。
>>
>> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢?
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效

Leonard Xu
In reply to this post by 鱼子酱
Hi,
你确定是在Flink SQL 里使用 upsert 语法? 我理解是不支持的

另外你flink里声明connector DDL 中的主键应该和你在Mysql表的主键一致。

祝好
Leonard

> 在 2020年11月17日,09:12,鱼子酱 <[hidden email]> 写道:
>
> upsert