flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

王智
我在使用flink 1.8 自定义 FileInputFormat 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~




问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么?

相关的代码描述如下




StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
   @SuppressWarnings("unchecked")
   FileInputFormat<OUT&gt; format = (FileInputFormat<OUT&gt;) inputFormat;

   source = createFileInput(format, typeInfo, "Custom File source",
         FileProcessingMode.PROCESS_ONCE, -1);
} else {
   source = createInput(inputFormat, typeInfo, "Custom Source");
}
return source;



createFileInput 方法内 使用 ContinuousFileMonitoringFunction 对 inputFormat 进行处理,在其构造函数中,对 FileInputFormat<OUT&gt; format 进行了一些条件约束


Preconditions.checkArgument(
   format.getFilePaths().length == 1,
   "FileInputFormats with multiple paths are not supported yet.");



这里就将 FileInputFormat 限制为只能添加一个file path。




问题2: 在flink 1.10 版本情况是否有改善?(在 FileInputFormat.supportsMultiPaths 方法中我看到flink 2.0 中,所有的FileInputFormat 都会支持多路径)


/**
 * Override this method to supports multiple paths.
 * When this method will be removed, all FileInputFormats have to support multiple paths.
 *
 * @return True if the FileInputFormat supports multiple paths, false otherwise.
 *
 * @deprecated Will be removed for Flink 2.0.
 */
@Deprecated
public boolean supportsMultiPaths() {
   return false;
}
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

JingsongLee
Hi,

你的需求是什么?下列哪种?
- 1.想用unbounded source,continuous的file source,监控文件夹,发送新文件,且需要支持多文件夹
- 2.只是想用bounded的input format,需要支持多文件

如果是1,现在仍然不支持。
如果是2,那你可以用env.addSource(new InputFormatSourceFunction(..)...)来支持多文件。

Best,
Jingsong Lee


------------------------------------------------------------------
From:王智 <[hidden email]>
Send Time:2020年3月4日(星期三) 17:34
To:user-zh <[hidden email]>
Subject:flink 1.8 内的StreamExecutionEnvironment 对于 FileInputFormat 多file path 不兼容问题咨询

我在使用flink 1.8 自定义 FileInputFormat 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~




问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么?

相关的代码描述如下




StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑
if (inputFormat instanceof FileInputFormat) {
   @SuppressWarnings("unchecked")
   FileInputFormat<OUT&gt; format = (FileInputFormat<OUT&gt;) inputFormat;

   source = createFileInput(format, typeInfo, "Custom File source",
         FileProcessingMode.PROCESS_ONCE, -1);
} else {
   source = createInput(inputFormat, typeInfo, "Custom Source");
}
return source;



createFileInput 方法内 使用 ContinuousFileMonitoringFunction 对 inputFormat 进行处理,在其构造函数中,对 FileInputFormat<OUT&gt; format 进行了一些条件约束


Preconditions.checkArgument(
   format.getFilePaths().length == 1,
   "FileInputFormats with multiple paths are not supported yet.");



这里就将 FileInputFormat 限制为只能添加一个file path。




问题2: 在flink 1.10 版本情况是否有改善?(在 FileInputFormat.supportsMultiPaths 方法中我看到flink 2.0 中,所有的FileInputFormat 都会支持多路径)


/**
 * Override this method to supports multiple paths.
 * When this method will be removed, all FileInputFormats have to support multiple paths.
 *
 * @return True if the FileInputFormat supports multiple paths, false otherwise.
 *
 * @deprecated Will be removed for Flink 2.0.
 */
@Deprecated
public boolean supportsMultiPaths() {
   return false;
}