Flink中readFile中如何只读取增量文件

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink中readFile中如何只读取增量文件

zhangjunjie1130@163.com
您好:
        现在Flink使用env.readFile的参数中,watchType只有两种参数:
            FileProcessingMode.PROCESS_CONTINUOUSLY - 当输入路径下有文件被修改,整个路径下内容将会被重新处理
            FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次
         想问一下一种方法,只加载文件中的增量内容?



[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink中readFile中如何只读取增量文件

Caizhi Weng
Hi,

因为文件的修改不仅仅是 append,也有可能是 update 或者删除,所以 `ContinuousFileMonitoringFunction`
对有修改的文件是重新读取的。Flink 现在暂时还没有实现只处理文件新增内容的 source function,你可以参照
`ContinuousFileMonitoringFunction` 自行实现一个 source function。

[hidden email] <[hidden email]> 于2019年7月15日周一 下午10:02写道:

> 您好:
>         现在Flink使用env.readFile的参数中,watchType只有两种参数:
>             FileProcessingMode.PROCESS_CONTINUOUSLY -
> 当输入路径下有文件被修改,整个路径下内容将会被重新处理
>             FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次
>          想问一下一种方法,只加载文件中的增量内容?
>
>
>
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink中readFile中如何只读取增量文件

zhangjunjie1130@163.com
您好:
      谢谢您的回答,是否有个demo可以参考呢? 如果没有,就像咨询一下这个工程量需要多长时间。因为刚入门,烦请给些建议。

谢谢!


[hidden email]
 
发件人: Caizhi Weng
发送时间: 2019-07-16 11:34
收件人: user-zh
抄送: zhangjunjie1130
主题: Re: Flink中readFile中如何只读取增量文件
Hi,

因为文件的修改不仅仅是 append,也有可能是 update 或者删除,所以 `ContinuousFileMonitoringFunction` 对有修改的文件是重新读取的。Flink 现在暂时还没有实现只处理文件新增内容的 source function,你可以参照 `ContinuousFileMonitoringFunction` 自行实现一个 source function。

[hidden email] <[hidden email]> 于2019年7月15日周一 下午10:02写道:
您好:
        现在Flink使用env.readFile的参数中,watchType只有两种参数:
            FileProcessingMode.PROCESS_CONTINUOUSLY - 当输入路径下有文件被修改,整个路径下内容将会被重新处理
            FileProcessingMode.PROCESS_ONCE - 只扫描一次,便退出。因此这种模式下输入数据只读取一次
         想问一下一种方法,只加载文件中的增量内容?



[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink中readFile中如何只读取增量文件

hepingtao-2
In reply to this post by Caizhi Weng
我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:

val stream = env.readFileStream(inputPath, 10,
FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)

源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
方法替代,但事实上FileProcessingMode并没有对应的
watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。

所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink中readFile中如何只读取增量文件

Jark
Administrator
你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?

On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]>
wrote:

> 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:
>
> val stream = env.readFileStream(inputPath, 10,
> FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>
> 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> 方法替代,但事实上FileProcessingMode并没有对应的
> watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
>
> 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

回复:Flink中readFile中如何只读取增量文件

hepingtao-2
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。

这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html
官网文档也有说明:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources

IMPORTANT NOTES:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.

val stream = env.readFile(textInputFormat, inputPath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)


------------------------------------------------------------------
发件人:Jark [via Apache Flink 中文用户邮件列表] <[hidden email]>
发送时间:2020年11月16日(星期一) 10:36
收件人:hepingtao <[hidden email]>
主 题:Re: Flink中readFile中如何只读取增量文件

 你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?

On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]>
wrote:

> 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:
>
> val stream = env.readFileStream(inputPath, 10,
> FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>
> 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> 方法替代,但事实上FileProcessingMode并没有对应的
> watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
>
> 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



If you reply to this email, your message will be added to the discussion below: http://apache-flink.147419.n8.nabble.com/Flink-readFile-tp142p8633.html 
 To start a new topic under Apache Flink 中文用户邮件列表, email [hidden email]
 To unsubscribe from Apache Flink 中文用户邮件列表, click here.
NAML

Reply | Threaded
Open this post in threaded view
|

回复: Flink中readFile中如何只读取增量文件

hepingtao
In reply to this post by Jark
试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。


这个问题之前的邮件里有讨论过,见:http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html
官网文档也有说明:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources

IMPORTANT NOTES:

  1.  If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.


val stream = env.readFile(textInputFormat, inputPath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)

发件人: Jark Wu<mailto:[hidden email]>
发送时间: 2020年11月16日 10:36
收件人: user-zh<mailto:[hidden email]>
主题: Re: Flink中readFile中如何只读取增量文件

你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?

On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]>
wrote:

> 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream 是可以实现的,代码如下:
>
> val stream = env.readFileStream(inputPath, 10,
> FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
>
> 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> 方法替代,但事实上FileProcessingMode并没有对应的
> watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
>
> 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Flink中readFile中如何只读取增量文件

Jark
Administrator
In reply to this post by hepingtao-2
可以去 JIRA 中开个 issue 反馈下这个功能。

On Mon, 16 Nov 2020 at 12:45, hectorhedev <[hidden email]>
wrote:

> 试过了的。但 FileProcessingMode.PROCESS_CONTINUOUSLY
> 并不是增量读取,只要文件内容发生变化,就会重新全量读取,与之前的FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED
> 增量方式不一样,也没有提供增量读取的模式。按官网文档的说法,这是break the “exactly-once” semantics。
>
> 这个问题之前的邮件里有讨论过,见:
> http://apache-flink.147419.n8.nabble.com/Flink-readFile-tt142.html
> 官网文档也有说明:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources
>
> IMPORTANT NOTES:
> If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a
> file is modified, its contents are re-processed entirely. This can break
> the “exactly-once” semantics, as appending data at the end of a file will
> lead to all its contents being re-processed.
>
> val stream = env.readFile(textInputFormat, inputPath,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
>
>
> ------------------------------------------------------------------
> 发件人:Jark [via Apache Flink 中文用户邮件列表] <[hidden email]>
> 发送时间:2020年11月16日(星期一) 10:36
> 收件人:hepingtao <[hidden email]>
> 主 题:Re: Flink中readFile中如何只读取增量文件
>
>  你试了  FileProcessingMode.PROCESS_CONTINUOUSLY 了么?
>
> On Mon, 16 Nov 2020 at 09:23, hepingtao <[hidden email]>
> wrote:
>
> > 我也遇到类似的需求,需要增量读取日志文件内容,最终发现用一个已废弃Deprecated的方法 readFileStream
> 是可以实现的,代码如下:
> >
> > val stream = env.readFileStream(inputPath, 10,
> > FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)
> >
> > 源码里说明可以用 readFile(FileInputFormat, String, FileProcessingMode, long)
> > 方法替代,但事实上FileProcessingMode并没有对应的
> > watchType与PROCESS_ONLY_APPENDED的功能是一致的,readFileStream这个Source
> > Function在未来彻底删除后,如果又没有提供内置的替代方案,就只能自己想办法实现了。
> >
> > 所以我的问题是,既然之前有增量读取的方法,为什么要突然废弃掉又不提供替代方案呢?这类需求不合理吗?这让我很不理解,希望得到大家的解答,谢谢!
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink.147419.n8.nabble.com/Flink-readFile-tp142p8633.html
>  To start a new topic under Apache Flink 中文用户邮件列表, email
> [hidden email]
>  To unsubscribe from Apache Flink 中文用户邮件列表, click here.
> NAML
>
>