Can not set compression.type when sink hive table

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Can not set compression.type when sink hive table

smallwong
flink.version:1.13
hive sink时发现两个问题

1. 不能设置parquet.compression
翻阅源码,sink hive时设置table.exec.hive.fallback-mapred-writer=false,就跟filesyetem sink是一样的Use native parquet&orc writer,但如果connector=filesystem时可以设置parquet.compression=snappy,在hive sink时,却是从jobconf或者sd.getSerdeInfo().getParameters()中获取,如下代码,但在实际hive表中parquet.compression=snappy却在tableproperties字段中,因此formatConf中并不能获取到parquet.compression信息,在filesytem sink时是可以从ReadableConfig获取到parquet.compression的,设置parquet.compression=snappy没问题

```
Configuration formatConf = new Configuration(jobConf);
 sd.getSerdeInfo().getParameters().forEach(formatConf::set);
return Optional.of(
 ParquetRowDataBuilder.createWriterFactory(
formatType, formatConf, hiveVersion.startsWith("3.")));



//filesystem table or hive table
CREATE TABLE result_filesystem (
    user_id INT,
    order_amount   DECIMAL(5,2),
  dt string,
  hr string
)  PARTITIONED BY (dt, hr)
WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://xxx',
  'format' = 'parquet',
  'parquet.compression'='snappy'
);

```
https://issues.apache.org/jira/browse/FLINK-27777

2. filesyetem sink和hive native parquet sink实际写入的数据所占空间相差很大
设置相同数据写入,发现filesystem sink不设置parquet.compression 和 hive sink每次checkpoint时单个文件所占空间相差很大