pyflink-udf 问题反馈

classic Classic list List threaded Threaded
55 messages Options
123
Reply | Threaded
Open this post in threaded view
|

pyflink sql select带特殊符号的字段名

whh_960101
您好,我使用pyflink时的代码如下,有如下问题:
source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
table = source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result()


kafka源端的json队列@timestamp字段名是固定死的,而我需要取这个字段进行处理,@timestamp涉及到@特殊符号和timestamp关键字,按照上面的代码会报解析sql错误,这个地方我该怎么修改,去网上查了加``或者''或者""都不行


希望您们能够给予解答!感谢!










 
Reply | Threaded
Open this post in threaded view
|

pyflink sql中select,where都带udf,其中一个udf失

whh_960101
In reply to this post by Xingbo Huang
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()


这样打印出来的结果很好的筛选了数据


但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = DataTypes.STRING())
table = source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
这个where筛选就失效了,最后打印出全部数据


如果改成where在前也不行,换成filter也不行
table = source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()


这个问题怎么解决
希望您们能够给予解答!感谢!










 





 
Reply | Threaded
Open this post in threaded view
|

pyflink sql中select,where都带udf,其中一个udf失

whh_960101
In reply to this post by Xingbo Huang
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()


这样打印出来的结果很好的筛选了数据


但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = DataTypes.STRING())(将msg简单处理为新的String)
table = source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
这个where筛选就失效了,最后打印出全部数据


如果改成where在前也不行,换成filter也不行
table = source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()


select、where中的udf会冲突吗?这个问题该怎么解决?
希望您们能够给予解答!感谢!










 





 





 
Reply | Threaded
Open this post in threaded view
|

pyflink Table object如何打印出其中内容方便调试

whh_960101
In reply to this post by Xingbo Huang
您好,我使用pyflink时的代码如下,有如下问题:


source  = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端
#udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True)


这样单步调试print(table)出来的结果是<pyflink.table.table.Table object at 0x7f888fb2cef0>
pyflink有没有将Table转化成可打印格式的方法
希望您们能够给予解答!感谢!










 





 





 





 
Reply | Threaded
Open this post in threaded view
|

Re: pyflink Table object如何打印出其中内容方便调试

Xingbo Huang
Hi,
你想要输出table的结果,可以有两种方便的方式,
1. table.to_pandas()
2. 使用print connector,可以参考[1]

然后你如果对pyflink感兴趣,可以看看这个doc[2],可以帮助你快速上手

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年10月15日周四 下午4:39写道:

> 您好,我使用pyflink时的代码如下,有如下问题:
>
>
> source  = st_env.from_path('source')
> #st_env是StreamTableEnvironment,source是kafka源端
> #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
> table = source.select("msg").where(udf1(msg)=True)
>
>
> 这样单步调试print(table)出来的结果是<pyflink.table.table.Table object at
> 0x7f888fb2cef0>
> pyflink有没有将Table转化成可打印格式的方法
> 希望您们能够给予解答!感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: pyflink sql中select,where都带udf,其中一个udf失

Xingbo Huang
In reply to this post by whh_960101
Hi,
我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的


[1]
https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67

Best,
Xingbo

whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:

> 您好,我使用pyflink时的代码如下,有如下问题:
>
>
> source  = st_env.from_path('source')
> #st_env是StreamTableEnvironment,source是kafka源端
> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
> DataTypes.BOOLEAN())
> table =
> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> 这样打印出来的结果很好的筛选了数据
>
>
> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
> DataTypes.STRING())(将msg简单处理为新的String)
> table =
> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
> 这个where筛选就失效了,最后打印出全部数据
>
>
> 如果改成where在前也不行,换成filter也不行
> table =
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>
>
> select、where中的udf会冲突吗?这个问题该怎么解决?
> 希望您们能够给予解答!感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink sql中select,where都带udf,其中一个udf失

