flink1.11.2
自定义source循环产生数据然后sink到kafka 采用application Mode部署作业到yarn, jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下) 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-21 10:52:17,742 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job kafkaSink -- flink ???kafka??????? (315f9a7b42afb08b4de1841a5b3c0f76) switched from state RUNNING to RESTARTING. 2021-01-21 10:52:18,743 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job kafkaSink -- flink ???kafka??????? (315f9a7b42afb08b4de1841a5b3c0f76) switched from state RESTARTING to RUNNING. 2021-01-21 10:52:18,743 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from CREATED to SCHEDULED. 2021-01-21 10:52:18,743 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from SCHEDULED to DEPLOYING. 2021-01-21 10:52:18,744 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: Custom Source -> Sink: Unnamed (1/1) (attempt #229) to container_1611044725922_0017_01_000002 @ slave02 (dataPort=39278) 2021-01-21 10:52:18,748 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from DEPLOYING to RUNNING. 2021-01-21 10:52:18,753 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@75c6d62a. org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) ~[quickstart-0.1.jar:?] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[quickstart-0.1.jar:?] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231] Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374) ~[quickstart-0.1.jar:?] at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392) ~[quickstart-0.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) ~[quickstart-0.1.jar:?] ... 23 more 2021-01-21 10:52:18,754 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-21 10:52:18,754 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_0. 2021-01-21 10:52:18,754 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job kafkaSink -- flink ???kafka??????? (315f9a7b42afb08b4de1841a5b3c0f76) switched from state RUNNING to RESTARTING. 我才用per-job模式是没有问题的 以为是版本的问题,我重新换了flink-1.12.1执行后,又报错如下(之分配了jobManager的container没有taskmanager): 2021-01-21 10:58:45,129 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not start cluster entrypoint YarnJobClusterEntrypoint. org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520) [quickstart-0.1.jar:?] at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1] Caused by: java.lang.AbstractMethodError: org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(Lorg/apache/flink/configuration/Configuration;Lorg/apache/flink/runtime/clusterframework/types/ResourceID;Lorg/apache/flink/runtime/rpc/RpcService;Lorg/apache/flink/runtime/highavailability/HighAvailabilityServices;Lorg/apache/flink/runtime/heartbeat/HeartbeatServices;Lorg/apache/flink/runtime/rpc/FatalErrorHandler;Lorg/apache/flink/runtime/entrypoint/ClusterInformation;Ljava/lang/String;Lorg/apache/flink/runtime/metrics/groups/ResourceManagerMetricGroup;Lorg/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices;)Lorg/apache/flink/runtime/resourcemanager/ResourceManager; at org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) ~[quickstart-0.1.jar:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_231] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_231] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) ~[flink-shaded-hadoop-2-uber-2.8.3-8.0.jar:2.8.3-8.0] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[quickstart-0.1.jar:?] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) ~[quickstart-0.1.jar:?] ... 2 more -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |