Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

xiao cai
如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


我的任务时kafka source -> hbase sink


任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。


Best
xiao cai


错误堆栈:
2020-08-19 11:23:08,099 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Received 1 containers.
2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Received 1 containers with resource <memory:2048, vCores:4>, 1 pending container requests.
2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - TaskExecutor container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}.
2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Creating container launch context for TaskManagers
2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Starting TaskManagers
2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Removing container request Capability[<memory:2048, vCores:4>]Priority[1].
2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:2048, vCores:4>.
2020-08-19 11:23:08,102 INFO  org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing Event EventType: START_CONTAINER for Container container_e07_1596440446172_0094_01_000069
2020-08-19 11:23:10,851 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] - Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e07_1596440446172_0094_01_000068.
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.0.jar:1.11.0]
2020-08-19 11:23:10,987 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Registering TaskManager with ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp://flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager
2020-08-19 11:23:11,029 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No hostname could be resolved for the IP address 10.3.15.22, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,161 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from DEPLOYING to RUNNING.
2020-08-19 11:23:11,161 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to RUNNING.
2020-08-19 11:23:11,162 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched from DEPLOYING to RUNNING.
2020-08-19 11:23:12,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541.
java.lang.NoClassDefFoundError: Could not initialize class xxxx
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

Congxian Qiu
Hi
     你的 Flink 是哪个版本,期望的行为是什么样的?
     从你给的日志看,是没有 xxxx 这个 class,这个 xxxx 是在你放到 lib 下的某个 jar
包里面吗?另外你这个作业第一次运行的时候失败,还是中间中间 failover 之后恢复回来的时候失败呢?
Best,
Congxian


xiao cai <[hidden email]> 于2020年8月19日周三 下午12:50写道:

