pyflink udf中发送rest api会导致udf被调用两次

classic Classic list List threaded Threaded
10 messages Options
lgs
Reply | Threaded
Open this post in threaded view
|

pyflink udf中发送rest api会导致udf被调用两次

lgs
Hi,

我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。
log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了?

2020-07-09 17:44:17,501 INFO  flink_test_stream_time_kafka.py:22                          
[] - start to ad
2020-07-09 17:44:17,530 INFO  flink_test_stream_time_kafka.py:63                          
[] - start to send rest api.
2020-07-09 17:44:17,532 INFO  flink_test_stream_time_kafka.py:69                          
[] - Receive: {"Received": "successful"}
2020-07-09 17:44:17,579 INFO
/home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:564
[] - Creating insecure state channel for localhost:57954.
2020-07-09 17:44:17,580 INFO
/home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:571
[] - State channel established.
2020-07-09 17:44:17,584 INFO
/home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:526
[] - Creating client data channel for localhost:60902
2020-07-09 17:44:17,591 INFO
org.apache.beam.runners.fnexecution.data.GrpcDataService     [] - Beam Fn
Data client connected.
2020-07-09 17:44:17,761 INFO  flink_test_stream_time_kafka.py:22                          
[] - start to ad
2020-07-09 17:44:17,810 INFO  flink_test_stream_time_kafka.py:63                          
[] - start to send rest api.
2020-07-09 17:44:17,812 INFO  flink_test_stream_time_kafka.py:69                          
[] - Receive: {"Received": "successful"}



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

