flink 1.11 upsert结果出错

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 upsert结果出错

咿咿呀呀
各位大佬好,请教一个问题flink从Kafka读数,写入mysql,对mysql结果根据主键进行数据更新,看官网是支持“on DUPLICATE”的,但是在执行中报错是这个导致的语法问题。完整代码如下,是在linux下,直接python *.py执行的。请问下这个是不支持吗,还是怎么写呢!


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (     
 trck_id VARCHAR,
 score  INT


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
 trck_id VARCHAR,
 score  INT,

PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


table_result1=t_env.execute_sql('''Insert into g_source_tab (`trck_id`,`score`) VALUES (select
                       trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''')

table_result1.get_job_client().get_job_execution_result().result()
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Leonard Xu
Hello,

> 在 2020年7月14日,17:56,小学生 <[hidden email]> 写道:
>
> ON DUPLICATE KEY UPDATE

这个语法 Flink 还不支持的,官网上说的 Flink 的 JDBC connector 实现 幂等写入[1]的方式,就是有相同pk的数据在写入数据库时,翻译成数据库 upsert SQL的方式,这里说的语法是数据库的 SQL 语法 。


Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes>

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

咿咿呀呀
嗯嗯,谢谢大佬的解答,还有一个问题就是sql自己的语法是支持增量式的比如score=score+1,现在flink1.11特性反应成数据库 upsert SQL的方式,其实是全量的更新同Pk的记录吧,并达不到增量的情况吧。
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Leonard Xu
是的,目前是更新相同pk的记录,如果需要统计相同pk的记录, Flink表不声明PK就是append 写入,就会有写入多条记录,(DB里的表也不声明pk,不然insert会报错)。

祝好


> 在 2020年7月14日,18:21,小学生 <[hidden email]> 写道:
>
> 嗯嗯,谢谢大佬的解答,还有一个问题就是sql自己的语法是支持增量式的比如score=score+1,现在flink1.11特性反应成数据库 upsert SQL的方式,其实是全量的更新同Pk的记录吧,并达不到增量的情况吧。

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

咿咿呀呀
嗯嗯,谢谢大佬的理解,还有一个问题,就是除了update,这个我看新性能也支持delete的,但是没找到相关的部分,delete这个是否类似:delete table1 where score=1;烦请大佬帮忙解答下,不胜感激。
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Leonard Xu
Hi,

基本类似的,具体拼delete sql会根据 pk 来, 可以看下delete executor的代码[1]

祝好,
Leonard Xu
【1】https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/BufferReduceStatementExecutor.java#L89 <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/BufferReduceStatementExecutor.java#L89>

> 在 2020年7月15日,11:05,小学生 <[hidden email]> 写道:
>
> 嗯嗯,谢谢大佬的理解,还有一个问题,就是除了update,这个我看新性能也支持delete的,但是没找到相关的部分,delete这个是否类似:delete table1 where score=1;烦请大佬帮忙解答下,不胜感激。

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

咿咿呀呀
嗯嗯,麻烦问下Python版本的相关资料有吗
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

咿咿呀呀
In reply to this post by Leonard Xu
各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Leonard Xu
Hi,

我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。

祝好
Leonard Xu

> 在 2020年7月16日,11:04,小学生 <[hidden email]> 写道:
>
> 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Xingbo Huang
Hi,
Leonard 说的是对的,除了udf的部分,pyflink的所有的api都是调用的java端的功能,如果java端没有,pyflink就不支持

Best,
Xingbo

Leonard Xu <[hidden email]> 于2020年7月16日周四 上午11:09写道:

> Hi,
>
> 我理解 pyflink 底层也会走到你看到的java代码, 我对 pyflink 不是很熟, cc xingbo 补充下。
>
> 祝好
> Leonard Xu
>
> > 在 2020年7月16日,11:04,小学生 <[hidden email]> 写道:
> >
> > 各位大佬好,由于不是特别懂java,所以麻烦问下pyflink里面有相关mysql的delete吗,官网没看到,谢谢!
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

咿咿呀呀
谢谢两位大佬的解答,但是理解有点抽象,不太清楚,有没有pyflink下一个简单例子呢。
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Xingbo Huang
Hi,
你需要什么样的例子,如果你用的table/sql的话,在官方文档对应的地方都有java/scala/python的对应写法。如果是python
udf相关的东西,你可以参考[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/

Best,
Xingbo

小学生 <[hidden email]> 于2020年7月16日周四 上午11:14写道:

> 谢谢两位大佬的解答,但是理解有点抽象,不太清楚,有没有pyflink下一个简单例子呢。
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

咿咿呀呀
您好,比如说我这个例子,我使用delete就出错了,我想知道是啥原因呢,
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE source_tab (
&nbsp;trck_id VARCHAR,
&nbsp;score&nbsp; INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp;
&nbsp;'table-name' = 'g',&nbsp; &nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = '123456t',
&nbsp;'sink.buffer-flush.interval' = '1s'
)
"""
sink="""
CREATE TABLE sink_tab (
&nbsp;trck_id VARCHAR,
&nbsp;score&nbsp; INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp;
&nbsp;'table-name' = 'g_copy',&nbsp; &nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = '123456t',
&nbsp;'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)


t_env.execute_sql(source)
t_env.execute_sql(sink)


t_env.execute_sql('''delete from source_tab where trck_id='aew'&nbsp; ''')
table_result1=t_env.execute_sql('''insert into&nbsp; sink_tab select * from source_tab ''')
table_result1.get_job_client().get_job_execution_result().result()
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 upsert结果出错

Leonard Xu


> 在 2020年7月16日,11:44,小学生 <[hidden email]> 写道:
>
> t_env.execute_sql('''delete from source_tab where trck_id='aew'&nbsp; ''')

你这张表定义的是 Flink 中的表,这张表对应的是你外部系统(MySQL数据库)中的表,Flink 不支持 表上 的DELETE [1], Flink 是一个计算引擎,
主要场景是读取、写入外部系统,修改外部系统的数据目前只发生在写入(insert)的时候,并且主要是为了保证数据一致性语义,需要往下游系统发Delete消息,
这个delete的消息的处理都是各个connector自己处理的,用户不用显示地调用delete, 你可以参考[2]了解更多。

祝好
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/ <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/>
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html>