关于flink窗口是否正确关闭的问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink窗口是否正确关闭的问题

zhiyezou
各位大佬好:
  最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
  我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
  举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
  在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
  ps:我加大了并行度还是不行
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink窗口是否正确关闭的问题

jingjing bai
窗口不会提前关闭,请查看下metircs是否有数据丢弃,


1530130567 <[hidden email]> 于2019年12月24日周二 下午8:46写道:

> 各位大佬好:
> &nbsp; 最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
> &nbsp; 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
> &nbsp; 举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
> &nbsp;
> 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
> &nbsp; ps:我加大了并行度还是不行
Reply | Threaded
Open this post in threaded view
|

回复: 关于flink窗口是否正确关闭的问题

zhiyezou
大佬好:
我昨天看了一下metric,确实是recordsIn&gt;recordsOut
代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
代码如下:
.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
.process(new ProcessWindowFunction<Row, Row, Integer, TimeWindow&gt;() {
    @Override
    public void process(Integer integer, Context context, Iterable<Row&gt; elements, Collector<Row&gt; out) {
        for (Row element : elements) {
                out.collect(element);
        }
    }
})




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"jingjing bai"<[hidden email]&gt;;
发送时间:&nbsp;2019年12月24日(星期二) 晚上9:18
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于flink窗口是否正确关闭的问题



窗口不会提前关闭,请查看下metircs是否有数据丢弃,


1530130567 <[hidden email]&gt; 于2019年12月24日周二 下午8:46写道:

&gt; 各位大佬好:
&gt; &amp;nbsp; 最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
&gt; &amp;nbsp; 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
&gt; &amp;nbsp; 举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
&gt; &amp;nbsp;
&gt; 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
&gt; &amp;nbsp; ps:我加大了并行度还是不行
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink窗口是否正确关闭的问题

Jary Zhen
使用基于EventTime 的 watermark处理数据通常会碰到两这么两种情况:
1.  数据因为乱序,迟到严重,会被丢弃,这个可以查看Side Out API [1]
2.
 数据产生的事件时间比当前系统时间大,我称之为“超自然数据”,比如当前系统时间是10:37:55,但数据产生的事件时间可能是10:38:55,那么一旦有这类数据到达,将会使窗口提前触发计算,导致正常数据被当做迟到数据,因而被丢弃,这个处理方式是在assignWaterMark
之前过滤掉。
3. 建议: 如果是简单的ETL,尽量不要用EventTime 来处理数据

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html

On Wed, 25 Dec 2019 at 09:57, 1530130567 <[hidden email]> wrote:

> 大佬好:
> 我昨天看了一下metric,确实是recordsIn&gt;recordsOut
> 代码里就是用了一个window然后配processfunction,也没有任何的filter操作。
> 代码如下:
>
> .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(1)))
> .process(new ProcessWindowFunction<Row, Row, Integer, TimeWindow&gt;() {
>     @Override
>     public void process(Integer integer, Context context, Iterable<Row&gt;
> elements, Collector<Row&gt; out) {
>         for (Row element : elements) {
>                 out.collect(element);
>         }
>     }
> })
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"jingjing bai"<[hidden email]&gt;;
> 发送时间:&nbsp;2019年12月24日(星期二) 晚上9:18
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 关于flink窗口是否正确关闭的问题
>
>
>
> 窗口不会提前关闭,请查看下metircs是否有数据丢弃,
>
>
> 1530130567 <[hidden email]&gt; 于2019年12月24日周二 下午8:46写道:
>
> &gt; 各位大佬好:
> &gt; &amp;nbsp; 最近在使用flink stream api处理数据,逻辑是非常简单的ETL操作
> &gt; &amp;nbsp;
> 我自己定义的一个1分钟的tumble窗口,watermark是10s,当处在流量高峰时段时,发现下游出现了数据丢失的问题。
> &gt; &amp;nbsp; 举个例子:我上游topic 5000/s,下游接受数据的topic只有4000/s
> &gt; &amp;nbsp;
> &gt;
> 在流量低谷时就没有这个问题,而且我把窗口去掉后也没有这个问题,是否是窗口被提前关闭了呢?导致我下游的processfunction还没处理完?
> &gt; &amp;nbsp; ps:我加大了并行度还是不行