kafka exactly-once语义下,从svaepoint恢复报错

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

kafka exactly-once语义下,从svaepoint恢复报错

周瑞
您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
//todo 通过配置传进来
env.setParallelism(1);
env.enableCheckpointing(600000L, CheckpointingMode.EXACTLY_ONCE);

// checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
env.getCheckpointConfig()
        .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);

//TODO 生产中必须使用 HDFS
env.setStateBackend(new FsStateBackend("hdfs://10.10.98.226:8020/tmp/checkpoint66"));

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
public static final  String TABLE_NAME = "KafkaTable";
public static final  String COLUMN_NAME = "source_value";

public static final String KAFKA_TABLE_FORMAT =
        "CREATE TABLE "+TABLE_NAME+" (\n" +
                "  "+COLUMN_NAME+" STRING\n" +
                ") WITH (\n" +
                "   'connector' = 'kafka',\n" +
                "   'topic' = '%s',\n" +
                "   'properties.bootstrap.servers' = '%s',\n" +
                "   'sink.semantic' = 'exactly-once',\n" +
                "   'properties.transaction.timeout.ms' = '30000',\n" +
                "   'format' = 'dbz-json'\n" +
                ")\n";
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId,
 or the producer's transaction has been expired by the broker. while recovering transaction KafkaTransactionState [transactionalId=Source:
TableSourceScan(table=[[default_catalog, default_database, debezium_source]], fields=[data]) -> Sink: Sink(table=[default_catalog.default_database.KafkaTable],
fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009, epoch=216]. Presumably this transaction has been already committed before
Reply | Threaded
Open this post in threaded view
|

Re: kafka exactly-once语义下,从svaepoint恢复报错

r pp
 'properties.transaction.timeout.ms' = '30000'  配置的太短了,30s
transactionalId   就过期了。 估计 都来不去启动吧
 官网的原文
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking a checkpoint, after recovering from the
said checkpoint. If the time between Flink application crash and completed
restart is larger than Kafka’s transaction timeout there will be data loss
(Kafka will automatically abort transactions that exceeded timeout time).
Having this in mind, please configure your transaction timeout
appropriately to your expected down times.

周瑞 <[hidden email]> 于2021年6月1日周二 下午3:45写道:

>
> 您好:kafka在exactly-once语义下,从svaepoint恢复报错。初步排查认为是kafka事务使用了旧的epoch。请问这个问题怎么处理?
> //todo 通过配置传进来
> env.setParallelism(1);
> env.enableCheckpointing(600000L, CheckpointingMode.EXACTLY_ONCE);
>
> // checkpoint的清除策略(即使任务被显示地取消也会保留checkpoint)
> env.getCheckpointConfig()
>
> .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setCheckpointTimeout(60000);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
>
> //TODO 生产中必须使用 HDFS
> env.setStateBackend(new FsStateBackend("hdfs://
> 10.10.98.226:8020/tmp/checkpoint66"));
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> public static final  String TABLE_NAME = "KafkaTable";
> public static final  String COLUMN_NAME = "source_value";
>
> public static final String KAFKA_TABLE_FORMAT =
>         "CREATE TABLE "+TABLE_NAME+" (\n" +
>                 "  "+COLUMN_NAME+" STRING\n" +
>                 ") WITH (\n" +
>                 "   'connector' = 'kafka',\n" +
>                 "   'topic' = '%s',\n" +
>                 "   'properties.bootstrap.servers' = '%s',\n" +
>                 "   'sink.semantic' = 'exactly-once',\n" +
>                 "   'properties.transaction.timeout.ms' = '30000',\n" +
>                 "   'format' = 'dbz-json'\n" +
>                 ")\n";
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
> an operation with an old epoch. Either there is a newer producer with the
> same transactionalId,
>  or the producer's transaction has been expired by the broker. while
> recovering transaction KafkaTransactionState [transactionalId=Source:
> TableSourceScan(table=[[default_catalog, default_database,
> debezium_source]], fields=[data]) -&gt; Sink:
> Sink(table=[default_catalog.default_database.KafkaTable],
> fields=[data])-7df19f87deec5680128845fd9a6ca18d-0, producerId=239009,
> epoch=216]. Presumably this transaction has been already committed before



--
Best,
  pp