whh_960101
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
def udf1(msg): #udf1就是简单的筛选log中的error关键字
      if msg is None:
             return ''
      msg_dic = json.loads(msg.strip())
      log = msg_dic.get('log').lower()
      if 'error' in log or 'fail' in log:
             return True
      else:
             return False
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.STRING())
def udf2(msg): #udf2就是简单的把msg中的log提取出来
if msg is None:
             return ''
      msg_dic = json.loads(msg.strip())
      log = msg_dic.get('log')
      return log
感觉两个udf没有冲突吧?








在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink sql中select,where都带udf,其中一个udf失

whh_960101
In reply to this post by Xingbo Huang
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据




但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
 source  = st_env.from_path('source').where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()

这个where筛选就失效了,最后打印出全部数据


想请问一下这种问题出在哪里?






在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink sql中select,where都带udf,其中一个udf失

whh_960101
In reply to this post by Xingbo Huang
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表
然后使用我的udf
source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印出来的结果能够很好的筛选出我想要的数据




但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
 source  = st_env.from_path('source')
 source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()

这个where筛选就失效了,最后打印出全部数据


而只在where中使用udf,即
source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
打印结果就是经过筛选后的




想请问一下这种问题出在哪里?






在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:

>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source  = st_env.from_path('source')
>> #st_env是StreamTableEnvironment,source是kafka源端
>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.BOOLEAN())
>> table =
>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> 这样打印出来的结果很好的筛选了数据
>>
>>
>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>> DataTypes.STRING())(将msg简单处理为新的String)
>> table =
>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 如果改成where在前也不行,换成filter也不行
>> table =
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>>
>> select、where中的udf会冲突吗?这个问题该怎么解决?
>> 希望您们能够给予解答!感谢!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>





 
Reply | Threaded
Open this post in threaded view
|

Re: pyflink sql中select,where都带udf,其中一个udf失

Dian Fu
可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables

> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道:
>
> hi,
> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
> 然后使用我的udf
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 打印出来的结果能够很好的筛选出我想要的数据
>
>
>
>
> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
> source  = st_env.from_path('source')
> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>
> 这个where筛选就失效了,最后打印出全部数据
>
>
> 而只在where中使用udf,即
> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
> 打印结果就是经过筛选后的
>
>
>
>
> 想请问一下这种问题出在哪里?
>
>
>
>
>
>
> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:
>> Hi,
>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>
>> Best,
>> Xingbo
>>
>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>>
>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>
>>>
>>> source  = st_env.from_path('source')
>>> #st_env是StreamTableEnvironment,source是kafka源端
>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>> DataTypes.BOOLEAN())
>>> table =
>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>
>>>
>>> 这样打印出来的结果很好的筛选了数据
>>>
>>>
>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>> DataTypes.STRING())(将msg简单处理为新的String)
>>> table =
>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 这个where筛选就失效了,最后打印出全部数据
>>>
>>>
>>> 如果改成where在前也不行,换成filter也不行
>>> table =
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>
>>>
>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>> 希望您们能够给予解答!感谢!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink sql中select,where都带udf,其中一个udf失

whh_960101
== Abstract Syntax Tree ==

LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], _c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], _c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], clusterName=[$2])

+- LogicalFilter(condition=[error_exist($1)])

   +- LogicalTableScan(table=[[default_catalog, default_database, source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]])




== Optimized Logical Plan ==

Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName])

+- PythonCalc(select=[f0, clusterName, log_get(message) AS f00])

   +- Calc(select=[message, clusterName, kubernetes.container.name AS f0])

      +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])

         +- LegacyTableSourceScan(table=[[default_catalog, default_database, source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]], fields=[@timestamp, message, clusterName, kubernetes])




== Physical Execution Plan ==

Stage 1 : Data Source

 content : Source: KafkaTableSource(@timestamp, message, clusterName, kubernetes)




 Stage 2 : Operator

  content : SourceConversion(table=[default_catalog.default_database.source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]], fields=[@timestamp, message, clusterName, kubernetes])

  ship_strategy : FORWARD




  Stage 3 : Operator

   content : StreamExecPythonCalc

   ship_strategy : FORWARD




   Stage 4 : Operator

    content : Calc(select=[message, clusterName, kubernetes.container.name AS f0])

    ship_strategy : FORWARD




    Stage 5 : Operator

     content : StreamExecPythonCalc

     ship_strategy : FORWARD




     Stage 6 : Operator

      content : Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName])

      ship_strategy : FORWARD

















