您好:
现在Flink使用env.readFile的参数中,watchType只有两种参数: FileProcessingMode.PROCESS_CONTINUOUSLY - 当输入路径下有文件被修改,整个路径下内容将会被重新处理 FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次 想问一下一种方法,只加载文件中的增量内容? [hidden email] |
Hi,
因为文件的修改不仅仅是 append,也有可能是 update 或者删除,所以 `ContinuousFileMonitoringFunction` 对有修改的文件是重新读取的。Flink 现在暂时还没有实现只处理文件新增内容的 source function,你可以参照 `ContinuousFileMonitoringFunction` 自行实现一个 source function。 [hidden email] <[hidden email]> 于2019年7月15日周一 下午10:02写道: > 您好: > 现在Flink使用env.readFile的参数中,watchType只有两种参数: > FileProcessingMode.PROCESS_CONTINUOUSLY - > 当输入路径下有文件被修改,整个路径下内容将会被重新处理 > FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次 > 想问一下一种方法,只加载文件中的增量内容? > > > > [hidden email] > |
您好:
谢谢您的回答,是否有个demo可以参考呢? 如果没有,就像咨询一下这个工程量需要多长时间。因为刚入门,烦请给些建议。 谢谢! [hidden email] 发件人: Caizhi Weng 发送时间: 2019-07-16 11:34 收件人: user-zh 抄送: zhangjunjie1130 主题: Re: Flink中readFile中如何只读取增量文件 Hi, 因为文件的修改不仅仅是 append,也有可能是 update 或者删除,所以 `ContinuousFileMonitoringFunction` 对有修改的文件是重新读取的。Flink 现在暂时还没有实现只处理文件新增内容的 source function,你可以参照 `ContinuousFileMonitoringFunction` 自行实现一个 source function。 [hidden email] <[hidden email]> 于2019年7月15日周一 下午10:02写道: 您好: 现在Flink使用env.readFile的参数中,watchType只有两种参数: FileProcessingMode.PROCESS_CONTINUOUSLY - 当输入路径下有文件被修改,整个路径下内容将会被重新处理 FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次 想问一下一种方法,只加载文件中的增量内容? [hidden email] |
In reply to this post by Caizhi Weng
我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:
val stream = env.readFileStream(inputPath, 10, FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) 方法替代,但事实上FileProcessingMode并没有对应的 watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
你试了 FileProcessingMode.PROCESS_CONTINUOUSLY 了么?
On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]> wrote: > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下: > > val stream = env.readFileStream(inputPath, 10, > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) > > 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) > 方法替代,但事实上FileProcessingMode并没有对应的 > watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source > Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。 > > 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢! > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。
这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html 官网文档也有说明:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources IMPORTANT NOTES: If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed. val stream = env.readFile(textInputFormat, inputPath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10) ------------------------------------------------------------------ 发件人:Jark [via Apache Flink 中文用户邮件列表] <[hidden email]> 发送时间:2020年11月16日(星期一) 10:36 收件人:hepingtao <[hidden email]> 主 题:Re: Flink中readFile中如何只读取增量文件 你试了 FileProcessingMode.PROCESS_CONTINUOUSLY 了么? On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]> wrote: > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下: > > val stream = env.readFileStream(inputPath, 10, > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) > > 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) > 方法替代,但事实上FileProcessingMode并没有对应的 > watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source > Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。 > > 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢! > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ If you reply to this email, your message will be added to the discussion below: http://apache-flink.147419.n8.nabble.com/Flink-readFile-tp142p8633.html To start a new topic under Apache Flink 中文用户邮件列表, email [hidden email] To unsubscribe from Apache Flink 中文用户邮件列表, click here. NAML |
In reply to this post by Jark
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。
这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html 官网文档也有说明:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources IMPORTANT NOTES: 1. If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed. val stream = env.readFile(textInputFormat, inputPath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10) 发件人: Jark Wu<mailto:[hidden email]> 发送时间: 2020年11月16日 10:36 收件人: user-zh<mailto:[hidden email]> 主题: Re: Flink中readFile中如何只读取增量文件 你试了 FileProcessingMode.PROCESS_CONTINUOUSLY 了么? On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]> wrote: > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下: > > val stream = env.readFileStream(inputPath, 10, > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) > > 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) > 方法替代,但事实上FileProcessingMode并没有对应的 > watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source > Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。 > > 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢! > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
In reply to this post by hepingtao-2
可以去 JIRA 中开个 issue 反馈下这个功能。
On Mon, 16 Nov 2020 at 12:45, hectorhedev <[hidden email]> wrote: > 试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY > 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED > 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。 > > 这个问题之前的邮件里有讨论过,见: > http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html > 官网文档也有说明: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources > > IMPORTANT NOTES: > If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a > file is modified, its contents are re-processed entirely. This can break > the “exactly-once” semantics, as appending data at the end of a file will > lead to all its contents being re-processed. > > val stream = env.readFile(textInputFormat, inputPath, > FileProcessingMode.PROCESS_CONTINUOUSLY, 10) > > > ------------------------------------------------------------------ > 发件人:Jark [via Apache Flink 中文用户邮件列表] <[hidden email]> > 发送时间:2020年11月16日(星期一) 10:36 > 收件人:hepingtao <[hidden email]> > 主 题:Re: Flink中readFile中如何只读取增量文件 > > 你试了 FileProcessingMode.PROCESS_CONTINUOUSLY 了么? > > On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]> > wrote: > > > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream > 是可以实现的,代码如下: > > > > val stream = env.readFileStream(inputPath, 10, > > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED) > > > > 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long) > > 方法替代,但事实上FileProcessingMode并没有对应的 > > watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source > > Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。 > > > > 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢! > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink.147419.n8.nabble.com/Flink-readFile-tp142p8633.html > To start a new topic under Apache Flink 中文用户邮件列表, email > [hidden email] > To unsubscribe from Apache Flink 中文用户邮件列表, click here. > NAML > > |
Free forum by Nabble | Edit this page |