FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

陈赋赟
我在flink中使用了kafkaProducer 并开启了ExcatlyOnce语义,第一次部署在测试环境启动的时候一切正常,然后我再上新版本的时候kill掉了之前的任务,并重现发布了一下,就出现了如下的问题日志里显示在做checkpoint的时候出现了初始化事务状态 超时过期的异常。
具体异常如下:
 

checkpoint interval设置了30s执行一次
producer事务超时(transaction.timeout.ms)时间设置了5分钟


 

Reply | Threaded
Open this post in threaded view
|

Re:FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

陈赋赟




2019-09-02 10:24:28,599 INFO  org.apache.flink.runtime.taskmanager.Task                     - Interval Join -> Sink: Unnamed (1/4) (e8b85b6f144879efbb0b4209f226c69b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.



在 2019-09-02 11:29:35,"陈赋赟" <[hidden email]> 写道:

我在flink中使用了kafkaProducer 并开启了ExcatlyOnce语义,第一次部署在测试环境启动的时候一切正常,然后我再上新版本的时候kill掉了之前的任务,并重现发布了一下,就出现了如下的问题日志里显示在做checkpoint的时候出现了初始化事务状态 超时过期的异常。
具体异常如下:
 


checkpoint interval设置了30s执行一次
producer事务超时(transaction.timeout.ms)时间设置了5分钟




 
Reply | Threaded
Open this post in threaded view
|

Re: FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

Wesley Peng
Hi

on 2019/9/2 11:49, 陈赋赟 wrote:
> 2019-09-02 10:24:28,599 INFO  org.apache.flink.runtime.taskmanager.Task                     - Interval Join -> Sink: Unnamed (1/4) (e8b85b6f144879efbb0b4209f226c69b) switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

You may reference this:

https://stackoverflow.com/questions/54295588/kafka-streams-failed-to-rebalance-error

Possible options:

As this answer says, switch off Exactly Once for your streamer. It then
doesn't use transactions and all seems to work ok. Not helpful if you
require EOS or some other client code requires transactions.
restart any brokers that are reporting warnings to force them to
re-resolve the IP address. They would need to be restarted in a way that
they don't change IP address themselves. Not usually possible in kubernetes.
Defect raised Issue KAFKA-7958 - Transactions are broken with kubernetes
hosted brokers

Update 2017-02-20 This may have been resolved in Kafka 2.1.1 (Confluent
5.1.2) released today. See the linked issue.