在 2020-10-15 20:59:12,"Dian Fu" <[hidden email]> 写道:

>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道:
>>
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印出来的结果能够很好的筛选出我想要的数据
>>
>>
>>
>>
>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>> source  = st_env.from_path('source')
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 而只在where中使用udf,即
>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印结果就是经过筛选后的
>>
>>
>>
>>
>> 想请问一下这种问题出在哪里?
>>
>>
>>
>>
>>
>>
>> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:
>>> Hi,
>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>>
>>> Best,
>>> Xingbo
>>>
>>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>>>
>>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>>
>>>>
>>>> source  = st_env.from_path('source')
>>>> #st_env是StreamTableEnvironment,source是kafka源端
>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.BOOLEAN())
>>>> table =
>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>
>>>>
>>>> 这样打印出来的结果很好的筛选了数据
>>>>
>>>>
>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.STRING())(将msg简单处理为新的String)
>>>> table =
>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 这个where筛选就失效了,最后打印出全部数据
>>>>
>>>>
>>>> 如果改成where在前也不行,换成filter也不行
>>>> table =
>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>
>>>>
>>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>>> 希望您们能够给予解答!感谢!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink sql中select,where都带udf,其中一个udf失

whh_960101
In reply to this post by Dian Fu
我摘取了plan其中一部分
在过滤数据这里
== Abstract Syntax Tree ==

+- LogicalFilter(condition=[error_exist($1)])

 




== Optimized Logical Plan ==

      +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])
#感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)])




== Physical Execution Plan ==




  Stage 3 : Operator

   content : StreamExecPythonCalc

   ship_strategy : FORWARD

















在 2020-10-15 20:59:12,"Dian Fu" <[hidden email]> 写道:

>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道:
>>
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印出来的结果能够很好的筛选出我想要的数据
>>
>>
>>
>>
>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>> source  = st_env.from_path('source')
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>
>> 这个where筛选就失效了,最后打印出全部数据
>>
>>
>> 而只在where中使用udf,即
>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>> 打印结果就是经过筛选后的
>>
>>
>>
>>
>> 想请问一下这种问题出在哪里?
>>
>>
>>
>>
>>
>>
>> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:
>>> Hi,
>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>>
>>> Best,
>>> Xingbo
>>>
>>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>>>
>>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>>
>>>>
>>>> source  = st_env.from_path('source')
>>>> #st_env是StreamTableEnvironment,source是kafka源端
>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.BOOLEAN())
>>>> table =
>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>
>>>>
>>>> 这样打印出来的结果很好的筛选了数据
>>>>
>>>>
>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>>> DataTypes.STRING())(将msg简单处理为新的String)
>>>> table =
>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>> 这个where筛选就失效了,最后打印出全部数据
>>>>
>>>>
>>>> 如果改成where在前也不行,换成filter也不行
>>>> table =
>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>
>>>>
>>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>>> 希望您们能够给予解答!感谢!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: pyflink sql中select,where都带udf,其中一个udf失

Dian Fu
这个问题是一个bug, 我创建了一个JIRA:https://issues.apache.org/jira/browse/FLINK-19675 <https://issues.apache.org/jira/browse/FLINK-19675>

出现的条件:在一个Calc里同时有Python UDF、Where条件、复合列访问。

在没有修复之前, 可以这样work around一下:

tmp_table = st_env.from_path("source")\
    .select("kubernetes.get('container').get('name') as name, message, clusterName, '@timestamp' "
            "as ts")

data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table, tmp_table._j_table.getSchema().toRowType())
table = Table(st_env._j_tenv.fromDataStream(data_stream, "name, message, clusterName, ts"), st_env)

table\
    .where("error_exist(message) = true")\
    .select("log_get(message),'','','ERROR','Asia/Shanghai', 'ts',name,clusterName") \
    .execute_insert("sink").get_job_client().get_job_execution_result().result()


> 在 2020年10月16日,上午10:47,whh_960101 <[hidden email]> 写道:
>
> 我摘取了plan其中一部分
> 在过滤数据这里
> == Abstract Syntax Tree ==
>
> +- LogicalFilter(condition=[error_exist($1)])
>
>
>
>
>
>
> == Optimized Logical Plan ==
>
>      +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0])
> #感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)])
>
>
>
>
> == Physical Execution Plan ==
>
>
>
>
>  Stage 3 : Operator
>
>   content : StreamExecPythonCalc
>
>   ship_strategy : FORWARD
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-15 20:59:12,"Dian Fu" <[hidden email]> 写道:
>> 可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>>
>>> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道:
>>>
>>> hi,
>>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>>> 然后使用我的udf
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 打印出来的结果能够很好的筛选出我想要的数据
>>>
>>>
>>>
>>>
>>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法
>>> source  = st_env.from_path('source')
>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>
>>> 这个where筛选就失效了,最后打印出全部数据
>>>
>>>
>>> 而只在where中使用udf,即
>>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>> 打印结果就是经过筛选后的
>>>
>>>
>>>
>>>
>>> 想请问一下这种问题出在哪里?
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道:
>>>> Hi,
>>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道:
>>>>
>>>>> 您好,我使用pyflink时的代码如下,有如下问题:
>>>>>
>>>>>
>>>>> source  = st_env.from_path('source')
>>>>> #st_env是StreamTableEnvironment,source是kafka源端
>>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
>>>>> DataTypes.BOOLEAN())
>>>>> table =
>>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>>
>>>>>
>>>>> 这样打印出来的结果很好的筛选了数据
>>>>>
>>>>>
>>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type =
>>>>> DataTypes.STRING())(将msg简单处理为新的String)
>>>>> table =
>>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>> 这个where筛选就失效了,最后打印出全部数据
>>>>>
>>>>>
>>>>> 如果改成where在前也不行,换成filter也不行
>>>>> table =
>>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result()
>>>>>
>>>>>
>>>>> select、where中的udf会冲突吗?这个问题该怎么解决?
>>>>> 希望您们能够给予解答!感谢!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>>
>>>
>>>
>>>

Reply | Threaded
Open this post in threaded view
|

pyflink1.11.0 kafka connector如果有访问权限

whh_960101
In reply to this post by Dian Fu
CREATETABLEkafkaTable(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3))WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','format'='csv','scan.startup.mode'='earliest-offset')你好,如果使用sql语句来创建kafkaTable,kafka节点有访问权限,option里面没有设置用户名密码这一项该如何解决?
Reply | Threaded
Open this post in threaded view
|

pyflink1.11.0 报错JobExecutionException: Job execution failed.

whh_960101
In reply to this post by Dian Fu
Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
    def error_exist(message):
        if message is None:
            return False
        mes_dic = json.loads(message.strip())
        log = mes_dic.get('log').lower().strip()
        if 'error' in log:
            return True
        else:
            return False

    @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
    def error_get(message):  
        if message is None:
            return ''
        mes_dic = json.loads(message.strip())
        log = mes_dic.get('log')
        return json.dumps({"content":log.strip()})

    @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
            DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
            DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
            DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
            DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
    def headers_get(message,container,clusterName):
        mes_dic = json.loads(message.strip())
        tz_utc = mes_dic.get('time')
        tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
        from_zone = tz.gettz('UTC')
        dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
        dt_utc = dt_utc.replace(tzinfo=from_zone)
        dt_ts = dt_utc.timestamp()

        map_df = pd.read_csv('cc_log_dev_map.csv')
        clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]

        return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
                           'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})

#st_env.execute_sql("""
#        CREATE TABLE source(
#           message STRING,
#           clusterName STRING,
#           kubernetes ROW<container ROW<name STRING>>
#        ) WITH(
#            'connector' = 'kafka',
#        )
#    """)

st_env.execute_sql("""
        CREATE TABLE sink(
            body ROW<content STRING>,
            headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
            `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
        ) WITH(
            'connector' = 'print',
        )
    """)tmp_table =  st_env.from_path("source") \
        .select("message,kubernetes.get('container').get('name') as container,clusterName")
 
    data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
    table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)

    sink_table = table \
        .where("error_exist(message) = true") \
        .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
       
    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
    return self._py_class(self._j_completable_future.get())
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
 at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
 at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
 at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
 at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
 at akka.dispatch.OnComplete.internal(Future.scala:264)
 at akka.dispatch.OnComplete.internal(Future.scala:261)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
 at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Dian Fu
错误堆栈看着似乎不太完整,有更完整的堆栈吗?

> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道:
>
> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>    def error_exist(message):
>        if message is None:
>            return False
>        mes_dic = json.loads(message.strip())
>        log = mes_dic.get('log').lower().strip()
>        if 'error' in log:
>            return True
>        else:
>            return False
>
>    @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>    def error_get(message):  
>        if message is None:
>            return ''
>        mes_dic = json.loads(message.strip())
>        log = mes_dic.get('log')
>        return json.dumps({"content":log.strip()})
>
>    @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>            DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>            DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>            DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>            DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>    def headers_get(message,container,clusterName):
>        mes_dic = json.loads(message.strip())
>        tz_utc = mes_dic.get('time')
>        tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>        from_zone = tz.gettz('UTC')
>        dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>        dt_utc = dt_utc.replace(tzinfo=from_zone)
>        dt_ts = dt_utc.timestamp()
>
>        map_df = pd.read_csv('cc_log_dev_map.csv')
>        clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>
>        return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>                           'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>
> #st_env.execute_sql("""
> #        CREATE TABLE source(
> #           message STRING,
> #           clusterName STRING,
> #           kubernetes ROW<container ROW<name STRING>>
> #        ) WITH(
> #            'connector' = 'kafka',
> #        )
> #    """)
>
> st_env.execute_sql("""
>        CREATE TABLE sink(
>            body ROW<content STRING>,
>            headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>            `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>        ) WITH(
>            'connector' = 'print',
>        )
>    """)tmp_table =  st_env.from_path("source") \
>        .select("message,kubernetes.get('container').get('name') as container,clusterName")
>
>    data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>    table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>
>    sink_table = table \
>        .where("error_exist(message) = true") \
>        .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>
>    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>    sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>    return self._py_class(self._j_completable_future.get())
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>    return f(*a, **kw)
>  File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>    format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Dian Fu
你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。

> 在 2020年10月20日,下午7:56,Dian Fu <[hidden email]> 写道:
>
> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
>
>> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道:
>>
>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>>   def error_exist(message):
>>       if message is None:
>>           return False
>>       mes_dic = json.loads(message.strip())
>>       log = mes_dic.get('log').lower().strip()
>>       if 'error' in log:
>>           return True
>>       else:
>>           return False
>>
>>   @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>>   def error_get(message):  
>>       if message is None:
>>           return ''
>>       mes_dic = json.loads(message.strip())
>>       log = mes_dic.get('log')
>>       return json.dumps({"content":log.strip()})
>>
>>   @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>>           DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>>           DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>>           DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>>           DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>>   def headers_get(message,container,clusterName):
>>       mes_dic = json.loads(message.strip())
>>       tz_utc = mes_dic.get('time')
>>       tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>>       from_zone = tz.gettz('UTC')
>>       dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>>       dt_utc = dt_utc.replace(tzinfo=from_zone)
>>       dt_ts = dt_utc.timestamp()
>>
>>       map_df = pd.read_csv('cc_log_dev_map.csv')
>>       clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>>
>>       return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>>                          'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>>
>> #st_env.execute_sql("""
>> #        CREATE TABLE source(
>> #           message STRING,
>> #           clusterName STRING,
>> #           kubernetes ROW<container ROW<name STRING>>
>> #        ) WITH(
>> #            'connector' = 'kafka',
>> #        )
>> #    """)
>>
>> st_env.execute_sql("""
>>       CREATE TABLE sink(
>>           body ROW<content STRING>,
>>           headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>>           `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>>       ) WITH(
>>           'connector' = 'print',
>>       )
>>   """)tmp_table =  st_env.from_path("source") \
>>       .select("message,kubernetes.get('container').get('name') as container,clusterName")
>>
>>   data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>>   table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>>
>>   sink_table = table \
>>       .where("error_exist(message) = true") \
>>       .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>>
>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>>   return self._py_class(self._j_completable_future.get())
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>>   answer, self.gateway_client, self.target_id, self.name)
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>>   return f(*a, **kw)
>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>>   format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

