关于FLINK PYTHON UDF

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

关于FLINK PYTHON UDF

秦寒

你好

       我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink文件里面没有任何数据,

如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下

 

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 

 

 

 

测试结果

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 

 

 

st_env.from_path("source")\
    .select(
"b.cast(LONG) as b1, c.cast(LONG) as c1") \
   
.select("add(b1,c1)") \ 无任何输出
    .insert_into(
"result_tab")

无任何输出

 

 

st_env.from_path("source")\
    .select(
"b.cast(LONG) as b1, c.cast(LONG) as c1") \
   
.select("c1")\   #正常输出

    .insert_into(
"result_tab")

正确输出

Reply | Threaded
Open this post in threaded view
|

Re: 关于FLINK PYTHON UDF

Xingbo Huang
Hi,
我刚刚在本地完全模拟了你的数据和核心的代码,是可以在sink里拿到结果的。
我把我的测试代码放到附件里面了,
你可以参考一下,如果还是不行的话,可以提供下你的代码再帮你看一下

Best,
Xingbo


秦寒 <[hidden email]> 于2020年4月15日周三 下午3:16写道:

你好

       我在使用kafka produce数据后,在python中使用UDF做一个add function,但是最后的sink文件里面没有任何数据,

如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG很久也不清楚是什么原因是否能帮忙分下

 

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 

 

 

 

测试结果

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 

 

 

st_env.from_path("source")\
    .select(
"b.cast(LONG) as b1, c.cast(LONG) as c1") \
   
.select("add(b1,c1)") \ 无任何输出
    .insert_into(
"result_tab")

无任何输出

 

 

st_env.from_path("source")\
    .select(
"b.cast(LONG) as b1, c.cast(LONG) as c1") \
   
.select("c1")\   #正常输出

    .insert_into(
"result_tab")

正确输出


kafka_test.py (3K) Download Attachment