您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 table = source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result() kafka源端的json队列@timestamp字段名是固定死的,而我需要取这个字段进行处理,@timestamp涉及到@特殊符号和timestamp关键字,按照上面的代码会报解析sql错误,这个地方我该怎么修改,去网上查了加``或者''或者""都不行 希望您们能够给予解答!感谢! |
In reply to this post by Xingbo Huang
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table = source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() 这样打印出来的结果很好的筛选了数据 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = DataTypes.STRING()) table = source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() 这个where筛选就失效了,最后打印出全部数据 如果改成where在前也不行,换成filter也不行 table = source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 这个问题怎么解决 希望您们能够给予解答!感谢! |
In reply to this post by Xingbo Huang
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table = source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() 这样打印出来的结果很好的筛选了数据 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = DataTypes.STRING())(将msg简单处理为新的String) table = source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() 这个where筛选就失效了,最后打印出全部数据 如果改成where在前也不行,换成filter也不行 table = source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() select、where中的udf会冲突吗?这个问题该怎么解决? 希望您们能够给予解答!感谢! |
In reply to this post by Xingbo Huang
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table = source.select("msg").where(udf1(msg)=True) 这样单步调试print(table)出来的结果是<pyflink.table.table.Table object at 0x7f888fb2cef0> pyflink有没有将Table转化成可打印格式的方法 希望您们能够给予解答!感谢! |
Hi,
你想要输出table的结果,可以有两种方便的方式, 1. table.to_pandas() 2. 使用print connector,可以参考[1] 然后你如果对pyflink感兴趣,可以看看这个doc[2],可以帮助你快速上手 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/print.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html Best, Xingbo whh_960101 <[hidden email]> 于2020年10月15日周四 下午4:39写道: > 您好,我使用pyflink时的代码如下,有如下问题: > > > source = st_env.from_path('source') > #st_env是StreamTableEnvironment,source是kafka源端 > #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) > table = source.select("msg").where(udf1(msg)=True) > > > 这样单步调试print(table)出来的结果是<pyflink.table.table.Table object at > 0x7f888fb2cef0> > pyflink有没有将Table转化成可打印格式的方法 > 希望您们能够给予解答!感谢! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > |
In reply to this post by whh_960101
Hi,
我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 [1] https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 Best, Xingbo whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: > 您好,我使用pyflink时的代码如下,有如下问题: > > > source = st_env.from_path('source') > #st_env是StreamTableEnvironment,source是kafka源端 > #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = > DataTypes.BOOLEAN()) > table = > source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() > > > 这样打印出来的结果很好的筛选了数据 > > > 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = > DataTypes.STRING())(将msg简单处理为新的String) > table = > source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() > 这个where筛选就失效了,最后打印出全部数据 > > > 如果改成where在前也不行,换成filter也不行 > table = > source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() > > > select、where中的udf会冲突吗?这个问题该怎么解决? > 希望您们能够给予解答!感谢! > > > > > > > > > > > > > > > > > > > > > > > |
@udf(input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
def udf1(msg): #udf1就是简单的筛选log中的error关键字 if msg is None: return '' msg_dic = json.loads(msg.strip()) log = msg_dic.get('log').lower() if 'error' in log or 'fail' in log: return True else: return False @udf(input_types =DataTypes.STRING(), result_type = DataTypes.STRING()) def udf2(msg): #udf2就是简单的把msg中的log提取出来 if msg is None: return '' msg_dic = json.loads(msg.strip()) log = msg_dic.get('log') return log 感觉两个udf没有冲突吧? 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >Hi, >我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 > > >[1] >https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 > >Best, >Xingbo > >whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: > >> 您好,我使用pyflink时的代码如下,有如下问题: >> >> >> source = st_env.from_path('source') >> #st_env是StreamTableEnvironment,source是kafka源端 >> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >> DataTypes.BOOLEAN()) >> table = >> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> >> 这样打印出来的结果很好的筛选了数据 >> >> >> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >> DataTypes.STRING())(将msg简单处理为新的String) >> table = >> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >> 这个where筛选就失效了,最后打印出全部数据 >> >> >> 如果改成where在前也不行,换成filter也不行 >> table = >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> >> select、where中的udf会冲突吗?这个问题该怎么解决? >> 希望您们能够给予解答!感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
In reply to this post by Xingbo Huang
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source = st_env.from_path('source').where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 这个where筛选就失效了,最后打印出全部数据 想请问一下这种问题出在哪里? 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >Hi, >我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 > > >[1] >https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 > >Best, >Xingbo > >whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: > >> 您好,我使用pyflink时的代码如下,有如下问题: >> >> >> source = st_env.from_path('source') >> #st_env是StreamTableEnvironment,source是kafka源端 >> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >> DataTypes.BOOLEAN()) >> table = >> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> >> 这样打印出来的结果很好的筛选了数据 >> >> >> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >> DataTypes.STRING())(将msg简单处理为新的String) >> table = >> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >> 这个where筛选就失效了,最后打印出全部数据 >> >> >> 如果改成where在前也不行,换成filter也不行 >> table = >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> >> select、where中的udf会冲突吗?这个问题该怎么解决? >> 希望您们能够给予解答!感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
In reply to this post by Xingbo Huang
hi,
我刚才改了一下你的例子[1],通过from_elements构建一个source表 然后使用我的udf source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印出来的结果能够很好的筛选出我想要的数据 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 source = st_env.from_path('source') source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() 这个where筛选就失效了,最后打印出全部数据 而只在where中使用udf,即 source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result() 打印结果就是经过筛选后的 想请问一下这种问题出在哪里? 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >Hi, >我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 > > >[1] >https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 > >Best, >Xingbo > >whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: > >> 您好,我使用pyflink时的代码如下,有如下问题: >> >> >> source = st_env.from_path('source') >> #st_env是StreamTableEnvironment,source是kafka源端 >> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >> DataTypes.BOOLEAN()) >> table = >> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> >> 这样打印出来的结果很好的筛选了数据 >> >> >> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >> DataTypes.STRING())(将msg简单处理为新的String) >> table = >> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >> 这个where筛选就失效了,最后打印出全部数据 >> >> >> 如果改成where在前也不行,换成filter也不行 >> table = >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> >> select、where中的udf会冲突吗?这个问题该怎么解决? >> 希望您们能够给予解答!感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> |
可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道: > > hi, > 我刚才改了一下你的例子[1],通过from_elements构建一个source表 > 然后使用我的udf > source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() > 打印出来的结果能够很好的筛选出我想要的数据 > > > > > 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 > source = st_env.from_path('source') > source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() > > 这个where筛选就失效了,最后打印出全部数据 > > > 而只在where中使用udf,即 > source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result() > 打印结果就是经过筛选后的 > > > > > 想请问一下这种问题出在哪里? > > > > > > > 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >> Hi, >> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 >> >> >> [1] >> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 >> >> Best, >> Xingbo >> >> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: >> >>> 您好,我使用pyflink时的代码如下,有如下问题: >>> >>> >>> source = st_env.from_path('source') >>> #st_env是StreamTableEnvironment,source是kafka源端 >>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >>> DataTypes.BOOLEAN()) >>> table = >>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>> >>> >>> 这样打印出来的结果很好的筛选了数据 >>> >>> >>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >>> DataTypes.STRING())(将msg简单处理为新的String) >>> table = >>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>> 这个where筛选就失效了,最后打印出全部数据 >>> >>> >>> 如果改成where在前也不行,换成filter也不行 >>> table = >>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >>> >>> >>> select、where中的udf会冲突吗?这个问题该怎么解决? >>> 希望您们能够给予解答!感谢! >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> > > > > > |
== Abstract Syntax Tree ==
LogicalProject(_c0=[log_get($1)], _c1=[_UTF-16LE''], _c2=[_UTF-16LE''], _c3=[_UTF-16LE'ERROR'], _c4=[_UTF-16LE'Asia/Shanghai'], _c5=[_UTF-16LE'@timestamp'], kubernetes$container$name=[$3.container.name], clusterName=[$2]) +- LogicalFilter(condition=[error_exist($1)]) +- LogicalTableScan(table=[[default_catalog, default_database, source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]]) == Optimized Logical Plan == Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName]) +- PythonCalc(select=[f0, clusterName, log_get(message) AS f00]) +- Calc(select=[message, clusterName, kubernetes.container.name AS f0]) +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]]], fields=[@timestamp, message, clusterName, kubernetes]) == Physical Execution Plan == Stage 1 : Data Source content : Source: KafkaTableSource(@timestamp, message, clusterName, kubernetes) Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.source, source: [KafkaTableSource(@timestamp, message, clusterName, kubernetes)]], fields=[@timestamp, message, clusterName, kubernetes]) ship_strategy : FORWARD Stage 3 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD Stage 4 : Operator content : Calc(select=[message, clusterName, kubernetes.container.name AS f0]) ship_strategy : FORWARD Stage 5 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD Stage 6 : Operator content : Calc(select=[f00 AS _c0, _UTF-16LE'' AS _c1, _UTF-16LE'' AS _c2, _UTF-16LE'ERROR' AS _c3, _UTF-16LE'Asia/Shanghai' AS _c4, _UTF-16LE'@timestamp' AS _c5, f0 AS kubernetes$container$name, clusterName]) ship_strategy : FORWARD 在 2020-10-15 20:59:12,"Dian Fu" <[hidden email]> 写道: >可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables > >> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道: >> >> hi, >> 我刚才改了一下你的例子[1],通过from_elements构建一个source表 >> 然后使用我的udf >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> 打印出来的结果能够很好的筛选出我想要的数据 >> >> >> >> >> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 >> source = st_env.from_path('source') >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> 这个where筛选就失效了,最后打印出全部数据 >> >> >> 而只在where中使用udf,即 >> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result() >> 打印结果就是经过筛选后的 >> >> >> >> >> 想请问一下这种问题出在哪里? >> >> >> >> >> >> >> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >>> Hi, >>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 >>> >>> Best, >>> Xingbo >>> >>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: >>> >>>> 您好,我使用pyflink时的代码如下,有如下问题: >>>> >>>> >>>> source = st_env.from_path('source') >>>> #st_env是StreamTableEnvironment,source是kafka源端 >>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >>>> DataTypes.BOOLEAN()) >>>> table = >>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>>> >>>> >>>> 这样打印出来的结果很好的筛选了数据 >>>> >>>> >>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >>>> DataTypes.STRING())(将msg简单处理为新的String) >>>> table = >>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>>> 这个where筛选就失效了,最后打印出全部数据 >>>> >>>> >>>> 如果改成where在前也不行,换成filter也不行 >>>> table = >>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >>>> >>>> >>>> select、where中的udf会冲突吗?这个问题该怎么解决? >>>> 希望您们能够给予解答!感谢! >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >> >> >> >> >> |
In reply to this post by Dian Fu
我摘取了plan其中一部分
在过滤数据这里 == Abstract Syntax Tree == +- LogicalFilter(condition=[error_exist($1)]) == Optimized Logical Plan == +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0]) #感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)]) == Physical Execution Plan == Stage 3 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD 在 2020-10-15 20:59:12,"Dian Fu" <[hidden email]> 写道: >可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables > >> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道: >> >> hi, >> 我刚才改了一下你的例子[1],通过from_elements构建一个source表 >> 然后使用我的udf >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> 打印出来的结果能够很好的筛选出我想要的数据 >> >> >> >> >> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 >> source = st_env.from_path('source') >> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >> >> 这个where筛选就失效了,最后打印出全部数据 >> >> >> 而只在where中使用udf,即 >> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result() >> 打印结果就是经过筛选后的 >> >> >> >> >> 想请问一下这种问题出在哪里? >> >> >> >> >> >> >> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >>> Hi, >>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 >>> >>> Best, >>> Xingbo >>> >>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: >>> >>>> 您好,我使用pyflink时的代码如下,有如下问题: >>>> >>>> >>>> source = st_env.from_path('source') >>>> #st_env是StreamTableEnvironment,source是kafka源端 >>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >>>> DataTypes.BOOLEAN()) >>>> table = >>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>>> >>>> >>>> 这样打印出来的结果很好的筛选了数据 >>>> >>>> >>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >>>> DataTypes.STRING())(将msg简单处理为新的String) >>>> table = >>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>>> 这个where筛选就失效了,最后打印出全部数据 >>>> >>>> >>>> 如果改成where在前也不行,换成filter也不行 >>>> table = >>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >>>> >>>> >>>> select、where中的udf会冲突吗?这个问题该怎么解决? >>>> 希望您们能够给予解答!感谢! >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >> >> >> >> >> |
这个问题是一个bug, 我创建了一个JIRA:https://issues.apache.org/jira/browse/FLINK-19675 <https://issues.apache.org/jira/browse/FLINK-19675>
出现的条件:在一个Calc里同时有Python UDF、Where条件、复合列访问。 在没有修复之前, 可以这样work around一下: tmp_table = st_env.from_path("source")\ .select("kubernetes.get('container').get('name') as name, message, clusterName, '@timestamp' " "as ts") data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table, tmp_table._j_table.getSchema().toRowType()) table = Table(st_env._j_tenv.fromDataStream(data_stream, "name, message, clusterName, ts"), st_env) table\ .where("error_exist(message) = true")\ .select("log_get(message),'','','ERROR','Asia/Shanghai', 'ts',name,clusterName") \ .execute_insert("sink").get_job_client().get_job_execution_result().result() > 在 2020年10月16日,上午10:47,whh_960101 <[hidden email]> 写道: > > 我摘取了plan其中一部分 > 在过滤数据这里 > == Abstract Syntax Tree == > > +- LogicalFilter(condition=[error_exist($1)]) > > > > > > > == Optimized Logical Plan == > > +- PythonCalc(select=[message, kubernetes, clusterName, error_exist(message) AS f0]) > #感觉应该是这个地方出问题了,这里应该不是select,应该是where或者filter,上面已经有了LogicalFilter(condition=[error_exist($1)]) > > > > > == Physical Execution Plan == > > > > > Stage 3 : Operator > > content : StreamExecPythonCalc > > ship_strategy : FORWARD > > > > > > > > > > > > > > > > > > 在 2020-10-15 20:59:12,"Dian Fu" <[hidden email]> 写道: >> 可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables >> >>> 在 2020年10月15日,下午7:02,whh_960101 <[hidden email]> 写道: >>> >>> hi, >>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表 >>> 然后使用我的udf >>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >>> 打印出来的结果能够很好的筛选出我想要的数据 >>> >>> >>> >>> >>> 但是通过StreamTableEnvironment,连接kafka,取消息队列来构建source表,即之前那种写法 >>> source = st_env.from_path('source') >>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >>> >>> 这个where筛选就失效了,最后打印出全部数据 >>> >>> >>> 而只在where中使用udf,即 >>> source.where(udf1(msg)=True).select("msg").execute_insert('sink').get_job_client().get_job_execution_result().result() >>> 打印结果就是经过筛选后的 >>> >>> >>> >>> >>> 想请问一下这种问题出在哪里? >>> >>> >>> >>> >>> >>> >>> 在 2020-10-15 16:57:39,"Xingbo Huang" <[hidden email]> 写道: >>>> Hi, >>>> 我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 >>>> >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 >>>> >>>> Best, >>>> Xingbo >>>> >>>> whh_960101 <[hidden email]> 于2020年10月15日周四 下午2:30写道: >>>> >>>>> 您好,我使用pyflink时的代码如下,有如下问题: >>>>> >>>>> >>>>> source = st_env.from_path('source') >>>>> #st_env是StreamTableEnvironment,source是kafka源端 >>>>> #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = >>>>> DataTypes.BOOLEAN()) >>>>> table = >>>>> source.select("msg").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>>>> >>>>> >>>>> 这样打印出来的结果很好的筛选了数据 >>>>> >>>>> >>>>> 但如果在select语句中加另一个udf2 (input_types =DataTypes.STRING(), result_type = >>>>> DataTypes.STRING())(将msg简单处理为新的String) >>>>> table = >>>>> source.select("udf2(msg)").where(udf1(msg)=True).execute_insert('sink').get_job_client().get_job_execution_result().result() >>>>> 这个where筛选就失效了,最后打印出全部数据 >>>>> >>>>> >>>>> 如果改成where在前也不行,换成filter也不行 >>>>> table = >>>>> source.where(udf1(msg)=True).select("udf2(msg)").execute_insert('sink').get_job_client().get_job_execution_result().result() >>>>> >>>>> >>>>> select、where中的udf会冲突吗?这个问题该怎么解决? >>>>> 希望您们能够给予解答!感谢! >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>> >>> >>> >>> >>> |
In reply to this post by Dian Fu
CREATETABLEkafkaTable(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3))WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','format'='csv','scan.startup.mode'='earliest-offset')你好,如果使用sql语句来创建kafkaTable,kafka节点有访问权限,option里面没有设置用户名密码这一项该如何解决?
|
In reply to this post by Dian Fu
Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN())
def error_exist(message): if message is None: return False mes_dic = json.loads(message.strip()) log = mes_dic.get('log').lower().strip() if 'error' in log: return True else: return False @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())])) def error_get(message): if message is None: return '' mes_dic = json.loads(message.strip()) log = mes_dic.get('log') return json.dumps({"content":log.strip()}) @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\ DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \ DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \ DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \ DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())])) def headers_get(message,container,clusterName): mes_dic = json.loads(message.strip()) tz_utc = mes_dic.get('time') tz_utc = tz_utc[:tz_utc.rfind('.') + 7] from_zone = tz.gettz('UTC') dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f') dt_utc = dt_utc.replace(tzinfo=from_zone) dt_ts = dt_utc.timestamp() map_df = pd.read_csv('cc_log_dev_map.csv') clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0] return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\ 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese}) #st_env.execute_sql(""" # CREATE TABLE source( # message STRING, # clusterName STRING, # kubernetes ROW<container ROW<name STRING>> # ) WITH( # 'connector' = 'kafka', # ) # """) st_env.execute_sql(""" CREATE TABLE sink( body ROW<content STRING>, headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\ `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING> ) WITH( 'connector' = 'print', ) """)tmp_table = st_env.from_path("source") \ .select("message,kubernetes.get('container').get('name') as container,clusterName") data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType()) table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env) sink_table = table \ .where("error_exist(message) = true") \ .select("error_get(message) as body, headers_get(message,container,clusterName) as headers") sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result() File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result return self._py_class(self._j_completable_future.get()) File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o150.get. : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) |
错误堆栈看着似乎不太完整,有更完整的堆栈吗?
> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道: > > Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN()) > def error_exist(message): > if message is None: > return False > mes_dic = json.loads(message.strip()) > log = mes_dic.get('log').lower().strip() > if 'error' in log: > return True > else: > return False > > @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())])) > def error_get(message): > if message is None: > return '' > mes_dic = json.loads(message.strip()) > log = mes_dic.get('log') > return json.dumps({"content":log.strip()}) > > @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\ > DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \ > DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \ > DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \ > DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())])) > def headers_get(message,container,clusterName): > mes_dic = json.loads(message.strip()) > tz_utc = mes_dic.get('time') > tz_utc = tz_utc[:tz_utc.rfind('.') + 7] > from_zone = tz.gettz('UTC') > dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f') > dt_utc = dt_utc.replace(tzinfo=from_zone) > dt_ts = dt_utc.timestamp() > > map_df = pd.read_csv('cc_log_dev_map.csv') > clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0] > > return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\ > 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese}) > > #st_env.execute_sql(""" > # CREATE TABLE source( > # message STRING, > # clusterName STRING, > # kubernetes ROW<container ROW<name STRING>> > # ) WITH( > # 'connector' = 'kafka', > # ) > # """) > > st_env.execute_sql(""" > CREATE TABLE sink( > body ROW<content STRING>, > headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\ > `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING> > ) WITH( > 'connector' = 'print', > ) > """)tmp_table = st_env.from_path("source") \ > .select("message,kubernetes.get('container').get('name') as container,clusterName") > > data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType()) > table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env) > > sink_table = table \ > .where("error_exist(message) = true") \ > .select("error_get(message) as body, headers_get(message,container,clusterName) as headers") > > sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo > sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result() > File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result > return self._py_class(self._j_completable_future.get()) > File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco > return f(*a, **kw) > File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o150.get. > : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) > at akka.dispatch.OnComplete.internal(Future.scala:264) > at akka.dispatch.OnComplete.internal(Future.scala:261) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) |
你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。
> 在 2020年10月20日,下午7:56,Dian Fu <[hidden email]> 写道: > > 错误堆栈看着似乎不太完整,有更完整的堆栈吗? > >> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道: >> >> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN()) >> def error_exist(message): >> if message is None: >> return False >> mes_dic = json.loads(message.strip()) >> log = mes_dic.get('log').lower().strip() >> if 'error' in log: >> return True >> else: >> return False >> >> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())])) >> def error_get(message): >> if message is None: >> return '' >> mes_dic = json.loads(message.strip()) >> log = mes_dic.get('log') >> return json.dumps({"content":log.strip()}) >> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\ >> DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \ >> DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \ >> DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \ >> DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())])) >> def headers_get(message,container,clusterName): >> mes_dic = json.loads(message.strip()) >> tz_utc = mes_dic.get('time') >> tz_utc = tz_utc[:tz_utc.rfind('.') + 7] >> from_zone = tz.gettz('UTC') >> dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f') >> dt_utc = dt_utc.replace(tzinfo=from_zone) >> dt_ts = dt_utc.timestamp() >> >> map_df = pd.read_csv('cc_log_dev_map.csv') >> clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0] >> >> return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\ >> 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese}) >> >> #st_env.execute_sql(""" >> # CREATE TABLE source( >> # message STRING, >> # clusterName STRING, >> # kubernetes ROW<container ROW<name STRING>> >> # ) WITH( >> # 'connector' = 'kafka', >> # ) >> # """) >> >> st_env.execute_sql(""" >> CREATE TABLE sink( >> body ROW<content STRING>, >> headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\ >> `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING> >> ) WITH( >> 'connector' = 'print', >> ) >> """)tmp_table = st_env.from_path("source") \ >> .select("message,kubernetes.get('container').get('name') as container,clusterName") >> >> data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType()) >> table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env) >> >> sink_table = table \ >> .where("error_exist(message) = true") \ >> .select("error_get(message) as body, headers_get(message,container,clusterName) as headers") >> >> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo >> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result() >> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result >> return self._py_class(self._j_completable_future.get()) >> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco >> return f(*a, **kw) >> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get. >> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) >> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) >> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) >> at akka.dispatch.OnComplete.internal(Future.scala:264) >> at akka.dispatch.OnComplete.internal(Future.scala:261) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > |
Row类型的对象在python中是怎么表示的,字典?
在 2020-10-20 20:35:22,"Dian Fu" <[hidden email]> 写道: >你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。 > >> 在 2020年10月20日,下午7:56,Dian Fu <[hidden email]> 写道: >> >> 错误堆栈看着似乎不太完整,有更完整的堆栈吗? >> >>> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道: >>> >>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN()) >>> def error_exist(message): >>> if message is None: >>> return False >>> mes_dic = json.loads(message.strip()) >>> log = mes_dic.get('log').lower().strip() >>> if 'error' in log: >>> return True >>> else: >>> return False >>> >>> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())])) >>> def error_get(message): >>> if message is None: >>> return '' >>> mes_dic = json.loads(message.strip()) >>> log = mes_dic.get('log') >>> return json.dumps({"content":log.strip()}) >>> >>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\ >>> DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \ >>> DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \ >>> DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \ >>> DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())])) >>> def headers_get(message,container,clusterName): >>> mes_dic = json.loads(message.strip()) >>> tz_utc = mes_dic.get('time') >>> tz_utc = tz_utc[:tz_utc.rfind('.') + 7] >>> from_zone = tz.gettz('UTC') >>> dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f') >>> dt_utc = dt_utc.replace(tzinfo=from_zone) >>> dt_ts = dt_utc.timestamp() >>> >>> map_df = pd.read_csv('cc_log_dev_map.csv') >>> clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0] >>> >>> return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\ >>> 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese}) >>> >>> #st_env.execute_sql(""" >>> # CREATE TABLE source( >>> # message STRING, >>> # clusterName STRING, >>> # kubernetes ROW<container ROW<name STRING>> >>> # ) WITH( >>> # 'connector' = 'kafka', >>> # ) >>> # """) >>> >>> st_env.execute_sql(""" >>> CREATE TABLE sink( >>> body ROW<content STRING>, >>> headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\ >>> `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING> >>> ) WITH( >>> 'connector' = 'print', >>> ) >>> """)tmp_table = st_env.from_path("source") \ >>> .select("message,kubernetes.get('container').get('name') as container,clusterName") >>> >>> data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType()) >>> table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env) >>> >>> sink_table = table \ >>> .where("error_exist(message) = true") \ >>> .select("error_get(message) as body, headers_get(message,container,clusterName) as headers") >>> >>> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo >>> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result() >>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result >>> return self._py_class(self._j_completable_future.get()) >>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ >>> answer, self.gateway_client, self.target_id, self.name) >>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco >>> return f(*a, **kw) >>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value >>> format(target_id, ".", name), value) >>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get. >>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) >>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) >>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) >>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> |
PyFlink里有Row类型的类, pyflink.table.Row
> 在 2020年10月21日,上午9:05,whh_960101 <[hidden email]> 写道: > > Row类型的对象在python中是怎么表示的,字典? > > > > > > > 在 2020-10-20 20:35:22,"Dian Fu" <[hidden email]> 写道: >> 你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。 >> >>> 在 2020年10月20日,下午7:56,Dian Fu <[hidden email]> 写道: >>> >>> 错误堆栈看着似乎不太完整,有更完整的堆栈吗? >>> >>>> 在 2020年10月20日,下午7:38,whh_960101 <[hidden email]> 写道: >>>> >>>> Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN()) >>>> def error_exist(message): >>>> if message is None: >>>> return False >>>> mes_dic = json.loads(message.strip()) >>>> log = mes_dic.get('log').lower().strip() >>>> if 'error' in log: >>>> return True >>>> else: >>>> return False >>>> >>>> @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ROW([DataTypes.FIELD("content", DataTypes.STRING())])) >>>> def error_get(message): >>>> if message is None: >>>> return '' >>>> mes_dic = json.loads(message.strip()) >>>> log = mes_dic.get('log') >>>> return json.dumps({"content":log.strip()}) >>>> >>>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()], result_type=DataTypes.ROW([\ >>>> DataTypes.FIELD("appId", DataTypes.STRING()),DataTypes.FIELD("hostname", DataTypes.STRING()), \ >>>> DataTypes.FIELD("level", DataTypes.STRING()),DataTypes.FIELD("timeZone", DataTypes.STRING()), \ >>>> DataTypes.FIELD("timestamp", DataTypes.DOUBLE()),DataTypes.FIELD("container", DataTypes.STRING()), \ >>>> DataTypes.FIELD("clusterName", DataTypes.STRING()),DataTypes.FIELD("clusterChinese", DataTypes.STRING())])) >>>> def headers_get(message,container,clusterName): >>>> mes_dic = json.loads(message.strip()) >>>> tz_utc = mes_dic.get('time') >>>> tz_utc = tz_utc[:tz_utc.rfind('.') + 7] >>>> from_zone = tz.gettz('UTC') >>>> dt_utc = datetime.strptime(tz_utc, '%Y-%m-%dT%H:%M:%S.%f') >>>> dt_utc = dt_utc.replace(tzinfo=from_zone) >>>> dt_ts = dt_utc.timestamp() >>>> >>>> map_df = pd.read_csv('cc_log_dev_map.csv') >>>> clusterChinese = map_df.loc[map_df.cs_English == clusterName, 'cs_Chinese'].values[0] >>>> >>>> return json.dumps({'appId':'','hostname':'','level':'ERROR','timeZone':'Asia/Shanghai','timestamp':dt_ts,\ >>>> 'container':container,'clusterName':clusterName,'clusterChinese':clusterChinese}) >>>> >>>> #st_env.execute_sql(""" >>>> # CREATE TABLE source( >>>> # message STRING, >>>> # clusterName STRING, >>>> # kubernetes ROW<container ROW<name STRING>> >>>> # ) WITH( >>>> # 'connector' = 'kafka', >>>> # ) >>>> # """) >>>> >>>> st_env.execute_sql(""" >>>> CREATE TABLE sink( >>>> body ROW<content STRING>, >>>> headers ROW<appId STRING,hostname STRING,level STRING,timeZone STRING,\ >>>> `timestamp` DOUBLE,container STRING,clusterName STRING,clusterChinese STRING> >>>> ) WITH( >>>> 'connector' = 'print', >>>> ) >>>> """)tmp_table = st_env.from_path("source") \ >>>> .select("message,kubernetes.get('container').get('name') as container,clusterName") >>>> >>>> data_stream = st_env._j_tenv.toAppendStream(tmp_table._j_table,tmp_table._j_table.getSchema().toRowType()) >>>> table = Table(st_env._j_tenv.fromDataStream(data_stream,"message,clusterName,container"),st_env) >>>> >>>> sink_table = table \ >>>> .where("error_exist(message) = true") \ >>>> .select("error_get(message) as body, headers_get(message,container,clusterName) as headers") >>>> >>>> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result()报错:File "log_parse_kafka.py", line 180, in from_kafka_to_oracle_demo >>>> sink_table.execute_insert("kafka_sink").get_job_client().get_job_execution_result().result() >>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/common/completable_future.py", line 78, in result >>>> return self._py_class(self._j_completable_future.get()) >>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1286, in __call__ >>>> answer, self.gateway_client, self.target_id, self.name) >>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco >>>> return f(*a, **kw) >>>> File "/home/cdh272705/.local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value >>>> format(target_id, ".", name), value) >>>> py4j.protocol.Py4JJavaError: An error occurred while calling o150.get. >>>> : java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >>>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>>> at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:186) >>>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) >>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) >>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>>> at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229) >>>> at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) >>>> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) >>>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) >>>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) >>>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:892) >>>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >>>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> |
In reply to this post by Dian Fu
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
user_id STRING, user_name STRING uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '<a href="http://localhost:9200'">http://localhost:9200', 'index' = 'users' );Connector Options | Option | Required | Default | Type | Description | | connector | required | (none) | String | Specify what connector to use, valid values are: elasticsearch-6: connect to Elasticsearch 6.x cluster elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster | | hosts | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. '<a href="http://host_name:9092;http://host_name:9093'">http://host_name:9092;http://host_name:9093'. | | index | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. | | document-type | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. | | document-id.key-delimiter | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | | failure-handler | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are: fail: throws an exception if a request fails and thus causes a job failure. ignore: ignores failures and drops the request. retry_rejected: re-adds requests that have failed due to queue capacity saturation. custom class name: for failure handling with a ActionRequestFailureHandler subclass. | | sink.flush-on-checkpoint | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. | | sink.bulk-flush.max-actions | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. | | sink.bulk-flush.max-size | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. | | sink.bulk-flush.interval | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. | | sink.bulk-flush.backoff.strategy | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: DISABLED: no retry performed, i.e. fail after the first request error. CONSTANT: wait for backoff delay between retries. EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries. | | sink.bulk-flush.backoff.max-retries | optional | 8 | Integer | Maximum number of backoff retries. | | sink.bulk-flush.backoff.delay | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. | | connection.max-retry-timeout | optional | (none) | Duration | Maximum timeout between retries. | | connection.path-prefix | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' | | format | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. | |
Free forum by Nabble | Edit this page |