whh_960101
Row类型的对象在python中是怎么表示的,字典?






在 2020-10-20 20:35:22,"Dian Fu" <[hidden email]> 写道:

>你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。
>
>> 在 2020年10月20日,下午7:56,Dian Fu <[hidden email]> 写道:
>>
>> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
>>
>>> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道:
>>>
>>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>>>   def error_exist(message):
>>>       if message is None:
>>>           return False
>>>       mes_dic = json.loads(message.strip())
>>>       log = mes_dic.get('log').lower().strip()
>>>       if 'error' in log:
>>>           return True
>>>       else:
>>>           return False
>>>
>>>   @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>>>   def error_get(message):  
>>>       if message is None:
>>>           return ''
>>>       mes_dic = json.loads(message.strip())
>>>       log = mes_dic.get('log')
>>>       return json.dumps({"content":log.strip()})
>>>
>>>   @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>>>           DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>>>           DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>>>           DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>>>           DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>>>   def headers_get(message,container,clusterName):
>>>       mes_dic = json.loads(message.strip())
>>>       tz_utc = mes_dic.get('time')
>>>       tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>>>       from_zone = tz.gettz('UTC')
>>>       dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>>>       dt_utc = dt_utc.replace(tzinfo=from_zone)
>>>       dt_ts = dt_utc.timestamp()
>>>
>>>       map_df = pd.read_csv('cc_log_dev_map.csv')
>>>       clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>>>
>>>       return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>>>                          'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>>>
>>> #st_env.execute_sql("""
>>> #        CREATE TABLE source(
>>> #           message STRING,
>>> #           clusterName STRING,
>>> #           kubernetes ROW<container ROW<name STRING>>
>>> #        ) WITH(
>>> #            'connector' = 'kafka',
>>> #        )
>>> #    """)
>>>
>>> st_env.execute_sql("""
>>>       CREATE TABLE sink(
>>>           body ROW<content STRING>,
>>>           headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>>>           `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>>>       ) WITH(
>>>           'connector' = 'print',
>>>       )
>>>   """)tmp_table =  st_env.from_path("source") \
>>>       .select("message,kubernetes.get('container').get('name') as container,clusterName")
>>>
>>>   data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>>>   table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>>>
>>>   sink_table = table \
>>>       .where("error_exist(message) = true") \
>>>       .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>>>
>>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>>>   sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>>>   return self._py_class(self._j_completable_future.get())
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>>>   answer, self.gateway_client, self.target_id, self.name)
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>>>   return f(*a, **kw)
>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>>>   format(target_id, ".", name), value)
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
>>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

Dian Fu
PyFlink里有Row类型的类, pyflink.table.Row

