请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
是目前不支持还是我使用的方法不对呢? 版本:flink 1.11.1 关键的2个sql如下 create table open_and_close_terminal_minute_1 ( request_date varchar ,terminal_no varchar ,logon_time varchar ,logout_time varchar ,insert_time varchar ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED ) with ( 'connector' = 'jd…… 'url' = 'jdbc:mys……se', 'table-name' = 'c……, 'driver' = 'com.m…… 'username' = 'ana…… 'password' = 'ana…… 'sink.buffer-flus…… ) upsert into open_and_close_terminal_minute_1 select request_date ,terminal_no ,logon_time ,logout_time ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date ,cast(terminalNo as varchar) as terminal_no ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time from caslog INNER join itoa_b_terminal_shop for system_time as of caslog.proc_time on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey where errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo ) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
这个版本是支持的。 其中插入语句是 "insert into " 而不是 “update into”? 在 2020-11-16 17:04:23,"鱼子酱" <[hidden email]> 写道: >请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多, >是目前不支持还是我使用的方法不对呢? >版本:flink 1.11.1 > >关键的2个sql如下 > > create table open_and_close_terminal_minute_1 ( > request_date varchar > ,terminal_no varchar > ,logon_time varchar > ,logout_time varchar > ,insert_time varchar > ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED > ) with ( > 'connector' = 'jd…… > 'url' = 'jdbc:mys……se', > 'table-name' = 'c……, > 'driver' = 'com.m…… > 'username' = 'ana…… > 'password' = 'ana…… > 'sink.buffer-flus…… > ) > > upsert into open_and_close_terminal_minute_1 > select request_date ,terminal_no ,logon_time ,logout_time >,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from > ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' >MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date > ,cast(terminalNo as varchar) as terminal_no > ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time > ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time > from caslog INNER join itoa_b_terminal_shop for system_time as of >caslog.proc_time > on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey > where > errCode=0 and attr=0 > group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo > > ) > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/ |
我写的是upsert呀。。。
insert into 我也测试了,也不行。 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你的sql里用的是 Tumble窗口,不是一个回撤流,不会有更新的,只有insert
________________________________ 发件人: 鱼子酱 <[hidden email]> 发送时间: 2020年11月17日 1:12 收件人: [hidden email] <[hidden email]> 主题: Re: Re:flink 1.11.1 使用flinksql,jdbc ,后台数据库设置主键的情况下,upsert不生效 我写的是upsert呀。。。 insert into 我也测试了,也不行。 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by 鱼子酱
需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON
DUPLICATE KEY UPDATE..."实现的。 ddl中可以不声明mysql主健。 鱼子酱 <[hidden email]>于2020年11月17日 周二09:13写道: > 我写的是upsert呀。。。 > insert into 我也测试了,也不行。 > > 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hello,
我使用 MySQLDialect 在本地确认了下, 1. 在数据库需要建主键,因为建了主键 “INSERT INTO ... ON DUPLICATE KEY UPDATE”[1] 语句的 upsert 语义才会生效。 2. 需要在 DDL 中定义 'PRIMARY KEY',因为需要根据 ‘PRIMARY KEY’ 确认是否使用 'upsert query' [2] [1] https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/MySQLDialect.java#L76 [2] https://github.com/apache/flink/blob/7eb514a59f6fd117c3535ec4bebc40a375f30b63/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicOutputFormatBuilder.java#L98 在 2020-11-17 09:28:14,"Tio Planto" <[hidden email]> 写道: >需要建立mysql表request_date,terminal_no的联合主键,mysql的upsert是基于"INSERT INTO ... ON >DUPLICATE KEY UPDATE..."实现的。 >ddl中可以不声明mysql主健。 > >鱼子酱 <[hidden email]>于2020年11月17日 周二09:13写道: > >> 我写的是upsert呀。。。 >> insert into 我也测试了,也不行。 >> >> 是MySQL数据库本身里面的表需要建立一个主键吗?还是只有flink里面建表的时候写就行呢? >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by 鱼子酱
Hi,
你确定是在Flink SQL 里使用 upsert 语法? 我理解是不支持的 另外你flink里声明connector DDL 中的主键应该和你在Mysql表的主键一致。 祝好 Leonard > 在 2020年11月17日,09:12,鱼子酱 <[hidden email]> 写道: > > upsert |
Free forum by Nabble | Edit this page |