flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql sink表使用两阶事务提交,exactly-once一致性保证 ?
官档说法: Consistency guarantees: By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled., CREATE TABLE 默认是 at-least-once |
你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once
Kafka011TableSink @Override protected SinkFunction<Row> createKafkaProducer( String topic, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> partitioner) { return new FlinkKafkaProducer011<>( topic, new KeyedSerializationSchemaWrapper<>(serializationSchema), properties, partitioner, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, 5); } 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase 参考: https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ ------------------------------------------------------------------ 发件人:静谧雨寒 <[hidden email]> 发送时间:2020年7月1日(星期三) 14:33 收件人:user-zh <[hidden email]> 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once? flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql sink表使用两阶事务提交,exactly-once一致性保证 ? 官档说法: Consistency guarantees: By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled., CREATE TABLE 默认是 at-least-once |
我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221
夏帅 <[hidden email]> 于2020年7月1日周三 下午3:13写道: > 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once > > Kafka011TableSink > > > @Override > protected SinkFunction<Row> createKafkaProducer( > String topic, > Properties properties, > SerializationSchema<Row> serializationSchema, > Optional<FlinkKafkaPartitioner<Row>> partitioner) { > return new FlinkKafkaProducer011<>( > topic, > new KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner, > FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, > 5); > } > 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase > > 参考: > https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ > ------------------------------------------------------------------ > 发件人:静谧雨寒 <[hidden email]> > 发送时间:2020年7月1日(星期三) 14:33 > 收件人:user-zh <[hidden email]> > 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once? > > flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql > sink表使用两阶事务提交,exactly-once一致性保证 ? > 官档说法: > Consistency guarantees: By default, a Kafka sink ingests data with > at-least-once guarantees into a Kafka topic if the query is executed with > checkpointing enabled., > CREATE TABLE 默认是 at-least-once > > |
感谢,看了下issues, Fix Version/s:None ,不知何时才能加上,还是老老实实用dataStream
------------------ 原始邮件 ------------------ 发件人: "方盛凯"<[hidden email]>; 发送时间: 2020年7月1日(星期三) 下午3:31 收件人: "user-zh"<[hidden email]>;"夏帅"<[hidden email]>; 主题: Re: flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once? 我们正准备开发这个功能,详情可以参考:https://issues.apache.org/jira/browse/FLINK-15221 夏帅 <[hidden email]> 于2020年7月1日周三 下午3:13写道: > 你好,可以尝试自定义实现Kafka011TableSourceSinkFactory和Kafka011TableSink来实现exactly-once > > Kafka011TableSink > > > @Override > protected SinkFunction<Row> createKafkaProducer( > String topic, > Properties properties, > SerializationSchema<Row> serializationSchema, > Optional<FlinkKafkaPartitioner<Row>> partitioner) { > return new FlinkKafkaProducer011<>( > topic, > new KeyedSerializationSchemaWrapper<>(serializationSchema), > properties, > partitioner, > FlinkKafkaProducer011.Semantic.EXACTLY_ONCE, > 5); > } > 如果想要修改配置的话,具体可以参考KafkaTableSourceSinkFactoryBase > > 参考: > https://jxeditor.github.io/2020/06/11/FlinkSQL%E5%9C%A8%E4%BD%BF%E7%94%A8%E5%88%9B%E5%BB%BA%E8%A1%A8%E8%AF%AD%E5%8F%A5%E6%97%B6%E7%9A%84%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/ > ------------------------------------------------------------------ > 发件人:静谧雨寒 <[hidden email]> > 发送时间:2020年7月1日(星期三) 14:33 > 收件人:user-zh <[hidden email]> > 主 题:flink sql ddl CREATE TABLE kafka011 sink 如何开启事务exactly-once? > > &nbsp;flink sql CREATE TABLE kafka sink表,开启checkpoint后,如何配置sql > sink表使用两阶事务提交,exactly-once一致性保证 ? > 官档说法: > Consistency guarantees: By default, a Kafka sink ingests data with > at-least-once guarantees into a Kafka topic if the query is executed with > checkpointing enabled.,&nbsp;&nbsp; > CREATE TABLE 默认是 at-least-once > > |
In reply to this post by Shengkai Fang
we are going to build our data computing system base on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from kafka, then select from dynamic table, and write results to mysql. but, when we test the exactly once(end to end), we found problem. official documentation about flink sql do ous no favor. i need your help sql file: -- source, 使用计算列,uuid()在线生成uuid CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), uuid as uuid() ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'test-consumer-group12', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); -- sink CREATE TABLE pvuv_sink ( uuid varchar, dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink_test', 'connector.table' = 'pvuv_sink13', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.write.flush.max-rows' = '1', 'connector.sink.semantic' = 'exactly-once' ); INSERT INTO pvuv_sink SELECT uuid, DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'), uuid; sql parse and concat file: /** * 这是进行命令解析和提交的程序,整个工程入口 */ public class SqlSubmit { public static void main(String[] args) throws Exception { // 解析命令行参数 final CliOptions options = CliOptionsParser.parseClient(args); // 将解析好的命令行参数传递给SqlSubmit SqlSubmit submit = new SqlSubmit(options); // 运行程序 submit.run(); } // -------------------------------------------------------------------------------------------- private String sqlFilePath; private TableEnvironment tEnv; // 获取到sql执行文件的路径 private SqlSubmit(CliOptions options) { this.sqlFilePath = options.getSqlFilePath(); } private void run() throws Exception { // 创建flink执行的上下文对象 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); this.tEnv = StreamTableEnvironment.create(environment, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()); // 获取所有的sql文件行内容,转为字符串list List<String> sql = Files.readAllLines(Paths.get(sqlFilePath)); List<SqlCommandParser.SqlCommandCall> calls = SqlCommandParser.parse(sql); if (calls.size() == 0) { //no sql to execute throw new RuntimeException("There is no sql statement to execute,please check your sql file: " + sqlFilePath); } for (SqlCommandParser.SqlCommandCall call : calls) { // System.out.println(call.command.toString()); callCommand(call); } } // -------------------------------------------------------------------------------------------- private void callCommand(SqlCommandParser.SqlCommandCall cmdCall) { switch (cmdCall.command) { case SET: callSet(cmdCall); break; case CREATE_TABLE: callCreateTable(cmdCall); break; case INSERT_INTO: callInsertInto(cmdCall); break; default: throw new RuntimeException("Unsupported command: " + cmdCall.command); } } private void callSet(SqlCommandParser.SqlCommandCall cmdCall) { String key = cmdCall.operands[0]; String value = cmdCall.operands[1]; tEnv.getConfig().getConfiguration().setString(key, value); System.out.println("设置 " + key + "-->" + value + " 成功"); } private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) { String ddl = cmdCall.operands[0]; try { tEnv.executeSql(ddl); } catch (SqlParserException e) { throw new RuntimeException("SQL parse failed:\n" + ddl + "\n", e); } String tableName = ddl.split("\\s+")[2]; System.out.println("创建表 " + tableName + " 成功"); } private void callInsertInto(SqlCommandParser.SqlCommandCall cmdCall) { String dml = cmdCall.operands[0]; Optional<JobClient> jobClient; try { TableResult result = tEnv.executeSql(dml); jobClient = result.getJobClient(); } catch (SqlParserException e) { throw new RuntimeException("SQL parse failed:\n" + dml + "\n", e); } if (jobClient.isPresent()) { JobID jobID = jobClient.get().getJobID(); System.out.println("任务提交成功,JobId: " + jobID); } } } shell to submit a job: #!/bin/bash export FLINK_HOME=/Users/hulc/developEnv/flink-1.11.0 sql_file=$2 # flink home检查 if [ -z "$FLINK_HOME" ];then echo "请指定FLINK_HOME 或者在该配置文件中配置" exit 1 fi # 参数数量检查 if [ $# -lt 2 ];then echo "命令格式为 ./sql-submit.sh -f <sql-file>" exit 1 fi # 要依赖的jar包,这里名字是写死的,后去可以使用传入参数 # SQL_JAR=./flink-sql-submit-1.0-SNAPSHOT.jar SQL_JAR=./target/flink-test1-1.0-SNAPSHOT.jar # 检查是否正确加载这个jar包 if [ -f $SQL_JAR ];then echo "`date +%Y-%m-%d" "%H:%M:%S` load jars from ${SQL_JAR}" else echo "failed to load dependent jars for sql-submit.sh,please specify it" exit 1 fi # 检查是否制定sql文件 if [ ! -f $sql_file ];then echo "sql文件 $sql_file 不存在,请检查文件路径" exit 1 fi #提交命令, 注意这里的提交参数也是写死的,并行度5 main主类全名, 工程打出的jar包 # $1 就是 -f,也就是制定需要执行的文件参数 # $sql_file 就是制定需要执行的sql文件 if [ $1 = "-f" ];then $FLINK_HOME/bin/flink run -p 1 -c SqlSubmit /Users/hulc/develop/flink-test1/target/flink-test1-1.0-SNAPSHOT.jar $1 $sql_file else echo "命令格式为 ./sql-submit.sh -f <sql-file>" exit 1 fi -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |