FlinkKafkaProducer事务,transactionId问题反馈

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducer事务,transactionId问题反馈

李杰
*一、场景说明:*

  flink作业逻辑:source(kakfa)-> data process (wordCount逻辑) -> sink (kafka)

  1、作业A:
         source_topic: word_count_topic
         sink_topc: result_01
         group_id: test-group01

  2、作业B:
         source_topic: word_count_topic
         sink_topc: result_02
         group_id: test-group02

3、两个作业使用的是同一个jar包,同一段代码,唯独group.id 和 sink_topic不同。

4、FlinkKafkaProducer 使用 EXACTLY_ONCE 语义,使用kafak事务向topic写入数据。

*现象:*
  从以下错误日志可以看出,transactionid相互干扰。

 *jobmanager端错误日志:*
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:741)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:90)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:231)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:64)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.

*takmanager端错误日志:*
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
an operation with an old epoch. Either there is a newer producer with the
same transactionalId, or the producer's transaction has been expired by the
broker.

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1227)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:837)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:605)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:504)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer
with the same transactionalId, or the producer's transaction has been
expired by the broker.
2020-06-04 16:48:36,817 INFO  org.apache.flink.runtime.taskmanager.Task
                - pend-name -> (Sink: pnt-name, Sink: sink-name) (1/1)
(db4ee0c44888e866b3d26d39b34a0bd8) switched from RUNNING to FAILED.



*二、初步分析*

*1、现象说明*
  尽管是两个独立的flink作业,同时向同一个kafka集群的不同topic写入事务性消息,但是使用的transactionId竟然一样;
所以会有不同作业使用相同的transactionid的问题。

*2、transactionid生成策略*
     transactionid = flink ui显示的算子name + “-” + 32位字符串(与算子uid 1:1映射) + “-” +
编号

     基于这个公式,如果name和uid相同,得到的transactionid也一定相同。

*相关代码:*
    transactionalIdsGenerator = new TransactionalIdsGenerator(
          getRuntimeContext().getTaskName() + "-" +
((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
          getRuntimeContext().getIndexOfThisSubtask(),
          getRuntimeContext().getNumberOfParallelSubtasks(),
          kafkaProducersPoolSize,
          SAFE_SCALE_DOWN_FACTOR);

*3、transactionid相同*
     在以下场景中,可能会使用到相同的transactionId
     1、(如最开始场景说明所示) 由同一个jar包而来的多个运行实例,只有topic和group.id不同。
     2、不同作业代码结构类似,且都没有显示指定FlinkKafkaProcuder sink算子的name和uid。

*三、如何避免相同的transactionid*
     1、每次都给算子指定不会与别人冲突的name 和 uid值
     或
     2、覆盖 TransactionalIdsGenerator 逻辑,可以用uuid值来替换uid对应的32位值。

*   基于一下两点考虑,觉得使用UUID的还是可以的*
    如果是新提交的flink作业,会使用uuid生成transactionIds并保存在userContext中,并最终保存在state里;

如果是从state恢复的作业,按照flink现有逻辑是不会重新生成transactionid,而是从state内部拿到userContext,从userContext中拿到transactionids,之后会一直循环使用这些transactionid。

*四、问题*
*       大家在使用过程中遇到过这个问题吗?大家是如何解决的?*

*谢谢大家!*