flink 写入 Kafka 开启 eos 报错

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 写入 Kafka 开启 eos 报错

仙剑……情动人间
Hi All,


 我这里从flink 写入 kafka 并实现 eos 语义, 但是出现了以下错误,希望遇到过的大佬能给点帮助


0-05-21 16:52:15,057 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (f65b2869d898a050238c53f9fbc9573b) switched from DEPLOYING to RUNNING. 2020-05-21 16:52:15,062 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Map -> Sink: Unnamed (1/1) (d0739aa81367223f83a63a86307fffb3) switched from DEPLOYING to RUNNING. 2020-05-21 16:52:15,276 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -> Map -> Sink: Unnamed (1/1) (d0739aa81367223f83a63a86307fffb3) switched from RUNNING to FAILED. org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key Co-Process-Broadcast -> Map -> Sink: Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server experienced an unexpected error when processing the request. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1101) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1037) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key Co-Process-Broadcast -> Map -> Sink: Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server experienced an unexpected error when processing the request. at org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1142) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:288) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) ... 1 more
Reply | Threaded
Open this post in threaded view
|

回复:flink 写入 Kafka 开启 eos 报错

Yun Gao
Hello,你用的flink和kafka的版本是?


------------------------------------------------------------------
发件人:仙剑……情动人间<[hidden email]>
日 期:2020年05月21日 17:17:03
收件人:user-zh<[hidden email]>
主 题:flink 写入 Kafka 开启 eos 报错

Hi All,


&nbsp;我这里从flink 写入 kafka 并实现 eos 语义, 但是出现了以下错误,希望遇到过的大佬能给点帮助


0-05-21 16:52:15,057 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source (1/1) (f65b2869d898a050238c53f9fbc9573b) switched from DEPLOYING to RUNNING. 2020-05-21 16:52:15,062 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -&gt; Map -&gt; Sink: Unnamed (1/1) (d0739aa81367223f83a63a86307fffb3) switched from DEPLOYING to RUNNING. 2020-05-21 16:52:15,276 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Co-Process-Broadcast -&gt; Map -&gt; Sink: Unnamed (1/1) (d0739aa81367223f83a63a86307fffb3) switched from RUNNING to FAILED. org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key Co-Process-Broadcast -&gt; Map -&gt; Sink: Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server experienced an unexpected error when processing the request.  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)  at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)  at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)  at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)  at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)  at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)  at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1101)  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1037)  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)  at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)  at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)  at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)  at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)  at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key Co-Process-Broadcast -&gt; Map -&gt; Sink: Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server experienced an unexpected error when processing the request.  at org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1142)  at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)  at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)  at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)  at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:288)  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)  ... 1 more