flink sink到kafka,报错Failed to construct kafka producer

classic Classic list List threaded Threaded
1 message Options
lp
Reply | Threaded
Open this post in threaded view
|

flink sink到kafka,报错Failed to construct kafka producer

lp
flink1.11.2
自定义source循环产生数据然后sink到kafka
采用application Mode部署作业到yarn,
jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下)

2021-01-21 10:52:17,742 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:17,742 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:17,742 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
kafkaSink -- flink ???kafka??????? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RUNNING to RESTARTING.
2021-01-21 10:52:18,743 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
kafkaSink -- flink ???kafka??????? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RESTARTING to RUNNING.
2021-01-21 10:52:18,743 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from CREATED to SCHEDULED.
2021-01-21 10:52:18,743 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from SCHEDULED to DEPLOYING.
2021-01-21 10:52:18,744 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #229) to
container_1611044725922_0017_01_000002 @ slave02 (dataPort=39278)
2021-01-21 10:52:18,748 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from DEPLOYING to RUNNING.
2021-01-21 10:52:18,753 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@75c6d62a.
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1111)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
~[quickstart-0.1.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[quickstart-0.1.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[quickstart-0.1.jar:?]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance
of org.apache.kafka.common.serialization.Serializer
        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
~[quickstart-0.1.jar:?]
        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
~[quickstart-0.1.jar:?]
        ... 23 more
2021-01-21 10:52:18,754 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:18,754 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:18,754 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
kafkaSink -- flink ???kafka??????? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RUNNING to RESTARTING.




我才用per-job模式是没有问题的





以为是版本的问题,我重新换了flink-1.12.1执行后,又报错如下(之分配了jobManager的container没有taskmanager):
2021-01-21 10:58:45,129 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Could not
start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
[quickstart-0.1.jar:?]
        at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
[flink-dist_2.11-1.12.1.jar:1.12.1]
Caused by: java.lang.AbstractMethodError:
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(Lorg/apache/flink/configuration/Configuration;Lorg/apache/flink/runtime/clusterframework/types/ResourceID;Lorg/apache/flink/runtime/rpc/RpcService;Lorg/apache/flink/runtime/highavailability/HighAvailabilityServices;Lorg/apache/flink/runtime/heartbeat/HeartbeatServices;Lorg/apache/flink/runtime/rpc/FatalErrorHandler;Lorg/apache/flink/runtime/entrypoint/ClusterInformation;Ljava/lang/String;Lorg/apache/flink/runtime/metrics/groups/ResourceManagerMetricGroup;Lorg/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices;)Lorg/apache/flink/runtime/resourcemanager/ResourceManager;
        at
org.apache.flink.runtime.resourcemanager.ResourceManagerFactory.createResourceManager(ResourceManagerFactory.java:61)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:167)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
~[quickstart-0.1.jar:?]
        at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_231]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_231]
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
~[flink-shaded-hadoop-2-uber-2.8.3-8.0.jar:2.8.3-8.0]
        at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[quickstart-0.1.jar:?]
        at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
~[quickstart-0.1.jar:?]
        ... 2 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/