关于flink 写于kafka时的transactionId 生成问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink 写于kafka时的transactionId 生成问题

star
hi,

在写入kafka的时候自动生成了一个transactionId,请问这个id生成的方式是什么,我自己指定好像并不起作用。
作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
错误,请问有什么办法可以避免这个问题?
下面是kafka的日志,一直在刷,很多都是好几天之前的任务了



2019-08-29 17:08:48,911] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-3 with producerId 1011 and producer epoch 14255 on partition __transaction_state-36 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:49,244] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-4 with producerId 1012 and producer epoch 16544 on partition __transaction_state-35 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:49,518] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-1 with producerId 1013 and producer epoch 15460 on partition __transaction_state-38 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:49,786] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-2 with producerId 1014 and producer epoch 7781 on partition __transaction_state-37 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:50,054] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-0 with producerId 1015 and producer epoch 7529 on partition __transaction_state-39 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:50,310] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-3 with producerId 1011 and producer epoch 14256 on partition __transaction_state-36 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:51,078] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-4 with producerId 1012 and producer epoch 16545 on partition __transaction_state-35 (kafka.coordinator.transaction.TransactionCoordinator)
[2019-08-29 17:08:53,047] INFO [TransactionCoordinator id=0] Initialized transactionalId Split Reader: Custom File Source -> (Sink: Print to Std. Out, Sink: Unnamed)-7b96fd3ac98d0efc878b17e02839b3d6-1 with producerId 1013 and producer epoch 15461 on partition __transaction_state-38 (kafka.coordinator.transaction.TransactionCoordinator)

Reply | Threaded
Open this post in threaded view
|

Re: 关于flink 写于kafka时的transactionId 生成问题

Wesley Peng
Hi

on 2019/8/29 17:13, ddwcg wrote:
> 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
> 错误,请问有什么办法可以避免这个问题?

Maybe you want to check all the broker and producers have the same
timezone setup, and all time are synchronous.

regards.
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink 写于kafka时的transactionId 生成问题

star
broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用

> 在 2019年8月29日,17:45,Wesley Peng <[hidden email]> 写道:
>
> Hi
>
> on 2019/8/29 17:13, ddwcg wrote:
>> 作业我已经停止了,但是看kafka的日志还是不断的在刷Initialized transactionalId………. ,而且该程序再此启动就会报:
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
>> 错误,请问有什么办法可以避免这个问题?
>
> Maybe you want to check all the broker and producers have the same timezone setup, and all time are synchronous.
>
> regards.
>



Reply | Threaded
Open this post in threaded view
|

Re: 关于flink 写于kafka时的transactionId 生成问题

Wesley Peng
Hi

on 2019/8/29 17:50, ddwcg wrote:
> broker就一个,flink集群的时钟确实和broker的不一样,是不同的机房,能自己指定transactionalId吗,两个机房的调整成一样怕影响其他的应用

AFAIK the transID is generated by systems.

regards.