官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html 按照例子写了程序,也安装了pyflink | python -m pip install apache-flink | 代码: | from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()) t_env.register_function("add", add) t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) \ .with_format(OldCsv() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT())) \ .with_schema(Schema() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT())) \ .create_temporary_table('mySource') t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt')) \ .with_format(OldCsv() .field('sum', DataTypes.BIGINT())) \ .with_schema(Schema() .field('sum', DataTypes.BIGINT())) \ .create_temporary_table('mySink') t_env.from_path('mySource')\ .select("add(a, b)") \ .insert_into('mySink') t_env.execute("tutorial_job") | 执行: | python test_pyflink.py | 报错: | Traceback (most recent call last): File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. : org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'. at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158) at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119) at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:745) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "test_pyflink.py", line 34, in <module> t_env.execute("tutorial_job") File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'." | 里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 , 我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml 增加了taskmanager.memory.task.off-heap.size: 100m 但是还是报一样的错误 请问用python安装的flink去哪里配置属性 |
Hi,
你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)就行,如果你用了的话,就需要配置off-heap memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')。你可以参考文档上的例子,以及对应的note说明[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions Best, Xingbo chenxuying <[hidden email]> 于2020年7月21日周二 上午11:36写道: > 官方例子: > https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html > 按照例子写了程序,也安装了pyflink > | > python -m pip install apache-flink > | > 代码: > | > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes > from pyflink.table.descriptors import Schema, OldCsv, FileSystem > from pyflink.table.udf import udf > > > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env) > > > add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], > DataTypes.BIGINT()) > > > t_env.register_function("add", add) > > > t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) > \ > .with_format(OldCsv() > .field('a', DataTypes.BIGINT()) > .field('b', DataTypes.BIGINT())) \ > .with_schema(Schema() > .field('a', DataTypes.BIGINT()) > .field('b', DataTypes.BIGINT())) \ > .create_temporary_table('mySource') > > > t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt')) > \ > .with_format(OldCsv() > .field('sum', DataTypes.BIGINT())) \ > .with_schema(Schema() > .field('sum', DataTypes.BIGINT())) \ > .create_temporary_table('mySink') > > > t_env.from_path('mySource')\ > .select("add(a, b)") \ > .insert_into('mySink') > > > t_env.execute("tutorial_job") > | > > 执行: > > | > python test_pyflink.py > | > > 报错: > > > | > Traceback (most recent call last): > File > "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", > line 147, in deco > return f(*a, **kw) > File > "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. > : org.apache.flink.table.api.TableException: The configured Task Off-Heap > Memory 0 bytes is less than the least required Python worker Memory 79 mb. > The Task Off-Heap Memory can be configured using the configuration key > 'taskmanager.memory.task.off-heap.size'. > at > org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158) > at > org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119) > at > org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:745) > > > > > During handling of the above exception, another exception occurred: > > > Traceback (most recent call last): > File "test_pyflink.py", line 34, in <module> > t_env.execute("tutorial_job") > File > "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", > line 1057, in execute > return JobExecutionResult(self._j_tenv.execute(job_name)) > File > "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", > line 154, in deco > raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) > pyflink.util.exceptions.TableException: "The configured Task Off-Heap > Memory 0 bytes is less than the least required Python worker Memory 79 mb. > The Task Off-Heap Memory can be configured using the configuration key > 'taskmanager.memory.task.off-heap.size'." > | > > > > > 里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 , > > 我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml > > 增加了taskmanager.memory.task.off-heap.size: 100m > > 但是还是报一样的错误 > > 请问用python安装的flink去哪里配置属性 > > > |
你好
明白了,感谢 , 我文档没看清楚哈 在 2020-07-21 11:44:23,"Xingbo Huang" <[hidden email]> 写道: >Hi, >你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", >True)就行,如果你用了的话,就需要配置off-heap >memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", >'80m')。你可以参考文档上的例子,以及对应的note说明[1] > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions > >Best, >Xingbo > > >chenxuying <[hidden email]> 于2020年7月21日周二 上午11:36写道: > >> 官方例子: >> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html >> 按照例子写了程序,也安装了pyflink >> | >> python -m pip install apache-flink >> | >> 代码: >> | >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment, DataTypes >> from pyflink.table.descriptors import Schema, OldCsv, FileSystem >> from pyflink.table.udf import udf >> >> >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> t_env = StreamTableEnvironment.create(env) >> >> >> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], >> DataTypes.BIGINT()) >> >> >> t_env.register_function("add", add) >> >> >> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt')) >> \ >> .with_format(OldCsv() >> .field('a', DataTypes.BIGINT()) >> .field('b', DataTypes.BIGINT())) \ >> .with_schema(Schema() >> .field('a', DataTypes.BIGINT()) >> .field('b', DataTypes.BIGINT())) \ >> .create_temporary_table('mySource') >> >> >> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt')) >> \ >> .with_format(OldCsv() >> .field('sum', DataTypes.BIGINT())) \ >> .with_schema(Schema() >> .field('sum', DataTypes.BIGINT())) \ >> .create_temporary_table('mySink') >> >> >> t_env.from_path('mySource')\ >> .select("add(a, b)") \ >> .insert_into('mySink') >> >> >> t_env.execute("tutorial_job") >> | >> >> 执行: >> >> | >> python test_pyflink.py >> | >> >> 报错: >> >> >> | >> Traceback (most recent call last): >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", >> line 147, in deco >> return f(*a, **kw) >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", >> line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. >> : org.apache.flink.table.api.TableException: The configured Task Off-Heap >> Memory 0 bytes is less than the least required Python worker Memory 79 mb. >> The Task Off-Heap Memory can be configured using the configuration key >> 'taskmanager.memory.task.off-heap.size'. >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158) >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119) >> at >> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> >> During handling of the above exception, another exception occurred: >> >> >> Traceback (most recent call last): >> File "test_pyflink.py", line 34, in <module> >> t_env.execute("tutorial_job") >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", >> line 1057, in execute >> return JobExecutionResult(self._j_tenv.execute(job_name)) >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", >> line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File >> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", >> line 154, in deco >> raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) >> pyflink.util.exceptions.TableException: "The configured Task Off-Heap >> Memory 0 bytes is less than the least required Python worker Memory 79 mb. >> The Task Off-Heap Memory can be configured using the configuration key >> 'taskmanager.memory.task.off-heap.size'." >> | >> >> >> >> >> 里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 , >> >> 我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml >> >> 增加了taskmanager.memory.task.off-heap.size: 100m >> >> 但是还是报一样的错误 >> >> 请问用python安装的flink去哪里配置属性 >> >> >> |
Free forum by Nabble | Edit this page |