*一、场景说明:*
flink作业逻辑:source(kakfa)-> data process (wordCount逻辑) -> sink (kafka) 1、作业A: source_topic: word_count_topic sink_topc: result_01 group_id: test-group01 2、作业B: source_topic: word_count_topic sink_topc: result_02 group_id: test-group02 3、两个作业使用的是同一个jar包,同一段代码,唯独group.id 和 sink_topic不同。 4、FlinkKafkaProducer 使用 EXACTLY_ONCE 语义,使用kafak事务向topic写入数据。 *现象:* 从以下错误日志可以看出,transactionid相互干扰。 *jobmanager端错误日志:* org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:741) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:90) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:231) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:64) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. *takmanager端错误日志:* org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:837) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:605) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:504) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 2020-06-04 16:48:36,817 INFO org.apache.flink.runtime.taskmanager.Task - pend-name -> (Sink: pnt-name, Sink: sink-name) (1/1) (db4ee0c44888e866b3d26d39b34a0bd8) switched from RUNNING to FAILED. *二、初步分析* *1、现象说明* 尽管是两个独立的flink作业,同时向同一个kafka集群的不同topic写入事务性消息,但是使用的transactionId竟然一样; 所以会有不同作业使用相同的transactionid的问题。 *2、transactionid生成策略* transactionid = flink ui显示的算子name + “-” + 32位字符串(与算子uid 1:1映射) + “-” + 编号 基于这个公式,如果name和uid相同,得到的transactionid也一定相同。 *相关代码:* transactionalIdsGenerator = new TransactionalIdsGenerator( getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), kafkaProducersPoolSize, SAFE_SCALE_DOWN_FACTOR); *3、transactionid相同* 在以下场景中,可能会使用到相同的transactionId 1、(如最开始场景说明所示) 由同一个jar包而来的多个运行实例,只有topic和group.id不同。 2、不同作业代码结构类似,且都没有显示指定FlinkKafkaProcuder sink算子的name和uid。 *三、如何避免相同的transactionid* 1、每次都给算子指定不会与别人冲突的name 和 uid值 或 2、覆盖 TransactionalIdsGenerator 逻辑,可以用uuid值来替换uid对应的32位值。 * 基于一下两点考虑,觉得使用UUID的还是可以的* 如果是新提交的flink作业,会使用uuid生成transactionIds并保存在userContext中,并最终保存在state里; 如果是从state恢复的作业,按照flink现有逻辑是不会重新生成transactionid,而是从state内部拿到userContext,从userContext中拿到transactionids,之后会一直循环使用这些transactionid。 *四、问题* * 大家在使用过程中遇到过这个问题吗?大家是如何解决的?* *谢谢大家!* |
Free forum by Nabble | Edit this page |