Dian Fu
Table API的作业在执行之前会经过一系列的rule优化,最终的执行计划,存在一个UDF调用多次的可能,你可以把执行计划打印出来看看(TableEnvironment#explain)。

具体原因,需要看一下作业逻辑。可以发一下你的作业吗?可重现代码即可。

> 在 2020年7月9日,下午5:50,lgs <[hidden email]> 写道:
>
> Hi,
>
> 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。
> log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了?
>
> 2020-07-09 17:44:17,501 INFO  flink_test_stream_time_kafka.py:22                          
> [] - start to ad
> 2020-07-09 17:44:17,530 INFO  flink_test_stream_time_kafka.py:63                          
> [] - start to send rest api.
> 2020-07-09 17:44:17,532 INFO  flink_test_stream_time_kafka.py:69                          
> [] - Receive: {"Received": "successful"}
> 2020-07-09 17:44:17,579 INFO
> /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:564
> [] - Creating insecure state channel for localhost:57954.
> 2020-07-09 17:44:17,580 INFO
> /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:571
> [] - State channel established.
> 2020-07-09 17:44:17,584 INFO
> /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:526
> [] - Creating client data channel for localhost:60902
> 2020-07-09 17:44:17,591 INFO
> org.apache.beam.runners.fnexecution.data.GrpcDataService     [] - Beam Fn
> Data client connected.
> 2020-07-09 17:44:17,761 INFO  flink_test_stream_time_kafka.py:22                          
> [] - start to ad
> 2020-07-09 17:44:17,810 INFO  flink_test_stream_time_kafka.py:63                          
> [] - start to send rest api.
> 2020-07-09 17:44:17,812 INFO  flink_test_stream_time_kafka.py:69                          
> [] - Receive: {"Received": "successful"}
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

lgs
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

lgs
This post was updated on .
谢谢提示。
我打印出来explain,发现确实调用了两次udf,是那个eventtime.isNotNull导致的,与发送rest api无关:



    st_env.scan("source") \
         .where("action === 'Insert'") \
       
.window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
         .group_by("hourlywindow") \
         .select("action.max as action1, conv_string(eventTime.collect) as
etlist, hourlywindow.start as time1") \
         .select("action1 as action, hbf_thres(etlist) as eventtime, time1
as actiontime") \
         .filter("eventtime.isNotNull") \
         .insert_into("alarm_ad")


LegacySink(name=[`default_catalog`.`default_database`.`alarm_ad`],
fields=[action, eventtime, actiontime])
+- Calc(select=[EXPR$0 AS action, f0 AS eventtime, EXPR$2 AS actiontime])
   +- PythonCalc(select=[EXPR$0, EXPR$2, simple_udf(f0) AS f0])
      +- Calc(select=[EXPR$0, EXPR$2, UDFLength(EXPR$1) AS f0], where=[IS
NOT NULL(f0)])
         +- PythonCalc(select=[EXPR$0, EXPR$1, EXPR$2, simple_udf(f0) AS
f0])
            +- Calc(select=[EXPR$0, EXPR$1, EXPR$2, UDFLength(EXPR$1) AS
f0])
               +-
GroupWindowAggregate(window=[TumblingGroupWindow('hourlywindow, actionTime,
3600000)], properties=[EXPR$2], select=[MAX(action) AS EXPR$0,
COLLECT(eventTime) AS EXPR$1, start('hourlywindow) AS EXPR$2])
                  +- Exchange(distribution=[single])
                     +- Calc(select=[recordId, action, originalState,
newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime,
duration, acked, pmdId, pmdTime, actionTime], where=[=(action,
_UTF-16LE'Insert')])
                        +- Reused(reference_id=[1])

我这里是想过滤python udf的返回,如果返回是空,我就不要sink。是我的sql写错了吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

Dian Fu
好的,针对你这个case,这个是个已知问题:https://issues.apache.org/jira/browse/FLINK-15973 <https://issues.apache.org/jira/browse/FLINK-15973>,暂时还没有修复。


你可以这样改写一下,应该可以绕过去这个问题:

 table = st_env.scan("source") \
        .where("action === 'Insert'") \
        .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
        .group_by("hourlywindow") \
        .select("action.max as action1, conv_string(eventTime.collect) as etlist, hourlywindow.start as time1") \
        .select("action1 as action, hbf_thres(etlist) as eventtime, time1as actiontime")

st_env.create_temporary_view("tmp", table)
st_env.scan("tmp").filter("eventtime.isNotNull").insert_into("alarm_ad")


> 在 2020年7月10日,上午10:08,lgs <[hidden email]> 写道:
>
> 谢谢提示。
> 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull:
>
>
>
>    st_env.scan("source") \
>         .where("action === 'Insert'") \
>
> .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
>         .group_by("hourlywindow") \
>         .select("action.max as action1, conv_string(eventTime.collect) as
> etlist, hourlywindow.start as time1") \
>         .select("action1 as action, hbf_thres(etlist) as eventtime, time1
> as actiontime") \
> *         .filter("eventtime.isNotNull") \
> *         .insert_into("alarm_ad")
>
>
> LegacySink(name=[`default_catalog`.`default_database`.`alarm_ad`],
> fields=[action, eventtime, actiontime])
> +- Calc(select=[EXPR$0 AS action, f0 AS eventtime, EXPR$2 AS actiontime])
> *   +- PythonCalc(select=[EXPR$0, EXPR$2, simple_udf(f0) AS f0])
>      +- Calc(select=[EXPR$0, EXPR$2, UDFLength(EXPR$1) AS f0], where=[IS
> NOT NULL(f0)])
> *         +- PythonCalc(select=[EXPR$0, EXPR$1, EXPR$2, simple_udf(f0) AS
> f0])
>            +- Calc(select=[EXPR$0, EXPR$1, EXPR$2, UDFLength(EXPR$1) AS
> f0])
>               +-
> GroupWindowAggregate(window=[TumblingGroupWindow('hourlywindow, actionTime,
> 3600000)], properties=[EXPR$2], select=[MAX(action) AS EXPR$0,
> COLLECT(eventTime) AS EXPR$1, start('hourlywindow) AS EXPR$2])
>                  +- Exchange(distribution=[single])
>                     +- Calc(select=[recordId, action, originalState,
> newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime,
> duration, acked, pmdId, pmdTime, actionTime], where=[=(action,
> _UTF-16LE'Insert')])
>                        +- Reused(reference_id=[1])
>
> 我这里是想过滤python udf的返回,如果返回是空,我就不要sink。是我的sql写错了吗?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

lgs
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

lgs
谢谢建议。
我照着代码试了一下,发现还是一样的结果。
udf还是会被调用两次



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

Dian Fu
这样再试试?

tmp_table = st_env.scan("source") \
        .where("action === 'Insert'") \
        .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
        .group_by("hourlywindow") \
        .select("action.max as action1, conv_string(eventTime.collect) as etlist, hourlywindow.start as time1") \
        .select("action1 as action, hbf_thres(etlist) as eventtime, time1 as actiontime")

ds = st_env._j_tenv.toAppendStream(tmp_table._j_table, tmp_table._j_table.getSchema().toRowType())
table = Table(st_env._j_tenv.fromDataStream(ds, "action, eventtime, actiontime"))
table.filter("eventtime.isNotNull").insert_into("alarm_ad")

> 在 2020年7月10日,下午2:44,lgs <[hidden email]> 写道:
>
> 谢谢建议。
> 我照着代码试了一下,发现还是一样的结果。
> udf还是会被调用两次
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

lgs
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

lgs
这次可以了。谢谢

另外还有一个问题请教一下:
我实际上是有另一个sink,source是同一个。
第一个sink是直接保存kafka数据到DB。
第二个sink是读取kafka,tumble window,然后在udf里面去读取DB。

要怎么样保证第一个sink写完了DB,然后第二个sink的udf能读取到最新的数据?

代码的顺序就能保证吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

Dian Fu
我不太明白你说的“代码顺序”指的什么?

据我所知,应该没有什么太好的办法。从执行图上来看,这2个之间没有依赖关系,所以也就无法保证先后顺序。

如果必须这样干的话,你得从业务的角度想一下,改造一下业务逻辑。


> 在 2020年7月10日,下午4:10,lgs <[hidden email]> 写道:
>
> 这次可以了。谢谢
>
> 另外还有一个问题请教一下:
> 我实际上是有另一个sink,source是同一个。
> 第一个sink是直接保存kafka数据到DB。
> 第二个sink是读取kafka,tumble window,然后在udf里面去读取DB。
>
> 要怎么样保证第一个sink写完了DB,然后第二个sink的udf能读取到最新的数据?
>
> 代码的顺序就能保证吗?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

lgs
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

lgs
代码顺序是指我先写第一个sink的代码,再写第二个sink的代码。

我设置了'connector.write.flush.max-rows' = '1'
第一个sink没有窗口,所以直接写了

第二个sink有窗口,所以是会在一个小时的最后触发。

可能这样就能保证第二个sink能够读到最新的数据。




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink udf中发送rest api会导致udf被调用两次

Dian Fu
大部分情况下,可以work,但是有一些边界的情况,可能会有问题。比如第一个sink的作业,由于某种原因,处理得比较慢,延迟比较大?

也就是说,通常情况下可能没有问题,但是由于这2个作业之间没有任何依赖关系,这个先后顺序是得不到保证的。

我觉得你可以测一下,如果能接受那些极端情况,就可以。

> 在 2020年7月10日,下午5:08,lgs <[hidden email]> 写道:
>
> 代码顺序是指我先写第一个sink的代码,再写第二个sink的代码。
>
> 我设置了'connector.write.flush.max-rows' = '1'
> 第一个sink没有窗口,所以直接写了
>
> 第二个sink有窗口,所以是会在一个小时的最后触发。
>
> 可能这样就能保证第二个sink能够读到最新的数据。
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/