我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[hive_catalog, cdc, team]], fields=[team_id, team_name, create_time, update_time]) 我的问题: 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka -> hive streaming? 谢谢! 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? sql语句如下 CREATE DATABASE IF NOT EXISTS cdc DROP TABLE IF EXISTS cdc.team CREATE TABLE team( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, proctime as proctime() ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'team' ) CREATE DATABASE IF NOT EXISTS ods DROP TABLE IF EXISTS ods.team CREATE TABLE ods.team ( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, ) PARTITIONED BY ( ts_date STRING, ts_hour STRING, ts_minute STRING, ) STORED AS PARQUET TBLPROPERTIES ( 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' ) INSERT INTO ods.team SELECT team_id, team_name, create_time, update_time, my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), my_date_format(create_time,'HH', 'Asia/Shanghai'), my_date_format(create_time,'mm', 'Asia/Shanghai') FROM cdc.team |
hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
| | 罗显宴 | | 邮箱:[hidden email] | 签名由 网易邮箱大师 定制 在2020年10月31日 12:06,陈帅 写道: 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[hive_catalog, cdc, team]], fields=[team_id, team_name, create_time, update_time]) 我的问题: 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka -> hive streaming? 谢谢! 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? sql语句如下 CREATE DATABASE IF NOT EXISTS cdc DROP TABLE IF EXISTS cdc.team CREATE TABLE team( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, proctime as proctime() ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'root', 'database-name' = 'test', 'table-name' = 'team' ) CREATE DATABASE IF NOT EXISTS ods DROP TABLE IF EXISTS ods.team CREATE TABLE ods.team ( team_id BIGINT, team_name STRING, create_time TIMESTAMP, update_time TIMESTAMP, ) PARTITIONED BY ( ts_date STRING, ts_hour STRING, ts_minute STRING, ) STORED AS PARQUET TBLPROPERTIES ( 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00' ) INSERT INTO ods.team SELECT team_id, team_name, create_time, update_time, my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), my_date_format(create_time,'HH', 'Asia/Shanghai'), my_date_format(create_time,'mm', 'Asia/Shanghai') FROM cdc.team |
Administrator
|
1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive
ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive 中进行合并。merge过程可以参考这篇文章[1]。 3. 你可以 ts + INTERVAL '8' HOUR PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 Best, Jark On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > > > | | > 罗显宴 > | > | > 邮箱:[hidden email] > | > > 签名由 网易邮箱大师 定制 > > 在2020年10月31日 12:06,陈帅 写道: > 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > > Exception in thread "main" org.apache.flink.table.api.TableException: > AppendStreamTableSink doesn't support consuming update and delete changes > which is produced by node TableSourceScan(table=[[hive_catalog, cdc, > team]], fields=[team_id, team_name, create_time, update_time]) > > 我的问题: > 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka > -> hive streaming? 谢谢! > 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > > sql语句如下 > > CREATE DATABASE IF NOT EXISTS cdc > > DROP TABLE IF EXISTS cdc.team > > CREATE TABLE team( > team_id BIGINT, > team_name STRING, > create_time TIMESTAMP, > update_time TIMESTAMP, > proctime as proctime() > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = 'root', > 'database-name' = 'test', > 'table-name' = 'team' > ) > > CREATE DATABASE IF NOT EXISTS ods > > DROP TABLE IF EXISTS ods.team > > CREATE TABLE ods.team ( > team_id BIGINT, > team_name STRING, > create_time TIMESTAMP, > update_time TIMESTAMP, > ) PARTITIONED BY ( > ts_date STRING, > ts_hour STRING, > ts_minute STRING, > ) STORED AS PARQUET TBLPROPERTIES ( > 'sink.partition-commit.trigger' = 'partition-time', > 'sink.partition-commit.delay' = '1 min', > 'sink.partition-commit.policy.kind' = 'metastore,success-file', > 'partition.time-extractor.timestamp-pattern' = '$ts_date > $ts_hour:$ts_minute:00' > ) > > INSERT INTO ods.team > SELECT team_id, team_name, create_time, update_time, > my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > my_date_format(create_time,'HH', 'Asia/Shanghai'), > my_date_format(create_time,'mm', 'Asia/Shanghai') > FROM cdc.team > |
谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? cdc -> kafka示例消息如下 {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.time.Duration; import java.util.Properties; public class MysqlCDC2Hive { public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); streamEnv.setParallelism(3); streamEnv.enableCheckpointing(60000); EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); String catalogName = "hive_catalog"; HiveCatalog catalog = new HiveCatalog( catalogName, "default", "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", "2.3.4" ); tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); MyDateFormat2 myDateFormat = new MyDateFormat2(); tableEnv.registerFunction("my_date_format", myDateFormat); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); tableEnv.executeSql("CREATE TABLE cdc.team(\n" + " team_id INT,\n" + " team_name STRING,\n" + " create_time TIMESTAMP,\n" + " update_time TIMESTAMP,\n" + " proctime as proctime()\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = 'root',\n" + " 'database-name' = 'test',\n" + " 'table-name' = 'team'\n" + ")"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); tableEnv.executeSql("CREATE TABLE kafka.team (\n" + " team_id INT,\n" + " team_name STRING,\n" + " create_time TIMESTAMP,\n" + " update_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'team',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'format' = 'changelog-json'\n" + ")"); tableEnv.executeSql("INSERT INTO kafka.team \n" + "SELECT team_id, team_name, create_time, update_time \n" + "FROM cdc.team"); // 定义带op字段的stream Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( "team", new SimpleStringSchema(), properties ).setStartFromEarliest(); DataStream<String> ds = streamEnv.addSource(consumer); String[] fieldNames = {"team_id", "team_name", "create_time", "update_time", "op"}; TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING}; DataStream<Row> ds2 = ds.map(str -> { JSONObject jsonObject = JSON.parseObject(str); String op = jsonObject.getString("op"); JSONObject data = jsonObject.getJSONObject("data"); int arity = fieldNames.length; Row row = new Row(arity); row.setField(0, data.get("team_id")); row.setField(1, data.get("team_name")); row.setField(2, data.get("create_time")); row.setField(3, data.get("update_time")); String operation = getOperation(op); row.setField(4, operation); return row; }, new RowTypeInfo(types, fieldNames)); tableEnv.registerDataStream("merged_team", ds2); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); tableEnv.executeSql("CREATE TABLE ods.team (\n" + " team_id INT,\n" + " team_name STRING,\n" + " create_time STRING,\n" + " update_time STRING,\n" + " op STRING\n" + // ") PARTITIONED BY (\n" + // " ts_date STRING,\n" + // " ts_hour STRING,\n" + // " ts_minute STRING\n" + ") STORED AS PARQUET TBLPROPERTIES (\n" + " 'sink.partition-commit.trigger' = 'partition-time',\n" + " 'sink.partition-commit.delay' = '1 min',\n" + " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + " 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("INSERT INTO ods.team \n" + "SELECT team_id, team_name, create_time, update_time, op \n" + // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), 'HH') as ts_hour, \n" + // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), 'mm') as ts_minute \n" + "FROM merged_team"); tableEnv.execute("MysqlCDC2Hive2"); streamEnv.execute(""); } private static String getOperation(String op) { String operation = "INSERT"; for (RowKind rk : RowKind.values()) { if (rk.shortString().equals(op)) { switch (rk) { case UPDATE_BEFORE: case UPDATE_AFTER: operation = "UPDATE"; break; case DELETE: operation = "DELETE"; break; case INSERT: default: operation = "INSERT"; break; } break; } } return operation; } } Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive > ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > > 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming > 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > 中进行合并。merge过程可以参考这篇文章[1]。 > > 3. 你可以 ts + INTERVAL '8' HOUR > > PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > > Best, > Jark > > On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > >> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >> >> >> | | >> 罗显宴 >> | >> | >> 邮箱:[hidden email] >> | >> >> 签名由 网易邮箱大师 定制 >> >> 在2020年10月31日 12:06,陈帅 写道: >> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> AppendStreamTableSink doesn't support consuming update and delete changes >> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >> team]], fields=[team_id, team_name, create_time, update_time]) >> >> 我的问题: >> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka >> -> hive streaming? 谢谢! >> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >> >> sql语句如下 >> >> CREATE DATABASE IF NOT EXISTS cdc >> >> DROP TABLE IF EXISTS cdc.team >> >> CREATE TABLE team( >> team_id BIGINT, >> team_name STRING, >> create_time TIMESTAMP, >> update_time TIMESTAMP, >> proctime as proctime() >> ) WITH ( >> 'connector' = 'mysql-cdc', >> 'hostname' = 'localhost', >> 'port' = '3306', >> 'username' = 'root', >> 'password' = 'root', >> 'database-name' = 'test', >> 'table-name' = 'team' >> ) >> >> CREATE DATABASE IF NOT EXISTS ods >> >> DROP TABLE IF EXISTS ods.team >> >> CREATE TABLE ods.team ( >> team_id BIGINT, >> team_name STRING, >> create_time TIMESTAMP, >> update_time TIMESTAMP, >> ) PARTITIONED BY ( >> ts_date STRING, >> ts_hour STRING, >> ts_minute STRING, >> ) STORED AS PARQUET TBLPROPERTIES ( >> 'sink.partition-commit.trigger' = 'partition-time', >> 'sink.partition-commit.delay' = '1 min', >> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >> 'partition.time-extractor.timestamp-pattern' = '$ts_date >> $ts_hour:$ts_minute:00' >> ) >> >> INSERT INTO ods.team >> SELECT team_id, team_name, create_time, update_time, >> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >> my_date_format(create_time,'HH', 'Asia/Shanghai'), >> my_date_format(create_time,'mm', 'Asia/Shanghai') >> FROM cdc.team >> > |
Administrator
|
你检查一下 hive 文件是否正常生成了?
我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > > cdc -> kafka示例消息如下 > {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONObject; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.SqlDialect; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.catalog.hive.HiveCatalog; > import org.apache.flink.types.Row; > import org.apache.flink.types.RowKind; > > import java.time.Duration; > import java.util.Properties; > > public class MysqlCDC2Hive { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > streamEnv.setParallelism(3); > streamEnv.enableCheckpointing(60000); > > EnvironmentSettings tableEnvSettings = > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE); > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofMinutes(1)); > > String catalogName = "hive_catalog"; > HiveCatalog catalog = new HiveCatalog( > catalogName, > "default", > "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > "2.3.4" > ); > tableEnv.registerCatalog(catalogName, catalog); > tableEnv.useCatalog(catalogName); > > MyDateFormat2 myDateFormat = new MyDateFormat2(); > tableEnv.registerFunction("my_date_format", myDateFormat); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time TIMESTAMP,\n" + > " update_time TIMESTAMP,\n" + > " proctime as proctime()\n" + > ") WITH (\n" + > " 'connector' = 'mysql-cdc',\n" + > " 'hostname' = 'localhost',\n" + > " 'port' = '3306',\n" + > " 'username' = 'root',\n" + > " 'password' = 'root',\n" + > " 'database-name' = 'test',\n" + > " 'table-name' = 'team'\n" + > ")"); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time TIMESTAMP,\n" + > " update_time TIMESTAMP\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'team',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'format' = 'changelog-json'\n" + > ")"); > > tableEnv.executeSql("INSERT INTO kafka.team \n" + > "SELECT team_id, team_name, create_time, update_time \n" + > "FROM cdc.team"); > > // 定义带op字段的stream > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "test"); > > FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( > "team", > new SimpleStringSchema(), > properties > ).setStartFromEarliest(); > > DataStream<String> ds = streamEnv.addSource(consumer); > > String[] fieldNames = {"team_id", "team_name", "create_time", > "update_time", "op"}; > TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, > Types.STRING, Types.STRING}; > DataStream<Row> ds2 = ds.map(str -> { > JSONObject jsonObject = JSON.parseObject(str); > String op = jsonObject.getString("op"); > JSONObject data = jsonObject.getJSONObject("data"); > int arity = fieldNames.length; > Row row = new Row(arity); > row.setField(0, data.get("team_id")); > row.setField(1, data.get("team_name")); > row.setField(2, data.get("create_time")); > row.setField(3, data.get("update_time")); > String operation = getOperation(op); > row.setField(4, operation); > > return row; > }, new RowTypeInfo(types, fieldNames)); > > tableEnv.registerDataStream("merged_team", ds2); > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > tableEnv.executeSql("CREATE TABLE ods.team (\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time STRING,\n" + > " update_time STRING,\n" + > " op STRING\n" + > // ") PARTITIONED BY (\n" + > // " ts_date STRING,\n" + > // " ts_hour STRING,\n" + > // " ts_minute STRING\n" + > ") STORED AS PARQUET TBLPROPERTIES (\n" + > " 'sink.partition-commit.trigger' = 'partition-time',\n" + > " 'sink.partition-commit.delay' = '1 min',\n" + > " 'sink.partition-commit.policy.kind' = > 'metastore,success-file',\n" + > " 'partition.time-extractor.timestamp-pattern' = > '$ts_date $ts_hour:$ts_minute:00'\n" + > ")"); > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql("INSERT INTO ods.team \n" + > "SELECT team_id, team_name, create_time, update_time, op > \n" + > // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'HH') as ts_hour, \n" + > // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'mm') as ts_minute \n" + > "FROM merged_team"); > tableEnv.execute("MysqlCDC2Hive2"); > > streamEnv.execute(""); > } > > private static String getOperation(String op) { > String operation = "INSERT"; > for (RowKind rk : RowKind.values()) { > if (rk.shortString().equals(op)) { > switch (rk) { > case UPDATE_BEFORE: > case UPDATE_AFTER: > operation = "UPDATE"; > break; > case DELETE: > operation = "DELETE"; > break; > case INSERT: > default: > operation = "INSERT"; > break; > } > break; > } > } > return operation; > } > } > > Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > >> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >> >> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >> 中进行合并。merge过程可以参考这篇文章[1]。 >> >> 3. 你可以 ts + INTERVAL '8' HOUR >> >> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >> >> Best, >> Jark >> >> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: >> >>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>> >>> >>> | | >>> 罗显宴 >>> | >>> | >>> 邮箱:[hidden email] >>> | >>> >>> 签名由 网易邮箱大师 定制 >>> >>> 在2020年10月31日 12:06,陈帅 写道: >>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> AppendStreamTableSink doesn't support consuming update and delete changes >>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>> team]], fields=[team_id, team_name, create_time, update_time]) >>> >>> 我的问题: >>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka >>> -> hive streaming? 谢谢! >>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>> >>> sql语句如下 >>> >>> CREATE DATABASE IF NOT EXISTS cdc >>> >>> DROP TABLE IF EXISTS cdc.team >>> >>> CREATE TABLE team( >>> team_id BIGINT, >>> team_name STRING, >>> create_time TIMESTAMP, >>> update_time TIMESTAMP, >>> proctime as proctime() >>> ) WITH ( >>> 'connector' = 'mysql-cdc', >>> 'hostname' = 'localhost', >>> 'port' = '3306', >>> 'username' = 'root', >>> 'password' = 'root', >>> 'database-name' = 'test', >>> 'table-name' = 'team' >>> ) >>> >>> CREATE DATABASE IF NOT EXISTS ods >>> >>> DROP TABLE IF EXISTS ods.team >>> >>> CREATE TABLE ods.team ( >>> team_id BIGINT, >>> team_name STRING, >>> create_time TIMESTAMP, >>> update_time TIMESTAMP, >>> ) PARTITIONED BY ( >>> ts_date STRING, >>> ts_hour STRING, >>> ts_minute STRING, >>> ) STORED AS PARQUET TBLPROPERTIES ( >>> 'sink.partition-commit.trigger' = 'partition-time', >>> 'sink.partition-commit.delay' = '1 min', >>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>> $ts_hour:$ts_minute:00' >>> ) >>> >>> INSERT INTO ods.team >>> SELECT team_id, team_name, create_time, update_time, >>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>> FROM cdc.team >>> >> |
我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
shell查不到数据。 import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Properties; public class MysqlCDC2Hive { private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); streamEnv.setParallelism(3); streamEnv.enableCheckpointing(60000); EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(1)); String catalogName = "hive_catalog"; HiveCatalog catalog = new HiveCatalog( catalogName, "default", "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", "2.3.4" ); tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); MyDateFormat2 myDateFormat = new MyDateFormat2(); tableEnv.registerFunction("my_date_format", myDateFormat); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); tableEnv.executeSql("CREATE TABLE cdc.team(\n" + " team_id INT,\n" + " team_name STRING,\n" + " create_time TIMESTAMP,\n" + " update_time TIMESTAMP,\n" + " proctime as proctime()\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = 'root',\n" + " 'database-name' = 'test',\n" + " 'table-name' = 'team'\n" + ")"); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); tableEnv.executeSql("CREATE TABLE kafka.team (\n" + " team_id INT,\n" + " team_name STRING,\n" + " create_time TIMESTAMP,\n" + " update_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'team',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + " 'properties.bootstrap.servers' = 'localhost:9092',\n" + " 'format' = 'changelog-json'\n" + ")"); tableEnv.executeSql("INSERT INTO kafka.team \n" + "SELECT team_id, team_name, create_time, update_time \n" + "FROM cdc.team"); // 定义带op字段的stream Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test1`"); FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( "team", new SimpleStringSchema(), properties ).setStartFromEarliest(); DataStream<String> ds = streamEnv.addSource(consumer); String[] fieldNames = {"team_id", "team_name", "create_time", "update_time", "op"}; TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING}; DataStream<Row> ds2 = ds.map(str -> { JSONObject jsonObject = JSON.parseObject(str); String op = jsonObject.getString("op"); JSONObject data = jsonObject.getJSONObject("data"); int arity = fieldNames.length; Row row = new Row(arity); row.setField(0, data.get("team_id")); row.setField(1, data.get("team_name")); row.setField(2, data.get("create_time")); row.setField(3, data.get("update_time")); String operation = getOperation(op); row.setField(4, operation); return row; }, new RowTypeInfo(types, fieldNames)) *.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { @Override public long extractTimestamp(Row row) { String dt = (String) row.getField(2); LocalDateTime ldt = LocalDateTime.parse(dt, dtf); Instant instant = ldt.atZone(ZoneId.systemDefault()).toInstant(); long timeInMillis = instant.toEpochMilli(); return timeInMillis; } });* tableEnv.registerDataStream("merged_team", ds2); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); tableEnv.executeSql("CREATE TABLE ods.team (\n" + " team_id INT,\n" + " team_name STRING,\n" + " create_time STRING,\n" + " update_time STRING,\n" + " op STRING\n" + ") PARTITIONED BY (\n" + " dt STRING,\n" + " hr STRING,\n" + " mi STRING\n" + ") STORED AS PARQUET TBLPROPERTIES (\n" + " 'sink.partition-commit.trigger' = 'partition-time',\n" + " 'sink.partition-commit.delay' = '1 min',\n" + " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + " 'partition.time-extractor.timestamp-pattern' = '$dt $hr:$mi:00'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("INSERT INTO ods.team \n" + "SELECT team_id, team_name, create_time, update_time, op, \n" + " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), 'yyyyMMdd') as dt, \n" + " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), 'HH') as hr, \n" + " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), 'mm') as mi \n" + "FROM merged_team"); tableEnv.execute("MysqlCDC2Hive2"); streamEnv.execute(""); } private static String getOperation(String op) { String operation = "INSERT"; for (RowKind rk : RowKind.values()) { if (rk.shortString().equals(op)) { switch (rk) { case UPDATE_BEFORE: case UPDATE_AFTER: operation = "UPDATE"; break; case DELETE: operation = "DELETE"; break; case INSERT: default: operation = "INSERT"; break; } break; } } return operation; } } Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > 你检查一下 hive 文件是否正常生成了? > > 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > > Best, > Jark > > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > > On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > >> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >> >> cdc -> kafka示例消息如下 >> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >> >> import com.alibaba.fastjson.JSON; >> import com.alibaba.fastjson.JSONObject; >> import org.apache.flink.api.common.serialization.SimpleStringSchema; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.Types; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.streaming.api.CheckpointingMode; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.SqlDialect; >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> import org.apache.flink.table.catalog.hive.HiveCatalog; >> import org.apache.flink.types.Row; >> import org.apache.flink.types.RowKind; >> >> import java.time.Duration; >> import java.util.Properties; >> >> public class MysqlCDC2Hive { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> streamEnv.setParallelism(3); >> streamEnv.enableCheckpointing(60000); >> >> EnvironmentSettings tableEnvSettings = >> EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >> >> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >> CheckpointingMode.EXACTLY_ONCE); >> >> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >> Duration.ofMinutes(1)); >> >> String catalogName = "hive_catalog"; >> HiveCatalog catalog = new HiveCatalog( >> catalogName, >> "default", >> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >> "2.3.4" >> ); >> tableEnv.registerCatalog(catalogName, catalog); >> tableEnv.useCatalog(catalogName); >> >> MyDateFormat2 myDateFormat = new MyDateFormat2(); >> tableEnv.registerFunction("my_date_format", myDateFormat); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time TIMESTAMP,\n" + >> " update_time TIMESTAMP,\n" + >> " proctime as proctime()\n" + >> ") WITH (\n" + >> " 'connector' = 'mysql-cdc',\n" + >> " 'hostname' = 'localhost',\n" + >> " 'port' = '3306',\n" + >> " 'username' = 'root',\n" + >> " 'password' = 'root',\n" + >> " 'database-name' = 'test',\n" + >> " 'table-name' = 'team'\n" + >> ")"); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time TIMESTAMP,\n" + >> " update_time TIMESTAMP\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka',\n" + >> " 'topic' = 'team',\n" + >> " 'scan.startup.mode' = 'earliest-offset',\n" + >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" + >> " 'format' = 'changelog-json'\n" + >> ")"); >> >> tableEnv.executeSql("INSERT INTO kafka.team \n" + >> "SELECT team_id, team_name, create_time, update_time \n" + >> "FROM cdc.team"); >> >> // 定义带op字段的stream >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "test"); >> >> FlinkKafkaConsumerBase<String> consumer = new >> FlinkKafkaConsumer<>( >> "team", >> new SimpleStringSchema(), >> properties >> ).setStartFromEarliest(); >> >> DataStream<String> ds = streamEnv.addSource(consumer); >> >> String[] fieldNames = {"team_id", "team_name", "create_time", >> "update_time", "op"}; >> TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, >> Types.STRING, Types.STRING}; >> DataStream<Row> ds2 = ds.map(str -> { >> JSONObject jsonObject = JSON.parseObject(str); >> String op = jsonObject.getString("op"); >> JSONObject data = jsonObject.getJSONObject("data"); >> int arity = fieldNames.length; >> Row row = new Row(arity); >> row.setField(0, data.get("team_id")); >> row.setField(1, data.get("team_name")); >> row.setField(2, data.get("create_time")); >> row.setField(3, data.get("update_time")); >> String operation = getOperation(op); >> row.setField(4, operation); >> >> return row; >> }, new RowTypeInfo(types, fieldNames)); >> >> tableEnv.registerDataStream("merged_team", ds2); >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >> >> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time STRING,\n" + >> " update_time STRING,\n" + >> " op STRING\n" + >> // ") PARTITIONED BY (\n" + >> // " ts_date STRING,\n" + >> // " ts_hour STRING,\n" + >> // " ts_minute STRING\n" + >> ") STORED AS PARQUET TBLPROPERTIES (\n" + >> " 'sink.partition-commit.trigger' = 'partition-time',\n" >> + >> " 'sink.partition-commit.delay' = '1 min',\n" + >> " 'sink.partition-commit.policy.kind' = >> 'metastore,success-file',\n" + >> " 'partition.time-extractor.timestamp-pattern' = >> '$ts_date $ts_hour:$ts_minute:00'\n" + >> ")"); >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >> tableEnv.executeSql("INSERT INTO ods.team \n" + >> "SELECT team_id, team_name, create_time, update_time, op >> \n" + >> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'HH') as ts_hour, \n" + >> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'mm') as ts_minute \n" + >> "FROM merged_team"); >> tableEnv.execute("MysqlCDC2Hive2"); >> >> streamEnv.execute(""); >> } >> >> private static String getOperation(String op) { >> String operation = "INSERT"; >> for (RowKind rk : RowKind.values()) { >> if (rk.shortString().equals(op)) { >> switch (rk) { >> case UPDATE_BEFORE: >> case UPDATE_AFTER: >> operation = "UPDATE"; >> break; >> case DELETE: >> operation = "DELETE"; >> break; >> case INSERT: >> default: >> operation = "INSERT"; >> break; >> } >> break; >> } >> } >> return operation; >> } >> } >> >> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: >> >>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>> >>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>> 中进行合并。merge过程可以参考这篇文章[1]。 >>> >>> 3. 你可以 ts + INTERVAL '8' HOUR >>> >>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>> >>> Best, >>> Jark >>> >>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: >>> >>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>> >>>> >>>> | | >>>> 罗显宴 >>>> | >>>> | >>>> 邮箱:[hidden email] >>>> | >>>> >>>> 签名由 网易邮箱大师 定制 >>>> >>>> 在2020年10月31日 12:06,陈帅 写道: >>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>> >>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>> AppendStreamTableSink doesn't support consuming update and delete >>>> changes >>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>> >>>> 我的问题: >>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>> kafka,然后kafka >>>> -> hive streaming? 谢谢! >>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>> >>>> sql语句如下 >>>> >>>> CREATE DATABASE IF NOT EXISTS cdc >>>> >>>> DROP TABLE IF EXISTS cdc.team >>>> >>>> CREATE TABLE team( >>>> team_id BIGINT, >>>> team_name STRING, >>>> create_time TIMESTAMP, >>>> update_time TIMESTAMP, >>>> proctime as proctime() >>>> ) WITH ( >>>> 'connector' = 'mysql-cdc', >>>> 'hostname' = 'localhost', >>>> 'port' = '3306', >>>> 'username' = 'root', >>>> 'password' = 'root', >>>> 'database-name' = 'test', >>>> 'table-name' = 'team' >>>> ) >>>> >>>> CREATE DATABASE IF NOT EXISTS ods >>>> >>>> DROP TABLE IF EXISTS ods.team >>>> >>>> CREATE TABLE ods.team ( >>>> team_id BIGINT, >>>> team_name STRING, >>>> create_time TIMESTAMP, >>>> update_time TIMESTAMP, >>>> ) PARTITIONED BY ( >>>> ts_date STRING, >>>> ts_hour STRING, >>>> ts_minute STRING, >>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>> 'sink.partition-commit.trigger' = 'partition-time', >>>> 'sink.partition-commit.delay' = '1 min', >>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>> $ts_hour:$ts_minute:00' >>>> ) >>>> >>>> INSERT INTO ods.team >>>> SELECT team_id, team_name, create_time, update_time, >>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>> FROM cdc.team >>>> >>> |
之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > shell查不到数据。 > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONObject; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.SqlDialect; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.catalog.hive.HiveCatalog; > import org.apache.flink.types.Row; > import org.apache.flink.types.RowKind; > > import java.time.Duration; > import java.time.Instant; > import java.time.LocalDateTime; > import java.time.ZoneId; > import java.time.format.DateTimeFormatter; > import java.util.Properties; > > public class MysqlCDC2Hive { > > private static final DateTimeFormatter dtf = > DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > streamEnv.setParallelism(3); > streamEnv.enableCheckpointing(60000); > > EnvironmentSettings tableEnvSettings = > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE); > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofMinutes(1)); > > String catalogName = "hive_catalog"; > HiveCatalog catalog = new HiveCatalog( > catalogName, > "default", > "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > "2.3.4" > ); > tableEnv.registerCatalog(catalogName, catalog); > tableEnv.useCatalog(catalogName); > > MyDateFormat2 myDateFormat = new MyDateFormat2(); > tableEnv.registerFunction("my_date_format", myDateFormat); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time TIMESTAMP,\n" + > " update_time TIMESTAMP,\n" + > " proctime as proctime()\n" + > ") WITH (\n" + > " 'connector' = 'mysql-cdc',\n" + > " 'hostname' = 'localhost',\n" + > " 'port' = '3306',\n" + > " 'username' = 'root',\n" + > " 'password' = 'root',\n" + > " 'database-name' = 'test',\n" + > " 'table-name' = 'team'\n" + > ")"); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time TIMESTAMP,\n" + > " update_time TIMESTAMP\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'team',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'format' = 'changelog-json'\n" + > ")"); > > tableEnv.executeSql("INSERT INTO kafka.team \n" + > "SELECT team_id, team_name, create_time, update_time \n" + > "FROM cdc.team"); > > // 定义带op字段的stream > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "test1`"); > > FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( > "team", > new SimpleStringSchema(), > properties > ).setStartFromEarliest(); > > DataStream<String> ds = streamEnv.addSource(consumer); > > String[] fieldNames = {"team_id", "team_name", "create_time", > "update_time", "op"}; > TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, > Types.STRING, Types.STRING}; > DataStream<Row> ds2 = ds.map(str -> { > JSONObject jsonObject = JSON.parseObject(str); > String op = jsonObject.getString("op"); > JSONObject data = jsonObject.getJSONObject("data"); > int arity = fieldNames.length; > Row row = new Row(arity); > row.setField(0, data.get("team_id")); > row.setField(1, data.get("team_name")); > row.setField(2, data.get("create_time")); > row.setField(3, data.get("update_time")); > String operation = getOperation(op); > row.setField(4, operation); > > return row; > }, new RowTypeInfo(types, fieldNames)) > > > > > > > > > *.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > @Override public long extractTimestamp(Row row) { > String dt = (String) row.getField(2); LocalDateTime ldt = > LocalDateTime.parse(dt, dtf); Instant instant = > ldt.atZone(ZoneId.systemDefault()).toInstant(); long > timeInMillis = instant.toEpochMilli(); return timeInMillis; > } });* > > tableEnv.registerDataStream("merged_team", ds2); > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > tableEnv.executeSql("CREATE TABLE ods.team (\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time STRING,\n" + > " update_time STRING,\n" + > " op STRING\n" + > ") PARTITIONED BY (\n" + > " dt STRING,\n" + > " hr STRING,\n" + > " mi STRING\n" + > ") STORED AS PARQUET TBLPROPERTIES (\n" + > " 'sink.partition-commit.trigger' = 'partition-time',\n" + > " 'sink.partition-commit.delay' = '1 min',\n" + > " 'sink.partition-commit.policy.kind' = > 'metastore,success-file',\n" + > " 'partition.time-extractor.timestamp-pattern' = '$dt > $hr:$mi:00'\n" + > ")"); > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql("INSERT INTO ods.team \n" + > "SELECT team_id, team_name, create_time, update_time, op, > \n" + > " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'HH') as hr, \n" + > " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'mm') as mi \n" + > "FROM merged_team"); > tableEnv.execute("MysqlCDC2Hive2"); > > streamEnv.execute(""); > } > > private static String getOperation(String op) { > String operation = "INSERT"; > for (RowKind rk : RowKind.values()) { > if (rk.shortString().equals(op)) { > switch (rk) { > case UPDATE_BEFORE: > case UPDATE_AFTER: > operation = "UPDATE"; > break; > case DELETE: > operation = "DELETE"; > break; > case INSERT: > default: > operation = "INSERT"; > break; > } > break; > } > } > return operation; > } > } > > Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > >> 你检查一下 hive 文件是否正常生成了? >> >> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >> >> Best, >> Jark >> >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >> >> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: >> >>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>> >>> cdc -> kafka示例消息如下 >>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>> >>> import com.alibaba.fastjson.JSON; >>> import com.alibaba.fastjson.JSONObject; >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.streaming.api.CheckpointingMode; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>> import >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.SqlDialect; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>> import org.apache.flink.types.Row; >>> import org.apache.flink.types.RowKind; >>> >>> import java.time.Duration; >>> import java.util.Properties; >>> >>> public class MysqlCDC2Hive { >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> streamEnv.setParallelism(3); >>> streamEnv.enableCheckpointing(60000); >>> >>> EnvironmentSettings tableEnvSettings = >>> EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>> CheckpointingMode.EXACTLY_ONCE); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>> Duration.ofMinutes(1)); >>> >>> String catalogName = "hive_catalog"; >>> HiveCatalog catalog = new HiveCatalog( >>> catalogName, >>> "default", >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>> "2.3.4" >>> ); >>> tableEnv.registerCatalog(catalogName, catalog); >>> tableEnv.useCatalog(catalogName); >>> >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>> tableEnv.registerFunction("my_date_format", myDateFormat); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP,\n" + >>> " proctime as proctime()\n" + >>> ") WITH (\n" + >>> " 'connector' = 'mysql-cdc',\n" + >>> " 'hostname' = 'localhost',\n" + >>> " 'port' = '3306',\n" + >>> " 'username' = 'root',\n" + >>> " 'password' = 'root',\n" + >>> " 'database-name' = 'test',\n" + >>> " 'table-name' = 'team'\n" + >>> ")"); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP\n" + >>> ") WITH (\n" + >>> " 'connector' = 'kafka',\n" + >>> " 'topic' = 'team',\n" + >>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" >>> + >>> " 'format' = 'changelog-json'\n" + >>> ")"); >>> >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>> "SELECT team_id, team_name, create_time, update_time \n" >>> + >>> "FROM cdc.team"); >>> >>> // 定义带op字段的stream >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test"); >>> >>> FlinkKafkaConsumerBase<String> consumer = new >>> FlinkKafkaConsumer<>( >>> "team", >>> new SimpleStringSchema(), >>> properties >>> ).setStartFromEarliest(); >>> >>> DataStream<String> ds = streamEnv.addSource(consumer); >>> >>> String[] fieldNames = {"team_id", "team_name", "create_time", >>> "update_time", "op"}; >>> TypeInformation[] types = {Types.INT, Types.STRING, >>> Types.STRING, Types.STRING, Types.STRING}; >>> DataStream<Row> ds2 = ds.map(str -> { >>> JSONObject jsonObject = JSON.parseObject(str); >>> String op = jsonObject.getString("op"); >>> JSONObject data = jsonObject.getJSONObject("data"); >>> int arity = fieldNames.length; >>> Row row = new Row(arity); >>> row.setField(0, data.get("team_id")); >>> row.setField(1, data.get("team_name")); >>> row.setField(2, data.get("create_time")); >>> row.setField(3, data.get("update_time")); >>> String operation = getOperation(op); >>> row.setField(4, operation); >>> >>> return row; >>> }, new RowTypeInfo(types, fieldNames)); >>> >>> tableEnv.registerDataStream("merged_team", ds2); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>> >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time STRING,\n" + >>> " update_time STRING,\n" + >>> " op STRING\n" + >>> // ") PARTITIONED BY (\n" + >>> // " ts_date STRING,\n" + >>> // " ts_hour STRING,\n" + >>> // " ts_minute STRING\n" + >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>> " 'sink.partition-commit.trigger' = >>> 'partition-time',\n" + >>> " 'sink.partition-commit.delay' = '1 min',\n" + >>> " 'sink.partition-commit.policy.kind' = >>> 'metastore,success-file',\n" + >>> " 'partition.time-extractor.timestamp-pattern' = >>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>> ")"); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>> "SELECT team_id, team_name, create_time, update_time, op >>> \n" + >>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'mm') as ts_minute \n" + >>> "FROM merged_team"); >>> tableEnv.execute("MysqlCDC2Hive2"); >>> >>> streamEnv.execute(""); >>> } >>> >>> private static String getOperation(String op) { >>> String operation = "INSERT"; >>> for (RowKind rk : RowKind.values()) { >>> if (rk.shortString().equals(op)) { >>> switch (rk) { >>> case UPDATE_BEFORE: >>> case UPDATE_AFTER: >>> operation = "UPDATE"; >>> break; >>> case DELETE: >>> operation = "DELETE"; >>> break; >>> case INSERT: >>> default: >>> operation = "INSERT"; >>> break; >>> } >>> break; >>> } >>> } >>> return operation; >>> } >>> } >>> >>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: >>> >>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>> >>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>> >>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>> >>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>> >>>> Best, >>>> Jark >>>> >>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: >>>> >>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>> >>>>> >>>>> | | >>>>> 罗显宴 >>>>> | >>>>> | >>>>> 邮箱:[hidden email] >>>>> | >>>>> >>>>> 签名由 网易邮箱大师 定制 >>>>> >>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>> >>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>> changes >>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>> >>>>> 我的问题: >>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>> kafka,然后kafka >>>>> -> hive streaming? 谢谢! >>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>> >>>>> sql语句如下 >>>>> >>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>> >>>>> DROP TABLE IF EXISTS cdc.team >>>>> >>>>> CREATE TABLE team( >>>>> team_id BIGINT, >>>>> team_name STRING, >>>>> create_time TIMESTAMP, >>>>> update_time TIMESTAMP, >>>>> proctime as proctime() >>>>> ) WITH ( >>>>> 'connector' = 'mysql-cdc', >>>>> 'hostname' = 'localhost', >>>>> 'port' = '3306', >>>>> 'username' = 'root', >>>>> 'password' = 'root', >>>>> 'database-name' = 'test', >>>>> 'table-name' = 'team' >>>>> ) >>>>> >>>>> CREATE DATABASE IF NOT EXISTS ods >>>>> >>>>> DROP TABLE IF EXISTS ods.team >>>>> >>>>> CREATE TABLE ods.team ( >>>>> team_id BIGINT, >>>>> team_name STRING, >>>>> create_time TIMESTAMP, >>>>> update_time TIMESTAMP, >>>>> ) PARTITIONED BY ( >>>>> ts_date STRING, >>>>> ts_hour STRING, >>>>> ts_minute STRING, >>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>> 'sink.partition-commit.delay' = '1 min', >>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>> $ts_hour:$ts_minute:00' >>>>> ) >>>>> >>>>> INSERT INTO ods.team >>>>> SELECT team_id, team_name, create_time, update_time, >>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>> FROM cdc.team >>>>> >>>> |
改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
") STORED AS TEXTFILE TBLPROPERTIES (" 这是生成的hive表建表语句 hive> show create table team; OK CREATE TABLE `team`( `team_id` int, `team_name` string, `create_time` string, `update_time` string, `op` string) PARTITIONED BY ( `dt` string, `hr` string, `mi` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' LOCATION 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' TBLPROPERTIES ( 'is_generic'='false', 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.trigger'='partition-time', 'transient_lastDdlTime'='1604222266') Time taken: 0.252 seconds, Fetched: 25 row(s) 另外,下载了hive文件内容如下 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT 还是查询不到结果 hive> select * from team; OK Time taken: 0.326 seconds 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > >> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive >> shell查不到数据。 >> >> import com.alibaba.fastjson.JSON; >> import com.alibaba.fastjson.JSONObject; >> import org.apache.flink.api.common.serialization.SimpleStringSchema; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.Types; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.streaming.api.CheckpointingMode; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >> import org.apache.flink.streaming.api.windowing.time.Time; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.SqlDialect; >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> import org.apache.flink.table.catalog.hive.HiveCatalog; >> import org.apache.flink.types.Row; >> import org.apache.flink.types.RowKind; >> >> import java.time.Duration; >> import java.time.Instant; >> import java.time.LocalDateTime; >> import java.time.ZoneId; >> import java.time.format.DateTimeFormatter; >> import java.util.Properties; >> >> public class MysqlCDC2Hive { >> >> private static final DateTimeFormatter dtf = >> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> streamEnv.setParallelism(3); >> streamEnv.enableCheckpointing(60000); >> >> EnvironmentSettings tableEnvSettings = >> EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build(); >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >> >> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >> CheckpointingMode.EXACTLY_ONCE); >> >> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >> Duration.ofMinutes(1)); >> >> String catalogName = "hive_catalog"; >> HiveCatalog catalog = new HiveCatalog( >> catalogName, >> "default", >> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >> "2.3.4" >> ); >> tableEnv.registerCatalog(catalogName, catalog); >> tableEnv.useCatalog(catalogName); >> >> MyDateFormat2 myDateFormat = new MyDateFormat2(); >> tableEnv.registerFunction("my_date_format", myDateFormat); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time TIMESTAMP,\n" + >> " update_time TIMESTAMP,\n" + >> " proctime as proctime()\n" + >> ") WITH (\n" + >> " 'connector' = 'mysql-cdc',\n" + >> " 'hostname' = 'localhost',\n" + >> " 'port' = '3306',\n" + >> " 'username' = 'root',\n" + >> " 'password' = 'root',\n" + >> " 'database-name' = 'test',\n" + >> " 'table-name' = 'team'\n" + >> ")"); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time TIMESTAMP,\n" + >> " update_time TIMESTAMP\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka',\n" + >> " 'topic' = 'team',\n" + >> " 'scan.startup.mode' = 'earliest-offset',\n" + >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" + >> " 'format' = 'changelog-json'\n" + >> ")"); >> >> tableEnv.executeSql("INSERT INTO kafka.team \n" + >> "SELECT team_id, team_name, create_time, update_time \n" + >> "FROM cdc.team"); >> >> // 定义带op字段的stream >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("group.id", "test1`"); >> >> FlinkKafkaConsumerBase<String> consumer = new >> FlinkKafkaConsumer<>( >> "team", >> new SimpleStringSchema(), >> properties >> ).setStartFromEarliest(); >> >> DataStream<String> ds = streamEnv.addSource(consumer); >> >> String[] fieldNames = {"team_id", "team_name", "create_time", >> "update_time", "op"}; >> TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, >> Types.STRING, Types.STRING}; >> DataStream<Row> ds2 = ds.map(str -> { >> JSONObject jsonObject = JSON.parseObject(str); >> String op = jsonObject.getString("op"); >> JSONObject data = jsonObject.getJSONObject("data"); >> int arity = fieldNames.length; >> Row row = new Row(arity); >> row.setField(0, data.get("team_id")); >> row.setField(1, data.get("team_name")); >> row.setField(2, data.get("create_time")); >> row.setField(3, data.get("update_time")); >> String operation = getOperation(op); >> row.setField(4, operation); >> >> return row; >> }, new RowTypeInfo(types, fieldNames)) >> >> >> >> >> >> >> >> >> *.assignTimestampsAndWatermarks(new >> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { >> @Override public long extractTimestamp(Row row) { >> String dt = (String) row.getField(2); LocalDateTime ldt = >> LocalDateTime.parse(dt, dtf); Instant instant = >> ldt.atZone(ZoneId.systemDefault()).toInstant(); long >> timeInMillis = instant.toEpochMilli(); return timeInMillis; >> } });* >> >> tableEnv.registerDataStream("merged_team", ds2); >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >> >> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >> " team_id INT,\n" + >> " team_name STRING,\n" + >> " create_time STRING,\n" + >> " update_time STRING,\n" + >> " op STRING\n" + >> ") PARTITIONED BY (\n" + >> " dt STRING,\n" + >> " hr STRING,\n" + >> " mi STRING\n" + >> ") STORED AS PARQUET TBLPROPERTIES (\n" + >> " 'sink.partition-commit.trigger' = 'partition-time',\n" >> + >> " 'sink.partition-commit.delay' = '1 min',\n" + >> " 'sink.partition-commit.policy.kind' = >> 'metastore,success-file',\n" + >> " 'partition.time-extractor.timestamp-pattern' = '$dt >> $hr:$mi:00'\n" + >> ")"); >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >> tableEnv.executeSql("INSERT INTO ods.team \n" + >> "SELECT team_id, team_name, create_time, update_time, op, >> \n" + >> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + >> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'HH') as hr, \n" + >> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >> HH:mm:ss'), 'mm') as mi \n" + >> "FROM merged_team"); >> tableEnv.execute("MysqlCDC2Hive2"); >> >> streamEnv.execute(""); >> } >> >> private static String getOperation(String op) { >> String operation = "INSERT"; >> for (RowKind rk : RowKind.values()) { >> if (rk.shortString().equals(op)) { >> switch (rk) { >> case UPDATE_BEFORE: >> case UPDATE_AFTER: >> operation = "UPDATE"; >> break; >> case DELETE: >> operation = "DELETE"; >> break; >> case INSERT: >> default: >> operation = "INSERT"; >> break; >> } >> break; >> } >> } >> return operation; >> } >> } >> >> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: >> >>> 你检查一下 hive 文件是否正常生成了? >>> >>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >>> >>> Best, >>> Jark >>> >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >>> >>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: >>> >>>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>>> >>>> cdc -> kafka示例消息如下 >>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>>> >>>> import com.alibaba.fastjson.JSON; >>>> import com.alibaba.fastjson.JSONObject; >>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>> import org.apache.flink.api.common.typeinfo.Types; >>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>> import org.apache.flink.streaming.api.CheckpointingMode; >>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import >>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>>> import >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>>> import org.apache.flink.table.api.EnvironmentSettings; >>>> import org.apache.flink.table.api.SqlDialect; >>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>>> import org.apache.flink.types.Row; >>>> import org.apache.flink.types.RowKind; >>>> >>>> import java.time.Duration; >>>> import java.util.Properties; >>>> >>>> public class MysqlCDC2Hive { >>>> public static void main(String[] args) throws Exception { >>>> StreamExecutionEnvironment streamEnv = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> streamEnv.setParallelism(3); >>>> streamEnv.enableCheckpointing(60000); >>>> >>>> EnvironmentSettings tableEnvSettings = >>>> EnvironmentSettings.newInstance() >>>> .useBlinkPlanner() >>>> .inStreamingMode() >>>> .build(); >>>> StreamTableEnvironment tableEnv = >>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>>> >>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>>> CheckpointingMode.EXACTLY_ONCE); >>>> >>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>>> Duration.ofMinutes(1)); >>>> >>>> String catalogName = "hive_catalog"; >>>> HiveCatalog catalog = new HiveCatalog( >>>> catalogName, >>>> "default", >>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>>> "2.3.4" >>>> ); >>>> tableEnv.registerCatalog(catalogName, catalog); >>>> tableEnv.useCatalog(catalogName); >>>> >>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>>> tableEnv.registerFunction("my_date_format", myDateFormat); >>>> >>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>>> " team_id INT,\n" + >>>> " team_name STRING,\n" + >>>> " create_time TIMESTAMP,\n" + >>>> " update_time TIMESTAMP,\n" + >>>> " proctime as proctime()\n" + >>>> ") WITH (\n" + >>>> " 'connector' = 'mysql-cdc',\n" + >>>> " 'hostname' = 'localhost',\n" + >>>> " 'port' = '3306',\n" + >>>> " 'username' = 'root',\n" + >>>> " 'password' = 'root',\n" + >>>> " 'database-name' = 'test',\n" + >>>> " 'table-name' = 'team'\n" + >>>> ")"); >>>> >>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>>> " team_id INT,\n" + >>>> " team_name STRING,\n" + >>>> " create_time TIMESTAMP,\n" + >>>> " update_time TIMESTAMP\n" + >>>> ") WITH (\n" + >>>> " 'connector' = 'kafka',\n" + >>>> " 'topic' = 'team',\n" + >>>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>>> " 'properties.bootstrap.servers' = >>>> 'localhost:9092',\n" + >>>> " 'format' = 'changelog-json'\n" + >>>> ")"); >>>> >>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>>> "SELECT team_id, team_name, create_time, update_time >>>> \n" + >>>> "FROM cdc.team"); >>>> >>>> // 定义带op字段的stream >>>> Properties properties = new Properties(); >>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>> properties.setProperty("group.id", "test"); >>>> >>>> FlinkKafkaConsumerBase<String> consumer = new >>>> FlinkKafkaConsumer<>( >>>> "team", >>>> new SimpleStringSchema(), >>>> properties >>>> ).setStartFromEarliest(); >>>> >>>> DataStream<String> ds = streamEnv.addSource(consumer); >>>> >>>> String[] fieldNames = {"team_id", "team_name", "create_time", >>>> "update_time", "op"}; >>>> TypeInformation[] types = {Types.INT, Types.STRING, >>>> Types.STRING, Types.STRING, Types.STRING}; >>>> DataStream<Row> ds2 = ds.map(str -> { >>>> JSONObject jsonObject = JSON.parseObject(str); >>>> String op = jsonObject.getString("op"); >>>> JSONObject data = jsonObject.getJSONObject("data"); >>>> int arity = fieldNames.length; >>>> Row row = new Row(arity); >>>> row.setField(0, data.get("team_id")); >>>> row.setField(1, data.get("team_name")); >>>> row.setField(2, data.get("create_time")); >>>> row.setField(3, data.get("update_time")); >>>> String operation = getOperation(op); >>>> row.setField(4, operation); >>>> >>>> return row; >>>> }, new RowTypeInfo(types, fieldNames)); >>>> >>>> tableEnv.registerDataStream("merged_team", ds2); >>>> >>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>>> >>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>>> >>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>>> " team_id INT,\n" + >>>> " team_name STRING,\n" + >>>> " create_time STRING,\n" + >>>> " update_time STRING,\n" + >>>> " op STRING\n" + >>>> // ") PARTITIONED BY (\n" + >>>> // " ts_date STRING,\n" + >>>> // " ts_hour STRING,\n" + >>>> // " ts_minute STRING\n" + >>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>>> " 'sink.partition-commit.trigger' = >>>> 'partition-time',\n" + >>>> " 'sink.partition-commit.delay' = '1 min',\n" + >>>> " 'sink.partition-commit.policy.kind' = >>>> 'metastore,success-file',\n" + >>>> " 'partition.time-extractor.timestamp-pattern' = >>>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>>> ")"); >>>> >>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>>> "SELECT team_id, team_name, create_time, update_time, >>>> op \n" + >>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>> HH:mm:ss'), 'mm') as ts_minute \n" + >>>> "FROM merged_team"); >>>> tableEnv.execute("MysqlCDC2Hive2"); >>>> >>>> streamEnv.execute(""); >>>> } >>>> >>>> private static String getOperation(String op) { >>>> String operation = "INSERT"; >>>> for (RowKind rk : RowKind.values()) { >>>> if (rk.shortString().equals(op)) { >>>> switch (rk) { >>>> case UPDATE_BEFORE: >>>> case UPDATE_AFTER: >>>> operation = "UPDATE"; >>>> break; >>>> case DELETE: >>>> operation = "DELETE"; >>>> break; >>>> case INSERT: >>>> default: >>>> operation = "INSERT"; >>>> break; >>>> } >>>> break; >>>> } >>>> } >>>> return operation; >>>> } >>>> } >>>> >>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: >>>> >>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>>> >>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>>> >>>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>>> >>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: >>>>> >>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>>> >>>>>> >>>>>> | | >>>>>> 罗显宴 >>>>>> | >>>>>> | >>>>>> 邮箱:[hidden email] >>>>>> | >>>>>> >>>>>> 签名由 网易邮箱大师 定制 >>>>>> >>>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>>> >>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>>> changes >>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>>> >>>>>> 我的问题: >>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>>> kafka,然后kafka >>>>>> -> hive streaming? 谢谢! >>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>>> >>>>>> sql语句如下 >>>>>> >>>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>>> >>>>>> DROP TABLE IF EXISTS cdc.team >>>>>> >>>>>> CREATE TABLE team( >>>>>> team_id BIGINT, >>>>>> team_name STRING, >>>>>> create_time TIMESTAMP, >>>>>> update_time TIMESTAMP, >>>>>> proctime as proctime() >>>>>> ) WITH ( >>>>>> 'connector' = 'mysql-cdc', >>>>>> 'hostname' = 'localhost', >>>>>> 'port' = '3306', >>>>>> 'username' = 'root', >>>>>> 'password' = 'root', >>>>>> 'database-name' = 'test', >>>>>> 'table-name' = 'team' >>>>>> ) >>>>>> >>>>>> CREATE DATABASE IF NOT EXISTS ods >>>>>> >>>>>> DROP TABLE IF EXISTS ods.team >>>>>> >>>>>> CREATE TABLE ods.team ( >>>>>> team_id BIGINT, >>>>>> team_name STRING, >>>>>> create_time TIMESTAMP, >>>>>> update_time TIMESTAMP, >>>>>> ) PARTITIONED BY ( >>>>>> ts_date STRING, >>>>>> ts_hour STRING, >>>>>> ts_minute STRING, >>>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>>> 'sink.partition-commit.delay' = '1 min', >>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>>> $ts_hour:$ts_minute:00' >>>>>> ) >>>>>> >>>>>> INSERT INTO ods.team >>>>>> SELECT team_id, team_name, create_time, update_time, >>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>>> FROM cdc.team >>>>>> >>>>> |
最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对? 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > ") STORED AS TEXTFILE TBLPROPERTIES (" > > 这是生成的hive表建表语句 > > hive> show create table team; > OK > CREATE TABLE `team`( > `team_id` int, > `team_name` string, > `create_time` string, > `update_time` string, > `op` string) > PARTITIONED BY ( > `dt` string, > `hr` string, > `mi` string) > ROW FORMAT SERDE > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > STORED AS INPUTFORMAT > 'org.apache.hadoop.mapred.TextInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > LOCATION > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > TBLPROPERTIES ( > 'is_generic'='false', > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > 'sink.partition-commit.delay'='1 min', > 'sink.partition-commit.policy.kind'='metastore,success-file', > 'sink.partition-commit.trigger'='partition-time', > 'transient_lastDdlTime'='1604222266') > Time taken: 0.252 seconds, Fetched: 25 row(s) > > 另外,下载了hive文件内容如下 > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT > > 还是查询不到结果 > hive> select * from team; > OK > Time taken: 0.326 seconds > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > >> >> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 >> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 >> >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: >> >>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive >>> shell查不到数据。 >>> >>> import com.alibaba.fastjson.JSON; >>> import com.alibaba.fastjson.JSONObject; >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.streaming.api.CheckpointingMode; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>> import >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.SqlDialect; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>> import org.apache.flink.types.Row; >>> import org.apache.flink.types.RowKind; >>> >>> import java.time.Duration; >>> import java.time.Instant; >>> import java.time.LocalDateTime; >>> import java.time.ZoneId; >>> import java.time.format.DateTimeFormatter; >>> import java.util.Properties; >>> >>> public class MysqlCDC2Hive { >>> >>> private static final DateTimeFormatter dtf = >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); >>> >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> streamEnv.setParallelism(3); >>> streamEnv.enableCheckpointing(60000); >>> >>> EnvironmentSettings tableEnvSettings = >>> EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>> CheckpointingMode.EXACTLY_ONCE); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>> Duration.ofMinutes(1)); >>> >>> String catalogName = "hive_catalog"; >>> HiveCatalog catalog = new HiveCatalog( >>> catalogName, >>> "default", >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>> "2.3.4" >>> ); >>> tableEnv.registerCatalog(catalogName, catalog); >>> tableEnv.useCatalog(catalogName); >>> >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>> tableEnv.registerFunction("my_date_format", myDateFormat); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP,\n" + >>> " proctime as proctime()\n" + >>> ") WITH (\n" + >>> " 'connector' = 'mysql-cdc',\n" + >>> " 'hostname' = 'localhost',\n" + >>> " 'port' = '3306',\n" + >>> " 'username' = 'root',\n" + >>> " 'password' = 'root',\n" + >>> " 'database-name' = 'test',\n" + >>> " 'table-name' = 'team'\n" + >>> ")"); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP\n" + >>> ") WITH (\n" + >>> " 'connector' = 'kafka',\n" + >>> " 'topic' = 'team',\n" + >>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" >>> + >>> " 'format' = 'changelog-json'\n" + >>> ")"); >>> >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>> "SELECT team_id, team_name, create_time, update_time \n" >>> + >>> "FROM cdc.team"); >>> >>> // 定义带op字段的stream >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test1`"); >>> >>> FlinkKafkaConsumerBase<String> consumer = new >>> FlinkKafkaConsumer<>( >>> "team", >>> new SimpleStringSchema(), >>> properties >>> ).setStartFromEarliest(); >>> >>> DataStream<String> ds = streamEnv.addSource(consumer); >>> >>> String[] fieldNames = {"team_id", "team_name", "create_time", >>> "update_time", "op"}; >>> TypeInformation[] types = {Types.INT, Types.STRING, >>> Types.STRING, Types.STRING, Types.STRING}; >>> DataStream<Row> ds2 = ds.map(str -> { >>> JSONObject jsonObject = JSON.parseObject(str); >>> String op = jsonObject.getString("op"); >>> JSONObject data = jsonObject.getJSONObject("data"); >>> int arity = fieldNames.length; >>> Row row = new Row(arity); >>> row.setField(0, data.get("team_id")); >>> row.setField(1, data.get("team_name")); >>> row.setField(2, data.get("create_time")); >>> row.setField(3, data.get("update_time")); >>> String operation = getOperation(op); >>> row.setField(4, operation); >>> >>> return row; >>> }, new RowTypeInfo(types, fieldNames)) >>> >>> >>> >>> >>> >>> >>> >>> >>> *.assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { >>> @Override public long extractTimestamp(Row row) { >>> String dt = (String) row.getField(2); LocalDateTime ldt = >>> LocalDateTime.parse(dt, dtf); Instant instant = >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long >>> timeInMillis = instant.toEpochMilli(); return timeInMillis; >>> } });* >>> >>> tableEnv.registerDataStream("merged_team", ds2); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>> >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time STRING,\n" + >>> " update_time STRING,\n" + >>> " op STRING\n" + >>> ") PARTITIONED BY (\n" + >>> " dt STRING,\n" + >>> " hr STRING,\n" + >>> " mi STRING\n" + >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>> " 'sink.partition-commit.trigger' = >>> 'partition-time',\n" + >>> " 'sink.partition-commit.delay' = '1 min',\n" + >>> " 'sink.partition-commit.policy.kind' = >>> 'metastore,success-file',\n" + >>> " 'partition.time-extractor.timestamp-pattern' = '$dt >>> $hr:$mi:00'\n" + >>> ")"); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>> "SELECT team_id, team_name, create_time, update_time, >>> op, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'HH') as hr, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'mm') as mi \n" + >>> "FROM merged_team"); >>> tableEnv.execute("MysqlCDC2Hive2"); >>> >>> streamEnv.execute(""); >>> } >>> >>> private static String getOperation(String op) { >>> String operation = "INSERT"; >>> for (RowKind rk : RowKind.values()) { >>> if (rk.shortString().equals(op)) { >>> switch (rk) { >>> case UPDATE_BEFORE: >>> case UPDATE_AFTER: >>> operation = "UPDATE"; >>> break; >>> case DELETE: >>> operation = "DELETE"; >>> break; >>> case INSERT: >>> default: >>> operation = "INSERT"; >>> break; >>> } >>> break; >>> } >>> } >>> return operation; >>> } >>> } >>> >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: >>> >>>> 你检查一下 hive 文件是否正常生成了? >>>> >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >>>> >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: >>>> >>>>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>>>> >>>>> cdc -> kafka示例消息如下 >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>>>> >>>>> import com.alibaba.fastjson.JSON; >>>>> import com.alibaba.fastjson.JSONObject; >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>>> import org.apache.flink.api.common.typeinfo.Types; >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>>> import org.apache.flink.streaming.api.CheckpointingMode; >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>> import >>>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>>>> import >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>> import org.apache.flink.table.api.SqlDialect; >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>>>> import org.apache.flink.types.Row; >>>>> import org.apache.flink.types.RowKind; >>>>> >>>>> import java.time.Duration; >>>>> import java.util.Properties; >>>>> >>>>> public class MysqlCDC2Hive { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment streamEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> streamEnv.setParallelism(3); >>>>> streamEnv.enableCheckpointing(60000); >>>>> >>>>> EnvironmentSettings tableEnvSettings = >>>>> EnvironmentSettings.newInstance() >>>>> .useBlinkPlanner() >>>>> .inStreamingMode() >>>>> .build(); >>>>> StreamTableEnvironment tableEnv = >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>>>> >>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>>>> CheckpointingMode.EXACTLY_ONCE); >>>>> >>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>>>> Duration.ofMinutes(1)); >>>>> >>>>> String catalogName = "hive_catalog"; >>>>> HiveCatalog catalog = new HiveCatalog( >>>>> catalogName, >>>>> "default", >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>>>> "2.3.4" >>>>> ); >>>>> tableEnv.registerCatalog(catalogName, catalog); >>>>> tableEnv.useCatalog(catalogName); >>>>> >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time TIMESTAMP,\n" + >>>>> " update_time TIMESTAMP,\n" + >>>>> " proctime as proctime()\n" + >>>>> ") WITH (\n" + >>>>> " 'connector' = 'mysql-cdc',\n" + >>>>> " 'hostname' = 'localhost',\n" + >>>>> " 'port' = '3306',\n" + >>>>> " 'username' = 'root',\n" + >>>>> " 'password' = 'root',\n" + >>>>> " 'database-name' = 'test',\n" + >>>>> " 'table-name' = 'team'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time TIMESTAMP,\n" + >>>>> " update_time TIMESTAMP\n" + >>>>> ") WITH (\n" + >>>>> " 'connector' = 'kafka',\n" + >>>>> " 'topic' = 'team',\n" + >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>>>> " 'properties.bootstrap.servers' = >>>>> 'localhost:9092',\n" + >>>>> " 'format' = 'changelog-json'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>>>> "SELECT team_id, team_name, create_time, update_time >>>>> \n" + >>>>> "FROM cdc.team"); >>>>> >>>>> // 定义带op字段的stream >>>>> Properties properties = new Properties(); >>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>> properties.setProperty("group.id", "test"); >>>>> >>>>> FlinkKafkaConsumerBase<String> consumer = new >>>>> FlinkKafkaConsumer<>( >>>>> "team", >>>>> new SimpleStringSchema(), >>>>> properties >>>>> ).setStartFromEarliest(); >>>>> >>>>> DataStream<String> ds = streamEnv.addSource(consumer); >>>>> >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", >>>>> "update_time", "op"}; >>>>> TypeInformation[] types = {Types.INT, Types.STRING, >>>>> Types.STRING, Types.STRING, Types.STRING}; >>>>> DataStream<Row> ds2 = ds.map(str -> { >>>>> JSONObject jsonObject = JSON.parseObject(str); >>>>> String op = jsonObject.getString("op"); >>>>> JSONObject data = jsonObject.getJSONObject("data"); >>>>> int arity = fieldNames.length; >>>>> Row row = new Row(arity); >>>>> row.setField(0, data.get("team_id")); >>>>> row.setField(1, data.get("team_name")); >>>>> row.setField(2, data.get("create_time")); >>>>> row.setField(3, data.get("update_time")); >>>>> String operation = getOperation(op); >>>>> row.setField(4, operation); >>>>> >>>>> return row; >>>>> }, new RowTypeInfo(types, fieldNames)); >>>>> >>>>> tableEnv.registerDataStream("merged_team", ds2); >>>>> >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>>>> >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time STRING,\n" + >>>>> " update_time STRING,\n" + >>>>> " op STRING\n" + >>>>> // ") PARTITIONED BY (\n" + >>>>> // " ts_date STRING,\n" + >>>>> // " ts_hour STRING,\n" + >>>>> // " ts_minute STRING\n" + >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>>>> " 'sink.partition-commit.trigger' = >>>>> 'partition-time',\n" + >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + >>>>> " 'sink.partition-commit.policy.kind' = >>>>> 'metastore,success-file',\n" + >>>>> " 'partition.time-extractor.timestamp-pattern' = >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>>>> "SELECT team_id, team_name, create_time, update_time, >>>>> op \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + >>>>> "FROM merged_team"); >>>>> tableEnv.execute("MysqlCDC2Hive2"); >>>>> >>>>> streamEnv.execute(""); >>>>> } >>>>> >>>>> private static String getOperation(String op) { >>>>> String operation = "INSERT"; >>>>> for (RowKind rk : RowKind.values()) { >>>>> if (rk.shortString().equals(op)) { >>>>> switch (rk) { >>>>> case UPDATE_BEFORE: >>>>> case UPDATE_AFTER: >>>>> operation = "UPDATE"; >>>>> break; >>>>> case DELETE: >>>>> operation = "DELETE"; >>>>> break; >>>>> case INSERT: >>>>> default: >>>>> operation = "INSERT"; >>>>> break; >>>>> } >>>>> break; >>>>> } >>>>> } >>>>> return operation; >>>>> } >>>>> } >>>>> >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: >>>>> >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>>>> >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>>>> >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>>>> >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: >>>>>> >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>>>> >>>>>>> >>>>>>> | | >>>>>>> 罗显宴 >>>>>>> | >>>>>>> | >>>>>>> 邮箱:[hidden email] >>>>>>> | >>>>>>> >>>>>>> 签名由 网易邮箱大师 定制 >>>>>>> >>>>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>>>> >>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>>>> changes >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>>>> >>>>>>> 我的问题: >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>>>> kafka,然后kafka >>>>>>> -> hive streaming? 谢谢! >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>>>> >>>>>>> sql语句如下 >>>>>>> >>>>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>>>> >>>>>>> DROP TABLE IF EXISTS cdc.team >>>>>>> >>>>>>> CREATE TABLE team( >>>>>>> team_id BIGINT, >>>>>>> team_name STRING, >>>>>>> create_time TIMESTAMP, >>>>>>> update_time TIMESTAMP, >>>>>>> proctime as proctime() >>>>>>> ) WITH ( >>>>>>> 'connector' = 'mysql-cdc', >>>>>>> 'hostname' = 'localhost', >>>>>>> 'port' = '3306', >>>>>>> 'username' = 'root', >>>>>>> 'password' = 'root', >>>>>>> 'database-name' = 'test', >>>>>>> 'table-name' = 'team' >>>>>>> ) >>>>>>> >>>>>>> CREATE DATABASE IF NOT EXISTS ods >>>>>>> >>>>>>> DROP TABLE IF EXISTS ods.team >>>>>>> >>>>>>> CREATE TABLE ods.team ( >>>>>>> team_id BIGINT, >>>>>>> team_name STRING, >>>>>>> create_time TIMESTAMP, >>>>>>> update_time TIMESTAMP, >>>>>>> ) PARTITIONED BY ( >>>>>>> ts_date STRING, >>>>>>> ts_hour STRING, >>>>>>> ts_minute STRING, >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>>>> 'sink.partition-commit.delay' = '1 min', >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>>>> $ts_hour:$ts_minute:00' >>>>>>> ) >>>>>>> >>>>>>> INSERT INTO ods.team >>>>>>> SELECT team_id, team_name, create_time, update_time, >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>>>> FROM cdc.team >>>>>>> >>>>>> |
您好,
您设置的分区提交策略是既写入hive的metastore,又会在分区目录中生成_SUCCESS文件 'sink.partition-commit.policy.kind' = 'metastore,success-file', 可以看看分区目录中的_SUCCESS文件有没有生成,没有的话hive那边也是由于分区的元数据没有提交导致的。 分区元数据提交延迟好像是跟Checkpoint的周期以及 'sink.partition-commit.delay' 配置有关,可以尝试等待两者相加的时间在看看hive是否可以查询。 祝好, ________________________________ 发件人: 陈帅 <[hidden email]> 发送时间: 2020年11月1日 下午 05:36 收件人: Jark Wu <[hidden email]> 抄送: user-zh <[hidden email]> 主题: Re: flink mysql cdc + hive streaming疑问 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive streaming不能自动注册hive分区吗?还是我使用的姿势不对? 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > ") STORED AS TEXTFILE TBLPROPERTIES (" > > 这是生成的hive表建表语句 > > hive> show create table team; > OK > CREATE TABLE `team`( > `team_id` int, > `team_name` string, > `create_time` string, > `update_time` string, > `op` string) > PARTITIONED BY ( > `dt` string, > `hr` string, > `mi` string) > ROW FORMAT SERDE > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > STORED AS INPUTFORMAT > 'org.apache.hadoop.mapred.TextInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > LOCATION > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > TBLPROPERTIES ( > 'is_generic'='false', > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > 'sink.partition-commit.delay'='1 min', > 'sink.partition-commit.policy.kind'='metastore,success-file', > 'sink.partition-commit.trigger'='partition-time', > 'transient_lastDdlTime'='1604222266') > Time taken: 0.252 seconds, Fetched: 25 row(s) > > 另外,下载了hive文件内容如下 > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT > > 还是查询不到结果 > hive> select * from team; > OK > Time taken: 0.326 seconds > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > >> >> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 >> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 >> >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: >> >>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive >>> shell查不到数据。 >>> >>> import com.alibaba.fastjson.JSON; >>> import com.alibaba.fastjson.JSONObject; >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.streaming.api.CheckpointingMode; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>> import >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.SqlDialect; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>> import org.apache.flink.types.Row; >>> import org.apache.flink.types.RowKind; >>> >>> import java.time.Duration; >>> import java.time.Instant; >>> import java.time.LocalDateTime; >>> import java.time.ZoneId; >>> import java.time.format.DateTimeFormatter; >>> import java.util.Properties; >>> >>> public class MysqlCDC2Hive { >>> >>> private static final DateTimeFormatter dtf = >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); >>> >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> streamEnv.setParallelism(3); >>> streamEnv.enableCheckpointing(60000); >>> >>> EnvironmentSettings tableEnvSettings = >>> EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>> CheckpointingMode.EXACTLY_ONCE); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>> Duration.ofMinutes(1)); >>> >>> String catalogName = "hive_catalog"; >>> HiveCatalog catalog = new HiveCatalog( >>> catalogName, >>> "default", >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>> "2.3.4" >>> ); >>> tableEnv.registerCatalog(catalogName, catalog); >>> tableEnv.useCatalog(catalogName); >>> >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>> tableEnv.registerFunction("my_date_format", myDateFormat); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP,\n" + >>> " proctime as proctime()\n" + >>> ") WITH (\n" + >>> " 'connector' = 'mysql-cdc',\n" + >>> " 'hostname' = 'localhost',\n" + >>> " 'port' = '3306',\n" + >>> " 'username' = 'root',\n" + >>> " 'password' = 'root',\n" + >>> " 'database-name' = 'test',\n" + >>> " 'table-name' = 'team'\n" + >>> ")"); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP\n" + >>> ") WITH (\n" + >>> " 'connector' = 'kafka',\n" + >>> " 'topic' = 'team',\n" + >>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" >>> + >>> " 'format' = 'changelog-json'\n" + >>> ")"); >>> >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>> "SELECT team_id, team_name, create_time, update_time \n" >>> + >>> "FROM cdc.team"); >>> >>> // 定义带op字段的stream >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test1`"); >>> >>> FlinkKafkaConsumerBase<String> consumer = new >>> FlinkKafkaConsumer<>( >>> "team", >>> new SimpleStringSchema(), >>> properties >>> ).setStartFromEarliest(); >>> >>> DataStream<String> ds = streamEnv.addSource(consumer); >>> >>> String[] fieldNames = {"team_id", "team_name", "create_time", >>> "update_time", "op"}; >>> TypeInformation[] types = {Types.INT, Types.STRING, >>> Types.STRING, Types.STRING, Types.STRING}; >>> DataStream<Row> ds2 = ds.map(str -> { >>> JSONObject jsonObject = JSON.parseObject(str); >>> String op = jsonObject.getString("op"); >>> JSONObject data = jsonObject.getJSONObject("data"); >>> int arity = fieldNames.length; >>> Row row = new Row(arity); >>> row.setField(0, data.get("team_id")); >>> row.setField(1, data.get("team_name")); >>> row.setField(2, data.get("create_time")); >>> row.setField(3, data.get("update_time")); >>> String operation = getOperation(op); >>> row.setField(4, operation); >>> >>> return row; >>> }, new RowTypeInfo(types, fieldNames)) >>> >>> >>> >>> >>> >>> >>> >>> >>> *.assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { >>> @Override public long extractTimestamp(Row row) { >>> String dt = (String) row.getField(2); LocalDateTime ldt = >>> LocalDateTime.parse(dt, dtf); Instant instant = >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long >>> timeInMillis = instant.toEpochMilli(); return timeInMillis; >>> } });* >>> >>> tableEnv.registerDataStream("merged_team", ds2); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>> >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time STRING,\n" + >>> " update_time STRING,\n" + >>> " op STRING\n" + >>> ") PARTITIONED BY (\n" + >>> " dt STRING,\n" + >>> " hr STRING,\n" + >>> " mi STRING\n" + >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>> " 'sink.partition-commit.trigger' = >>> 'partition-time',\n" + >>> " 'sink.partition-commit.delay' = '1 min',\n" + >>> " 'sink.partition-commit.policy.kind' = >>> 'metastore,success-file',\n" + >>> " 'partition.time-extractor.timestamp-pattern' = '$dt >>> $hr:$mi:00'\n" + >>> ")"); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>> "SELECT team_id, team_name, create_time, update_time, >>> op, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'HH') as hr, \n" + >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'mm') as mi \n" + >>> "FROM merged_team"); >>> tableEnv.execute("MysqlCDC2Hive2"); >>> >>> streamEnv.execute(""); >>> } >>> >>> private static String getOperation(String op) { >>> String operation = "INSERT"; >>> for (RowKind rk : RowKind.values()) { >>> if (rk.shortString().equals(op)) { >>> switch (rk) { >>> case UPDATE_BEFORE: >>> case UPDATE_AFTER: >>> operation = "UPDATE"; >>> break; >>> case DELETE: >>> operation = "DELETE"; >>> break; >>> case INSERT: >>> default: >>> operation = "INSERT"; >>> break; >>> } >>> break; >>> } >>> } >>> return operation; >>> } >>> } >>> >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: >>> >>>> 你检查一下 hive 文件是否正常生成了? >>>> >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >>>> >>>> Best, >>>> Jark >>>> >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >>>> >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: >>>> >>>>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>>>> >>>>> cdc -> kafka示例消息如下 >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>>>> >>>>> import com.alibaba.fastjson.JSON; >>>>> import com.alibaba.fastjson.JSONObject; >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>>> import org.apache.flink.api.common.typeinfo.Types; >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>>> import org.apache.flink.streaming.api.CheckpointingMode; >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>> import >>>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>>>> import >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>>>> import org.apache.flink.table.api.EnvironmentSettings; >>>>> import org.apache.flink.table.api.SqlDialect; >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>>>> import org.apache.flink.types.Row; >>>>> import org.apache.flink.types.RowKind; >>>>> >>>>> import java.time.Duration; >>>>> import java.util.Properties; >>>>> >>>>> public class MysqlCDC2Hive { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment streamEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>>> streamEnv.setParallelism(3); >>>>> streamEnv.enableCheckpointing(60000); >>>>> >>>>> EnvironmentSettings tableEnvSettings = >>>>> EnvironmentSettings.newInstance() >>>>> .useBlinkPlanner() >>>>> .inStreamingMode() >>>>> .build(); >>>>> StreamTableEnvironment tableEnv = >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>>>> >>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>>>> CheckpointingMode.EXACTLY_ONCE); >>>>> >>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>>>> Duration.ofMinutes(1)); >>>>> >>>>> String catalogName = "hive_catalog"; >>>>> HiveCatalog catalog = new HiveCatalog( >>>>> catalogName, >>>>> "default", >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>>>> "2.3.4" >>>>> ); >>>>> tableEnv.registerCatalog(catalogName, catalog); >>>>> tableEnv.useCatalog(catalogName); >>>>> >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time TIMESTAMP,\n" + >>>>> " update_time TIMESTAMP,\n" + >>>>> " proctime as proctime()\n" + >>>>> ") WITH (\n" + >>>>> " 'connector' = 'mysql-cdc',\n" + >>>>> " 'hostname' = 'localhost',\n" + >>>>> " 'port' = '3306',\n" + >>>>> " 'username' = 'root',\n" + >>>>> " 'password' = 'root',\n" + >>>>> " 'database-name' = 'test',\n" + >>>>> " 'table-name' = 'team'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time TIMESTAMP,\n" + >>>>> " update_time TIMESTAMP\n" + >>>>> ") WITH (\n" + >>>>> " 'connector' = 'kafka',\n" + >>>>> " 'topic' = 'team',\n" + >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>>>> " 'properties.bootstrap.servers' = >>>>> 'localhost:9092',\n" + >>>>> " 'format' = 'changelog-json'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>>>> "SELECT team_id, team_name, create_time, update_time >>>>> \n" + >>>>> "FROM cdc.team"); >>>>> >>>>> // 定义带op字段的stream >>>>> Properties properties = new Properties(); >>>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>>> properties.setProperty("group.id", "test"); >>>>> >>>>> FlinkKafkaConsumerBase<String> consumer = new >>>>> FlinkKafkaConsumer<>( >>>>> "team", >>>>> new SimpleStringSchema(), >>>>> properties >>>>> ).setStartFromEarliest(); >>>>> >>>>> DataStream<String> ds = streamEnv.addSource(consumer); >>>>> >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", >>>>> "update_time", "op"}; >>>>> TypeInformation[] types = {Types.INT, Types.STRING, >>>>> Types.STRING, Types.STRING, Types.STRING}; >>>>> DataStream<Row> ds2 = ds.map(str -> { >>>>> JSONObject jsonObject = JSON.parseObject(str); >>>>> String op = jsonObject.getString("op"); >>>>> JSONObject data = jsonObject.getJSONObject("data"); >>>>> int arity = fieldNames.length; >>>>> Row row = new Row(arity); >>>>> row.setField(0, data.get("team_id")); >>>>> row.setField(1, data.get("team_name")); >>>>> row.setField(2, data.get("create_time")); >>>>> row.setField(3, data.get("update_time")); >>>>> String operation = getOperation(op); >>>>> row.setField(4, operation); >>>>> >>>>> return row; >>>>> }, new RowTypeInfo(types, fieldNames)); >>>>> >>>>> tableEnv.registerDataStream("merged_team", ds2); >>>>> >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>>>> >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>>>> >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>>>> " team_id INT,\n" + >>>>> " team_name STRING,\n" + >>>>> " create_time STRING,\n" + >>>>> " update_time STRING,\n" + >>>>> " op STRING\n" + >>>>> // ") PARTITIONED BY (\n" + >>>>> // " ts_date STRING,\n" + >>>>> // " ts_hour STRING,\n" + >>>>> // " ts_minute STRING\n" + >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>>>> " 'sink.partition-commit.trigger' = >>>>> 'partition-time',\n" + >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + >>>>> " 'sink.partition-commit.policy.kind' = >>>>> 'metastore,success-file',\n" + >>>>> " 'partition.time-extractor.timestamp-pattern' = >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>>>> ")"); >>>>> >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>>>> "SELECT team_id, team_name, create_time, update_time, >>>>> op \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + >>>>> "FROM merged_team"); >>>>> tableEnv.execute("MysqlCDC2Hive2"); >>>>> >>>>> streamEnv.execute(""); >>>>> } >>>>> >>>>> private static String getOperation(String op) { >>>>> String operation = "INSERT"; >>>>> for (RowKind rk : RowKind.values()) { >>>>> if (rk.shortString().equals(op)) { >>>>> switch (rk) { >>>>> case UPDATE_BEFORE: >>>>> case UPDATE_AFTER: >>>>> operation = "UPDATE"; >>>>> break; >>>>> case DELETE: >>>>> operation = "DELETE"; >>>>> break; >>>>> case INSERT: >>>>> default: >>>>> operation = "INSERT"; >>>>> break; >>>>> } >>>>> break; >>>>> } >>>>> } >>>>> return operation; >>>>> } >>>>> } >>>>> >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: >>>>> >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>>>> >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>>>> >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>>>> >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: >>>>>> >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>>>> >>>>>>> >>>>>>> | | >>>>>>> 罗显宴 >>>>>>> | >>>>>>> | >>>>>>> 邮箱:[hidden email] >>>>>>> | >>>>>>> >>>>>>> 签名由 网易邮箱大师 定制 >>>>>>> >>>>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>>>> >>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>>>> changes >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>>>> >>>>>>> 我的问题: >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>>>> kafka,然后kafka >>>>>>> -> hive streaming? 谢谢! >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>>>> >>>>>>> sql语句如下 >>>>>>> >>>>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>>>> >>>>>>> DROP TABLE IF EXISTS cdc.team >>>>>>> >>>>>>> CREATE TABLE team( >>>>>>> team_id BIGINT, >>>>>>> team_name STRING, >>>>>>> create_time TIMESTAMP, >>>>>>> update_time TIMESTAMP, >>>>>>> proctime as proctime() >>>>>>> ) WITH ( >>>>>>> 'connector' = 'mysql-cdc', >>>>>>> 'hostname' = 'localhost', >>>>>>> 'port' = '3306', >>>>>>> 'username' = 'root', >>>>>>> 'password' = 'root', >>>>>>> 'database-name' = 'test', >>>>>>> 'table-name' = 'team' >>>>>>> ) >>>>>>> >>>>>>> CREATE DATABASE IF NOT EXISTS ods >>>>>>> >>>>>>> DROP TABLE IF EXISTS ods.team >>>>>>> >>>>>>> CREATE TABLE ods.team ( >>>>>>> team_id BIGINT, >>>>>>> team_name STRING, >>>>>>> create_time TIMESTAMP, >>>>>>> update_time TIMESTAMP, >>>>>>> ) PARTITIONED BY ( >>>>>>> ts_date STRING, >>>>>>> ts_hour STRING, >>>>>>> ts_minute STRING, >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>>>> 'sink.partition-commit.delay' = '1 min', >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>>>> $ts_hour:$ts_minute:00' >>>>>>> ) >>>>>>> >>>>>>> INSERT INTO ods.team >>>>>>> SELECT team_id, team_name, create_time, update_time, >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>>>> FROM cdc.team >>>>>>> >>>>>> |
In reply to this post by 陈帅
Hi,
正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); LOG.info("Committed partition {} to metastore", partitionSpec); LOG.info("Committed partition {} with success file", context.partitionSpec()); On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <[hidden email]> wrote: > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > 这是生成的hive表建表语句 > > > > hive> show create table team; > > OK > > CREATE TABLE `team`( > > `team_id` int, > > `team_name` string, > > `create_time` string, > > `update_time` string, > > `op` string) > > PARTITIONED BY ( > > `dt` string, > > `hr` string, > > `mi` string) > > ROW FORMAT SERDE > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > STORED AS INPUTFORMAT > > 'org.apache.hadoop.mapred.TextInputFormat' > > OUTPUTFORMAT > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > LOCATION > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > TBLPROPERTIES ( > > 'is_generic'='false', > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > 'sink.partition-commit.delay'='1 min', > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > 'sink.partition-commit.trigger'='partition-time', > > 'transient_lastDdlTime'='1604222266') > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > 另外,下载了hive文件内容如下 > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > 11:25:38<0x01>INSERT > > > > 还是查询不到结果 > > hive> select * from team; > > OK > > Time taken: 0.326 seconds > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > > > >> > >> > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > >> 生成的hive分区文件路径类似于 > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > >> > >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > >> > >>> > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > >>> shell查不到数据。 > >>> > >>> import com.alibaba.fastjson.JSON; > >>> import com.alibaba.fastjson.JSONObject; > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>> import org.apache.flink.api.common.typeinfo.Types; > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>> import org.apache.flink.streaming.api.CheckpointingMode; > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>> import org.apache.flink.streaming.api.datastream.DataStream; > >>> import > >>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>> import > >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>> import > >>> > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > >>> import org.apache.flink.streaming.api.windowing.time.Time; > >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>> import > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>> import org.apache.flink.table.api.EnvironmentSettings; > >>> import org.apache.flink.table.api.SqlDialect; > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>> import org.apache.flink.types.Row; > >>> import org.apache.flink.types.RowKind; > >>> > >>> import java.time.Duration; > >>> import java.time.Instant; > >>> import java.time.LocalDateTime; > >>> import java.time.ZoneId; > >>> import java.time.format.DateTimeFormatter; > >>> import java.util.Properties; > >>> > >>> public class MysqlCDC2Hive { > >>> > >>> private static final DateTimeFormatter dtf = > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > >>> > >>> public static void main(String[] args) throws Exception { > >>> StreamExecutionEnvironment streamEnv = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> streamEnv.setParallelism(3); > >>> streamEnv.enableCheckpointing(60000); > >>> > >>> EnvironmentSettings tableEnvSettings = > >>> EnvironmentSettings.newInstance() > >>> .useBlinkPlanner() > >>> .inStreamingMode() > >>> .build(); > >>> StreamTableEnvironment tableEnv = > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>> CheckpointingMode.EXACTLY_ONCE); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>> Duration.ofMinutes(1)); > >>> > >>> String catalogName = "hive_catalog"; > >>> HiveCatalog catalog = new HiveCatalog( > >>> catalogName, > >>> "default", > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>> "2.3.4" > >>> ); > >>> tableEnv.registerCatalog(catalogName, catalog); > >>> tableEnv.useCatalog(catalogName); > >>> > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP,\n" + > >>> " proctime as proctime()\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'mysql-cdc',\n" + > >>> " 'hostname' = 'localhost',\n" + > >>> " 'port' = '3306',\n" + > >>> " 'username' = 'root',\n" + > >>> " 'password' = 'root',\n" + > >>> " 'database-name' = 'test',\n" + > >>> " 'table-name' = 'team'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'kafka',\n" + > >>> " 'topic' = 'team',\n" + > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>> " 'properties.bootstrap.servers' = > 'localhost:9092',\n" > >>> + > >>> " 'format' = 'changelog-json'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time > \n" > >>> + > >>> "FROM cdc.team"); > >>> > >>> // 定义带op字段的stream > >>> Properties properties = new Properties(); > >>> properties.setProperty("bootstrap.servers", "localhost:9092"); > >>> properties.setProperty("group.id", "test1`"); > >>> > >>> FlinkKafkaConsumerBase<String> consumer = new > >>> FlinkKafkaConsumer<>( > >>> "team", > >>> new SimpleStringSchema(), > >>> properties > >>> ).setStartFromEarliest(); > >>> > >>> DataStream<String> ds = streamEnv.addSource(consumer); > >>> > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>> "update_time", "op"}; > >>> TypeInformation[] types = {Types.INT, Types.STRING, > >>> Types.STRING, Types.STRING, Types.STRING}; > >>> DataStream<Row> ds2 = ds.map(str -> { > >>> JSONObject jsonObject = JSON.parseObject(str); > >>> String op = jsonObject.getString("op"); > >>> JSONObject data = jsonObject.getJSONObject("data"); > >>> int arity = fieldNames.length; > >>> Row row = new Row(arity); > >>> row.setField(0, data.get("team_id")); > >>> row.setField(1, data.get("team_name")); > >>> row.setField(2, data.get("create_time")); > >>> row.setField(3, data.get("update_time")); > >>> String operation = getOperation(op); > >>> row.setField(4, operation); > >>> > >>> return row; > >>> }, new RowTypeInfo(types, fieldNames)) > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> *.assignTimestampsAndWatermarks(new > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > >>> @Override public long extractTimestamp(Row row) { > >>> String dt = (String) row.getField(2); LocalDateTime ldt > = > >>> LocalDateTime.parse(dt, dtf); Instant instant = > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > >>> timeInMillis = instant.toEpochMilli(); return > timeInMillis; > >>> } });* > >>> > >>> tableEnv.registerDataStream("merged_team", ds2); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>> > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time STRING,\n" + > >>> " update_time STRING,\n" + > >>> " op STRING\n" + > >>> ") PARTITIONED BY (\n" + > >>> " dt STRING,\n" + > >>> " hr STRING,\n" + > >>> " mi STRING\n" + > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>> " 'sink.partition-commit.trigger' = > >>> 'partition-time',\n" + > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>> " 'sink.partition-commit.policy.kind' = > >>> 'metastore,success-file',\n" + > >>> " 'partition.time-extractor.timestamp-pattern' = '$dt > >>> $hr:$mi:00'\n" + > >>> ")"); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time, > >>> op, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'HH') as hr, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'mm') as mi \n" + > >>> "FROM merged_team"); > >>> tableEnv.execute("MysqlCDC2Hive2"); > >>> > >>> streamEnv.execute(""); > >>> } > >>> > >>> private static String getOperation(String op) { > >>> String operation = "INSERT"; > >>> for (RowKind rk : RowKind.values()) { > >>> if (rk.shortString().equals(op)) { > >>> switch (rk) { > >>> case UPDATE_BEFORE: > >>> case UPDATE_AFTER: > >>> operation = "UPDATE"; > >>> break; > >>> case DELETE: > >>> operation = "DELETE"; > >>> break; > >>> case INSERT: > >>> default: > >>> operation = "INSERT"; > >>> break; > >>> } > >>> break; > >>> } > >>> } > >>> return operation; > >>> } > >>> } > >>> > >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > >>> > >>>> 你检查一下 hive 文件是否正常生成了? > >>>> > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> > >>>> [1]: > >>>> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > >>>> > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > >>>> > >>>>> > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > >>>>> > >>>>> cdc -> kafka示例消息如下 > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > >>>>> > >>>>> import com.alibaba.fastjson.JSON; > >>>>> import com.alibaba.fastjson.JSONObject; > >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>>>> import org.apache.flink.api.common.typeinfo.Types; > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>>>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>>>> import > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > >>>>> import org.apache.flink.table.api.SqlDialect; > >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>>>> import org.apache.flink.types.Row; > >>>>> import org.apache.flink.types.RowKind; > >>>>> > >>>>> import java.time.Duration; > >>>>> import java.util.Properties; > >>>>> > >>>>> public class MysqlCDC2Hive { > >>>>> public static void main(String[] args) throws Exception { > >>>>> StreamExecutionEnvironment streamEnv = > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>>>> > >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>>>> streamEnv.setParallelism(3); > >>>>> streamEnv.enableCheckpointing(60000); > >>>>> > >>>>> EnvironmentSettings tableEnvSettings = > >>>>> EnvironmentSettings.newInstance() > >>>>> .useBlinkPlanner() > >>>>> .inStreamingMode() > >>>>> .build(); > >>>>> StreamTableEnvironment tableEnv = > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>>>> CheckpointingMode.EXACTLY_ONCE); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>>>> Duration.ofMinutes(1)); > >>>>> > >>>>> String catalogName = "hive_catalog"; > >>>>> HiveCatalog catalog = new HiveCatalog( > >>>>> catalogName, > >>>>> "default", > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>>>> "2.3.4" > >>>>> ); > >>>>> tableEnv.registerCatalog(catalogName, catalog); > >>>>> tableEnv.useCatalog(catalogName); > >>>>> > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP,\n" + > >>>>> " proctime as proctime()\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'mysql-cdc',\n" + > >>>>> " 'hostname' = 'localhost',\n" + > >>>>> " 'port' = '3306',\n" + > >>>>> " 'username' = 'root',\n" + > >>>>> " 'password' = 'root',\n" + > >>>>> " 'database-name' = 'test',\n" + > >>>>> " 'table-name' = 'team'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'kafka',\n" + > >>>>> " 'topic' = 'team',\n" + > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>>>> " 'properties.bootstrap.servers' = > >>>>> 'localhost:9092',\n" + > >>>>> " 'format' = 'changelog-json'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time > >>>>> \n" + > >>>>> "FROM cdc.team"); > >>>>> > >>>>> // 定义带op字段的stream > >>>>> Properties properties = new Properties(); > >>>>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > >>>>> properties.setProperty("group.id", "test"); > >>>>> > >>>>> FlinkKafkaConsumerBase<String> consumer = new > >>>>> FlinkKafkaConsumer<>( > >>>>> "team", > >>>>> new SimpleStringSchema(), > >>>>> properties > >>>>> ).setStartFromEarliest(); > >>>>> > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > >>>>> > >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>>>> "update_time", "op"}; > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > >>>>> Types.STRING, Types.STRING, Types.STRING}; > >>>>> DataStream<Row> ds2 = ds.map(str -> { > >>>>> JSONObject jsonObject = JSON.parseObject(str); > >>>>> String op = jsonObject.getString("op"); > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > >>>>> int arity = fieldNames.length; > >>>>> Row row = new Row(arity); > >>>>> row.setField(0, data.get("team_id")); > >>>>> row.setField(1, data.get("team_name")); > >>>>> row.setField(2, data.get("create_time")); > >>>>> row.setField(3, data.get("update_time")); > >>>>> String operation = getOperation(op); > >>>>> row.setField(4, operation); > >>>>> > >>>>> return row; > >>>>> }, new RowTypeInfo(types, fieldNames)); > >>>>> > >>>>> tableEnv.registerDataStream("merged_team", ds2); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>>>> > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time STRING,\n" + > >>>>> " update_time STRING,\n" + > >>>>> " op STRING\n" + > >>>>> // ") PARTITIONED BY (\n" + > >>>>> // " ts_date STRING,\n" + > >>>>> // " ts_hour STRING,\n" + > >>>>> // " ts_minute STRING\n" + > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>>>> " 'sink.partition-commit.trigger' = > >>>>> 'partition-time',\n" + > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>>>> " 'sink.partition-commit.policy.kind' = > >>>>> 'metastore,success-file',\n" + > >>>>> " 'partition.time-extractor.timestamp-pattern' = > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time, > >>>>> op \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > >>>>> "FROM merged_team"); > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > >>>>> > >>>>> streamEnv.execute(""); > >>>>> } > >>>>> > >>>>> private static String getOperation(String op) { > >>>>> String operation = "INSERT"; > >>>>> for (RowKind rk : RowKind.values()) { > >>>>> if (rk.shortString().equals(op)) { > >>>>> switch (rk) { > >>>>> case UPDATE_BEFORE: > >>>>> case UPDATE_AFTER: > >>>>> operation = "UPDATE"; > >>>>> break; > >>>>> case DELETE: > >>>>> operation = "DELETE"; > >>>>> break; > >>>>> case INSERT: > >>>>> default: > >>>>> operation = "INSERT"; > >>>>> break; > >>>>> } > >>>>> break; > >>>>> } > >>>>> } > >>>>> return operation; > >>>>> } > >>>>> } > >>>>> > >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > >>>>> > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > >>>>>> > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > >>>>>> > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > >>>>>> > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > >>>>>> > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > >>>>>>> > >>>>>>> > >>>>>>> | | > >>>>>>> 罗显宴 > >>>>>>> | > >>>>>>> | > >>>>>>> 邮箱:[hidden email] > >>>>>>> | > >>>>>>> > >>>>>>> 签名由 网易邮箱大师 定制 > >>>>>>> > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > >>>>>>> > >>>>>>> Exception in thread "main" > org.apache.flink.table.api.TableException: > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > >>>>>>> changes > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > cdc, > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > >>>>>>> > >>>>>>> 我的问题: > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > >>>>>>> kafka,然后kafka > >>>>>>> -> hive streaming? 谢谢! > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > >>>>>>> > >>>>>>> sql语句如下 > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS cdc.team > >>>>>>> > >>>>>>> CREATE TABLE team( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> proctime as proctime() > >>>>>>> ) WITH ( > >>>>>>> 'connector' = 'mysql-cdc', > >>>>>>> 'hostname' = 'localhost', > >>>>>>> 'port' = '3306', > >>>>>>> 'username' = 'root', > >>>>>>> 'password' = 'root', > >>>>>>> 'database-name' = 'test', > >>>>>>> 'table-name' = 'team' > >>>>>>> ) > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS ods.team > >>>>>>> > >>>>>>> CREATE TABLE ods.team ( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> ) PARTITIONED BY ( > >>>>>>> ts_date STRING, > >>>>>>> ts_hour STRING, > >>>>>>> ts_minute STRING, > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > >>>>>>> 'sink.partition-commit.delay' = '1 min', > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > >>>>>>> $ts_hour:$ts_minute:00' > >>>>>>> ) > >>>>>>> > >>>>>>> INSERT INTO ods.team > >>>>>>> SELECT team_id, team_name, create_time, update_time, > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > >>>>>>> FROM cdc.team > >>>>>>> > >>>>>> > -- Best regards! Rui Li |
- 你可以用 proc-time
- 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的 On Mon, Nov 2, 2020 at 10:38 AM Rui Li <[hidden email]> wrote: > Hi, > > 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success > file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 > > LOG.info("Partition {} of table {} is ready to be committed", > partSpec, tableIdentifier); > > LOG.info("Committed partition {} to metastore", partitionSpec); > > LOG.info("Committed partition {} with success file", > context.partitionSpec()); > > > On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <[hidden email]> wrote: > > > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > > > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > > > 这是生成的hive表建表语句 > > > > > > hive> show create table team; > > > OK > > > CREATE TABLE `team`( > > > `team_id` int, > > > `team_name` string, > > > `create_time` string, > > > `update_time` string, > > > `op` string) > > > PARTITIONED BY ( > > > `dt` string, > > > `hr` string, > > > `mi` string) > > > ROW FORMAT SERDE > > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > > STORED AS INPUTFORMAT > > > 'org.apache.hadoop.mapred.TextInputFormat' > > > OUTPUTFORMAT > > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > > LOCATION > > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > > TBLPROPERTIES ( > > > 'is_generic'='false', > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > > 'sink.partition-commit.delay'='1 min', > > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > > 'sink.partition-commit.trigger'='partition-time', > > > 'transient_lastDdlTime'='1604222266') > > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > > > 另外,下载了hive文件内容如下 > > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > > 11:25:38<0x01>INSERT > > > > > > 还是查询不到结果 > > > hive> select * from team; > > > OK > > > Time taken: 0.326 seconds > > > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > > > > > >> > > >> > > > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > > >> 生成的hive分区文件路径类似于 > > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > > >> > > >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > > >> > > >>> > > > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > > >>> shell查不到数据。 > > >>> > > >>> import com.alibaba.fastjson.JSON; > > >>> import com.alibaba.fastjson.JSONObject; > > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >>> import org.apache.flink.api.common.typeinfo.Types; > > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >>> import org.apache.flink.streaming.api.CheckpointingMode; > > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > > >>> import org.apache.flink.streaming.api.datastream.DataStream; > > >>> import > > >>> > > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > > >>> import > > >>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >>> import > > >>> > > > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > > >>> import org.apache.flink.streaming.api.windowing.time.Time; > > >>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > > >>> import > > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > > >>> import org.apache.flink.table.api.EnvironmentSettings; > > >>> import org.apache.flink.table.api.SqlDialect; > > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > > >>> import org.apache.flink.types.Row; > > >>> import org.apache.flink.types.RowKind; > > >>> > > >>> import java.time.Duration; > > >>> import java.time.Instant; > > >>> import java.time.LocalDateTime; > > >>> import java.time.ZoneId; > > >>> import java.time.format.DateTimeFormatter; > > >>> import java.util.Properties; > > >>> > > >>> public class MysqlCDC2Hive { > > >>> > > >>> private static final DateTimeFormatter dtf = > > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > > >>> > > >>> public static void main(String[] args) throws Exception { > > >>> StreamExecutionEnvironment streamEnv = > > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>> > > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >>> streamEnv.setParallelism(3); > > >>> streamEnv.enableCheckpointing(60000); > > >>> > > >>> EnvironmentSettings tableEnvSettings = > > >>> EnvironmentSettings.newInstance() > > >>> .useBlinkPlanner() > > >>> .inStreamingMode() > > >>> .build(); > > >>> StreamTableEnvironment tableEnv = > > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > >>> > > >>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > > >>> CheckpointingMode.EXACTLY_ONCE); > > >>> > > >>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > > >>> Duration.ofMinutes(1)); > > >>> > > >>> String catalogName = "hive_catalog"; > > >>> HiveCatalog catalog = new HiveCatalog( > > >>> catalogName, > > >>> "default", > > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > > >>> "2.3.4" > > >>> ); > > >>> tableEnv.registerCatalog(catalogName, catalog); > > >>> tableEnv.useCatalog(catalogName); > > >>> > > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time TIMESTAMP,\n" + > > >>> " update_time TIMESTAMP,\n" + > > >>> " proctime as proctime()\n" + > > >>> ") WITH (\n" + > > >>> " 'connector' = 'mysql-cdc',\n" + > > >>> " 'hostname' = 'localhost',\n" + > > >>> " 'port' = '3306',\n" + > > >>> " 'username' = 'root',\n" + > > >>> " 'password' = 'root',\n" + > > >>> " 'database-name' = 'test',\n" + > > >>> " 'table-name' = 'team'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time TIMESTAMP,\n" + > > >>> " update_time TIMESTAMP\n" + > > >>> ") WITH (\n" + > > >>> " 'connector' = 'kafka',\n" + > > >>> " 'topic' = 'team',\n" + > > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > > >>> " 'properties.bootstrap.servers' = > > 'localhost:9092',\n" > > >>> + > > >>> " 'format' = 'changelog-json'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > > >>> "SELECT team_id, team_name, create_time, update_time > > \n" > > >>> + > > >>> "FROM cdc.team"); > > >>> > > >>> // 定义带op字段的stream > > >>> Properties properties = new Properties(); > > >>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > > >>> properties.setProperty("group.id", "test1`"); > > >>> > > >>> FlinkKafkaConsumerBase<String> consumer = new > > >>> FlinkKafkaConsumer<>( > > >>> "team", > > >>> new SimpleStringSchema(), > > >>> properties > > >>> ).setStartFromEarliest(); > > >>> > > >>> DataStream<String> ds = streamEnv.addSource(consumer); > > >>> > > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > > >>> "update_time", "op"}; > > >>> TypeInformation[] types = {Types.INT, Types.STRING, > > >>> Types.STRING, Types.STRING, Types.STRING}; > > >>> DataStream<Row> ds2 = ds.map(str -> { > > >>> JSONObject jsonObject = JSON.parseObject(str); > > >>> String op = jsonObject.getString("op"); > > >>> JSONObject data = jsonObject.getJSONObject("data"); > > >>> int arity = fieldNames.length; > > >>> Row row = new Row(arity); > > >>> row.setField(0, data.get("team_id")); > > >>> row.setField(1, data.get("team_name")); > > >>> row.setField(2, data.get("create_time")); > > >>> row.setField(3, data.get("update_time")); > > >>> String operation = getOperation(op); > > >>> row.setField(4, operation); > > >>> > > >>> return row; > > >>> }, new RowTypeInfo(types, fieldNames)) > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> *.assignTimestampsAndWatermarks(new > > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > > >>> @Override public long extractTimestamp(Row row) { > > >>> String dt = (String) row.getField(2); LocalDateTime > ldt > > = > > >>> LocalDateTime.parse(dt, dtf); Instant instant = > > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > > >>> timeInMillis = instant.toEpochMilli(); return > > timeInMillis; > > >>> } });* > > >>> > > >>> tableEnv.registerDataStream("merged_team", ds2); > > >>> > > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > >>> > > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time STRING,\n" + > > >>> " update_time STRING,\n" + > > >>> " op STRING\n" + > > >>> ") PARTITIONED BY (\n" + > > >>> " dt STRING,\n" + > > >>> " hr STRING,\n" + > > >>> " mi STRING\n" + > > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > > >>> " 'sink.partition-commit.trigger' = > > >>> 'partition-time',\n" + > > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > > >>> " 'sink.partition-commit.policy.kind' = > > >>> 'metastore,success-file',\n" + > > >>> " 'partition.time-extractor.timestamp-pattern' = > '$dt > > >>> $hr:$mi:00'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > > >>> "SELECT team_id, team_name, create_time, update_time, > > >>> op, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'HH') as hr, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'mm') as mi \n" + > > >>> "FROM merged_team"); > > >>> tableEnv.execute("MysqlCDC2Hive2"); > > >>> > > >>> streamEnv.execute(""); > > >>> } > > >>> > > >>> private static String getOperation(String op) { > > >>> String operation = "INSERT"; > > >>> for (RowKind rk : RowKind.values()) { > > >>> if (rk.shortString().equals(op)) { > > >>> switch (rk) { > > >>> case UPDATE_BEFORE: > > >>> case UPDATE_AFTER: > > >>> operation = "UPDATE"; > > >>> break; > > >>> case DELETE: > > >>> operation = "DELETE"; > > >>> break; > > >>> case INSERT: > > >>> default: > > >>> operation = "INSERT"; > > >>> break; > > >>> } > > >>> break; > > >>> } > > >>> } > > >>> return operation; > > >>> } > > >>> } > > >>> > > >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > > >>> > > >>>> 你检查一下 hive 文件是否正常生成了? > > >>>> > > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > > >>>> > > >>>> Best, > > >>>> Jark > > >>>> > > >>>> > > >>>> [1]: > > >>>> > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > > >>>> > > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > > >>>> > > >>>>> > > > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > > >>>>> > > >>>>> cdc -> kafka示例消息如下 > > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > > >>>>> > > >>>>> import com.alibaba.fastjson.JSON; > > >>>>> import com.alibaba.fastjson.JSONObject; > > >>>>> import > org.apache.flink.api.common.serialization.SimpleStringSchema; > > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >>>>> import org.apache.flink.api.common.typeinfo.Types; > > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > > >>>>> import > > >>>>> > > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > > >>>>> import > > >>>>> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >>>>> import > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > > >>>>> import > > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > > >>>>> import org.apache.flink.table.api.SqlDialect; > > >>>>> import > org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > > >>>>> import org.apache.flink.types.Row; > > >>>>> import org.apache.flink.types.RowKind; > > >>>>> > > >>>>> import java.time.Duration; > > >>>>> import java.util.Properties; > > >>>>> > > >>>>> public class MysqlCDC2Hive { > > >>>>> public static void main(String[] args) throws Exception { > > >>>>> StreamExecutionEnvironment streamEnv = > > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>>>> > > >>>>> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >>>>> streamEnv.setParallelism(3); > > >>>>> streamEnv.enableCheckpointing(60000); > > >>>>> > > >>>>> EnvironmentSettings tableEnvSettings = > > >>>>> EnvironmentSettings.newInstance() > > >>>>> .useBlinkPlanner() > > >>>>> .inStreamingMode() > > >>>>> .build(); > > >>>>> StreamTableEnvironment tableEnv = > > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > >>>>> > > >>>>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > > >>>>> CheckpointingMode.EXACTLY_ONCE); > > >>>>> > > >>>>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > > >>>>> Duration.ofMinutes(1)); > > >>>>> > > >>>>> String catalogName = "hive_catalog"; > > >>>>> HiveCatalog catalog = new HiveCatalog( > > >>>>> catalogName, > > >>>>> "default", > > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > > >>>>> "2.3.4" > > >>>>> ); > > >>>>> tableEnv.registerCatalog(catalogName, catalog); > > >>>>> tableEnv.useCatalog(catalogName); > > >>>>> > > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time TIMESTAMP,\n" + > > >>>>> " update_time TIMESTAMP,\n" + > > >>>>> " proctime as proctime()\n" + > > >>>>> ") WITH (\n" + > > >>>>> " 'connector' = 'mysql-cdc',\n" + > > >>>>> " 'hostname' = 'localhost',\n" + > > >>>>> " 'port' = '3306',\n" + > > >>>>> " 'username' = 'root',\n" + > > >>>>> " 'password' = 'root',\n" + > > >>>>> " 'database-name' = 'test',\n" + > > >>>>> " 'table-name' = 'team'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time TIMESTAMP,\n" + > > >>>>> " update_time TIMESTAMP\n" + > > >>>>> ") WITH (\n" + > > >>>>> " 'connector' = 'kafka',\n" + > > >>>>> " 'topic' = 'team',\n" + > > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > > >>>>> " 'properties.bootstrap.servers' = > > >>>>> 'localhost:9092',\n" + > > >>>>> " 'format' = 'changelog-json'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > > >>>>> "SELECT team_id, team_name, create_time, > update_time > > >>>>> \n" + > > >>>>> "FROM cdc.team"); > > >>>>> > > >>>>> // 定义带op字段的stream > > >>>>> Properties properties = new Properties(); > > >>>>> properties.setProperty("bootstrap.servers", > > "localhost:9092"); > > >>>>> properties.setProperty("group.id", "test"); > > >>>>> > > >>>>> FlinkKafkaConsumerBase<String> consumer = new > > >>>>> FlinkKafkaConsumer<>( > > >>>>> "team", > > >>>>> new SimpleStringSchema(), > > >>>>> properties > > >>>>> ).setStartFromEarliest(); > > >>>>> > > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > > >>>>> > > >>>>> String[] fieldNames = {"team_id", "team_name", > "create_time", > > >>>>> "update_time", "op"}; > > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > > >>>>> Types.STRING, Types.STRING, Types.STRING}; > > >>>>> DataStream<Row> ds2 = ds.map(str -> { > > >>>>> JSONObject jsonObject = JSON.parseObject(str); > > >>>>> String op = jsonObject.getString("op"); > > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > > >>>>> int arity = fieldNames.length; > > >>>>> Row row = new Row(arity); > > >>>>> row.setField(0, data.get("team_id")); > > >>>>> row.setField(1, data.get("team_name")); > > >>>>> row.setField(2, data.get("create_time")); > > >>>>> row.setField(3, data.get("update_time")); > > >>>>> String operation = getOperation(op); > > >>>>> row.setField(4, operation); > > >>>>> > > >>>>> return row; > > >>>>> }, new RowTypeInfo(types, fieldNames)); > > >>>>> > > >>>>> tableEnv.registerDataStream("merged_team", ds2); > > >>>>> > > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time STRING,\n" + > > >>>>> " update_time STRING,\n" + > > >>>>> " op STRING\n" + > > >>>>> // ") PARTITIONED BY (\n" + > > >>>>> // " ts_date STRING,\n" + > > >>>>> // " ts_hour STRING,\n" + > > >>>>> // " ts_minute STRING\n" + > > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > > >>>>> " 'sink.partition-commit.trigger' = > > >>>>> 'partition-time',\n" + > > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > > >>>>> " 'sink.partition-commit.policy.kind' = > > >>>>> 'metastore,success-file',\n" + > > >>>>> " 'partition.time-extractor.timestamp-pattern' = > > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > > >>>>> "SELECT team_id, team_name, create_time, > update_time, > > >>>>> op \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > > >>>>> "FROM merged_team"); > > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > > >>>>> > > >>>>> streamEnv.execute(""); > > >>>>> } > > >>>>> > > >>>>> private static String getOperation(String op) { > > >>>>> String operation = "INSERT"; > > >>>>> for (RowKind rk : RowKind.values()) { > > >>>>> if (rk.shortString().equals(op)) { > > >>>>> switch (rk) { > > >>>>> case UPDATE_BEFORE: > > >>>>> case UPDATE_AFTER: > > >>>>> operation = "UPDATE"; > > >>>>> break; > > >>>>> case DELETE: > > >>>>> operation = "DELETE"; > > >>>>> break; > > >>>>> case INSERT: > > >>>>> default: > > >>>>> operation = "INSERT"; > > >>>>> break; > > >>>>> } > > >>>>> break; > > >>>>> } > > >>>>> } > > >>>>> return operation; > > >>>>> } > > >>>>> } > > >>>>> > > >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > > >>>>> > > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 > hive > > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > > >>>>>> > > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive > streaming > > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > > >>>>>> > > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > > >>>>>> > > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > > >>>>>> > > >>>>>> Best, > > >>>>>> Jark > > >>>>>> > > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > > >>>>>> > > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > > >>>>>>> > > >>>>>>> > > >>>>>>> | | > > >>>>>>> 罗显宴 > > >>>>>>> | > > >>>>>>> | > > >>>>>>> 邮箱:[hidden email] > > >>>>>>> | > > >>>>>>> > > >>>>>>> 签名由 网易邮箱大师 定制 > > >>>>>>> > > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > > >>>>>>> > > >>>>>>> Exception in thread "main" > > org.apache.flink.table.api.TableException: > > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > > >>>>>>> changes > > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > > cdc, > > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > > >>>>>>> > > >>>>>>> 我的问题: > > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > > >>>>>>> kafka,然后kafka > > >>>>>>> -> hive streaming? 谢谢! > > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > > >>>>>>> > > >>>>>>> sql语句如下 > > >>>>>>> > > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > > >>>>>>> > > >>>>>>> DROP TABLE IF EXISTS cdc.team > > >>>>>>> > > >>>>>>> CREATE TABLE team( > > >>>>>>> team_id BIGINT, > > >>>>>>> team_name STRING, > > >>>>>>> create_time TIMESTAMP, > > >>>>>>> update_time TIMESTAMP, > > >>>>>>> proctime as proctime() > > >>>>>>> ) WITH ( > > >>>>>>> 'connector' = 'mysql-cdc', > > >>>>>>> 'hostname' = 'localhost', > > >>>>>>> 'port' = '3306', > > >>>>>>> 'username' = 'root', > > >>>>>>> 'password' = 'root', > > >>>>>>> 'database-name' = 'test', > > >>>>>>> 'table-name' = 'team' > > >>>>>>> ) > > >>>>>>> > > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > > >>>>>>> > > >>>>>>> DROP TABLE IF EXISTS ods.team > > >>>>>>> > > >>>>>>> CREATE TABLE ods.team ( > > >>>>>>> team_id BIGINT, > > >>>>>>> team_name STRING, > > >>>>>>> create_time TIMESTAMP, > > >>>>>>> update_time TIMESTAMP, > > >>>>>>> ) PARTITIONED BY ( > > >>>>>>> ts_date STRING, > > >>>>>>> ts_hour STRING, > > >>>>>>> ts_minute STRING, > > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > > >>>>>>> 'sink.partition-commit.delay' = '1 min', > > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > > >>>>>>> $ts_hour:$ts_minute:00' > > >>>>>>> ) > > >>>>>>> > > >>>>>>> INSERT INTO ods.team > > >>>>>>> SELECT team_id, team_name, create_time, update_time, > > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > > >>>>>>> FROM cdc.team > > >>>>>>> > > >>>>>> > > > > > -- > Best regards! > Rui Li > -- Best, Jingsong Lee |
In reply to this post by Rui Li
你好!
看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢? 发件人: Rui Li 发送时间: 2020-11-02 10:38 收件人: user-zh 抄送: Jark Wu 主题: Re: flink mysql cdc + hive streaming疑问 Hi, 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); LOG.info("Committed partition {} to metastore", partitionSpec); LOG.info("Committed partition {} with success file", context.partitionSpec()); On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <[hidden email]> wrote: > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > 这是生成的hive表建表语句 > > > > hive> show create table team; > > OK > > CREATE TABLE `team`( > > `team_id` int, > > `team_name` string, > > `create_time` string, > > `update_time` string, > > `op` string) > > PARTITIONED BY ( > > `dt` string, > > `hr` string, > > `mi` string) > > ROW FORMAT SERDE > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > STORED AS INPUTFORMAT > > 'org.apache.hadoop.mapred.TextInputFormat' > > OUTPUTFORMAT > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > LOCATION > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > TBLPROPERTIES ( > > 'is_generic'='false', > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > 'sink.partition-commit.delay'='1 min', > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > 'sink.partition-commit.trigger'='partition-time', > > 'transient_lastDdlTime'='1604222266') > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > 另外,下载了hive文件内容如下 > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > 11:25:38<0x01>INSERT > > > > 还是查询不到结果 > > hive> select * from team; > > OK > > Time taken: 0.326 seconds > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > > > >> > >> > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > >> 生成的hive分区文件路径类似于 > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > >> > >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > >> > >>> > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > >>> shell查不到数据。 > >>> > >>> import com.alibaba.fastjson.JSON; > >>> import com.alibaba.fastjson.JSONObject; > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>> import org.apache.flink.api.common.typeinfo.Types; > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>> import org.apache.flink.streaming.api.CheckpointingMode; > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>> import org.apache.flink.streaming.api.datastream.DataStream; > >>> import > >>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>> import > >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>> import > >>> > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > >>> import org.apache.flink.streaming.api.windowing.time.Time; > >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>> import > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>> import org.apache.flink.table.api.EnvironmentSettings; > >>> import org.apache.flink.table.api.SqlDialect; > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>> import org.apache.flink.types.Row; > >>> import org.apache.flink.types.RowKind; > >>> > >>> import java.time.Duration; > >>> import java.time.Instant; > >>> import java.time.LocalDateTime; > >>> import java.time.ZoneId; > >>> import java.time.format.DateTimeFormatter; > >>> import java.util.Properties; > >>> > >>> public class MysqlCDC2Hive { > >>> > >>> private static final DateTimeFormatter dtf = > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > >>> > >>> public static void main(String[] args) throws Exception { > >>> StreamExecutionEnvironment streamEnv = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> streamEnv.setParallelism(3); > >>> streamEnv.enableCheckpointing(60000); > >>> > >>> EnvironmentSettings tableEnvSettings = > >>> EnvironmentSettings.newInstance() > >>> .useBlinkPlanner() > >>> .inStreamingMode() > >>> .build(); > >>> StreamTableEnvironment tableEnv = > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>> CheckpointingMode.EXACTLY_ONCE); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>> Duration.ofMinutes(1)); > >>> > >>> String catalogName = "hive_catalog"; > >>> HiveCatalog catalog = new HiveCatalog( > >>> catalogName, > >>> "default", > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>> "2.3.4" > >>> ); > >>> tableEnv.registerCatalog(catalogName, catalog); > >>> tableEnv.useCatalog(catalogName); > >>> > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP,\n" + > >>> " proctime as proctime()\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'mysql-cdc',\n" + > >>> " 'hostname' = 'localhost',\n" + > >>> " 'port' = '3306',\n" + > >>> " 'username' = 'root',\n" + > >>> " 'password' = 'root',\n" + > >>> " 'database-name' = 'test',\n" + > >>> " 'table-name' = 'team'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'kafka',\n" + > >>> " 'topic' = 'team',\n" + > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>> " 'properties.bootstrap.servers' = > 'localhost:9092',\n" > >>> + > >>> " 'format' = 'changelog-json'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time > \n" > >>> + > >>> "FROM cdc.team"); > >>> > >>> // 定义带op字段的stream > >>> Properties properties = new Properties(); > >>> properties.setProperty("bootstrap.servers", "localhost:9092"); > >>> properties.setProperty("group.id", "test1`"); > >>> > >>> FlinkKafkaConsumerBase<String> consumer = new > >>> FlinkKafkaConsumer<>( > >>> "team", > >>> new SimpleStringSchema(), > >>> properties > >>> ).setStartFromEarliest(); > >>> > >>> DataStream<String> ds = streamEnv.addSource(consumer); > >>> > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>> "update_time", "op"}; > >>> TypeInformation[] types = {Types.INT, Types.STRING, > >>> Types.STRING, Types.STRING, Types.STRING}; > >>> DataStream<Row> ds2 = ds.map(str -> { > >>> JSONObject jsonObject = JSON.parseObject(str); > >>> String op = jsonObject.getString("op"); > >>> JSONObject data = jsonObject.getJSONObject("data"); > >>> int arity = fieldNames.length; > >>> Row row = new Row(arity); > >>> row.setField(0, data.get("team_id")); > >>> row.setField(1, data.get("team_name")); > >>> row.setField(2, data.get("create_time")); > >>> row.setField(3, data.get("update_time")); > >>> String operation = getOperation(op); > >>> row.setField(4, operation); > >>> > >>> return row; > >>> }, new RowTypeInfo(types, fieldNames)) > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> *.assignTimestampsAndWatermarks(new > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > >>> @Override public long extractTimestamp(Row row) { > >>> String dt = (String) row.getField(2); LocalDateTime ldt > = > >>> LocalDateTime.parse(dt, dtf); Instant instant = > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > >>> timeInMillis = instant.toEpochMilli(); return > timeInMillis; > >>> } });* > >>> > >>> tableEnv.registerDataStream("merged_team", ds2); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>> > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time STRING,\n" + > >>> " update_time STRING,\n" + > >>> " op STRING\n" + > >>> ") PARTITIONED BY (\n" + > >>> " dt STRING,\n" + > >>> " hr STRING,\n" + > >>> " mi STRING\n" + > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>> " 'sink.partition-commit.trigger' = > >>> 'partition-time',\n" + > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>> " 'sink.partition-commit.policy.kind' = > >>> 'metastore,success-file',\n" + > >>> " 'partition.time-extractor.timestamp-pattern' = '$dt > >>> $hr:$mi:00'\n" + > >>> ")"); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time, > >>> op, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'HH') as hr, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'mm') as mi \n" + > >>> "FROM merged_team"); > >>> tableEnv.execute("MysqlCDC2Hive2"); > >>> > >>> streamEnv.execute(""); > >>> } > >>> > >>> private static String getOperation(String op) { > >>> String operation = "INSERT"; > >>> for (RowKind rk : RowKind.values()) { > >>> if (rk.shortString().equals(op)) { > >>> switch (rk) { > >>> case UPDATE_BEFORE: > >>> case UPDATE_AFTER: > >>> operation = "UPDATE"; > >>> break; > >>> case DELETE: > >>> operation = "DELETE"; > >>> break; > >>> case INSERT: > >>> default: > >>> operation = "INSERT"; > >>> break; > >>> } > >>> break; > >>> } > >>> } > >>> return operation; > >>> } > >>> } > >>> > >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > >>> > >>>> 你检查一下 hive 文件是否正常生成了? > >>>> > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> > >>>> [1]: > >>>> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > >>>> > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > >>>> > >>>>> > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > >>>>> > >>>>> cdc -> kafka示例消息如下 > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > >>>>> > >>>>> import com.alibaba.fastjson.JSON; > >>>>> import com.alibaba.fastjson.JSONObject; > >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>>>> import org.apache.flink.api.common.typeinfo.Types; > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>>>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>>>> import > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > >>>>> import org.apache.flink.table.api.SqlDialect; > >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>>>> import org.apache.flink.types.Row; > >>>>> import org.apache.flink.types.RowKind; > >>>>> > >>>>> import java.time.Duration; > >>>>> import java.util.Properties; > >>>>> > >>>>> public class MysqlCDC2Hive { > >>>>> public static void main(String[] args) throws Exception { > >>>>> StreamExecutionEnvironment streamEnv = > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>>>> > >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>>>> streamEnv.setParallelism(3); > >>>>> streamEnv.enableCheckpointing(60000); > >>>>> > >>>>> EnvironmentSettings tableEnvSettings = > >>>>> EnvironmentSettings.newInstance() > >>>>> .useBlinkPlanner() > >>>>> .inStreamingMode() > >>>>> .build(); > >>>>> StreamTableEnvironment tableEnv = > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>>>> CheckpointingMode.EXACTLY_ONCE); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>>>> Duration.ofMinutes(1)); > >>>>> > >>>>> String catalogName = "hive_catalog"; > >>>>> HiveCatalog catalog = new HiveCatalog( > >>>>> catalogName, > >>>>> "default", > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>>>> "2.3.4" > >>>>> ); > >>>>> tableEnv.registerCatalog(catalogName, catalog); > >>>>> tableEnv.useCatalog(catalogName); > >>>>> > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP,\n" + > >>>>> " proctime as proctime()\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'mysql-cdc',\n" + > >>>>> " 'hostname' = 'localhost',\n" + > >>>>> " 'port' = '3306',\n" + > >>>>> " 'username' = 'root',\n" + > >>>>> " 'password' = 'root',\n" + > >>>>> " 'database-name' = 'test',\n" + > >>>>> " 'table-name' = 'team'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'kafka',\n" + > >>>>> " 'topic' = 'team',\n" + > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>>>> " 'properties.bootstrap.servers' = > >>>>> 'localhost:9092',\n" + > >>>>> " 'format' = 'changelog-json'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time > >>>>> \n" + > >>>>> "FROM cdc.team"); > >>>>> > >>>>> // 定义带op字段的stream > >>>>> Properties properties = new Properties(); > >>>>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > >>>>> properties.setProperty("group.id", "test"); > >>>>> > >>>>> FlinkKafkaConsumerBase<String> consumer = new > >>>>> FlinkKafkaConsumer<>( > >>>>> "team", > >>>>> new SimpleStringSchema(), > >>>>> properties > >>>>> ).setStartFromEarliest(); > >>>>> > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > >>>>> > >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>>>> "update_time", "op"}; > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > >>>>> Types.STRING, Types.STRING, Types.STRING}; > >>>>> DataStream<Row> ds2 = ds.map(str -> { > >>>>> JSONObject jsonObject = JSON.parseObject(str); > >>>>> String op = jsonObject.getString("op"); > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > >>>>> int arity = fieldNames.length; > >>>>> Row row = new Row(arity); > >>>>> row.setField(0, data.get("team_id")); > >>>>> row.setField(1, data.get("team_name")); > >>>>> row.setField(2, data.get("create_time")); > >>>>> row.setField(3, data.get("update_time")); > >>>>> String operation = getOperation(op); > >>>>> row.setField(4, operation); > >>>>> > >>>>> return row; > >>>>> }, new RowTypeInfo(types, fieldNames)); > >>>>> > >>>>> tableEnv.registerDataStream("merged_team", ds2); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>>>> > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time STRING,\n" + > >>>>> " update_time STRING,\n" + > >>>>> " op STRING\n" + > >>>>> // ") PARTITIONED BY (\n" + > >>>>> // " ts_date STRING,\n" + > >>>>> // " ts_hour STRING,\n" + > >>>>> // " ts_minute STRING\n" + > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>>>> " 'sink.partition-commit.trigger' = > >>>>> 'partition-time',\n" + > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>>>> " 'sink.partition-commit.policy.kind' = > >>>>> 'metastore,success-file',\n" + > >>>>> " 'partition.time-extractor.timestamp-pattern' = > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time, > >>>>> op \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > >>>>> "FROM merged_team"); > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > >>>>> > >>>>> streamEnv.execute(""); > >>>>> } > >>>>> > >>>>> private static String getOperation(String op) { > >>>>> String operation = "INSERT"; > >>>>> for (RowKind rk : RowKind.values()) { > >>>>> if (rk.shortString().equals(op)) { > >>>>> switch (rk) { > >>>>> case UPDATE_BEFORE: > >>>>> case UPDATE_AFTER: > >>>>> operation = "UPDATE"; > >>>>> break; > >>>>> case DELETE: > >>>>> operation = "DELETE"; > >>>>> break; > >>>>> case INSERT: > >>>>> default: > >>>>> operation = "INSERT"; > >>>>> break; > >>>>> } > >>>>> break; > >>>>> } > >>>>> } > >>>>> return operation; > >>>>> } > >>>>> } > >>>>> > >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > >>>>> > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > >>>>>> > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > >>>>>> > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > >>>>>> > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > >>>>>> > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > >>>>>>> > >>>>>>> > >>>>>>> | | > >>>>>>> 罗显宴 > >>>>>>> | > >>>>>>> | > >>>>>>> 邮箱:[hidden email] > >>>>>>> | > >>>>>>> > >>>>>>> 签名由 网易邮箱大师 定制 > >>>>>>> > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > >>>>>>> > >>>>>>> Exception in thread "main" > org.apache.flink.table.api.TableException: > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > >>>>>>> changes > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > cdc, > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > >>>>>>> > >>>>>>> 我的问题: > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > >>>>>>> kafka,然后kafka > >>>>>>> -> hive streaming? 谢谢! > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > >>>>>>> > >>>>>>> sql语句如下 > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS cdc.team > >>>>>>> > >>>>>>> CREATE TABLE team( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> proctime as proctime() > >>>>>>> ) WITH ( > >>>>>>> 'connector' = 'mysql-cdc', > >>>>>>> 'hostname' = 'localhost', > >>>>>>> 'port' = '3306', > >>>>>>> 'username' = 'root', > >>>>>>> 'password' = 'root', > >>>>>>> 'database-name' = 'test', > >>>>>>> 'table-name' = 'team' > >>>>>>> ) > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS ods.team > >>>>>>> > >>>>>>> CREATE TABLE ods.team ( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> ) PARTITIONED BY ( > >>>>>>> ts_date STRING, > >>>>>>> ts_hour STRING, > >>>>>>> ts_minute STRING, > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > >>>>>>> 'sink.partition-commit.delay' = '1 min', > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > >>>>>>> $ts_hour:$ts_minute:00' > >>>>>>> ) > >>>>>>> > >>>>>>> INSERT INTO ods.team > >>>>>>> SELECT team_id, team_name, create_time, update_time, > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > >>>>>>> FROM cdc.team > >>>>>>> > >>>>>> > -- Best regards! Rui Li |
In reply to this post by Rui Li
这个utc时间怎么设置,不能查看到hive数据的根本原因是 分区信息没有更新到metastore ;
你会发现文件生成了但是没有 _SUCCESS文件; 但是这样指定也不行?? tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8)); 它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据 [hidden email] 发件人: [hidden email] 发送时间: 2020-11-02 13:37 收件人: user-zh 主题: Re: Re: flink mysql cdc + hive streaming疑问 你好! 看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢? 发件人: Rui Li 发送时间: 2020-11-02 10:38 收件人: user-zh 抄送: Jark Wu 主题: Re: flink mysql cdc + hive streaming疑问 Hi, 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); LOG.info("Committed partition {} to metastore", partitionSpec); LOG.info("Committed partition {} with success file", context.partitionSpec()); On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <[hidden email]> wrote: > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > 这是生成的hive表建表语句 > > > > hive> show create table team; > > OK > > CREATE TABLE `team`( > > `team_id` int, > > `team_name` string, > > `create_time` string, > > `update_time` string, > > `op` string) > > PARTITIONED BY ( > > `dt` string, > > `hr` string, > > `mi` string) > > ROW FORMAT SERDE > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > STORED AS INPUTFORMAT > > 'org.apache.hadoop.mapred.TextInputFormat' > > OUTPUTFORMAT > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > LOCATION > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > TBLPROPERTIES ( > > 'is_generic'='false', > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > 'sink.partition-commit.delay'='1 min', > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > 'sink.partition-commit.trigger'='partition-time', > > 'transient_lastDdlTime'='1604222266') > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > 另外,下载了hive文件内容如下 > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > 11:25:38<0x01>INSERT > > > > 还是查询不到结果 > > hive> select * from team; > > OK > > Time taken: 0.326 seconds > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > > > >> > >> > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > >> 生成的hive分区文件路径类似于 > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > >> > >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > >> > >>> > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > >>> shell查不到数据。 > >>> > >>> import com.alibaba.fastjson.JSON; > >>> import com.alibaba.fastjson.JSONObject; > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>> import org.apache.flink.api.common.typeinfo.Types; > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>> import org.apache.flink.streaming.api.CheckpointingMode; > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>> import org.apache.flink.streaming.api.datastream.DataStream; > >>> import > >>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>> import > >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>> import > >>> > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > >>> import org.apache.flink.streaming.api.windowing.time.Time; > >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>> import > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>> import org.apache.flink.table.api.EnvironmentSettings; > >>> import org.apache.flink.table.api.SqlDialect; > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>> import org.apache.flink.types.Row; > >>> import org.apache.flink.types.RowKind; > >>> > >>> import java.time.Duration; > >>> import java.time.Instant; > >>> import java.time.LocalDateTime; > >>> import java.time.ZoneId; > >>> import java.time.format.DateTimeFormatter; > >>> import java.util.Properties; > >>> > >>> public class MysqlCDC2Hive { > >>> > >>> private static final DateTimeFormatter dtf = > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > >>> > >>> public static void main(String[] args) throws Exception { > >>> StreamExecutionEnvironment streamEnv = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> streamEnv.setParallelism(3); > >>> streamEnv.enableCheckpointing(60000); > >>> > >>> EnvironmentSettings tableEnvSettings = > >>> EnvironmentSettings.newInstance() > >>> .useBlinkPlanner() > >>> .inStreamingMode() > >>> .build(); > >>> StreamTableEnvironment tableEnv = > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>> CheckpointingMode.EXACTLY_ONCE); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>> Duration.ofMinutes(1)); > >>> > >>> String catalogName = "hive_catalog"; > >>> HiveCatalog catalog = new HiveCatalog( > >>> catalogName, > >>> "default", > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>> "2.3.4" > >>> ); > >>> tableEnv.registerCatalog(catalogName, catalog); > >>> tableEnv.useCatalog(catalogName); > >>> > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP,\n" + > >>> " proctime as proctime()\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'mysql-cdc',\n" + > >>> " 'hostname' = 'localhost',\n" + > >>> " 'port' = '3306',\n" + > >>> " 'username' = 'root',\n" + > >>> " 'password' = 'root',\n" + > >>> " 'database-name' = 'test',\n" + > >>> " 'table-name' = 'team'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'kafka',\n" + > >>> " 'topic' = 'team',\n" + > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>> " 'properties.bootstrap.servers' = > 'localhost:9092',\n" > >>> + > >>> " 'format' = 'changelog-json'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time > \n" > >>> + > >>> "FROM cdc.team"); > >>> > >>> // 定义带op字段的stream > >>> Properties properties = new Properties(); > >>> properties.setProperty("bootstrap.servers", "localhost:9092"); > >>> properties.setProperty("group.id", "test1`"); > >>> > >>> FlinkKafkaConsumerBase<String> consumer = new > >>> FlinkKafkaConsumer<>( > >>> "team", > >>> new SimpleStringSchema(), > >>> properties > >>> ).setStartFromEarliest(); > >>> > >>> DataStream<String> ds = streamEnv.addSource(consumer); > >>> > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>> "update_time", "op"}; > >>> TypeInformation[] types = {Types.INT, Types.STRING, > >>> Types.STRING, Types.STRING, Types.STRING}; > >>> DataStream<Row> ds2 = ds.map(str -> { > >>> JSONObject jsonObject = JSON.parseObject(str); > >>> String op = jsonObject.getString("op"); > >>> JSONObject data = jsonObject.getJSONObject("data"); > >>> int arity = fieldNames.length; > >>> Row row = new Row(arity); > >>> row.setField(0, data.get("team_id")); > >>> row.setField(1, data.get("team_name")); > >>> row.setField(2, data.get("create_time")); > >>> row.setField(3, data.get("update_time")); > >>> String operation = getOperation(op); > >>> row.setField(4, operation); > >>> > >>> return row; > >>> }, new RowTypeInfo(types, fieldNames)) > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> *.assignTimestampsAndWatermarks(new > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > >>> @Override public long extractTimestamp(Row row) { > >>> String dt = (String) row.getField(2); LocalDateTime ldt > = > >>> LocalDateTime.parse(dt, dtf); Instant instant = > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > >>> timeInMillis = instant.toEpochMilli(); return > timeInMillis; > >>> } });* > >>> > >>> tableEnv.registerDataStream("merged_team", ds2); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>> > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time STRING,\n" + > >>> " update_time STRING,\n" + > >>> " op STRING\n" + > >>> ") PARTITIONED BY (\n" + > >>> " dt STRING,\n" + > >>> " hr STRING,\n" + > >>> " mi STRING\n" + > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>> " 'sink.partition-commit.trigger' = > >>> 'partition-time',\n" + > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>> " 'sink.partition-commit.policy.kind' = > >>> 'metastore,success-file',\n" + > >>> " 'partition.time-extractor.timestamp-pattern' = '$dt > >>> $hr:$mi:00'\n" + > >>> ")"); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time, > >>> op, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'HH') as hr, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'mm') as mi \n" + > >>> "FROM merged_team"); > >>> tableEnv.execute("MysqlCDC2Hive2"); > >>> > >>> streamEnv.execute(""); > >>> } > >>> > >>> private static String getOperation(String op) { > >>> String operation = "INSERT"; > >>> for (RowKind rk : RowKind.values()) { > >>> if (rk.shortString().equals(op)) { > >>> switch (rk) { > >>> case UPDATE_BEFORE: > >>> case UPDATE_AFTER: > >>> operation = "UPDATE"; > >>> break; > >>> case DELETE: > >>> operation = "DELETE"; > >>> break; > >>> case INSERT: > >>> default: > >>> operation = "INSERT"; > >>> break; > >>> } > >>> break; > >>> } > >>> } > >>> return operation; > >>> } > >>> } > >>> > >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > >>> > >>>> 你检查一下 hive 文件是否正常生成了? > >>>> > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> > >>>> [1]: > >>>> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > >>>> > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > >>>> > >>>>> > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > >>>>> > >>>>> cdc -> kafka示例消息如下 > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > >>>>> > >>>>> import com.alibaba.fastjson.JSON; > >>>>> import com.alibaba.fastjson.JSONObject; > >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>>>> import org.apache.flink.api.common.typeinfo.Types; > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>>>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>>>> import > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > >>>>> import org.apache.flink.table.api.SqlDialect; > >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>>>> import org.apache.flink.types.Row; > >>>>> import org.apache.flink.types.RowKind; > >>>>> > >>>>> import java.time.Duration; > >>>>> import java.util.Properties; > >>>>> > >>>>> public class MysqlCDC2Hive { > >>>>> public static void main(String[] args) throws Exception { > >>>>> StreamExecutionEnvironment streamEnv = > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>>>> > >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>>>> streamEnv.setParallelism(3); > >>>>> streamEnv.enableCheckpointing(60000); > >>>>> > >>>>> EnvironmentSettings tableEnvSettings = > >>>>> EnvironmentSettings.newInstance() > >>>>> .useBlinkPlanner() > >>>>> .inStreamingMode() > >>>>> .build(); > >>>>> StreamTableEnvironment tableEnv = > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>>>> CheckpointingMode.EXACTLY_ONCE); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>>>> Duration.ofMinutes(1)); > >>>>> > >>>>> String catalogName = "hive_catalog"; > >>>>> HiveCatalog catalog = new HiveCatalog( > >>>>> catalogName, > >>>>> "default", > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>>>> "2.3.4" > >>>>> ); > >>>>> tableEnv.registerCatalog(catalogName, catalog); > >>>>> tableEnv.useCatalog(catalogName); > >>>>> > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP,\n" + > >>>>> " proctime as proctime()\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'mysql-cdc',\n" + > >>>>> " 'hostname' = 'localhost',\n" + > >>>>> " 'port' = '3306',\n" + > >>>>> " 'username' = 'root',\n" + > >>>>> " 'password' = 'root',\n" + > >>>>> " 'database-name' = 'test',\n" + > >>>>> " 'table-name' = 'team'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'kafka',\n" + > >>>>> " 'topic' = 'team',\n" + > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>>>> " 'properties.bootstrap.servers' = > >>>>> 'localhost:9092',\n" + > >>>>> " 'format' = 'changelog-json'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time > >>>>> \n" + > >>>>> "FROM cdc.team"); > >>>>> > >>>>> // 定义带op字段的stream > >>>>> Properties properties = new Properties(); > >>>>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > >>>>> properties.setProperty("group.id", "test"); > >>>>> > >>>>> FlinkKafkaConsumerBase<String> consumer = new > >>>>> FlinkKafkaConsumer<>( > >>>>> "team", > >>>>> new SimpleStringSchema(), > >>>>> properties > >>>>> ).setStartFromEarliest(); > >>>>> > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > >>>>> > >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>>>> "update_time", "op"}; > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > >>>>> Types.STRING, Types.STRING, Types.STRING}; > >>>>> DataStream<Row> ds2 = ds.map(str -> { > >>>>> JSONObject jsonObject = JSON.parseObject(str); > >>>>> String op = jsonObject.getString("op"); > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > >>>>> int arity = fieldNames.length; > >>>>> Row row = new Row(arity); > >>>>> row.setField(0, data.get("team_id")); > >>>>> row.setField(1, data.get("team_name")); > >>>>> row.setField(2, data.get("create_time")); > >>>>> row.setField(3, data.get("update_time")); > >>>>> String operation = getOperation(op); > >>>>> row.setField(4, operation); > >>>>> > >>>>> return row; > >>>>> }, new RowTypeInfo(types, fieldNames)); > >>>>> > >>>>> tableEnv.registerDataStream("merged_team", ds2); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>>>> > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time STRING,\n" + > >>>>> " update_time STRING,\n" + > >>>>> " op STRING\n" + > >>>>> // ") PARTITIONED BY (\n" + > >>>>> // " ts_date STRING,\n" + > >>>>> // " ts_hour STRING,\n" + > >>>>> // " ts_minute STRING\n" + > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>>>> " 'sink.partition-commit.trigger' = > >>>>> 'partition-time',\n" + > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>>>> " 'sink.partition-commit.policy.kind' = > >>>>> 'metastore,success-file',\n" + > >>>>> " 'partition.time-extractor.timestamp-pattern' = > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time, > >>>>> op \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > >>>>> "FROM merged_team"); > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > >>>>> > >>>>> streamEnv.execute(""); > >>>>> } > >>>>> > >>>>> private static String getOperation(String op) { > >>>>> String operation = "INSERT"; > >>>>> for (RowKind rk : RowKind.values()) { > >>>>> if (rk.shortString().equals(op)) { > >>>>> switch (rk) { > >>>>> case UPDATE_BEFORE: > >>>>> case UPDATE_AFTER: > >>>>> operation = "UPDATE"; > >>>>> break; > >>>>> case DELETE: > >>>>> operation = "DELETE"; > >>>>> break; > >>>>> case INSERT: > >>>>> default: > >>>>> operation = "INSERT"; > >>>>> break; > >>>>> } > >>>>> break; > >>>>> } > >>>>> } > >>>>> return operation; > >>>>> } > >>>>> } > >>>>> > >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > >>>>> > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > >>>>>> > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > >>>>>> > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > >>>>>> > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > >>>>>> > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > >>>>>>> > >>>>>>> > >>>>>>> | | > >>>>>>> 罗显宴 > >>>>>>> | > >>>>>>> | > >>>>>>> 邮箱:[hidden email] > >>>>>>> | > >>>>>>> > >>>>>>> 签名由 网易邮箱大师 定制 > >>>>>>> > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > >>>>>>> > >>>>>>> Exception in thread "main" > org.apache.flink.table.api.TableException: > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > >>>>>>> changes > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > cdc, > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > >>>>>>> > >>>>>>> 我的问题: > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > >>>>>>> kafka,然后kafka > >>>>>>> -> hive streaming? 谢谢! > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > >>>>>>> > >>>>>>> sql语句如下 > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS cdc.team > >>>>>>> > >>>>>>> CREATE TABLE team( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> proctime as proctime() > >>>>>>> ) WITH ( > >>>>>>> 'connector' = 'mysql-cdc', > >>>>>>> 'hostname' = 'localhost', > >>>>>>> 'port' = '3306', > >>>>>>> 'username' = 'root', > >>>>>>> 'password' = 'root', > >>>>>>> 'database-name' = 'test', > >>>>>>> 'table-name' = 'team' > >>>>>>> ) > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS ods.team > >>>>>>> > >>>>>>> CREATE TABLE ods.team ( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> ) PARTITIONED BY ( > >>>>>>> ts_date STRING, > >>>>>>> ts_hour STRING, > >>>>>>> ts_minute STRING, > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > >>>>>>> 'sink.partition-commit.delay' = '1 min', > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > >>>>>>> $ts_hour:$ts_minute:00' > >>>>>>> ) > >>>>>>> > >>>>>>> INSERT INTO ods.team > >>>>>>> SELECT team_id, team_name, create_time, update_time, > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > >>>>>>> FROM cdc.team > >>>>>>> > >>>>>> > Best regards! Rui Li |
谢谢各位耐心解答,问题终于找到了。
通过开启debug日志并且在PartitionTimeCommitTrigger类的commitablePartitions方法上添加了断点,运行程序发现在执行127行 LocalDateTime partTime = extractor.extract(partitionKeys, extractPartitionValues(new Path(partition))); 代码时抛了个异常 java.time.format.DateTimeParseException: Text '20201101 17:14:00' could not be parsed at index 8 追踪下来发现是时间格式问题导致提交分区失败。具体出问题的一行代码是 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), '*yyyyMMdd*') as dt, \n" + 应该改成 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd HH:mm:ss'), ' *yyyy-MM-dd*') as dt, \n" + 翻看了flink官方文档对于参数 'partition.time-extractor.timestamp-pattern' 的说明,支持的格式是 yyyy-MM-dd HH:mm:ss,所以dt部分应该用yyyy-MM-dd表示,而不是yyyyMMdd https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html 再一次谢谢各位! [hidden email] <[hidden email]> 于2020年11月2日周一 下午4:07写道: > 这个utc时间怎么设置,不能查看到hive数据的根本原因是 分区信息没有更新到metastore ; > 你会发现文件生成了但是没有 _SUCCESS文件; > 但是这样指定也不行?? > tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8)); > > 它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据 > > > > [hidden email] > > 发件人: [hidden email] > 发送时间: 2020-11-02 13:37 > 收件人: user-zh > 主题: Re: Re: flink mysql cdc + hive streaming疑问 > 你好! > 看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢? > > > > > 发件人: Rui Li > 发送时间: 2020-11-02 10:38 > 收件人: user-zh > 抄送: Jark Wu > 主题: Re: flink mysql cdc + hive streaming疑问 > Hi, > 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success > file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 > LOG.info("Partition {} of table {} is ready to be committed", > partSpec, tableIdentifier); > LOG.info("Committed partition {} to metastore", partitionSpec); > LOG.info("Committed partition {} with success file", > context.partitionSpec()); > On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <[hidden email]> wrote: > > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:24写道: > > > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > > > 这是生成的hive表建表语句 > > > > > > hive> show create table team; > > > OK > > > CREATE TABLE `team`( > > > `team_id` int, > > > `team_name` string, > > > `create_time` string, > > > `update_time` string, > > > `op` string) > > > PARTITIONED BY ( > > > `dt` string, > > > `hr` string, > > > `mi` string) > > > ROW FORMAT SERDE > > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > > STORED AS INPUTFORMAT > > > 'org.apache.hadoop.mapred.TextInputFormat' > > > OUTPUTFORMAT > > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > > LOCATION > > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > > TBLPROPERTIES ( > > > 'is_generic'='false', > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > > 'sink.partition-commit.delay'='1 min', > > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > > 'sink.partition-commit.trigger'='partition-time', > > > 'transient_lastDdlTime'='1604222266') > > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > > > 另外,下载了hive文件内容如下 > > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > > 11:25:38<0x01>INSERT > > > > > > 还是查询不到结果 > > > hive> select * from team; > > > OK > > > Time taken: 0.326 seconds > > > > > > 陈帅 <[hidden email]> 于2020年11月1日周日 下午5:10写道: > > > > > >> > > >> > > > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > > >> 生成的hive分区文件路径类似于 > > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > > >> > > >> 陈帅 <[hidden email]> 于2020年11月1日周日 下午4:43写道: > > >> > > >>> > > > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > > >>> shell查不到数据。 > > >>> > > >>> import com.alibaba.fastjson.JSON; > > >>> import com.alibaba.fastjson.JSONObject; > > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >>> import org.apache.flink.api.common.typeinfo.Types; > > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >>> import org.apache.flink.streaming.api.CheckpointingMode; > > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > > >>> import org.apache.flink.streaming.api.datastream.DataStream; > > >>> import > > >>> > > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > > >>> import > > >>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >>> import > > >>> > > > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > > >>> import org.apache.flink.streaming.api.windowing.time.Time; > > >>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > > >>> import > > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > > >>> import org.apache.flink.table.api.EnvironmentSettings; > > >>> import org.apache.flink.table.api.SqlDialect; > > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > > >>> import org.apache.flink.types.Row; > > >>> import org.apache.flink.types.RowKind; > > >>> > > >>> import java.time.Duration; > > >>> import java.time.Instant; > > >>> import java.time.LocalDateTime; > > >>> import java.time.ZoneId; > > >>> import java.time.format.DateTimeFormatter; > > >>> import java.util.Properties; > > >>> > > >>> public class MysqlCDC2Hive { > > >>> > > >>> private static final DateTimeFormatter dtf = > > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > > >>> > > >>> public static void main(String[] args) throws Exception { > > >>> StreamExecutionEnvironment streamEnv = > > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>> > > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >>> streamEnv.setParallelism(3); > > >>> streamEnv.enableCheckpointing(60000); > > >>> > > >>> EnvironmentSettings tableEnvSettings = > > >>> EnvironmentSettings.newInstance() > > >>> .useBlinkPlanner() > > >>> .inStreamingMode() > > >>> .build(); > > >>> StreamTableEnvironment tableEnv = > > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > >>> > > >>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > > >>> CheckpointingMode.EXACTLY_ONCE); > > >>> > > >>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > > >>> Duration.ofMinutes(1)); > > >>> > > >>> String catalogName = "hive_catalog"; > > >>> HiveCatalog catalog = new HiveCatalog( > > >>> catalogName, > > >>> "default", > > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > > >>> "2.3.4" > > >>> ); > > >>> tableEnv.registerCatalog(catalogName, catalog); > > >>> tableEnv.useCatalog(catalogName); > > >>> > > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time TIMESTAMP,\n" + > > >>> " update_time TIMESTAMP,\n" + > > >>> " proctime as proctime()\n" + > > >>> ") WITH (\n" + > > >>> " 'connector' = 'mysql-cdc',\n" + > > >>> " 'hostname' = 'localhost',\n" + > > >>> " 'port' = '3306',\n" + > > >>> " 'username' = 'root',\n" + > > >>> " 'password' = 'root',\n" + > > >>> " 'database-name' = 'test',\n" + > > >>> " 'table-name' = 'team'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time TIMESTAMP,\n" + > > >>> " update_time TIMESTAMP\n" + > > >>> ") WITH (\n" + > > >>> " 'connector' = 'kafka',\n" + > > >>> " 'topic' = 'team',\n" + > > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > > >>> " 'properties.bootstrap.servers' = > > 'localhost:9092',\n" > > >>> + > > >>> " 'format' = 'changelog-json'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > > >>> "SELECT team_id, team_name, create_time, update_time > > \n" > > >>> + > > >>> "FROM cdc.team"); > > >>> > > >>> // 定义带op字段的stream > > >>> Properties properties = new Properties(); > > >>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > > >>> properties.setProperty("group.id", "test1`"); > > >>> > > >>> FlinkKafkaConsumerBase<String> consumer = new > > >>> FlinkKafkaConsumer<>( > > >>> "team", > > >>> new SimpleStringSchema(), > > >>> properties > > >>> ).setStartFromEarliest(); > > >>> > > >>> DataStream<String> ds = streamEnv.addSource(consumer); > > >>> > > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > > >>> "update_time", "op"}; > > >>> TypeInformation[] types = {Types.INT, Types.STRING, > > >>> Types.STRING, Types.STRING, Types.STRING}; > > >>> DataStream<Row> ds2 = ds.map(str -> { > > >>> JSONObject jsonObject = JSON.parseObject(str); > > >>> String op = jsonObject.getString("op"); > > >>> JSONObject data = jsonObject.getJSONObject("data"); > > >>> int arity = fieldNames.length; > > >>> Row row = new Row(arity); > > >>> row.setField(0, data.get("team_id")); > > >>> row.setField(1, data.get("team_name")); > > >>> row.setField(2, data.get("create_time")); > > >>> row.setField(3, data.get("update_time")); > > >>> String operation = getOperation(op); > > >>> row.setField(4, operation); > > >>> > > >>> return row; > > >>> }, new RowTypeInfo(types, fieldNames)) > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> *.assignTimestampsAndWatermarks(new > > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > > >>> @Override public long extractTimestamp(Row row) { > > >>> String dt = (String) row.getField(2); LocalDateTime > ldt > > = > > >>> LocalDateTime.parse(dt, dtf); Instant instant = > > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > > >>> timeInMillis = instant.toEpochMilli(); return > > timeInMillis; > > >>> } });* > > >>> > > >>> tableEnv.registerDataStream("merged_team", ds2); > > >>> > > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > >>> > > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time STRING,\n" + > > >>> " update_time STRING,\n" + > > >>> " op STRING\n" + > > >>> ") PARTITIONED BY (\n" + > > >>> " dt STRING,\n" + > > >>> " hr STRING,\n" + > > >>> " mi STRING\n" + > > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > > >>> " 'sink.partition-commit.trigger' = > > >>> 'partition-time',\n" + > > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > > >>> " 'sink.partition-commit.policy.kind' = > > >>> 'metastore,success-file',\n" + > > >>> " 'partition.time-extractor.timestamp-pattern' = > '$dt > > >>> $hr:$mi:00'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > > >>> "SELECT team_id, team_name, create_time, update_time, > > >>> op, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'HH') as hr, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'mm') as mi \n" + > > >>> "FROM merged_team"); > > >>> tableEnv.execute("MysqlCDC2Hive2"); > > >>> > > >>> streamEnv.execute(""); > > >>> } > > >>> > > >>> private static String getOperation(String op) { > > >>> String operation = "INSERT"; > > >>> for (RowKind rk : RowKind.values()) { > > >>> if (rk.shortString().equals(op)) { > > >>> switch (rk) { > > >>> case UPDATE_BEFORE: > > >>> case UPDATE_AFTER: > > >>> operation = "UPDATE"; > > >>> break; > > >>> case DELETE: > > >>> operation = "DELETE"; > > >>> break; > > >>> case INSERT: > > >>> default: > > >>> operation = "INSERT"; > > >>> break; > > >>> } > > >>> break; > > >>> } > > >>> } > > >>> return operation; > > >>> } > > >>> } > > >>> > > >>> Jark Wu <[hidden email]> 于2020年11月1日周日 上午11:04写道: > > >>> > > >>>> 你检查一下 hive 文件是否正常生成了? > > >>>> > > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > > >>>> > > >>>> Best, > > >>>> Jark > > >>>> > > >>>> > > >>>> [1]: > > >>>> > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > > >>>> > > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[hidden email]> wrote: > > >>>> > > >>>>> > > > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > > >>>>> > > >>>>> cdc -> kafka示例消息如下 > > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > > >>>>> > > >>>>> import com.alibaba.fastjson.JSON; > > >>>>> import com.alibaba.fastjson.JSONObject; > > >>>>> import > org.apache.flink.api.common.serialization.SimpleStringSchema; > > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >>>>> import org.apache.flink.api.common.typeinfo.Types; > > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > > >>>>> import > > >>>>> > > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > > >>>>> import > > >>>>> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >>>>> import > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > > >>>>> import > > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > > >>>>> import org.apache.flink.table.api.SqlDialect; > > >>>>> import > org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > > >>>>> import org.apache.flink.types.Row; > > >>>>> import org.apache.flink.types.RowKind; > > >>>>> > > >>>>> import java.time.Duration; > > >>>>> import java.util.Properties; > > >>>>> > > >>>>> public class MysqlCDC2Hive { > > >>>>> public static void main(String[] args) throws Exception { > > >>>>> StreamExecutionEnvironment streamEnv = > > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>>>> > > >>>>> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >>>>> streamEnv.setParallelism(3); > > >>>>> streamEnv.enableCheckpointing(60000); > > >>>>> > > >>>>> EnvironmentSettings tableEnvSettings = > > >>>>> EnvironmentSettings.newInstance() > > >>>>> .useBlinkPlanner() > > >>>>> .inStreamingMode() > > >>>>> .build(); > > >>>>> StreamTableEnvironment tableEnv = > > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > >>>>> > > >>>>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > > >>>>> CheckpointingMode.EXACTLY_ONCE); > > >>>>> > > >>>>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > > >>>>> Duration.ofMinutes(1)); > > >>>>> > > >>>>> String catalogName = "hive_catalog"; > > >>>>> HiveCatalog catalog = new HiveCatalog( > > >>>>> catalogName, > > >>>>> "default", > > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > > >>>>> "2.3.4" > > >>>>> ); > > >>>>> tableEnv.registerCatalog(catalogName, catalog); > > >>>>> tableEnv.useCatalog(catalogName); > > >>>>> > > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time TIMESTAMP,\n" + > > >>>>> " update_time TIMESTAMP,\n" + > > >>>>> " proctime as proctime()\n" + > > >>>>> ") WITH (\n" + > > >>>>> " 'connector' = 'mysql-cdc',\n" + > > >>>>> " 'hostname' = 'localhost',\n" + > > >>>>> " 'port' = '3306',\n" + > > >>>>> " 'username' = 'root',\n" + > > >>>>> " 'password' = 'root',\n" + > > >>>>> " 'database-name' = 'test',\n" + > > >>>>> " 'table-name' = 'team'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time TIMESTAMP,\n" + > > >>>>> " update_time TIMESTAMP\n" + > > >>>>> ") WITH (\n" + > > >>>>> " 'connector' = 'kafka',\n" + > > >>>>> " 'topic' = 'team',\n" + > > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > > >>>>> " 'properties.bootstrap.servers' = > > >>>>> 'localhost:9092',\n" + > > >>>>> " 'format' = 'changelog-json'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > > >>>>> "SELECT team_id, team_name, create_time, > update_time > > >>>>> \n" + > > >>>>> "FROM cdc.team"); > > >>>>> > > >>>>> // 定义带op字段的stream > > >>>>> Properties properties = new Properties(); > > >>>>> properties.setProperty("bootstrap.servers", > > "localhost:9092"); > > >>>>> properties.setProperty("group.id", "test"); > > >>>>> > > >>>>> FlinkKafkaConsumerBase<String> consumer = new > > >>>>> FlinkKafkaConsumer<>( > > >>>>> "team", > > >>>>> new SimpleStringSchema(), > > >>>>> properties > > >>>>> ).setStartFromEarliest(); > > >>>>> > > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > > >>>>> > > >>>>> String[] fieldNames = {"team_id", "team_name", > "create_time", > > >>>>> "update_time", "op"}; > > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > > >>>>> Types.STRING, Types.STRING, Types.STRING}; > > >>>>> DataStream<Row> ds2 = ds.map(str -> { > > >>>>> JSONObject jsonObject = JSON.parseObject(str); > > >>>>> String op = jsonObject.getString("op"); > > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > > >>>>> int arity = fieldNames.length; > > >>>>> Row row = new Row(arity); > > >>>>> row.setField(0, data.get("team_id")); > > >>>>> row.setField(1, data.get("team_name")); > > >>>>> row.setField(2, data.get("create_time")); > > >>>>> row.setField(3, data.get("update_time")); > > >>>>> String operation = getOperation(op); > > >>>>> row.setField(4, operation); > > >>>>> > > >>>>> return row; > > >>>>> }, new RowTypeInfo(types, fieldNames)); > > >>>>> > > >>>>> tableEnv.registerDataStream("merged_team", ds2); > > >>>>> > > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time STRING,\n" + > > >>>>> " update_time STRING,\n" + > > >>>>> " op STRING\n" + > > >>>>> // ") PARTITIONED BY (\n" + > > >>>>> // " ts_date STRING,\n" + > > >>>>> // " ts_hour STRING,\n" + > > >>>>> // " ts_minute STRING\n" + > > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > > >>>>> " 'sink.partition-commit.trigger' = > > >>>>> 'partition-time',\n" + > > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > > >>>>> " 'sink.partition-commit.policy.kind' = > > >>>>> 'metastore,success-file',\n" + > > >>>>> " 'partition.time-extractor.timestamp-pattern' = > > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > > >>>>> "SELECT team_id, team_name, create_time, > update_time, > > >>>>> op \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > > >>>>> "FROM merged_team"); > > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > > >>>>> > > >>>>> streamEnv.execute(""); > > >>>>> } > > >>>>> > > >>>>> private static String getOperation(String op) { > > >>>>> String operation = "INSERT"; > > >>>>> for (RowKind rk : RowKind.values()) { > > >>>>> if (rk.shortString().equals(op)) { > > >>>>> switch (rk) { > > >>>>> case UPDATE_BEFORE: > > >>>>> case UPDATE_AFTER: > > >>>>> operation = "UPDATE"; > > >>>>> break; > > >>>>> case DELETE: > > >>>>> operation = "DELETE"; > > >>>>> break; > > >>>>> case INSERT: > > >>>>> default: > > >>>>> operation = "INSERT"; > > >>>>> break; > > >>>>> } > > >>>>> break; > > >>>>> } > > >>>>> } > > >>>>> return operation; > > >>>>> } > > >>>>> } > > >>>>> > > >>>>> Jark Wu <[hidden email]> 于2020年10月31日周六 下午1:45写道: > > >>>>> > > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 > hive > > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > > >>>>>> > > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive > streaming > > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > > >>>>>> > > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > > >>>>>> > > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > > >>>>>> > > >>>>>> Best, > > >>>>>> Jark > > >>>>>> > > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[hidden email]> wrote: > > >>>>>> > > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > > >>>>>>> > > >>>>>>> > > >>>>>>> | | > > >>>>>>> 罗显宴 > > >>>>>>> | > > >>>>>>> | > > >>>>>>> 邮箱:[hidden email] > > >>>>>>> | > > >>>>>>> > > >>>>>>> 签名由 网易邮箱大师 定制 > > >>>>>>> > > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > > >>>>>>> > > >>>>>>> Exception in thread "main" > > org.apache.flink.table.api.TableException: > > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > > >>>>>>> changes > > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > > cdc, > > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > > >>>>>>> > > >>>>>>> 我的问题: > > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > > >>>>>>> kafka,然后kafka > > >>>>>>> -> hive streaming? 谢谢! > > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > > >>>>>>> > > >>>>>>> sql语句如下 > > >>>>>>> > > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > > >>>>>>> > > >>>>>>> DROP TABLE IF EXISTS cdc.team > > >>>>>>> > > >>>>>>> CREATE TABLE team( > > >>>>>>> team_id BIGINT, > > >>>>>>> team_name STRING, > > >>>>>>> create_time TIMESTAMP, > > >>>>>>> update_time TIMESTAMP, > > >>>>>>> proctime as proctime() > > >>>>>>> ) WITH ( > > >>>>>>> 'connector' = 'mysql-cdc', > > >>>>>>> 'hostname' = 'localhost', > > >>>>>>> 'port' = '3306', > > >>>>>>> 'username' = 'root', > > >>>>>>> 'password' = 'root', > > >>>>>>> 'database-name' = 'test', > > >>>>>>> 'table-name' = 'team' > > >>>>>>> ) > > >>>>>>> > > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > > >>>>>>> > > >>>>>>> DROP TABLE IF EXISTS ods.team > > >>>>>>> > > >>>>>>> CREATE TABLE ods.team ( > > >>>>>>> team_id BIGINT, > > >>>>>>> team_name STRING, > > >>>>>>> create_time TIMESTAMP, > > >>>>>>> update_time TIMESTAMP, > > >>>>>>> ) PARTITIONED BY ( > > >>>>>>> ts_date STRING, > > >>>>>>> ts_hour STRING, > > >>>>>>> ts_minute STRING, > > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > > >>>>>>> 'sink.partition-commit.delay' = '1 min', > > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > > >>>>>>> $ts_hour:$ts_minute:00' > > >>>>>>> ) > > >>>>>>> > > >>>>>>> INSERT INTO ods.team > > >>>>>>> SELECT team_id, team_name, create_time, update_time, > > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > > >>>>>>> FROM cdc.team > > >>>>>>> > > >>>>>> > > > -- > Best regards! > Rui Li > |
Free forum by Nabble | Edit this page |