flink 1.11.2 fileSystem source table 读取 fileSystem sink table 分区错误问题

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11.2 fileSystem source table 读取 fileSystem sink table 分区错误问题

范未太
1.问题描述

基于flink filesystem connect 创建create table  source_test(id string,name string dayno sring,`hour` string)  partitioned (dayno ,`hour`) with('connector'='filesystm',path='xxxxx/data/')
报错堆栈如下:
|
ava.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66)
Caused by: java.util.NoSuchElementException: key not found: hour
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
at org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
at org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
at org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
at org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at com.oppo.recdata.datapipe.flink.table.FlinkTableExecution.start(FlinkTableExecution.java:52)
at com.oppo.recdata.datapipe.Datapipe.entryPoint(Datapipe.java:110)
at com.oppo.recdata.datapipe.Datapipe.run(Datapipe.java:48)
at com.oppo.recdata.datapipe.DatapipeFlink.main(DatapipeFlink.java:13)
... 5 more

|




FileSystemTableSource 源代码:
|
public List<Map<String, String>> getPartitions() {
try {
return (List)PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(), this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
return (LinkedHashMap)tuple2.f0;
}).map((spec) -> {
                LinkedHashMap<String, String> ret = new LinkedHashMap();
spec.forEach((k, v) -> {
                    String var10000 = (String)ret.put(k, this.defaultPartName.equals(v) ? null : v);
});
                return ret;
}).collect(Collectors.toList());
} catch (Exception var2) {
throw new TableException("Fetch partitions fail.", var2);
}
    }

public FileSystemTableSource applyPartitionPruning(List<Map<String, String>> remainingPartitions) {
return new FileSystemTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.properties, remainingPartitions, this.selectFields, this.limit, this.filters);
}

public FileSystemTableSource projectFields(int[] fields) {
return new FileSystemTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.properties, this.readPartitions, fields, this.limit, this.filters);
}

public FileSystemTableSource applyLimit(long limit) {
return new FileSystemTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.properties, this.readPartitions, this.selectFields, limit, this.filters);
}

public boolean isLimitPushedDown() {
return this.limit != null;
}

public FileSystemTableSource applyPredicate(List<Expression> predicates) {
return new FileSystemTableSource(this.schema, this.path, this.partitionKeys, this.defaultPartName, this.properties, this.readPartitions, this.selectFields, this.limit, new ArrayList(predicates));
}

public boolean isFilterPushedDown() {
return this.filters != null;
}

private int[] readFields() {
return this.selectFields == null ? IntStream.range(0, this.schema.getFieldCount()).toArray() : this.selectFields;
}

public DataType getProducedDataType() {
int[] fields = this.readFields();
String[] schemaFieldNames = this.schema.getFieldNames();
DataType[] schemaTypes = this.schema.getFieldDataTypes();
        return (DataType)DataTypes.ROW((Field[])Arrays.stream(fields).mapToObj((i) -> {
return DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]);
}).toArray((x$0) -> {
return new Field[x$0];
})).bridgedTo(RowData.class);
}

public TableSchema getTableSchema() {
return this.schema;
}

public String explainSource() {
return TableConnectorUtils.generateRuntimeName(this.getClass(), this.getTableSchema().getFieldNames()) + (this.readPartitions == null ? "" : ", readPartitions=" + this.readPartitions) + (this.selectFields == null ? "" : ", selectFields=" + Arrays.toString(this.selectFields)) + (this.limit == null ? "" : ", limit=" + this.limit) + (this.filters == null ? "" : ", filters=" + this.filtersString());
}

private String filtersString() {
return (String)this.filters.stream().map(Expression::asSummaryString).collect(Collectors.joining(","));
}
}


|




2、报错主要是获取分区路径拿到了.开头的目录导致分区字段与路径不匹配

|

public List<Map<String, String>> getPartitions() {
try {
return (List)PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(), this.path, this.partitionKeys.size()).stream().map((tuple2) -> {
return (LinkedHashMap)tuple2.f0;
}).map((spec) -> {
LinkedHashMap<String, String> ret = new LinkedHashMap();
spec.forEach((k, v) -> {
String var10000 = (String)ret.put(k, this.defaultPartName.equals(v) ? null : v);
});
return ret;
}).collect(Collectors.toList());
} catch (Exception var2) {
throw new TableException("Fetch partitions fail.", var2);
}
}








|




3.searchPartSpecAndPaths代码


|

public static List<Tuple2<LinkedHashMap<String, String>, Path>> searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {

//根据分去字段个数递归获得分区目录
FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, fs);

//数据 generatedParts

// 数据目录 hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0

// 数据目录 hdfs://xxx-hdfs/merge/all/dayno=20210531/hour-11

List<Tuple2<LinkedHashMap<String, String>, Path>> ret = new ArrayList();
FileStatus[] var5 = generatedParts;
int var6 = generatedParts.length;

for(int var7 = 0; var7 < var6; ++var7) {
FileStatus part = var5[var7];

//判断是否是合理的分区
if (!isHiddenFile(part)) {
ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), part.getPath()));
}
}

return ret;
}




|

4.递归代码:

|
private static FileStatus[] getFileStatusRecurse(Path path, int expectLevel, FileSystem fs) {
    ArrayList result = new ArrayList();

    try {
        FileStatus fileStatus = fs.getFileStatus(path);
listStatusRecursively(fs, fileStatus, 0, expectLevel, result);
} catch (IOException var5) {
return new FileStatus[0];
}

return (FileStatus[])result.toArray(new FileStatus[0]);
}


|

5.获取文件路径

|
//这儿只取了最后一个文件名判断 所以两次分区的情况下.开头的路径还是读了进来


private static boolean isHiddenFile(FileStatus fileStatus) {
    String name = fileStatus.getPath().getName();
    return name.startsWith("_") || name.startsWith(".");
}


|




基于上述问题描述

1.有没有大佬遇到过类似的问题,以及如何解决的

2.sink table 写数据的时候 .staging_1622167234684文件如何保证写入成功和高效,完成任务后清空

3.分区读取是不是要根据分区个数获取对应层级的文件名称判断是否是一个合理的path