flink.version:1.13
hive sink时发现两个问题
1. 不能设置parquet.compression
翻阅源码,sink hive时设置table.exec.hive.fallback-mapred-writer=false,就跟filesyetem sink是一样的Use native parquet&orc writer,但如果connector=filesystem时可以设置parquet.compression=snappy,在hive sink时,却是从jobconf或者sd.getSerdeInfo().getParameters()中获取,如下代码,但在实际hive表中parquet.compression=snappy却在tableproperties字段中,因此formatConf中并不能获取到parquet.compression信息,在filesytem sink时是可以从ReadableConfig获取到parquet.compression的,设置parquet.compression=snappy没问题
```
Configuration formatConf = new Configuration(jobConf);
sd.getSerdeInfo().getParameters().forEach(formatConf::set);
return Optional.of(
ParquetRowDataBuilder.createWriterFactory(
formatType, formatConf, hiveVersion.startsWith("3.")));
//filesystem table or hive table
CREATE TABLE result_filesystem (
user_id INT,
order_amount DECIMAL(5,2),
dt string,
hr string
) PARTITIONED BY (dt, hr)
WITH (
'connector' = 'filesystem',
'path' = 'hdfs://xxx',
'format' = 'parquet',
'parquet.compression'='snappy'
);
```
https://issues.apache.org/jira/browse/FLINK-277772. filesyetem sink和hive native parquet sink实际写入的数据所占空间相差很大
设置相同数据写入,发现filesystem sink不设置parquet.compression 和 hive sink每次checkpoint时单个文件所占空间相差很大