> 在 2020年10月21日,上午9:05,whh_960101 <[hidden email]> 写道:
>
> Row类型的对象在python中是怎么表示的,字典?
>
>
>
>
>
>
> 在 2020-10-20 20:35:22,"Dian Fu" <[hidden email]> 写道:
>> 你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。
>>
>>> 在 2020年10月20日,下午7:56,Dian Fu <[hidden email]> 写道:
>>>
>>> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
>>>
>>>> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道:
>>>>
>>>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
>>>>  def error_exist(message):
>>>>      if message is None:
>>>>          return False
>>>>      mes_dic = json.loads(message.strip())
>>>>      log = mes_dic.get('log').lower().strip()
>>>>      if 'error' in log:
>>>>          return True
>>>>      else:
>>>>          return False
>>>>
>>>>  @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())]))
>>>>  def error_get(message):  
>>>>      if message is None:
>>>>          return ''
>>>>      mes_dic = json.loads(message.strip())
>>>>      log = mes_dic.get('log')
>>>>      return json.dumps({"content":log.strip()})
>>>>
>>>>  @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\
>>>>          DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \
>>>>          DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \
>>>>          DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \
>>>>          DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())]))
>>>>  def headers_get(message,container,clusterName):
>>>>      mes_dic = json.loads(message.strip())
>>>>      tz_utc = mes_dic.get('time')
>>>>      tz_utc = tz_utc[:tz_utc.rfind('.') + 7]
>>>>      from_zone = tz.gettz('UTC')
>>>>      dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f')
>>>>      dt_utc = dt_utc.replace(tzinfo=from_zone)
>>>>      dt_ts = dt_utc.timestamp()
>>>>
>>>>      map_df = pd.read_csv('cc_log_dev_map.csv')
>>>>      clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0]
>>>>
>>>>      return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\
>>>>                         'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese})
>>>>
>>>> #st_env.execute_sql("""
>>>> #        CREATE TABLE source(
>>>> #           message STRING,
>>>> #           clusterName STRING,
>>>> #           kubernetes ROW<container ROW<name STRING>>
>>>> #        ) WITH(
>>>> #            'connector' = 'kafka',
>>>> #        )
>>>> #    """)
>>>>
>>>> st_env.execute_sql("""
>>>>      CREATE TABLE sink(
>>>>          body ROW<content STRING>,
>>>>          headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\
>>>>          `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING>
>>>>      ) WITH(
>>>>          'connector' = 'print',
>>>>      )
>>>>  """)tmp_table =  st_env.from_path("source") \
>>>>      .select("message,kubernetes.get('container').get('name') as container,clusterName")
>>>>
>>>>  data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType())
>>>>  table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env)
>>>>
>>>>  sink_table = table \
>>>>      .where("error_exist(message) = true") \
>>>>      .select("error_get(message) as body, headers_get(message,container,clusterName) as headers")
>>>>
>>>>  sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo
>>>>  sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result
>>>>  return self._py_class(self._j_completable_future.get())
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__
>>>>  answer, self.gateway_client, self.target_id, self.name)
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
>>>>  return f(*a, **kw)
>>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
>>>>  format(target_id, ".", name), value)
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get.
>>>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186)
>>>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
>>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>

Reply | Threaded
Open this post in threaded view
|

pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

whh_960101
In reply to this post by Dian Fu
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
  'index' = 'users'
);Connector Options
| Option | Required | Default | Type | Description |
|
connector
| required | (none) | String | Specify what connector to use, valid values are:
elasticsearch-6: connect to Elasticsearch 6.x cluster
elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
|
|
hosts
| required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. |
|
index
| required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
|
document-type
| required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
|
document-id.key-delimiter
| optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
|
failure-handler
| optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
fail: throws an exception if a request fails and thus causes a job failure.
ignore: ignores failures and drops the request.
retry_rejected: re-adds requests that have failed due to queue capacity saturation.
custom class name: for failure handling with a ActionRequestFailureHandler subclass.
|
|
sink.flush-on-checkpoint
| optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
|
sink.bulk-flush.max-actions
| optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
|
sink.bulk-flush.max-size
| optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
|
sink.bulk-flush.interval
| optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
|
sink.bulk-flush.backoff.strategy
| optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
DISABLED: no retry performed, i.e. fail after the first request error.
CONSTANT: wait for backoff delay between retries.
EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
|
|
sink.bulk-flush.backoff.max-retries
| optional | 8 | Integer | Maximum number of backoff retries. |
|
sink.bulk-flush.backoff.delay
| optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
|
connection.max-retry-timeout
| optional | (none) | Duration | Maximum timeout between retries. |
|
connection.path-prefix
| optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
|
format
| optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |






 
123