flink 1.11.1 使用flinksql,jdbc ,设置主键的情况下,upsert不生效

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11.1 使用flinksql,jdbc ,设置主键的情况下,upsert不生效

鱼子酱
请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
是目前不支持还是我使用的方法不对呢?
版本:flink 1.11.1

关键的2个sql如下

    create table open_and_close_terminal_minute_1 (
      request_date varchar
      ,terminal_no varchar
      ,logon_time varchar
      ,logout_time varchar
      ,insert_time varchar
      ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED
    ) with (
      'connector' = 'jd……
      'url' = 'jdbc:mys……se',
      'table-name' = 'c……,
      'driver' = 'com.m……
      'username' = 'ana……
      'password' = 'ana……
      'sink.buffer-flus……
    )

    upsert into open_and_close_terminal_minute_1
    select request_date ,terminal_no ,logon_time ,logout_time
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19)  from
    (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date
        ,cast(terminalNo as varchar) as terminal_no
        ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time
        ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time
        from caslog INNER join itoa_b_terminal_shop  for system_time as of
caslog.proc_time
        on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey
        where
        errCode=0 and attr=0
        group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo

    )



--
Sent from: http://apache-flink.147419.n8.nabble.com/