Hi:
感谢答复,确实是个思路。 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 Best, xiao cai 原始邮件 发件人: 范超<[hidden email]> 收件人: [hidden email]<[hidden email]> 发送时间: 2020年8月20日(周四) 09:11 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我之前开启job的failover restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task executor No TaskExecutor registered under containe_xxxx. 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -----邮件原件----- 发件人: xiao cai [mailto:[hidden email]] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh <[hidden email]> 主题: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers with resource <memory:2048, vCores:4>, 1 pending container requests. 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager [] - Creating container launch context for TaskManagers 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Removing container request Capability[<memory:2048, vCores:4>]Priority[1]. 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:2048, vCores:4>. 2020-08-19 11:23:08,102 INFO org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing Event EventType: START_CONTAINER for Container container_e07_1596440446172_0094_01_000069 2020-08-19 11:23:10,851 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] - Unhandled exception. org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e07_1596440446172_0094_01_000068. at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.0.jar:1.11.0] 2020-08-19 11:23:10,987 INFO org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp://flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager 2020-08-19 11:23:11,029 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname could be resolved for the IP address 10.3.15.22, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. 2020-08-19 11:23:11,043 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,043 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,043 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,161 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from DEPLOYING to RUNNING. 2020-08-19 11:23:11,161 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to RUNNING. 2020-08-19 11:23:11,162 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched from DEPLOYING to RUNNING. 2020-08-19 11:23:12,733 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541. java.lang.NoClassDefFoundError: Could not initialize class xxxx |
Hi
理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, Congxian xiao cai <[hidden email]> 于2020年8月20日周四 下午2:27写道: > Hi: > 感谢答复,确实是个思路。 > > 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超<[hidden email]> > 收件人: [hidden email]<[hidden email]> > 发送时间: 2020年8月20日(周四) 09:11 > 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我之前开启job的failover > restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task > executor No TaskExecutor registered under containe_xxxx. > 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -----邮件原件----- 发件人: xiao cai > [mailto:[hidden email]] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < > [hidden email]> 主题: Flink on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink > 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO > org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - > Received 1 containers with resource <memory:2048, vCores:4>, 1 pending > container requests. 2020-08-19 11:23:08,100 INFO > org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor > container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 > with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO > org.apache.flink.yarn.YarnResourceManager [] - Creating container launch > context for TaskManagers 2020-08-19 11:23:08,101 INFO > org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > Removing container request Capability[<memory:2048, vCores:4>]Priority[1]. > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > Accepted 1 requested containers, returned 0 excess containers, 0 pending > container requests of resource <memory:2048, vCores:4>. 2020-08-19 > 11:23:08,102 INFO > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > Processing Event EventType: START_CONTAINER for Container > container_e07_1596440446172_0094_01_000069 2020-08-19 11:23:10,851 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > [] - Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under > container_e07_1596440446172_0094_01_000068. at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_191] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.Mailbox.run(Mailbox.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.11.0.jar:1.11.0] 2020-08-19 11:23:10,987 INFO > org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with > ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp:// > flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager > 2020-08-19 11:23:11,029 WARN > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname > could be resolved for the IP address 10.3.15.22, using IP address as host > name. Local input split assignment (such as for HDFS files) may be > impacted. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (attempt #68) to > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (attempt #68) to > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to > DEPLOYING. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ > 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,161 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from DEPLOYING to RUNNING. 2020-08-19 11:23:11,161 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to > RUNNING. 2020-08-19 11:23:11,162 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > switched from DEPLOYING to RUNNING. 2020-08-19 11:23:12,733 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541. > java.lang.NoClassDefFoundError: Could not initialize class xxxx |
Hi
确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。 Best, xiao cai 原始邮件 发件人: Congxian Qiu<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2020年8月24日(周一) 20:39 主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, Congxian xiao cai <[hidden email]> 于2020年8月20日周四 下午2:27写道: > Hi: > 感谢答复,确实是个思路。 > > 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超<[hidden email]> > 收件人: [hidden email]<[hidden email]> > 发送时间: 2020年8月20日(周四) 09:11 > 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我之前开启job的failover > restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task > executor No TaskExecutor registered under containe_xxxx. > 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -----邮件原件----- 发件人: xiao cai > [mailto:[hidden email]] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < > [hidden email]> 主题: Flink on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink > 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO > org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - > Received 1 containers with resource <memory:2048, vCores:4>, 1 pending > container requests. 2020-08-19 11:23:08,100 INFO > org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor > container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 > with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO > org.apache.flink.yarn.YarnResourceManager [] - Creating container launch > context for TaskManagers 2020-08-19 11:23:08,101 INFO > org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > Removing container request Capability[<memory:2048, vCores:4>]Priority[1]. > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > Accepted 1 requested containers, returned 0 excess containers, 0 pending > container requests of resource <memory:2048, vCores:4>. 2020-08-19 > 11:23:08,102 INFO > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > Processing Event EventType: START_CONTAINER for Container > container_e07_1596440446172_0094_01_000069 2020-08-19 11:23:10,851 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > [] - Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under > container_e07_1596440446172_0094_01_000068. at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_191] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.actor.ActorCell.invoke(ActorCell.scala:561) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.Mailbox.run(Mailbox.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.11.0.jar:1.11.0] at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.11.0.jar:1.11.0] 2020-08-19 11:23:10,987 INFO > org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with > ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp:// > flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager > 2020-08-19 11:23:11,029 WARN > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname > could be resolved for the IP address 10.3.15.22, using IP address as host > name. Local input split assignment (such as for HDFS files) may be > impacted. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (attempt #68) to > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (attempt #68) to > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to > DEPLOYING. 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ > 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,161 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from DEPLOYING to RUNNING. 2020-08-19 11:23:11,161 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to > RUNNING. 2020-08-19 11:23:11,162 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > switched from DEPLOYING to RUNNING. 2020-08-19 11:23:12,733 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541. > java.lang.NoClassDefFoundError: Could not initialize class xxxx |
如果是$FLINK_HOME/lib下面的文件,就一定会上传到hdfs上面,并且注册为LocalResource的
在JM/TM failover以后,是可以重新下载并且加载的 你可以把报ClassNotFound的JM/TM日志发出来,这样方便分析 Best, Yang xiao cai <[hidden email]> 于2020年8月25日周二 上午9:30写道: > Hi > > 确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。 > > > Best, > xiao cai > > > 原始邮件 > 发件人: Congxian Qiu<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2020年8月24日(周一) 20:39 > 主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, > Congxian xiao cai <[hidden email]> 于2020年8月20日周四 下午2:27写道: > Hi: > > 感谢答复,确实是个思路。 > > > 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 > > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超<[hidden email]> > 收件人: > [hidden email]<[hidden email]> > 发送时间: > 2020年8月20日(周四) 09:11 > 主题: 答复: Flink on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我之前开启job的failover > > restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task > > executor No TaskExecutor registered under containe_xxxx. > > 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -----邮件原件----- 发件人: xiao cai > > [mailto:[hidden email]] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < > > [hidden email]> 主题: Flink on Yarn > > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn > > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink > > 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 > > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO > > org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. > > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - > > Received 1 containers with resource <memory:2048, vCores:4>, 1 pending > > container requests. 2020-08-19 11:23:08,100 INFO > > org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor > > container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 > > with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > > (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > > taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > > networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > > (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > > jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO > > org.apache.flink.yarn.YarnResourceManager [] - Creating container launch > > context for TaskManagers 2020-08-19 11:23:08,101 INFO > > org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers > > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > > Removing container request Capability[<memory:2048, > vCores:4>]Priority[1]. > 2020-08-19 11:23:08,102 INFO > org.apache.flink.yarn.YarnResourceManager [] - > Accepted 1 requested > containers, returned 0 excess containers, 0 pending > container requests of > resource <memory:2048, vCores:4>. 2020-08-19 > 11:23:08,102 INFO > > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > > Processing Event EventType: START_CONTAINER for Container > > container_e07_1596440446172_0094_01_000069 2020-08-19 11:23:10,851 ERROR > > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > > [] - Unhandled exception. > > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > > No TaskExecutor registered under > > container_e07_1596440446172_0094_01_000068. at > > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) > > ~[?:1.8.0_191] at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.actor.Actor$class.aroundReceive(Actor.scala:517) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.actor.ActorCell.invoke(ActorCell.scala:561) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.Mailbox.run(Mailbox.scala:225) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.Mailbox.exec(Mailbox.scala:235) > > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > [flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > [flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > [flink-dist_2.11-1.11.0.jar:1.11.0] at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > [flink-dist_2.11-1.11.0.jar:1.11.0] 2020-08-19 11:23:10,987 INFO > > org.apache.flink.yarn.YarnResourceManager [] - Registering TaskManager with > > ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp:// > > flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager > > 2020-08-19 11:23:11,029 WARN > > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No hostname > > could be resolved for the IP address 10.3.15.22, using IP address as host > > name. Local input split assignment (such as for HDFS files) may be > > impacted. 2020-08-19 11:23:11,043 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > > switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (attempt #68) to > > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > > 2020-08-19 11:23:11,043 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > > switched from SCHEDULED to DEPLOYING. 2020-08-19 11:23:11,043 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (attempt #68) to > > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > > 2020-08-19 11:23:11,043 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > > PassThroughWindowFunction) -> > > SourceConversion(table=[default_catalog.default_database.catalog_source, > > source: [KafkaTableSource(id, name, kafka_partition, event_time, > > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to > > DEPLOYING. 2020-08-19 11:23:11,043 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > > PassThroughWindowFunction) -> > > SourceConversion(table=[default_catalog.default_database.catalog_source, > > source: [KafkaTableSource(id, name, kafka_partition, event_time, > > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > > cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ > > 10.3.15.22 (dataPort=34755) 2020-08-19 11:23:11,161 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > > switched from DEPLOYING to RUNNING. 2020-08-19 11:23:11,161 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > > PassThroughWindowFunction) -> > > SourceConversion(table=[default_catalog.default_database.catalog_source, > > source: [KafkaTableSource(id, name, kafka_partition, event_time, > > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to > > RUNNING. 2020-08-19 11:23:11,162 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > > switched from DEPLOYING to RUNNING. 2020-08-19 11:23:12,733 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > > switched from RUNNING to FAILED on > > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541. > > java.lang.NoClassDefFoundError: Could not initialize class xxxx |
Free forum by Nabble | Edit this page |