彭哲夫,你好:
你是如何安装的beam,安装的版本是多少?如果是1.10 需要apache-beam 2.15,如果是 master需要apache-beam 2.19. BTW, 为了共享你的问题,我将你的问题发到了中文用户列表里面,我们大家一起讨论。 Best, Jincheng Zhefu PENG <[hidden email]> 于2020年3月20日周五 下午5:12写道: > Hi Jincheng, > > 针对昨天提到的第二点,我做了两个思路:1. 将我的pyflink.py以本地模式运行,2.对集群节点进行环境配置 > > 1. > 在本地模式中,我们发现在装apache-beam包的时候出于某些原因没有装全,少了_bz2模块,再补充上之后,pyflink.py的脚本可以正常运行,其中的udf功能也能正常使用。 > > 2. > 在本地模式运行成功的基础上,我们根据你的建议,对所有的worker节点进行了环境的更新,都更新到了python3.6以及安装了apache_beam和apache-flink. > 但是以集群模式运行带有udf功能的脚本仍然报错,尝试谷歌搜索以后也没有搜到相关解答,在附件附上错误日志,希望能得到帮助(因为本地模式已经成功所以就不附带代码了),非常感谢! > > 期待您的回复 > 彭哲夫 > > > Zhefu PENG <[hidden email]> 于2020年3月19日周四 下午11:14写道: > >> Hi Jincheng: >> >> 非常感谢你如此迅速而细致的回复!~ >> >> 关于第一点:根据你的回复,我在flink的lib目录下增加flink-csv-1.10.0-sql-jar.jar包之后,运行成功。而第一个包我在之前浏览你博客中关于kafka的使用的demo(based >> on flink 1.9)中有看到并下载,因此这里有个提议,或许你未来可以对于后续使用者补充 >> flink-csv-1.10.0-sql-jar.jar包的使用的必要性 :),但也有可能是我在查询学习时看漏了,但不管怎么说感谢你的帮助解决; >> >> 关于第二点:因为部门相关组织安排问题,我现在没有权限去worker节点上查询,但是针对这一点我有个好奇的地方:我目前只在启动脚本的主机上安装了python3.5+, >> 并且除了udf功能外,我都能正常使用(比如sql本身就有的concat之类,或者add_columns()这种简单功能)。所以是不是我理解为,如果要使用pyflink的全部功能,应该是集群的环境都要是python3.5+? >> 但是简单的功能,只要启动脚本的主机环境符合就够了? >> 还是关于第二点,我刚刚又重新跑了一下脚本,本来是希望能获得和之前一样的错误日志发给我的mentor,但是发现这次报了新的问题: >> java.lang.NoClassDefFoundError: Could not initialize class >> org.apache.beam.sdk.options.PipelineOptionsFactory >> at >> org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:173) >> at >> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193) >> at >> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139) >> at >> org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143) >> at >> org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:73) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> >> >> 我猜测原因正如你提到的worker的环境不符标准,我会在明天上班后请同事帮忙check后,根据你的建议进行修改尝试。也希望能解答一下疑问,因为刚毕业参加工作,可能提的问题会显得比较低级,请见谅! >> >> 再次感谢你的回复,我会根据建议尽快进行错误修复 >> 彭哲夫 >> >> jincheng sun <[hidden email]> 于2020年3月19日周四 下午9:08写道: >> >>> 彭哲夫,你好: >>> >>> 你上面问题可能原因是: >>> >>> 1. pyflink默认不包含kafka connector的jar包和csv的格式JIR包,需要把这些jar包加到pyflink的lib目录下: >>> >>> $ PYFLINK_LIB=`python -c "import pyflink;import >>> os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')"` >>> $ cd $PYFLINK_LIB >>> $ curl -O >>> https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar >>> $ curl -O >>> https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar >>> >>> 2. 有可能的原因是worker上没有安装python3以上环境或者环境中没有安装apache-beam,可以尝试在worker机器上执行一下: >>> python --version 检查python版本,同时执行 pip list 查看是否有apache-beam,如果没有,可以执行 >>> :python -m pip install apache-flink >>> >>> 期望对你有帮助,有问题我们持续沟通。 >>> >>> Best, >>> Jincheng >>> >>> >>> >>> Zhefu PENG <[hidden email]> 于2020年3月19日周四 下午8:13写道: >>> >>>> 你好: >>>> >>>> >>>> 在网上看到了你的博客,关于你对pyflink的开发和推动深感敬佩。我们部门因为业务需要最近在调研使用flink相关,我写了个一个简单的demo想做体验和测试,但是遇到了两个问题(第二个问题是目前遇到的比较大的困难,第一个问题采取了规避策略:)): >>>> >>>> 1. >>>> 当我把数据想以Csv格式输出到Kafka时,报错。(从社区文档我已经了解到应该用Csv()取代OldCsv(),并修改)。查看报错信息后我怀疑是因为缺少jar包导致(比如之前使用Json格式时候),但是从另一个文档中了解到csv格式应该是built-in的。目前采取了规避措施,采用json格式输出。 >>>> >>>> 报错信息如下: >>>> >>>> py4j.protocol.Py4JJavaError: An error occurred while calling >>>> o62.insertInto. >>>> : org.apache.flink.table.api.NoMatchingTableFactoryException: Could not >>>> find a suitable table factory for >>>> 'org.apache.flink.table.factories.SerializationSchemaFactory' in >>>> the classpath. >>>> >>>> Reason: No factory supports all properties. >>>> >>>> The matching candidates: >>>> org.apache.flink.formats.csv.CsvRowFormatFactory >>>> Unsupported property keys: >>>> format.fields.#.name >>>> format.fields.#.data-type >>>> >>>> table注册部分代码如下: >>>> table_env.connect(Kafka() >>>> .version("universal") >>>> .topic(kafka_write_topic) >>>> .property(kafka_server, ','.join(kafka_server_list)) >>>> .property(kafka_zookeeper, >>>> ','.join(kafka_server_list))) \ >>>> .with_format(Csv() >>>> .schema(DataTypes.ROW([DataTypes.FIELD("a", >>>> DataTypes.STRING()), >>>> DataTypes.FIELD("b", >>>> DataTypes.STRING()), >>>> DataTypes.FIELD("c", >>>> DataTypes.STRING()) >>>> ]))) \ >>>> .with_schema(Schema() >>>> .field("a", DataTypes.STRING()) >>>> .field("b", DataTypes.STRING()) >>>> .field("c", DataTypes.STRING())) \ >>>> .create_temporary_table(table_name_output) >>>> >>>> 2.采用规避策略后,尝试使用python udf增加自定义的函数丰富功能。但是在按照给的demo中的步骤定义好udf函数后, >>>> 才运行起来后,一段时间内会超时报错,猜测是因为pyflink-udf-runner.sh这个脚本没有被启用, >>>> 但是在所依赖的opt/flink-python_2.11-1.10.0.jar的包内可以找到该脚本。 >>>> 报错信息如下: >>>> 2020-03-19 17:02:23,273 INFO >>>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory >>>> - Still waiting for startup of environment >>>> '/matrix/data/hadoop/data4/mapred/usercache/root/appcache/application_1584436506937_1231/python-dist-1e474e56-46a7-4ae6-bc4a-871ba86917a6/pyflink-udf-runner.sh' >>>> for worker id 1 >>>> >>>> 这部分代码如下: >>>> @udf(input_types=[DataTypes.STRING()], >>>> result_type=DataTypes.STRING()) >>>> def if_length_enough(devid): >>>> res = devid + " func" >>>> return res >>>> >>>> table_env.register_function("if_length_enough", if_length_enough) >>>> >>>> table_env.from_path(table_name_input) \ >>>> .select("a, b, if_length_enough(c)") \ >>>> .insert_into(table_name_output) >>>> >>>> >>>> 以上两个错误,困扰了一下午,希望能帮忙解答,非常感谢! >>>> 期待您的回复。 >>>> >>>> 彭哲夫 >>>> >>>> |
Hi Zhefu, 谢谢您分享解决问题的细节,这对社区有很大的贡献! 1. 关于订阅问题 我想确认一下,你是否参考了[1],同时以订阅中文用户列表([hidden email])为例,您需要发送邮件到([hidden email]),就是在原有邮件的地址上添加subscribe。同时收到一封“confirm subscribe to user-zh@flink.apache.org”的确认邮件,需要进行确认回复。 2. 关于JAR包冲突问题 flink-python JAR会携带flink-python对beam依赖的核心JAR包,我这里想了解一些,为啥你集群上面存在这beam相关的包?另外我认为您提供的case很好,让我想到了可以对PyFlink对Beam的依赖进行一些优化,比如将beam进行relocation. 我已经创建了社区改进JIRA[2]. 3. 关于打包问题 上传给PyFlink的Python环境包需要是在机器间可移植的,所以的确不能包含软链接。如果是用virtualenv创建的环境的话,需要加上--always-copy选项。此外,如果集群机器上已经有准备好的python3.5+的环境,可以不用上传环境包,直接使用add_python_executable("python3")为集群指定要使用的Python Interpreter。 除了virtualenv,conda/miniconda 也可用于创建虚拟环境,但是大小要大很多,在virtualenv处于某些原因不work的时候(比如源python解释器依赖的so文件在集群上不存在),可以考虑使用。 再次感谢您分享问题的解决细节和问题的反馈! Zhefu PENG <[hidden email]> 于2020年3月24日周二 下午9:33写道:
error.txt (15K) Download Attachment |
Hi Jincheng,
我现在碰到同样的问题,udf运行的时候会打印这样的log: 2020-08-07 03:06:45,920 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/usr/local/lib64/python3.6/site-packages/pyflink/bin/pyflink-udf-runner.sh' for worker id 1-1 然后过一阵就pyflink退出了。 我是用local模式运行的。我的beam版本是2.19 安装最新的2.23也是一样的问题。 apache-flink版本是1.11.0 请问有什么办法可以排查这个错误? -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
flink 1.11.0的话,依赖的beam版本是2.19.0,所以先不要用最新的beam 2.23.0
针对你这个问题,应该是在启动过程中,由于某种原因,Python进程启动失败了,常见的原因有:依赖缺失,Python版本不对等等。 按说在log文件里(你发的log信息所在的log文件),应该能看到详细的原因,你的log文件里面没有详细的失败原因吗? > 在 2020年8月7日,下午1:44,lgs <[hidden email]> 写道: > > Hi Jincheng, > > 我现在碰到同样的问题,udf运行的时候会打印这样的log: > 2020-08-07 03:06:45,920 INFO > org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] > - Still waiting for startup of environment > '/usr/local/lib64/python3.6/site-packages/pyflink/bin/pyflink-udf-runner.sh' > for worker id 1-1 > 然后过一阵就pyflink退出了。 > > 我是用local模式运行的。我的beam版本是2.19 安装最新的2.23也是一样的问题。 > apache-flink版本是1.11.0 > > 请问有什么办法可以排查这个错误? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by jincheng sun
Hi, jincheng.
最近也遇到了类似问题,请问有什么思路吗? flink-python 的 jar 都是有的,且版本是对的。 版本是 1.11.1,这个主要是在引入了 udf 时出现的,之前是正常的。 尝试过 virtualenv 打包 python 环境也没有效果。 具体报错如下: Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152) at org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
看报错是NoClassDefFoundError,应该是包冲突了,你是不是自己安装了apache-beam(1.11.1依赖的beam是2.19.0的版本)的其他版本了,亦或者你可以看下是不是PipelineOptions这个beam类依赖的其他包里面存在了包冲突的问题 Best, Xingbo jing <[hidden email]> 于2020年11月3日周二 上午10:16写道: > Hi, jincheng. > > 最近也遇到了类似问题,请问有什么思路吗? > > flink-python 的 jar 都是有的,且版本是对的。 > > 版本是 1.11.1,这个主要是在引入了 udf 时出现的,之前是正常的。 > > 尝试过 virtualenv 打包 python 环境也没有效果。 > > 具体报错如下: > > Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.beam.sdk.options.PipelineOptionsFactory > at > > org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152) > at > > org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) > at > > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) > at > > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) > at > > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) > at > > org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) > at > > org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) > at > > org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > > org.apache.flink.client.program.ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
This post was updated on .
Hi, xingbo.
在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install apache-beam==2.19.0 还是没有,都是一样的问题。 用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。 只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。 pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。 在本地 start-cluster.sh 之后,PYFLINK_CLIENT_EXECUTABLE=/usr/local/bin/python3 flink run -py demo.py 也是一样的。 示例代码如下: import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf def word_count(): content = "line Licensed to the Apache Software Foundation ASF under one " \ "line or more contributor license agreements See the NOTICE file " \ "line distributed with this work for additional information " \ "line regarding copyright ownership The ASF licenses this file " \ "to you under the Apache License Version the " \ "License you may not use this file except in compliance " \ "with the License" t_env = StreamTableEnvironment.create( StreamExecutionEnvironment.get_execution_environment(), environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() ) t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True) sink_ddl = """ create table Results(word VARCHAR,`count` BIGINT) with ( 'connector' = 'print') """ add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT()) t_env.register_function("add_test", add) t_env.sql_update(sink_ddl) elements = [(word, 1) for word in content.split(" ")] t_env.from_elements(elements, ["word", "count"]) \ .group_by("word") \ .select("word, add_test(count(1)) as count") \ .insert_into("Results") t_env.execute("word_count") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count() 环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有 udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。 这个情况下需要装什么东西吗,还是需要改配置。 日志上提示是: 2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver [] - Run python process failed java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88) ~[flink-python_2.11-1.11.1.jar:1.11.1] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_265] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_265] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_265] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) [flink-dist_2.11-1.11.1.jar:1.11.1] 2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend [] - Fatal error while running command line interface. org.apache.flink.client.program.ProgramAbortException: null at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) ~[flink-python_2.11-1.11.1.jar:1.11.1] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_265] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_265] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_265] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) [flink-dist_2.11-1.11.1.jar:1.11.1] org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
1. 之前那个报错在重新安装完pyflink之后有没有解决(本地python demo.py是否正常);之前那个报错是本地运行就报错,还是在远程提交才报的错。 2. 现在这个报错是作业提交时编译阶段就报错了,还没到作业运行。在作业提交的console界面是可以看到错误日志的,可否提供一下错误日志。 Best, Xingbo jing <[hidden email]> 于2020年11月3日周二 下午5:36写道: > Hi, xingbo. > 在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install > apache-beam==2.19.0 还是没有,都是一样的问题。 > 用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。 > 只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。 > pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。 > > 示例代码如下: > > import logging > import sys > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings, > DataTypes > from pyflink.table.udf import udf > > > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under > one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " > \ > "line regarding copyright ownership The ASF licenses this > file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_env = StreamTableEnvironment.create( > StreamExecutionEnvironment.get_execution_environment(), > > > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() > ) > > > t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", > True) > sink_ddl = """ > create table Results(word VARCHAR,`count` BIGINT) with ( > 'connector' > = 'print') > """ > add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT()) > t_env.register_function("add_test", add) > t_env.sql_update(sink_ddl) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, add_test(count(1)) as count") \ > .insert_into("Results") > t_env.execute("word_count") > > > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > > > 环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有 > udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。 > > 这个情况下需要装什么东西吗,还是需要改配置。 > > 日志上提示是: > > 2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver > > [] - Run python process failed > java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88) > ~[flink-python_2.11-1.11.1.jar:1.11.1] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_265] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_265] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_265] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > [flink-dist_2.11-1.11.1.jar:1.11.1] > 2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend > > [] - Fatal error while running command line interface. > org.apache.flink.client.program.ProgramAbortException: null > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > ~[flink-python_2.11-1.11.1.jar:1.11.1] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_265] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_265] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_265] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265] > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > [flink-dist_2.11-1.11.1.jar:1.11.1] > org.apache.flink.client.program.ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
1. 重新安装之后并没有解决。
本地提交和远程提交都有问题。用 python demo.py 的方式是正常的。 2. 作业是已经提交了。 有在提示 Job has been submitted with JobID 05fcaebfec3aca731df408418ebea80c 然后立马会出现下面的错误: 即:Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory Traceback (most recent call last): File "docs/examples/udf/demo.py", line 37, in <module> word_count() File "docs/examples/udf/demo.py", line 32, in word_count t_env.execute("word_count") File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 05fcaebfec3aca731df408418ebea80c) at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 05fcaebfec3aca731df408418ebea80c) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) ... 18 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.beam.sdk.options.PipelineOptionsFactory at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152) at org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:834) org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
1. python demo.py运行正常,说明代码和你本地执行的python环境都没有问题。 2. 你现在给的错误和你前一封邮件给的那个编译报错不是一个报错,一个一个问题来。 3. 你这个运行报错是你集群运行环境里面的python环境的beam包冲突了,就是我一开始回答的,你去得去检查集群运行环境里面的python环境是否符合要求。 Best, Xingbo jing <[hidden email]> 于2020年11月3日周二 下午6:09写道: > 1. 重新安装之后并没有解决。 > 本地提交和远程提交都有问题。用 python demo.py 的方式是正常的。 > 2. 作业是已经提交了。 > 有在提示 Job has been submitted with JobID 05fcaebfec3aca731df408418ebea80c > 然后立马会出现下面的错误: > > 即:Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.beam.sdk.options.PipelineOptionsFactory > > > > Traceback (most recent call last): > File "docs/examples/udf/demo.py", line 37, in <module> > word_count() > File "docs/examples/udf/demo.py", line 32, in word_count > t_env.execute("word_count") > File > > "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", > line 1057, in execute > return JobExecutionResult(self._j_tenv.execute(job_name)) > File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line > 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 147, in deco > return f(*a, **kw) > File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, > in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: 05fcaebfec3aca731df408418ebea80c) > at > > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > at > > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116) > at > > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) > at > > org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: 05fcaebfec3aca731df408418ebea80c) > at > > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) > at > > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) > at > > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) > at > > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > > java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > at > > java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) > at > > java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > at > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) > ... 18 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > at > > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > at > > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > at jdk.internal.reflect.GeneratedMethodAccessor25.invoke(Unknown > Source) > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NoClassDefFoundError: Could not initialize class > org.apache.beam.sdk.options.PipelineOptionsFactory > at > > org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:152) > at > > org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) > at > > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) > at > > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) > at > > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) > at > > org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) > at > > org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) > at > > org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) > at > > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.base/java.lang.Thread.run(Thread.java:834) > > org.apache.flink.client.program.ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |