各位大佬好,请教一个问题flink从Kafka读数,写入mysql,对mysql结果根据主键进行数据更新,看官网是支持“on DUPLICATE”的,但是在执行中报错是这个导致的语法问题。完整代码如下,是在linux下,直接python *.py执行的。请问下这个是不支持吗,还是怎么写呢!
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE kafka_source_tab ( trck_id VARCHAR, score INT ) WITH ( 'connector' = 'kafka', 'topic' = 'alarm_test_g', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.2.2.73:2181', 'properties.bootstrap.servers' = '10.2.2.73:9092', 'format' = 'json' ) """ sink=""" CREATE TABLE g_source_tab ( trck_id VARCHAR, score INT, PRIMARY KEY (trck_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(source) t_env.execute_sql(sink) table_result1=t_env.execute_sql('''Insert into g_source_tab (`trck_id`,`score`) VALUES (select trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''') table_result1.get_job_client().get_job_execution_result().result() |
Hello,
> 在 2020年7月14日,17:56,小学生 <[hidden email]> 写道: > > ON DUPLICATE KEY UPDATE 这个语法 Flink 还不支持的,官网上说的 Flink 的 JDBC connector 实现 幂等写入[1]的方式,就是有相同pk的数据在写入数据库时,翻译成数据库 upsert SQL的方式,这里说的语法是数据库的 SQL 语法 。 Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes> |
嗯嗯,谢谢大佬的解答,还有一个问题就是sql自己的语法是支持增量式的比如score=score+1,现在flink1.11特性反应成数据库 upsert SQL的方式,其实是全量的更新同Pk的记录吧,并达不到增量的情况吧。
|
是的,目前是更新相同pk的记录,如果需要统计相同pk的记录, Flink表不声明PK就是append 写入,就会有写入多条记录,(DB里的表也不声明pk,不然insert会报错)。
祝好 > 在 2020年7月14日,18:21,小学生 <[hidden email]> 写道: > > 嗯嗯,谢谢大佬的解答,还有一个问题就是sql自己的语法是支持增量式的比如score=score+1,现在flink1.11特性反应成数据库 upsert SQL的方式,其实是全量的更新同Pk的记录吧,并达不到增量的情况吧。 |
嗯嗯,谢谢大佬的理解,还有一个问题,就是除了update,这个我看新性能也支持delete的,但是没找到相关的部分,delete这个是否类似:delete table1 where score=1;烦请大佬帮忙解答下,不胜感激。
|
Hi,
基本类似的,具体拼delete sql会根据 pk 来, 可以看下delete executor的代码[1] 祝好, Leonard Xu 【1】https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/BufferReduceStatementExecutor.java#L89 <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/BufferReduceStatementExecutor.java#L89> > 在 2020年7月15日,11:05,小学生 <[hidden email]> 写道: > > 嗯嗯,谢谢大佬的理解,还有一个问题,就是除了update,这个我看新性能也支持delete的,但是没找到相关的部分,delete这个是否类似:delete table1 where score=1;烦请大佬帮忙解答下,不胜感激。 |
嗯嗯,麻烦问下Python版本的相关资料有吗
|
In reply to this post by Leonard Xu
各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!
|
Hi,
我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。 祝好 Leonard Xu > 在 2020年7月16日,11:04,小学生 <[hidden email]> 写道: > > 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢! |
Hi,
Leonard 说的是对的,除了udf的部分,pyflink的所有的api都是调用的java端的功能,如果java端没有,pyflink就不支持 Best, Xingbo Leonard Xu <[hidden email]> 于2020年7月16日周四 上午11:09写道: > Hi, > > 我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。 > > 祝好 > Leonard Xu > > > 在 2020年7月16日,11:04,小学生 <[hidden email]> 写道: > > > > 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢! > > |
谢谢两位大佬的解答,但是理解有点抽象,不太清楚,有没有pyflink下一个简单例子呢。
|
Hi,
你需要什么样的例子,如果你用的table/sql的话,在官方文档对应的地方都有java/scala/python的对应写法。如果是python udf相关的东西,你可以参考[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/ Best, Xingbo 小学生 <[hidden email]> 于2020年7月16日周四 上午11:14写道: > 谢谢两位大佬的解答,但是理解有点抽象,不太清楚,有没有pyflink下一个简单例子呢。 |
您好,比如说我这个例子,我使用delete就出错了,我想知道是啥原因呢,
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode from pyflink.table import StreamTableEnvironment, EnvironmentSettings source=""" CREATE TABLE source_tab ( trck_id VARCHAR, score INT, PRIMARY KEY (trck_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ sink=""" CREATE TABLE sink_tab ( trck_id VARCHAR, score INT, PRIMARY KEY (trck_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 'table-name' = 'g_copy', 'username' = 'root', 'password' = '123456t', 'sink.buffer-flush.interval' = '1s' ) """ env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() t_env = StreamTableEnvironment.create(env, environment_settings=env_settings) t_env.execute_sql(source) t_env.execute_sql(sink) t_env.execute_sql('''delete from source_tab where trck_id='aew' ''') table_result1=t_env.execute_sql('''insert into sink_tab select * from source_tab ''') table_result1.get_job_client().get_job_execution_result().result() |
> 在 2020年7月16日,11:44,小学生 <[hidden email]> 写道: > > t_env.execute_sql('''delete from source_tab where trck_id='aew' ''') 你这张表定义的是 Flink 中的表,这张表对应的是你外部系统(MySQL数据库)中的表,Flink 不支持 表上 的DELETE [1], Flink 是一个计算引擎, 主要场景是读取、写入外部系统,修改外部系统的数据目前只发生在写入(insert)的时候,并且主要是为了保证数据一致性语义,需要往下游系统发Delete消息, 这个delete的消息的处理都是各个connector自己处理的,用户不用显示地调用delete, 你可以参考[2]了解更多。 祝好 [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/ <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/> [2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html> |
Free forum by Nabble | Edit this page |