hi, all:
麻烦大佬们帮看个问题,多谢!
处理逻辑
- 使用DataStream API读取kafka中的数据,写入DataStream ds1中
- 新建一个tableEnv,并注册hive catalog:
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
- 声明以ds1为数据源的table
Table sourcetable = tableEnv.fromDataStream(ds1);
String souceTableName = "music_source";
tableEnv.createTemporaryView(souceTableName, sourcetable);
- 创建一张hive表:
CREATE TABLE `dwd_music_copyright_test`(
`url` string COMMENT 'url',
`md5` string COMMENT 'md5',
`utime` bigint COMMENT '时间',
`title` string COMMENT '歌曲名',
`singer` string COMMENT '演唱者',
`company` string COMMENT '公司',
`level` int COMMENT '置信度.0是标题切词,1是acrcloud返回的结果,3是人工标准')
PARTITIONED BY (
`dt` string,
`hour` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test'
TBLPROPERTIES (
'connector'='HiveCatalog',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.partition-commit.trigger'='partition-time',
'sink.rolling-policy.check-interval'='30s',
'sink.rolling-policy.rollover-interval'='1min',
'sink.rolling-policy.file-size'='1MB');
- 将step3表中的数据插入dwd_music_copyright_test
环境
flink:1.11
kafka:1.1.1
hadoop:2.6.0
hive:1.2.0
问题
程序运行后,发现hive catalog部分分区未成功创建,如下未成功创建hour=02和hour=03分区:
show partitions rt_dwd.dwd_music_copyright_test;
| dt=2020-08-29/hour=00 |
| dt=2020-08-29/hour=01 |
| dt=2020-08-29/hour=04 |
| dt=2020-08-29/hour=05 |
但是hdfs目录下有文件生成:
$ hadoop fs -du -h /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/
4.5 K 13.4 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00
2.0 K 6.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01
1.7 K 5.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02
1.3 K 3.8 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03
3.1 K 9.2 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04
且手动add partition后可以正常读取数据。
通过flink WebUI可以看到,过程中有checkpoint在StreamingFileCommitter时失败的情况发生:
请问:
- exactly-once只能保证写sink文件,不能保证更新catalog吗?
- 是的话有什么方案解决这个问题吗?
- EXACTLY_ONCE有没有必要指定kafka参数isolation.level=read_committed和enable.auto.commit=false?是不是有了如下设置就可以保证EXACTLY_ONCE?
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);