flink 1.11运算结果存mysql出错

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11运算结果存mysql出错

咿咿呀呀
各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,     
 trck_id VARCHAR


) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
        .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

Leonard Xu
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

> 在 2020年7月13日,20:06,小学生 <[hidden email]> 写道:
>
> 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下
>
>
> from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> source="""
> CREATE TABLE kafka_source_tab (
> &nbsp;id VARCHAR,&nbsp; &nbsp;
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
>
>
> ) WITH (
> &nbsp;'connector' = 'kafka',
> &nbsp;'topic' = 'alarm_test_g',&nbsp; &nbsp;
> &nbsp;'scan.startup.mode' = 'earliest-offset',
> &nbsp;'properties.bootstrap.servers' = '10.2.2.73:2181',
> &nbsp;'properties.bootstrap.servers' = '10.2.2.73:9092',
> &nbsp;'format' = 'json'&nbsp;
> )
> """
>
> sink="""
> CREATE TABLE g_source_tab (
> &nbsp;id VARCHAR,&nbsp; &nbsp;
> &nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
> &nbsp;trck_id VARCHAR
>
>
> ) WITH (
> &nbsp;'connector' = 'jdbc',
> &nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp;
> &nbsp;'table-name' = 'g',&nbsp; &nbsp;
> &nbsp;'username' = 'root',
> &nbsp;'password' = '123456t',
> &nbsp;'sink.buffer-flush.interval' = '1s'
> )
> """
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
>
>
>
> t_env.execute_sql(source)
> t_env.execute_sql(sink)
>
>
> source = t_env.from_path("kafka_source_tab")\
> &nbsp; &nbsp; &nbsp; &nbsp; .select("id,alarm_id,trck_id")
> source.execute_insert("g_source_tab")

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

咿咿呀呀
检查过了,有topic数据的,这个在flink1.10版本代码下使用insert_into方式结果就是正常的,所以很奇怪。
Reply | Threaded
Open this post in threaded view
|

回复: flink 1.11运算结果存mysql出错

13122260573@163.com
In reply to this post by Leonard Xu
有新数据进来吗,看起来和这个jira很像
https://issues.apache.org/jira/browse/FLINK-15262




在2020年07月13日 20:38,Leonard Xu<[hidden email]> 写道:
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

在 2020年7月13日,20:06,小学生 <[hidden email]> 写道:

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp;
&nbsp;trck_id VARCHAR


) WITH (
&nbsp;'connector' = 'kafka',
&nbsp;'topic' = 'alarm_test_g',&nbsp; &nbsp;
&nbsp;'scan.startup.mode' = 'earliest-offset',
&nbsp;'properties.bootstrap.servers' = '10.2.2.73:2181',
&nbsp;'properties.bootstrap.servers' = '10.2.2.73:9092',
&nbsp;'format' = 'json'&nbsp;
)
"""

sink="""
CREATE TABLE g_source_tab (
&nbsp;id VARCHAR,&nbsp; &nbsp;
&nbsp;alarm_id VARCHAR,&nbsp; &nbsp; &nbsp;
&nbsp;trck_id VARCHAR


) WITH (
&nbsp;'connector' = 'jdbc',
&nbsp;'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',&nbsp;
&nbsp;'table-name' = 'g',&nbsp; &nbsp;
&nbsp;'username' = 'root',
&nbsp;'password' = '123456t',
&nbsp;'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
&nbsp; &nbsp; &nbsp; &nbsp; .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

咿咿呀呀
不像吧,这个是1.10版的,我执行这个程序很快就结束了,不会挂着。
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

Leonard Xu
Hi,知道了
source.execute_insert("g_source_tab”) 返回的结果是一个TableResult对象,如果不显示地等待任务的执行,这个任务会直接返回,你试下这个

result.execute_insert("g_source_tab") \
    .get_job_client() \
    .get_job_execution_result() \
    .result()

这是Flip-84引入的一个改动,为了更好地处理table程序的返回值。

祝好,
Leonard Xu

> 在 2020年7月13日,20:57,小学生 <[hidden email]> 写道:
>
> 不像吧,这个是1.10版的,我执行这个程序很快就结束了,不会挂着。

Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

咿咿呀呀
嗯嗯,尝试了,这下没问题了,想问下这个TableResult对象,设计的目的是啥呢,不是特别懂呢,谢谢!
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

TableEnvironment.executeSql() 和 StatementSet.execute()
提交的作业都是异步的,如果是在本地测试的话,不会等有最终结果才会推出。针对这个问题,1.12里准备引入 await 方法
[3],代码还在review中。

TableResult是用来描述一个statement执行的结果。对于SELECT和INSERT,TableResult中还包含了JobClient
[4]
用来操作对应的job,例如获取job状态,cancel作业,等待作业结束等。TableResult还可以collect方法拿到statement执行的schema和结果数据,例如
select/show的结果。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
[3] https://issues.apache.org/jira/browse/FLINK-18337
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey


小学生 <[hidden email]> 于2020年7月13日周一 下午9:12写道:

> 嗯嗯,尝试了,这下没问题了,想问下这个TableResult对象,设计的目的是啥呢,不是特别懂呢,谢谢!
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11运算结果存mysql出错

咿咿呀呀
懂了,谢谢