flink 使用 StreamingFileSink 向catalog中增加分区失败

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink 使用 StreamingFileSink 向catalog中增加分区失败

MuChen

hi, all:

麻烦大佬们帮看个问题,多谢!

处理逻辑

  1. 使用DataStream API读取kafka中的数据,写入DataStream ds1中
  2. 新建一个tableEnv,并注册hive catalog:
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
  1. 声明以ds1为数据源的table
Table sourcetable = tableEnv.fromDataStream(ds1);
String souceTableName = "music_source";
tableEnv.createTemporaryView(souceTableName, sourcetable);
  1. 创建一张hive表:
CREATE TABLE `dwd_music_copyright_test`(
  `url` string COMMENT 'url',
  `md5` string COMMENT 'md5',
  `utime` bigint COMMENT '时间',
  `title` string COMMENT '歌曲名',
  `singer` string COMMENT '演唱者',
  `company` string COMMENT '公司',
  `level` int COMMENT '置信度.0是标题切词,1acrcloud返回的结果,3是人工标准')
PARTITIONED BY (
  `dt` string,
  `hour` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test'
TBLPROPERTIES (
  'connector'='HiveCatalog',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'sink.partition-commit.trigger'='partition-time',
  'sink.rolling-policy.check-interval'='30s',
  'sink.rolling-policy.rollover-interval'='1min',
  'sink.rolling-policy.file-size'='1MB');
  1. 将step3表中的数据插入dwd_music_copyright_test

环境

flink:1.11
kafka:1.1.1
hadoop:2.6.0
hive:1.2.0

问题

程序运行后,发现hive catalog部分分区未成功创建,如下未成功创建hour=02和hour=03分区:

show partitions rt_dwd.dwd_music_copyright_test;

| dt=2020-08-29/hour=00  |
| dt=2020-08-29/hour=01  |
| dt=2020-08-29/hour=04  |
| dt=2020-08-29/hour=05  |

但是hdfs目录下有文件生成:

$ hadoop fs -du -h /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/
4.5 K   13.4 K  /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00
2.0 K   6.1 K   /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01
1.7 K   5.1 K   /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02
1.3 K   3.8 K   /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03
3.1 K   9.2 K   /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04

且手动add partition后可以正常读取数据。

通过flink WebUI可以看到,过程中有checkpoint在StreamingFileCommitter时失败的情况发生:



请问:

  1. exactly-once只能保证写sink文件,不能保证更新catalog吗?
  2. 是的话有什么方案解决这个问题吗?
  3. EXACTLY_ONCE有没有必要指定kafka参数isolation.level=read_committed和enable.auto.commit=false?是不是有了如下设置就可以保证EXACTLY_ONCE?
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);