> 如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
>
>
> 我的任务时kafka source -> hbase sink
>
>
>
> 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
>
>
> Best
> xiao cai
>
>
> 错误堆栈:
> 2020-08-19 11:23:08,099 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Received 1 containers.
> 2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Received 1 containers with resource <memory:2048,
> vCores:4>, 1 pending container requests.
> 2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - TaskExecutor
> container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22
> with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb
> (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
> taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb
> (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
> jvmOverheadSize=192.000mb (201326592 bytes)}.
> 2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Creating container launch context for TaskManagers
> 2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Starting TaskManagers
> 2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Removing container request Capability[<memory:2048,
> vCores:4>]Priority[1].
> 2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Accepted 1 requested containers, returned 0 excess
> containers, 0 pending container requests of resource <memory:2048,
> vCores:4>.
> 2020-08-19 11:23:08,102 INFO
> org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] -
> Processing Event EventType: START_CONTAINER for Container
> container_e07_1596440446172_0094_01_000069
> 2020-08-19 11:23:10,851 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> [] - Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under container_e07_1596440446172_0094_01_000068.
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_191]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.0.jar:1.11.0]
> 2020-08-19 11:23:10,987 INFO  org.apache.flink.yarn.YarnResourceManager
>                 [] - Registering TaskManager with ResourceID
> container_e07_1596440446172_0094_01_000069 (akka.tcp://
> flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager
> 2020-08-19 11:23:11,029 WARN
> org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No
> hostname could be resolved for the IP address 10.3.15.22, using IP address
> as host name. Local input split assignment (such as for HDFS files) may be
> impacted.
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64)
> switched from SCHEDULED to DEPLOYING.
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
> Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (attempt #68) to
> container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963)
> switched from SCHEDULED to DEPLOYING.
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
> Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (2/2) (attempt #68) to
> container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
> PassThroughWindowFunction) ->
> SourceConversion(table=[default_catalog.default_database.catalog_source,
> source: [KafkaTableSource(id, name, kafka_partition, event_time,
> write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
> wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name,
> kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time,
> proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) ->
> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW
> event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS
> EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey,
> cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to
> DEPLOYING.
> 2020-08-19 11:23:11,043 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
> Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
> PassThroughWindowFunction) ->
> SourceConversion(table=[default_catalog.default_database.catalog_source,
> source: [KafkaTableSource(id, name, kafka_partition, event_time,
> write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
> wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name,
> kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time,
> proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) ->
> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW
> event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS
> EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey,
> cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @
> 10.3.15.22 (dataPort=34755)
> 2020-08-19 11:23:11,161 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64)
> switched from DEPLOYING to RUNNING.
> 2020-08-19 11:23:11,161 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> Window(TumblingEventTimeWindows(1000), EventTimeTrigger,
> PassThroughWindowFunction) ->
> SourceConversion(table=[default_catalog.default_database.catalog_source,
> source: [KafkaTableSource(id, name, kafka_partition, event_time,
> write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time,
> wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name,
> kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time,
> proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) ->
> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW
> event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS
> EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey,
> cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to
> RUNNING.
> 2020-08-19 11:23:11,162 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963)
> switched from DEPLOYING to RUNNING.
> 2020-08-19 11:23:12,733 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> KafkaTableSource(id, name, kafka_partition, event_time, write_time,
> snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition,
> wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64)
> switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541.
> java.lang.NoClassDefFoundError: Could not initialize class xxxx
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

xiao cai
In reply to this post by xiao cai
Hi
Flink版本是 1.11.0
期望的行为是将kafka中的数据实时写入hbase表
xxx这个class是在lib下的某个jar中的
任务初始运行都是正常的,jar包也是可以找到的,运行期间失败了,然后进入了restarting状态,就不停的在running和restarting状态切换
我提交任务的节点是20,然后container运行的节点是22,lib中的jar都在20节点上,所以猜测是任务运行过程中,重新分配新的container时
丢失了lib中jar资源。


Best,
xiao cai


 原始邮件
发件人: Congxian Qiu<[hidden email]>
收件人: user-zh<[hidden email]>
发送时间: 2020年8月19日(周三) 13:34
主题: Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


Hi 你的 Flink 是哪个版本,期望的行为是什么样的? 从你给的日志看,是没有 xxxx 这个 class,这个 xxxx 是在你放到 lib 下的某个 jar 包里面吗?另外你这个作业第一次运行的时候失败,还是中间中间 failover 之后恢复回来的时候失败呢? Best, Congxian xiao cai <[hidden email]> 于2020年8月19日周三 下午12:50写道: > 如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我的任务时kafka source -> hbase sink > > > > 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 > > > Best > xiao cai > > > 错误堆栈: > 2020-08-19 11:23:08,099 INFO org.apache.flink.yarn.YarnResourceManager > [] - Received 1 containers. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager > [] - Received 1 containers with resource <memory:2048, > vCores:4>, 1 pending container requests. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager > [] - TaskExecutor > container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 > with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > jvmOverheadSize=192.000mb (201326592 bytes)}. > 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager > [] - Creating container launch context for TaskManagers > 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager > [] - Starting TaskManagers > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager > [] - Removing container request Capability[<memory:2048, > vCores:4>]Priority[1]. > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager > [] - Accepted 1 requested containers, returned 0 excess > containers, 0 pending container requests of resource <memory:2048, > vCores:4>. > 2020-08-19 11:23:08,102 INFO > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > Processing Event EventType: START_CONTAINER for Container > container_e07_1596440446172_0094_01_000069 > 2020-08-19 11:23:10,851 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > [] - Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under container_e07_1596440446172_0094_01_000068. > at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.11.0.jar:1.11.0] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.11.0.jar:1.11.0] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.11.0.jar:1.11.0] > 2020-08-19 11:23:10,987 INFO org.apache.flink.yarn.YarnResourceManager > [] - Registering TaskManager with ResourceID > container_e07_1596440446172_0094_01_000069 (akka.tcp:// > flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager > 2020-08-19 11:23:11,029 WARN > org.apache.flink.runtime.taskmanager.TaskManagerLocation [] - No > hostname could be resolved for the IP address 10.3.15.22, using IP address > as host name. Local input split assignment (such as for HDFS files) may be > impacted. > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from SCHEDULED to DEPLOYING. > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (attempt #68) to > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > switched from SCHEDULED to DEPLOYING. > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (attempt #68) to > container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to > DEPLOYING. > 2020-08-19 11:23:11,043 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ > 10.3.15.22 (dataPort=34755) > 2020-08-19 11:23:11,161 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from DEPLOYING to RUNNING. > 2020-08-19 11:23:11,161 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(TumblingEventTimeWindows(1000), EventTimeTrigger, > PassThroughWindowFunction) -> > SourceConversion(table=[default_catalog.default_database.catalog_source, > source: [KafkaTableSource(id, name, kafka_partition, event_time, > write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, > wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, > kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, > proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> > Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW > event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS > EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, > cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to > RUNNING. > 2020-08-19 11:23:11,162 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) > switched from DEPLOYING to RUNNING. > 2020-08-19 11:23:12,733 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: > KafkaTableSource(id, name, kafka_partition, event_time, write_time, > snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, > wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) > switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541. > java.lang.NoClassDefFoundError: Could not initialize class xxxx
Reply | Threaded
Open this post in threaded view
|

答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

范超
In reply to this post by xiao cai
我之前开启job的failover restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task executor
No TaskExecutor registered under containe_xxxx.
我这边干脆写了个脚本通过savepoint的方式来reload应用了
希望对你有帮助

-----邮件原件-----
发件人: xiao cai [mailto:[hidden email]]
发送时间: 2020年8月19日 星期三 12:50
收件人: user-zh <[hidden email]>
主题: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


我的任务时kafka source -> hbase sink


任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。


Best
xiao cai


错误堆栈:
2020-08-19 11:23:08,099 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Received 1 containers.
2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Received 1 containers with resource <memory:2048, vCores:4>, 1 pending container requests.
2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - TaskExecutor container_e07_1596440446172_0094_01_000069 will be started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}.
2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Creating container launch context for TaskManagers
2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Starting TaskManagers
2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Removing container request Capability[<memory:2048, vCores:4>]Priority[1].
2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:2048, vCores:4>.
2020-08-19 11:23:08,102 INFO  org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing Event EventType: START_CONTAINER for Container container_e07_1596440446172_0094_01_000069
2020-08-19 11:23:10,851 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] - Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e07_1596440446172_0094_01_000068.
at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.0.jar:1.11.0]
2020-08-19 11:23:10,987 INFO  org.apache.flink.yarn.YarnResourceManager                    [] - Registering TaskManager with ResourceID container_e07_1596440446172_0094_01_000069 (akka.tcp://flink@10.3.15.22:37461/user/rpc/taskmanager_0) at ResourceManager
2020-08-19 11:23:11,029 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation     [] - No hostname could be resolved for the IP address 10.3.15.22, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from SCHEDULED to DEPLOYING.
2020-08-19 11:23:11,043 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (attempt #68) to container_e07_1596440446172_0094_01_000069 @ 10.3.15.22 (dataPort=34755)
2020-08-19 11:23:11,161 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from DEPLOYING to RUNNING.
2020-08-19 11:23:11,161 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Window(TumblingEventTimeWindows(1000), EventTimeTrigger, PassThroughWindowFunction) -> SourceConversion(table=[default_catalog.default_database.catalog_source, source: [KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp)]], fields=[id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp]) -> Calc(select=[CAST(id) AS rowkey, (id ROW name ROW kafka_partition ROW event_time ROW write_time ROW snapshot_time ROW max_snapshot_time) AS EXPR$1]) -> SinkConversionToTuple2 -> Sink: HBaseUpsertTableSink(rowkey, cf) (1/1) (93ff2cf68229f33a00851bd93f2b7f85) switched from DEPLOYING to RUNNING.
2020-08-19 11:23:11,162 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (2/2) (4bae00129d9af9489cd29441d4540963) switched from DEPLOYING to RUNNING.
2020-08-19 11:23:12,733 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: KafkaTableSource(id, name, kafka_partition, event_time, write_time, snapshot_time, max_snapshot_time, proc_time, wpt_event_time, wpt_partition, wpt_offset, wpt_timestamp) (1/2) (dbc71bdd0b8f72a1a2573a59b7e08d64) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@71e7e541.
java.lang.NoClassDefFoundError: Could not initialize class xxxx