This post was updated on .
CONTENTS DELETED
The author has deleted this message.
|
> 2021年1月15日 下午3:19,花乞丐 <[hidden email]> 写道: > > 我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的? > <http://apache-flink.147419.n8.nabble.com/file/t1257/E4142DB1-E410-43e8-8653-2B90D0A998EA.png> > 我代码也指定了watermark,但是debug的时候好像没有起作用 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
贴下代码,看下你是怎么使用的
| | 刘小红 | | [hidden email] | 签名由网易邮箱大师定制 在2021年1月15日 17:40,xufengfeng<[hidden email]> 写道: 2021年1月15日 下午3:19,花乞丐 <[hidden email]> 写道: 我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的? <http://apache-flink.147419.n8.nabble.com/file/t1257/E4142DB1-E410-43e8-8653-2B90D0A998EA.png> 我代码也指定了watermark,但是debug的时候好像没有起作用 -- Sent from: http://apache-flink.147419.n8.nabble.com/ 会不会是数据里时间有未来时间呢?我之前遇到过一次 |
package com.zallsteel.flink.app.log;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import com.zallsteel.flink.entity.ChangelogVO; import com.zallsteel.flink.entity.OrderInfo; import com.zallsteel.flink.utils.ConfigUtils; import lombok.SneakyThrows; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.text.ParseException; import java.time.Duration; import java.util.Date; import java.util.Properties; /** * @desc 测试MySQLCDC to Hive */ public class MySQLCDC2HiveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(6); //设置checkpoint env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName, "default", "/home/beggar/tools/apache-hive-3.1.2-bin/conf", "/home/beggar/tools/hadoop-3.1.1/etc/hadoop", "3.1.2" ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //创建mysql cdc 数据源 tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); // 创建mysql cdc 数据表 tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time TIMESTAMP,\n" + " operate_time TIMESTAMP,\n" + " province_id INT,\n" + " order_status STRING,\n" + " total_amount DECIMAL(10, 5)\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'beggar',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'cdc',\n" + " 'table-name' = 'order_info'\n" + ")"); // 创建kafka source tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + ") WITH (\n" + "'connector' = 'kafka',\n" + "'topic' = 'order_info',\n" + "'scan.startup.mode' = 'earliest-offset',\n" + "'properties.bootstrap.servers' = 'beggar.dev:9092',\n" + "'format' = 'changelog-json'\n" + ")"); // 向kafka表中插入数据 tableEnv.executeSql("INSERT INTO kafka.order_info\n" + "SELECT id, user_id, create_time, operate_time,province_id,order_status,total_amount\n" + "FROM cdc.order_info"); // 自定义带op字段的stream Properties kafkaConfig = ConfigUtils.getKafkaConfig(); FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( "order_info", new SimpleStringSchema(), kafkaConfig ).setStartFromEarliest(); DataStreamSource<String> streamSource = env.addSource(consumer); String[] fieldNames = {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"}; TypeInformation[] types = {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING}; SingleOutputStreamOperator<Row> ds2 = streamSource.map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { Gson gson = new Gson(); ChangelogVO changelogVO = gson.fromJson(value, ChangelogVO.class); String op = changelogVO.getOp(); int arity = fieldNames.length; Row row = new Row(arity); row.setField(0, changelogVO.getData().getId()); row.setField(1, changelogVO.getData().getUserId()); row.setField(2, changelogVO.getData().getCreateTime()); row.setField(3, changelogVO.getData().getOperateTime()); row.setField(4, changelogVO.getData().getProviceId()); row.setField(5, changelogVO.getData().getOrderStatus()); row.setField(6, changelogVO.getData().getTotalAmount()); String operation = getOperation(op); row.setField(7, operation); return row; } private String getOperation(String op) { String operation = "INSERT"; for (RowKind rk : RowKind.values()) { if (rk.shortString().equals(op)) { switch (rk) { case UPDATE_BEFORE: operation = "UPDATE-BEFORE"; break; case UPDATE_AFTER: operation = "UPDATE-AFTER"; break; case DELETE: operation = "DELETE"; break; case INSERT: default: operation = "INSERT"; break; } break; } } return operation; } }, new RowTypeInfo(types, fieldNames)); /ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))); tableEnv.createTemporaryView("merged_order_info", ds2); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info"); tableEnv.executeSql("CREATE TABLE ods.order_info (\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time STRING,\n" + " operate_time STRING,\n" + " province_id INT,\n" + " order_status INT,\n" + " total_amount DOUBLE,\n" + " op STRING \n" + ") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED AS parquet TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:$sec:00',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.delay'='1 min',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); TableResult tableResult = tableEnv.executeSql("INSERT INTO ods.order_info\n" + "SELECT \n" + "id,\n" + "user_id,\n" + "create_time,\n" + "operate_time,\n" + "province_id,\n" + "order_status,\n" + "total_amount,\n" + "op,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') as dt,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'HH') as hr,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'mm') as sec\n" + "FROM merged_order_info" ); try { tableEnv.execute("mysqlcdc to hive"); } catch (Exception e) { e.printStackTrace(); } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by 18500348251@163.com
代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive
shell中执行命令:hive (ods)> msck repair table order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教! package com.zallsteel.flink.app.log; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import com.zallsteel.flink.entity.ChangelogVO; import com.zallsteel.flink.entity.OrderInfo; import com.zallsteel.flink.utils.ConfigUtils; import lombok.SneakyThrows; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.text.ParseException; import java.time.Duration; import java.util.Date; import java.util.Properties; /** * @author Jackie Zhu * @time 2021-01-13 16:50:18 * @desc 测试MySQLCDC to Hive */ public class MySQLCDC2HiveApp { public static void main(String[] args) { //获取执行环节 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发 env.setParallelism(6); //设置checkpoint env.enableCheckpointing(60000); env.getConfig().setAutoWatermarkInterval(200); // 设置Flink SQL环境 EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 创建table Env StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, tableEnvSettings); // 设置checkpoint 模型 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); // 设置checkpoint间隔 tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); // 指定catalog名称 String catalogName = "devHive"; // 创建HiveCatalog HiveCatalog hiveCatalog = new HiveCatalog(catalogName, "default", "/home/beggar/tools/apache-hive-3.1.2-bin/conf", "/home/beggar/tools/hadoop-3.1.1/etc/hadoop", "3.1.2" ); //注册 Hive Catalog tableEnv.registerCatalog(catalogName,hiveCatalog); //使用hive Catalog tableEnv.useCatalog(catalogName); //创建mysql cdc 数据源 tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); // 创建mysql cdc 数据表 tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time TIMESTAMP,\n" + " operate_time TIMESTAMP,\n" + " province_id INT,\n" + " order_status STRING,\n" + " total_amount DECIMAL(10, 5)\n" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'beggar',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + " 'database-name' = 'cdc',\n" + " 'table-name' = 'order_info'\n" + ")"); // 创建kafka source tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + "id BIGINT,\n" + "user_id BIGINT,\n" + "create_time TIMESTAMP,\n" + "operate_time TIMESTAMP,\n" + "province_id INT,\n" + "order_status STRING,\n" + "total_amount DECIMAL(10, 5)\n" + ") WITH (\n" + "'connector' = 'kafka',\n" + "'topic' = 'order_info',\n" + "'scan.startup.mode' = 'earliest-offset',\n" + "'properties.bootstrap.servers' = 'beggar.dev:9092',\n" + "'format' = 'changelog-json'\n" + ")"); // 向kafka表中插入数据 tableEnv.executeSql("INSERT INTO kafka.order_info\n" + "SELECT id, user_id, create_time, operate_time,province_id,order_status,total_amount\n" + "FROM cdc.order_info"); // 自定义带op字段的stream Properties kafkaConfig = ConfigUtils.getKafkaConfig(); FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( "order_info", new SimpleStringSchema(), kafkaConfig ).setStartFromEarliest(); DataStreamSource<String> streamSource = env.addSource(consumer); String[] fieldNames = {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"}; TypeInformation[] types = {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING}; SingleOutputStreamOperator<Row> ds2 = streamSource.map(new MapFunction<String, Row>() { @Override public Row map(String value) throws Exception { Gson gson = new Gson(); ChangelogVO changelogVO = gson.fromJson(value, ChangelogVO.class); String op = changelogVO.getOp(); int arity = fieldNames.length; Row row = new Row(arity); row.setField(0, changelogVO.getData().getId()); row.setField(1, changelogVO.getData().getUserId()); row.setField(2, changelogVO.getData().getCreateTime()); row.setField(3, changelogVO.getData().getOperateTime()); row.setField(4, changelogVO.getData().getProviceId()); row.setField(5, changelogVO.getData().getOrderStatus()); row.setField(6, changelogVO.getData().getTotalAmount()); String operation = getOperation(op); row.setField(7, operation); return row; } private String getOperation(String op) { String operation = "INSERT"; for (RowKind rk : RowKind.values()) { if (rk.shortString().equals(op)) { switch (rk) { case UPDATE_BEFORE: operation = "UPDATE-BEFORE"; break; case UPDATE_AFTER: operation = "UPDATE-AFTER"; break; case DELETE: operation = "DELETE"; break; case INSERT: default: operation = "INSERT"; break; } break; } } return operation; } }, new RowTypeInfo(types, fieldNames)); // 设置水印 ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))); tableEnv.createTemporaryView("merged_order_info", ds2); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info"); tableEnv.executeSql("CREATE TABLE ods.order_info (\n" + " id BIGINT,\n" + " user_id BIGINT,\n" + " create_time STRING,\n" + " operate_time STRING,\n" + " province_id INT,\n" + " order_status INT,\n" + " total_amount DOUBLE,\n" + " op STRING \n" + ") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED AS parquet TBLPROPERTIES (\n" + " 'partition.time-extractor.timestamp-pattern'='$dt $hr:$sec:00',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.delay'='1 min',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); TableResult tableResult = tableEnv.executeSql("INSERT INTO ods.order_info\n" + "SELECT \n" + "id,\n" + "user_id,\n" + "create_time,\n" + "operate_time,\n" + "province_id,\n" + "order_status,\n" + "total_amount,\n" + "op,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd') as dt,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'HH') as hr,\n" + "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd HH:mm:ss'),'mm') as sec\n" + "FROM merged_order_info" ); try { tableEnv.execute("mysqlcdc to hive"); } catch (Exception e) { e.printStackTrace(); } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
这种情况一般是kafka的某个分区,不存在数据,导致总体的watermark不前进。遇到这种情况一般是需要手动设置idle
source[1]。但是社区的watemark push down存在一些问题[2],已经在修复了。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout [2] https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel 花乞丐 <[hidden email]> 于2021年1月18日周一 上午11:42写道: > > 代码已经附上,我现在是数据已经写入hdfs,有文件生产,但是目前添加的水印无效,所以一直没有更新metastore信息,导致metastore中一直没有分区信息,必须在hive > shell中执行命令:hive (ods)> msck repair table > > order_info。之后才可以查询到数据,经过debug发现,在分区提交的时候,需要判断水印的值比从分区提取的值+延迟时间大,才会提交分区,但是现在,水印的值一直是Long.MIN_VALUE,导致一直无法提交水印,我在代码中已经设置了水印,是不是我的水印设置姿势不对,还请指教! > package com.zallsteel.flink.app.log; > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONObject; > import com.google.gson.Gson; > import com.google.gson.JsonArray; > import com.google.gson.JsonElement; > import com.google.gson.JsonParser; > import com.zallsteel.flink.entity.ChangelogVO; > import com.zallsteel.flink.entity.OrderInfo; > import com.zallsteel.flink.utils.ConfigUtils; > > import lombok.SneakyThrows; > import org.apache.commons.lang3.time.FastDateFormat; > import org.apache.flink.api.common.eventtime.*; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.SqlDialect; > import org.apache.flink.table.api.TableResult; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.catalog.hive.HiveCatalog; > import org.apache.flink.types.Row; > import org.apache.flink.types.RowKind; > > import java.text.ParseException; > import java.time.Duration; > import java.util.Date; > import java.util.Properties; > > /** > * @author Jackie Zhu > * @time 2021-01-13 16:50:18 > * @desc 测试MySQLCDC to Hive > */ > public class MySQLCDC2HiveApp { > public static void main(String[] args) { > //获取执行环节 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 设置并发 > env.setParallelism(6); > //设置checkpoint > env.enableCheckpointing(60000); > env.getConfig().setAutoWatermarkInterval(200); > // 设置Flink SQL环境 > EnvironmentSettings tableEnvSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > // 创建table Env > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env, > tableEnvSettings); > // 设置checkpoint 模型 > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE); > // 设置checkpoint间隔 > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofMinutes(1)); > // 指定catalog名称 > String catalogName = "devHive"; > // 创建HiveCatalog > HiveCatalog hiveCatalog = new HiveCatalog(catalogName, > "default", > "/home/beggar/tools/apache-hive-3.1.2-bin/conf", > "/home/beggar/tools/hadoop-3.1.1/etc/hadoop", > "3.1.2" > ); > //注册 Hive Catalog > tableEnv.registerCatalog(catalogName,hiveCatalog); > //使用hive Catalog > tableEnv.useCatalog(catalogName); > //创建mysql cdc 数据源 > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > // 创建mysql cdc 数据表 > tableEnv.executeSql("DROP TABLE IF EXISTS cdc.order_info"); > tableEnv.executeSql("CREATE TABLE cdc.order_info(\n" + > " id BIGINT,\n" + > " user_id BIGINT,\n" + > " create_time TIMESTAMP,\n" + > " operate_time TIMESTAMP,\n" + > " province_id INT,\n" + > " order_status STRING,\n" + > " total_amount DECIMAL(10, 5)\n" + > " ) WITH (\n" + > " 'connector' = 'mysql-cdc',\n" + > " 'hostname' = 'beggar',\n" + > " 'port' = '3306',\n" + > " 'username' = 'root',\n" + > " 'password' = '123456',\n" + > " 'database-name' = 'cdc',\n" + > " 'table-name' = 'order_info'\n" + > ")"); > // 创建kafka source > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > tableEnv.executeSql("DROP TABLE IF EXISTS kafka.order_info"); > tableEnv.executeSql("CREATE TABLE kafka.order_info (\n" + > "id BIGINT,\n" + > "user_id BIGINT,\n" + > "create_time TIMESTAMP,\n" + > "operate_time TIMESTAMP,\n" + > "province_id INT,\n" + > "order_status STRING,\n" + > "total_amount DECIMAL(10, 5)\n" + > ") WITH (\n" + > "'connector' = 'kafka',\n" + > "'topic' = 'order_info',\n" + > "'scan.startup.mode' = 'earliest-offset',\n" + > "'properties.bootstrap.servers' = 'beggar.dev:9092',\n" + > "'format' = 'changelog-json'\n" + > ")"); > // 向kafka表中插入数据 > tableEnv.executeSql("INSERT INTO kafka.order_info\n" + > "SELECT id, user_id, create_time, > operate_time,province_id,order_status,total_amount\n" + > "FROM cdc.order_info"); > // 自定义带op字段的stream > Properties kafkaConfig = ConfigUtils.getKafkaConfig(); > FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( > "order_info", > new SimpleStringSchema(), > kafkaConfig > ).setStartFromEarliest(); > DataStreamSource<String> streamSource = env.addSource(consumer); > > > > String[] fieldNames = > > {"id","user_id","create_time","operate_time","province_id","order_status","total_amount","op"}; > > TypeInformation[] types = > > {Types.LONG,Types.LONG,Types.STRING,Types.STRING,Types.INT,Types.INT,Types.DOUBLE,Types.STRING}; > > SingleOutputStreamOperator<Row> ds2 = streamSource.map(new > MapFunction<String, Row>() { > @Override > public Row map(String value) throws Exception { > Gson gson = new Gson(); > ChangelogVO changelogVO = gson.fromJson(value, > ChangelogVO.class); > String op = changelogVO.getOp(); > int arity = fieldNames.length; > Row row = new Row(arity); > row.setField(0, changelogVO.getData().getId()); > row.setField(1, changelogVO.getData().getUserId()); > row.setField(2, changelogVO.getData().getCreateTime()); > row.setField(3, changelogVO.getData().getOperateTime()); > row.setField(4, changelogVO.getData().getProviceId()); > row.setField(5, changelogVO.getData().getOrderStatus()); > row.setField(6, changelogVO.getData().getTotalAmount()); > String operation = getOperation(op); > row.setField(7, operation); > return row; > } > > private String getOperation(String op) { > String operation = "INSERT"; > for (RowKind rk : RowKind.values()) { > if (rk.shortString().equals(op)) { > switch (rk) { > case UPDATE_BEFORE: > operation = "UPDATE-BEFORE"; > break; > case UPDATE_AFTER: > operation = "UPDATE-AFTER"; > break; > case DELETE: > operation = "DELETE"; > break; > case INSERT: > default: > operation = "INSERT"; > break; > } > break; > } > } > return operation; > } > }, new RowTypeInfo(types, fieldNames)); > // 设置水印 > > > ds2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))); > tableEnv.createTemporaryView("merged_order_info", ds2); > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > tableEnv.executeSql("DROP TABLE IF EXISTS ods.order_info"); > tableEnv.executeSql("CREATE TABLE ods.order_info (\n" + > " id BIGINT,\n" + > " user_id BIGINT,\n" + > " create_time STRING,\n" + > " operate_time STRING,\n" + > " province_id INT,\n" + > " order_status INT,\n" + > " total_amount DOUBLE,\n" + > " op STRING \n" + > ") PARTITIONED BY (dt STRING, hr STRING,sec STRING) STORED > AS parquet TBLPROPERTIES (\n" + > " 'partition.time-extractor.timestamp-pattern'='$dt > $hr:$sec:00',\n" + > " 'sink.partition-commit.trigger'='partition-time',\n" + > " 'sink.partition-commit.delay'='1 min',\n" + > " > 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + > ")"); > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > TableResult tableResult = tableEnv.executeSql("INSERT INTO > ods.order_info\n" + > "SELECT \n" + > "id,\n" + > "user_id,\n" + > "create_time,\n" + > "operate_time,\n" + > "province_id,\n" + > "order_status,\n" + > "total_amount,\n" + > "op,\n" + > "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd > HH:mm:ss'),'yyyy-MM-dd') as dt,\n" + > "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd > HH:mm:ss'),'HH') as hr,\n" + > "DATE_FORMAT(TO_TIMESTAMP(create_time,'yyyy-MM-dd > HH:mm:ss'),'mm') as sec\n" + > "FROM merged_order_info" > ); > try { > tableEnv.execute("mysqlcdc to hive"); > } catch (Exception e) { > e.printStackTrace(); > } > > > } > } > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
In reply to this post by 花乞丐
多谢各位的耐心回答,我已经找到问题了,目前是水印使用有点问题,是我自己的问题,不好意思各位
<http://apache-flink.147419.n8.nabble.com/file/t1257/%E4%BD%BF%E7%94%A8%E5%A7%BF%E5%8A%BF.jpg> 修改之后,发现还是无法提交分区数据,经调试发现,watermark值目前是ok,但是其次是,由于Flink的toMills方法使用的UTC时间,导致我们从分区提取值时,比原始值大了8个小时,因此,导致水印一直小于 partition_time+commitDelay。接下来进行相应处理即可。 <http://apache-flink.147419.n8.nabble.com/file/t1257/%E6%97%B6%E5%8C%BA%E9%97%AE%E9%A2%98.jpg> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |