Hi,
我们有这样的需求--流式入库后,可以自动添加分区和合并小文件。 参考了网上的自定义合并小文件的分区提交策略[1],经过测试发现。 这个自动以policy用于filesystem connector时可以正常合并文件,并生成目标文件。 由于自带的metastore policy只能用在hive table上,所以又测试了下使用hive catalog往hive table里写数据,经过测试 自动添加分区是ok的,但是合并小文件有点问题--没有合并后的目标目标。而且没有任何异常。 很奇怪的是同样的代码在写hdfs就正常,写hive不行,看了源码写hive底层也是依赖的StreamingFileSink,排查了两天没什么头绪,有没有大佬遇到过这个问题,或者有什么排查的思路。 policy 代码如下: public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy { private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class); @Override public void commit(Context context) throws Exception { LOGGER.info("begin to merge files.partition path is {}.", context.partitionPath().toUri().toString()); Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.partitionPath().toUri().getHost()); FileSystem fs = FileSystem.get(conf); String partitionPath = context.partitionPath().getPath(); List files = listAllFiles(fs, new Path(partitionPath), "part-"); LOGGER.info("{} files in path {}", files.size(), partitionPath);//这里待合并文件数量可以正常打印 MessageType schema = getParquetSchema(files, conf); if (schema == null) { return; } LOGGER.info("Fetched parquet schema: {}", schema.toString());//schema也正常输出 Path result = merge(partitionPath, schema, files, fs); LOGGER.info("Files merged into {}", result.toString()); } private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException { List result = new ArrayList<>(); RemoteIterator dirIterator = fs.listFiles(dir, false); while (dirIterator.hasNext()) { LocatedFileStatus fileStatus = (LocatedFileStatus) dirIterator.next(); Path filePath = fileStatus.getPath(); if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) { result.add(filePath); } } return result; } private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException { if (files.size() == 0) { return null; } HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf); ParquetFileReader reader = ParquetFileReader.open(inputFile); ParquetMetadata metadata = reader.getFooter(); MessageType schema = metadata.getFileMetaData().getSchema(); reader.close(); return schema; } private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException { Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet"); ParquetWriter writer = ExampleParquetWriter.builder(mergeDest) .withType(schema) .withConf(fs.getConf()) .withWriteMode(Mode.OVERWRITE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .withCompressionCodec(CompressionCodecName.SNAPPY) .build(); for (Path file : files) { ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(fs.getConf()) .build(); Group data; while ((data = (Group) reader.read()) != null) { writer.write(data); } reader.close(); } LOGGER.info("data size is [{}]", writer.getDataSize());//数据大小也正常输出 try { writer.close(); } catch (Exception e) { LOGGER.error("flush failed!!!!", e);//没有异常 } if (!fs.exists(mergeDest)) { LOGGER.warn("Fuck! result file not exist."); } for (Path file : files) { fs.delete(file, false); } return mergeDest; } } 粗略看了下ParquetWriter的源码, ParquetWriter writer = ExampleParquetWriter.builder(mergeDest) .withType(schema) .withConf(fs.getConf()) .withWriteMode(Mode.CREATE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .withCompressionCodec(CompressionCodecName.SNAPPY) .build() 在最后build时会创建文件。所以说在这一步创建文件就没成功。 也shi过通过FileSystem.create 创建文件,可以创建但是write也不往里面写。 to hdfs代码: CREATE TABLE test_kafka ( tuid STRING, device STRING, active_time BIGINT, process_time BIGINT, pkg_cn_name STRING, pkg_en_name STRING, os STRING, appid INT, dtu STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test_kafka', 'properties.bootstrap.servers' = ‘xxx:9092', 'properties.group.id' = 'test-1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.flink.partition-discovery.interval-millis' = '300000' ); CREATE TABLE test_hdfs ( `day` STRING, `hour` STRING, tuid STRING, device STRING, active_time BIGINT, process_time BIGINT, pkg_cn_name STRING, pkg_en_name STRING, os STRING, appid INT, dtu STRING ) PARTITIONED BY (`day`, `hour`) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'path' = 'hdfs://xxx/test.db/test_flink_sql', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file,custom', 'sink.partition-commit.success-file.name' = '_SUCCESS', 'sink.partition-commit.policy.class' = 'policy.ParquetFileMergingCommitPolicy' ); insert into test_hdfs select from_unixtime(process_time,'yyyy-MM-dd') as `day`, from_unixtime(process_time,'HH') as `hour`, tuid, device, active_time, process_time, pkg_cn_name, pkg_en_name, os, appid, dtu from test_kafka; To hive 代码: public static void main(String[] args) { StreamExecutionEnvironment env = EnvUtil.OptionEOSEnv(60L); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.getConfig().setIdleStateRetentionTime(Time.minutes(60), Time.minutes(120)); String name = "myhive"; String defaultDatabase = "test"; String hiveConfDir = "/opt/hive-conf"; // a local path HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive"); tableEnv.executeSql("DROP TABLE IF EXISTS test_kafka"); tableEnv.executeSql("CREATE TABLE test_kafka (\n" + " tuid STRING,\n" + " device STRING,\n" + " active_time BIGINT,\n" + " process_time BIGINT,\n" + " pkg_cn_name STRING,\n" + " pkg_en_name STRING,\n" + " os STRING,\n" + " appid INT,\n" + " dtu STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test_kafka',\n" + " 'properties.bootstrap.servers' = 'xxx:9092',\n" + " 'properties.group.id' = 'test-2',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json',\n" + " 'properties.flink.partition-discovery.interval-millis' = '300000'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tableEnv.executeSql("DROP TABLE IF EXISTS test.sink_to_hive"); tableEnv.executeSql("CREATE EXTERNAL TABLE test.sink_to_hive (\n" + " tuid STRING,\n" + " device STRING,\n" + " active_time BIGINT,\n" + " process_time BIGINT,\n" + " pkg_cn_name STRING,\n" + " pkg_en_name STRING,\n" + " os STRING,\n" + " appid INT,\n" + " dtu STRING\n" + ") PARTITIONED BY (`day` STRING, `hour` STRING) STORED AS PARQUET\n" + "TBLPROPERTIES (\n" + " 'parquet.compression'='SNAPPY',\n" + " 'sink.partition-commit.policy.kind' = 'metastore,success-file,custom',\n" + " 'sink.partition-commit.success-file.name' = '_SUCCESS',\n" + " 'sink.partition-commit.policy.class' = 'policy.ParquetFileMergingCommitPolicy'\n" + ")"); tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); tableEnv.executeSql("insert into test.sink_to_hive\n" + "select\n" + " tuid,\n" + " device,\n" + " active_time,\n" + " process_time,\n" + " pkg_cn_name,\n" + " pkg_en_name,\n" + " os,\n" + " appid,\n" + " dtu,\n" + " from_unixtime(process_time,'yyyy-MM-dd') as `day`,\n" + " from_unixtime(process_time,'HH') as `hour`\n" + "from test_kafka"); } [1]https://developer.aliyun.com/article/770822 |
Free forum by Nabble | Edit this page |