flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on yarn,per-job模式
程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 主要代码 t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', '128m') t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") t_env.add_python_archive("venv.zip") t_env.get_config().set_python_executable("venv.zip/venv/bin/python") @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.INT()) def judge_ip(src_ip, dst_ip): import IPy ..... t_env.register_function('judge_ip', judge_ip) 下面是主要报错信息 Traceback (most recent call last): File "traffic-tuple-sf.py", line 59, in <module> t_env.register_function('judge_ip', judge_ip) File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 876, in register_function File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o5.registerFunction. : org.apache.flink.table.api.ValidationException: Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable. Make sure that the class is self-contained (i.e. no references to outer classes) and all inner fields are serializable as well. at org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) at org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) at org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @1311d9fb at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177) at java.base/java.lang.reflect.Field.setAccessible(Field.java:171) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346) ... 14 more 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢 |
你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。
[1] https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m On Wed, Apr 14, 2021 at 4:58 PM magichuang <[hidden email]> wrote: > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on > yarn,per-job模式 > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > 主要代码 > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > '128m') > > t_env.get_config().get_configuration().set_string("pipeline.jars", > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > t_env.get_config().get_configuration().set_string("pipeline.classpaths", > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > t_env.add_python_archive("venv.zip") > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python") > > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], > result_type=DataTypes.INT()) > > def judge_ip(src_ip, dst_ip): > > import IPy > > ..... > > t_env.register_function('judge_ip', judge_ip) > > > > 下面是主要报错信息 > > Traceback (most recent call last): > > File "traffic-tuple-sf.py", line 59, in <module> > > t_env.register_function('judge_ip', judge_ip) > > File > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 876, in register_function > > File > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > > File > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line > 147, in deco > > File > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line > 328, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling > o5.registerFunction. > > : org.apache.flink.table.api.ValidationException: Function class 'class > org.apache.flink.table.functions.python.PythonScalarFunction' is not > serializable. Make sure that the class is self-contained (i.e. no > references to outer classes) and all inner fields are serializable as well. > > at > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) > > at > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) > > at > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) > > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) > > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) > > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > at java.base/java.lang.Thread.run(Thread.java:831) > > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make > field private final byte[] java.lang.String.value accessible: module > java.base does not "opens java.lang" to unnamed module @1311d9fb > > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) > > at > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > > at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177) > > at java.base/java.lang.reflect.Field.setAccessible(Field.java:171) > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104) > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > > at > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346) > > ... 14 more > > > > > 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢 > > > > > > > > |
您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的
提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 -ytm 1024m -p 1 -py traffic.py > ------------------ 原始邮件 ------------------ > 发 件 人:"Dian Fu" <[hidden email]> > 发送时间:2021-04-14 23:11:57 > 收 件 人:user-zh <[hidden email]> > 抄 送: > 主 题:Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。 > > [1] > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote: > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on > > yarn,per-job模式 > > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > > > > > > 主要代码 > > > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > > '128m') > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > t_env.get_config().get_configuration().set_string("pipeline.classpaths", > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > t_env.add_python_archive("venv.zip") > > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python") > > > > > > > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], > > result_type=DataTypes.INT()) > > > > def judge_ip(src_ip, dst_ip): > > > > import IPy > > > > ..... > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > 下面是主要报错信息 > > > > Traceback (most recent call last): > > > > File "traffic-tuple-sf.py", line 59, in > > > > t_env.register_function('judge_ip', judge_ip) > > > > File > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > > line 876, in register_function > > > > File > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > > line 1286, in __call__ > > > > File > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line > > 147, in deco > > > > File > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line > > 328, in get_return_value > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > o5.registerFunction. > > > > : org.apache.flink.table.api.ValidationException: Function class 'class > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > serializable. Make sure that the class is self-contained (i.e. no > > references to outer classes) and all inner fields are serializable as well. > > > > at > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) > > > > at > > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) > > > > at > > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) > > > > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) > > > > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) > > > > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > > > at > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > > > at > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > > > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > > > at > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > > > at > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > > > at > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > > > at java.base/java.lang.Thread.run(Thread.java:831) > > > > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make > > field private final byte[] java.lang.String.value accessible: module > > java.base does not "opens java.lang" to unnamed module @1311d9fb > > > > at > > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) > > > > at > > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > > > > at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177) > > > > at java.base/java.lang.reflect.Field.setAccessible(Field.java:171) > > > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104) > > > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > > > > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > > > > at > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346) > > > > ... 14 more > > > > > > > > > > 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢 > > > > > > > > > > > > > > > > -- Best, MagicHuang |
你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。
On Thu, Apr 15, 2021 at 10:24 AM magichuang <[hidden email]> wrote: > 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java > version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的 > > 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 > -ytm 1024m -p 1 -py traffic.py > > > > > > ------------------ 原始邮件 ------------------ > > 发 件 人:"Dian Fu" <[hidden email]> > > 发送时间:2021-04-14 23:11:57 > > 收 件 人:user-zh <[hidden email]> > > 抄 送: > > 主 题:Re: pyflink 运行提示:Function class 'class > org.apache.flink.table.functions.python.PythonScalarFunction' is not > serializable > > > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。 > > > > [1] > > > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > > > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote: > > > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on > > > yarn,per-job模式 > > > > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf > 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > > > > > > > > > > > 主要代码 > > > > > > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > > > '128m') > > > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > t_env.get_config().get_configuration().set_string("pipeline.classpaths", > > > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > > > > > > t_env.add_python_archive("venv.zip") > > > > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python") > > > > > > > > > > > > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], > > > result_type=DataTypes.INT()) > > > > > > def judge_ip(src_ip, dst_ip): > > > > > > import IPy > > > > > > ..... > > > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > > > > > 下面是主要报错信息 > > > > > > Traceback (most recent call last): > > > > > > File "traffic-tuple-sf.py", line 59, in > > > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > File > > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > > > line 876, in register_function > > > > > > File > > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > > > line 1286, in __call__ > > > > > > File > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line > > > 147, in deco > > > > > > File > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line > > > 328, in get_return_value > > > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > > o5.registerFunction. > > > > > > : org.apache.flink.table.api.ValidationException: Function class 'class > > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > > serializable. Make sure that the class is self-contained (i.e. no > > > references to outer classes) and all inner fields are serializable as > well. > > > > > > at > > > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) > > > > > > at > > > > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) > > > > > > at > > > > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) > > > > > > at > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) > > > > > > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > > Method) > > > > > > at > > > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) > > > > > > at > > > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > > > > > at > > > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > > > > > at > > > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > > > > > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > > > > > at > > > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > > > > > at > > > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > > > > > at > > > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > > > > > at java.base/java.lang.Thread.run(Thread.java:831) > > > > > > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to > make > > > field private final byte[] java.lang.String.value accessible: module > > > java.base does not "opens java.lang" to unnamed module @1311d9fb > > > > > > at > > > > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) > > > > > > at > > > > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > > > > > > at > java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177) > > > > > > at java.base/java.lang.reflect.Field.setAccessible(Field.java:171) > > > > > > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104) > > > > > > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > > > > > > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > > > > > > at > > > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346) > > > > > > ... 14 more > > > > > > > > > > > > > > > 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢 > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Best, > > MagicHuang > > > |
感谢~ 通过多次调试 是打的venv 包有问题, 已经解决了 现在可以在集群上跑了 谢谢~
> ------------------ 原始邮件 ------------------ > 发 件 人:"Dian Fu" <[hidden email]> > 发送时间:2021-04-15 10:32:49 > 收 件 人:user-zh <[hidden email]>,[hidden email] > 抄 送: > 主 题:Re: Re: pyflink 运行提示:Function class 'class org.apache.flink.table.functions.python.PythonScalarFunction' is not serializable > > 你要不先用local的方式跑一下试试?先缩小一下范围。从报错看,编译的时候报错的,应该在你现在提交作业的机器上就可以复现出来。 > > On Thu, Apr 15, 2021 at 10:24 AM magichuang wrote: > > > 您好,集群是有6台 centos 7.8的云服务器,其中5台 java version 是"1.8.0_202",其中1台是 java > > version "16" 2021-03-16,这个有影响吗? 我是在"1.8.0_202" 上提交的 > > > > 提交命令:/opt/flink-1.11.2/bin/flink run -m yarn-cluster -ynm traffic -ys 1 > > -ytm 1024m -p 1 -py traffic.py > > > > > > > > > > > ------------------ 原始邮件 ------------------ > > > 发 件 人:"Dian Fu" > > > 发送时间:2021-04-14 23:11:57 > > > 收 件 人:user-zh > > > 抄 送: > > > 主 题:Re: pyflink 运行提示:Function class 'class > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > serializable > > > > > > 你JDK版本多少? 看起来像是Java环境的问题。这里有一个相似的问题[1],看下是否有帮助。 > > > > > > [1] > > > > > https://stackoverflow.com/questions/41265266/how-to-solve-inaccessibleobjectexception-unable-to-make-member-accessible-m > > > > > > On Wed, Apr 14, 2021 at 4:58 PM magichuang wrote: > > > > > > > flink版本:1.11.2 Python版本 3.6 apache-flink==1.11.2, 用的是flink on > > > > yarn,per-job模式 > > > > > > > > 程序使用pyflink开发的,从kafka读取数据,然后通过udf > > 判断一个IP是否在一个IP段中,引入的第三方包是IPy,然后再写入kafka中 > > > > > > > > > > > > > > > > > > > > 主要代码 > > > > > > > > > > t_env.get_config().get_configuration().set_string('taskmanager.memory.task.off-heap.size', > > > > '128m') > > > > > > > > t_env.get_config().get_configuration().set_string("pipeline.jars", > > > > > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > t_env.get_config().get_configuration().set_string("pipeline.classpaths", > > > > > > "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.0.jar; ") > > > > > > > > > > > > > > > > > > > > t_env.add_python_archive("venv.zip") > > > > > > > > t_env.get_config().set_python_executable("venv.zip/venv/bin/python") > > > > > > > > > > > > > > > > > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.STRING()], > > > > result_type=DataTypes.INT()) > > > > > > > > def judge_ip(src_ip, dst_ip): > > > > > > > > import IPy > > > > > > > > ..... > > > > > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > > > > > > > > > 下面是主要报错信息 > > > > > > > > Traceback (most recent call last): > > > > > > > > File "traffic-tuple-sf.py", line 59, in > > > > > > > > t_env.register_function('judge_ip', judge_ip) > > > > > > > > File > > > > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", > > > > line 876, in register_function > > > > > > > > File > > > > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > > > > line 1286, in __call__ > > > > > > > > File > > > > "/opt/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", > > line > > > > 147, in deco > > > > > > > > File > > > > "/opt/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > > line > > > > 328, in get_return_value > > > > > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > > > o5.registerFunction. > > > > > > > > : org.apache.flink.table.api.ValidationException: Function class 'class > > > > org.apache.flink.table.functions.python.PythonScalarFunction' is not > > > > serializable. Make sure that the class is self-contained (i.e. no > > > > references to outer classes) and all inner fields are serializable as > > well. > > > > > > > > at > > > > > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:349) > > > > > > > > at > > > > > > org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:204) > > > > > > > > at > > > > > > org.apache.flink.table.catalog.FunctionCatalog.registerTempSystemScalarFunction(FunctionCatalog.java:383) > > > > > > > > at > > > > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.registerFunction(TableEnvironmentImpl.java:357) > > > > > > > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > > > Method) > > > > > > > > at > > > > > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) > > > > > > > > at > > > > > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > > > > > > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > > > > > > > > at > > > > > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > > > > > > > at > > > > > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > > > > > > > at > > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > > > > > > > at > > > > > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > > > > > > > at > > > > > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > > > > > > > at > > > > > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > > > > > > > at java.base/java.lang.Thread.run(Thread.java:831) > > > > > > > > Caused by: java.lang.reflect.InaccessibleObjectException: Unable to > > make > > > > field private final byte[] java.lang.String.value accessible: module > > > > java.base does not "opens java.lang" to unnamed module @1311d9fb > > > > > > > > at > > > > > > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:357) > > > > > > > > at > > > > > > java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297) > > > > > > > > at > > java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:177) > > > > > > > > at java.base/java.lang.reflect.Field.setAccessible(Field.java:171) > > > > > > > > at > > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:104) > > > > > > > > at > > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > > > > > > > > at > > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > > > > > > > > at > > > > > > org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:346) > > > > > > > > ... 14 more > > > > > > > > > > > > > > > > > > > > 麻烦各位大佬给看看是哪里有问题呀,应该如何修改~ 感谢 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best, > > > > MagicHuang > > > > > > -- Best, MagicHuang |
Free forum by Nabble | Edit this page |