请教大佬们: 一个最简单pyflink UDF跑起来,报 Failed to create stage bundle factory!
INFO:root:Initializing python harness: 在IdeaIJ上可以运行、大家有遇到过吗?谢谢~ /opt/module/flink-1.11.1/bin/flink run -m localhost:8081 -pyexec /opt/python36/bin/python3 -py udf.py [hadoop@hadoop01 pyflink]$ /opt/python36/bin/python3 -V Python 3.6.5 [hadoop@hadoop01 pyflink]$ pip list | grep flink apache-flink 1.12.0 ==================== from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json from pyflink.table.udf import udf, TableFunction, ScalarFunction env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true') add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()) t_env.register_function("add", add) t_env.sql_update(""" CREATE TABLE mySource ( a bigint, b bigint ) WITH ( 'connector' = 'kafka', 'topic' = 'udf', 'properties.bootstrap.servers' = 'hadoop01:9092', 'properties.group.id' = 'pyflinkUDF', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ) """) t_env.sql_update(""" CREATE TABLE mySink ( a bigint, b bigint ) WITH ( 'connector' = 'print' ) """) t_env.sql_update("insert into mySink select a, add(a,b) from mySource") t_env.execute("job") ==================== [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m localhost:8081 -pyexec /opt/python36/bin/python3 -py udf.py SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Job has been submitted with JobID 67fc6d3f4f3f97339345202f4de53366 Traceback (most recent call last): File "udf.py", line 63, in <module> t_env.execute("job") File "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1057, in execute File "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 67fc6d3f4f3f97339345202f4de53366) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 67fc6d3f4f3f97339345202f4de53366) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: /opt/python36/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_boot.py --id=1-1 --logging_endpoint=localhost:37175 --artifact_endpoint=localhost:16168 --provision_endpoint=localhost:38949 --control_endpoint=localhost:36619 Failed to set up logging handler, continuing without. Traceback (most recent call last): File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 94, in main fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor) File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/log_handler.py", line 75, in __init__ grpc.channel_ready_future(ch).result(timeout=60) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 140, in result self._block(timeout) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 86, in _block raise grpc.FutureTimeoutError() grpc.FutureTimeoutError WARNING:apache_beam.runners.worker.sdk_worker_main:No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail. WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['--options_id=0.0', '--app_name=AbstractPythonFunctionRunner'] ERROR:apache_beam.runners.worker.sdk_worker_main:Python sdk harness failed: Traceback (most recent call last): File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 158, in main sdk_pipeline_options.view_as(ProfilingOptions))).run() File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in __init__ grpc.channel_ready_future(self._control_channel).result(timeout=60) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 140, in result self._block(timeout) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 86, in _block raise grpc.FutureTimeoutError() grpc.FutureTimeoutError Traceback (most recent call last): File "/opt/python36/lib/python3.6/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/opt/python36/lib/python3.6/runpy.py", line 85, in _run_code exec(code, run_globals) File "/opt/python36/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py", line 29, in <module> apache_beam.runners.worker.sdk_worker_main.main(sys.argv) File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 158, in main sdk_pipeline_options.view_as(ProfilingOptions))).run() File "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in __init__ grpc.channel_ready_future(self._control_channel).result(timeout=60) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 140, in result self._block(timeout) File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line 86, in _block raise grpc.FutureTimeoutError() grpc.FutureTimeoutError at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197) at org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164) at org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) at org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) at org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) at org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0 at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:331) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:320) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:250) at org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195) ... 16 more Caused by: java.lang.IllegalStateException: Process died with exit code 0 at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:72) at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:137) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:200) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:184) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ... 24 more org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
看起来似乎是因为Flink集群的版本和PyFlink的版本不一致导致的:集群装的Flink是1.11.1,PyFlink是1.12.0?
先把版本都统一一下,再试试。 > 在 2021年2月8日,上午10:28,陈康 <[hidden email]> 写道: > > 请教大佬们: 一个最简单pyflink UDF跑起来,报 Failed to create stage bundle factory! > INFO:root:Initializing python harness: 在IdeaIJ上可以运行、大家有遇到过吗?谢谢~ > > /opt/module/flink-1.11.1/bin/flink run -m localhost:8081 -pyexec > /opt/python36/bin/python3 -py udf.py > > [hadoop@hadoop01 pyflink]$ /opt/python36/bin/python3 -V > Python 3.6.5 > [hadoop@hadoop01 pyflink]$ pip list | grep flink > apache-flink 1.12.0 > > ==================== > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, DataTypes > from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, > Json > from pyflink.table.udf import udf, TableFunction, ScalarFunction > > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env) > t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", > 'true') > > add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], > DataTypes.BIGINT()) > t_env.register_function("add", add) > > t_env.sql_update(""" > CREATE TABLE mySource ( > a bigint, > b bigint > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'udf', > 'properties.bootstrap.servers' = 'hadoop01:9092', > 'properties.group.id' = 'pyflinkUDF', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ) > """) > t_env.sql_update(""" > CREATE TABLE mySink ( > a bigint, > b bigint > ) WITH ( > 'connector' = 'print' > ) > """) > t_env.sql_update("insert into mySink select a, add(a,b) from mySource") > t_env.execute("job") > > ==================== > > > [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m > localhost:8081 -pyexec /opt/python36/bin/python3 -py udf.py > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > Job has been submitted with JobID 67fc6d3f4f3f97339345202f4de53366 > Traceback (most recent call last): > File "udf.py", line 63, in <module> > t_env.execute("job") > File > "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 1057, in execute > File > "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: 67fc6d3f4f3f97339345202f4de53366) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) > at > org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: 67fc6d3f4f3f97339345202f4de53366) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) > ... 19 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Failed to create stage bundle > factory! INFO:root:Initializing python harness: > /opt/python36/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_boot.py > --id=1-1 --logging_endpoint=localhost:37175 > --artifact_endpoint=localhost:16168 --provision_endpoint=localhost:38949 > --control_endpoint=localhost:36619 > Failed to set up logging handler, continuing without. > Traceback (most recent call last): > File > "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", > line 94, in main > fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor) > File > "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/log_handler.py", > line 75, in __init__ > grpc.channel_ready_future(ch).result(timeout=60) > File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line > 140, in result > self._block(timeout) > File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line > 86, in _block > raise grpc.FutureTimeoutError() > grpc.FutureTimeoutError > WARNING:apache_beam.runners.worker.sdk_worker_main:No session file found: > /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive > session) may fail. > WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: > ['--options_id=0.0', '--app_name=AbstractPythonFunctionRunner'] > ERROR:apache_beam.runners.worker.sdk_worker_main:Python sdk harness failed: > Traceback (most recent call last): > File > "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", > line 158, in main > sdk_pipeline_options.view_as(ProfilingOptions))).run() > File > "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 162, in __init__ > grpc.channel_ready_future(self._control_channel).result(timeout=60) > File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line > 140, in result > self._block(timeout) > File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line > 86, in _block > raise grpc.FutureTimeoutError() > grpc.FutureTimeoutError > Traceback (most recent call last): > File "/opt/python36/lib/python3.6/runpy.py", line 193, in > _run_module_as_main > "__main__", mod_spec) > File "/opt/python36/lib/python3.6/runpy.py", line 85, in _run_code > exec(code, run_globals) > File > "/opt/python36/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py", > line 29, in <module> > apache_beam.runners.worker.sdk_worker_main.main(sys.argv) > File > "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py", > line 158, in main > sdk_pipeline_options.view_as(ProfilingOptions))).run() > File > "/opt/python36/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 162, in __init__ > grpc.channel_ready_future(self._control_channel).result(timeout=60) > File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line > 140, in result > self._block(timeout) > File "/opt/python36/lib/python3.6/site-packages/grpc/_utilities.py", line > 86, in _block > raise grpc.FutureTimeoutError() > grpc.FutureTimeoutError > > at > org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:197) > at > org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:164) > at > org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner.open(AbstractGeneralPythonScalarFunctionRunner.java:65) > at > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractStatelessFunctionOperator.java:186) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:143) > at > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.open(AbstractStatelessFunctionOperator.java:131) > at > org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:88) > at > org.apache.flink.table.runtime.operators.python.scalar.AbstractRowDataPythonScalarFunctionOperator.open(AbstractRowDataPythonScalarFunctionOperator.java:80) > at > org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.open(RowDataPythonScalarFunctionOperator.java:64) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: Process died with exit code 0 > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:331) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:320) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:250) > at > org.apache.flink.python.AbstractPythonFunctionRunner.createStageBundleFactory(AbstractPythonFunctionRunner.java:195) > ... 16 more > Caused by: java.lang.IllegalStateException: Process died with exit code 0 > at > org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:72) > at > org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:137) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:200) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:184) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) > ... 24 more > > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
感谢回复...切换了版本...运行报错如下图 。。
[hadoop@hadoop01 bin]$ pip list | grep flink apache-flink 1.11.1 ======== Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 397c590a9c19b173a83a4476f8eeaca0 timed out. ... 26 more <http://apache-flink.147419.n8.nabble.com/file/t1280/gc.png> 观察到TM 年轻代gc频繁.. ============== [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m localhost:8081 -pyexec /opt/python36/bin/python3 -py udf.py SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Job has been submitted with JobID a27f139f01ef951d832cfa8382523e4f Traceback (most recent call last): File "udf.py", line 63, in <module> t_env.execute("job") File "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 1057, in execute File "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. : java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a27f139f01ef951d832cfa8382523e4f) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: a27f139f01ef951d832cfa8382523e4f) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:435) at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 397c590a9c19b173a83a4476f8eeaca0 timed out. ... 26 more org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
这个问题应该有人问过,你搜搜看。
另外,如果GC频繁的话,把内存调大看看~ > 在 2021年2月8日,下午5:14,陈康 <[hidden email]> 写道: > > 感谢回复...切换了版本...运行报错如下图 。。 > [hadoop@hadoop01 bin]$ pip list | grep flink > apache-flink 1.11.1 > ======== > Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager > with id 397c590a9c19b173a83a4476f8eeaca0 timed out. > ... 26 more > > <http://apache-flink.147419.n8.nabble.com/file/t1280/gc.png> 观察到TM > 年轻代gc频繁.. > ============== > [hadoop@hadoop01 pyflink]$ /opt/module/flink-1.11.1/bin/flink run -m > localhost:8081 -pyexec /opt/python36/bin/python3 -py udf.py > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/opt/module/flink-1.11.1/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/module/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/opt/module/hbase/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > Job has been submitted with JobID a27f139f01ef951d832cfa8382523e4f > Traceback (most recent call last): > File "udf.py", line 63, in <module> > t_env.execute("job") > File > "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 1057, in execute > File > "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/opt/module/flink-1.11.1/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/opt/module/flink-1.11.1/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: a27f139f01ef951d832cfa8382523e4f) > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) > at > org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: a27f139f01ef951d832cfa8382523e4f) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:114) > ... 19 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > at > org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) > at > org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) > at > org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) > at > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) > at > org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777) > at > org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:435) > at > org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1193) > at > org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager > with id 397c590a9c19b173a83a4476f8eeaca0 timed out. > ... 26 more > > org.apache.flink.client.program.ProgramAbortException > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |