flink-1.11 模拟背压

classic Classic list List threaded Threaded
21 messages Options
12
kcz
Reply | Threaded
Open this post in threaded view
|

flink-1.11 模拟背压

kcz
我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
public static void main(String[] args) throws Exception{

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
    env.setStateBackend(new MemoryStateBackend());
    env.setParallelism(4);
    Properties properties = getLocal();
    properties.setProperty("group.id","test");
    FlinkKafkaConsumer<String&gt; consumer = new FlinkKafkaConsumer<&gt;("testOrderTopic", new SimpleStringSchema(), properties);
    DataStream<String&gt; stream = env
            .addSource(consumer);
    stream.map(new MapFunction<String, Tuple2<Integer,Integer&gt;&gt;() {
        @Override
        public Tuple2<Integer,Integer&gt; map(String s) throws Exception {
            Thread.sleep(1000*60*60*60);
            return new Tuple2(1,1);
        }
    }).keyBy(0).sum(0);
    stream.print();
    //stream.map();
    env.execute();

}
Reply | Threaded
Open this post in threaded view
|

Re: flink-1.11 模拟背压

shizk233
source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗

kcz <[hidden email]> 于2020年8月3日周一 下午7:29写道:

> 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
> public static void main(String[] args) throws Exception{
>
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
>     env.setStateBackend(new MemoryStateBackend());
>     env.setParallelism(4);
>     Properties properties = getLocal();
>     properties.setProperty("group.id","test");
>     FlinkKafkaConsumer<String&gt; consumer = new
> FlinkKafkaConsumer<&gt;("testOrderTopic", new SimpleStringSchema(),
> properties);
>     DataStream<String&gt; stream = env
>             .addSource(consumer);
>     stream.map(new MapFunction<String, Tuple2<Integer,Integer&gt;&gt;() {
>         @Override
>         public Tuple2<Integer,Integer&gt; map(String s) throws Exception {
>             Thread.sleep(1000*60*60*60);
>             return new Tuple2(1,1);
>         }
>     }).keyBy(0).sum(0);
>     stream.print();
>     //stream.map();
>     env.execute();
>
> }
kcz
Reply | Threaded
Open this post in threaded view
|

回复:flink-1.11 模拟背压

kcz
嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。





------------------ 原始邮件 ------------------
发件人: shizk233 <[hidden email]&gt;
发送时间: 2020年8月3日 23:03
收件人: [hidden email] <[hidden email]&gt;
主题: 回复:flink-1.11 模拟背压



source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗

kcz <[hidden email]&gt; 于2020年8月3日周一 下午7:29写道:

&gt; 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
&gt; public static void main(String[] args) throws Exception{
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
&gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new MemoryStateBackend());
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = getLocal();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; properties.setProperty("group.id","test");
&gt;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&amp;gt; consumer = new
&gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new SimpleStringSchema(),
&gt; properties);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream = env
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .addSource(consumer);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new MapFunction<String, Tuple2<Integer,Integer&amp;gt;&amp;gt;() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Thread.sleep(1000*60*60*60);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return new Tuple2(1,1);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map();
&gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute();
&gt;
&gt; }
Reply | Threaded
Open this post in threaded view
|

Re: flink-1.11 模拟背压

shizk233
source算子好像是没有In指标的,只有Out指标;
默认source和map算子会operator chain成一个task,你disable一下operator
chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。
背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。

kcz <[hidden email]> 于2020年8月4日周二 上午12:41写道:

> 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: shizk233 <[hidden email]&gt;
> 发送时间: 2020年8月3日 23:03
> 收件人: [hidden email] <[hidden email]&gt;
> 主题: 回复:flink-1.11 模拟背压
>
>
>
> source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
>
> kcz <[hidden email]&gt; 于2020年8月3日周一 下午7:29写道:
>
> &gt; 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
> &gt; public static void main(String[] args) throws Exception{
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env =
> &gt; StreamExecutionEnvironment.getExecutionEnvironment();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.enableCheckpointing(2000L,
> CheckpointingMode.EXACTLY_ONCE);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new MemoryStateBackend());
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = getLocal();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; properties.setProperty("group.id","test");
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&amp;gt; consumer =
> new
> &gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new
> SimpleStringSchema(),
> &gt; properties);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream = env
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .addSource(consumer);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new MapFunction<String,
> Tuple2<Integer,Integer&amp;gt;&amp;gt;() {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public
> Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> Thread.sleep(1000*60*60*60);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> return new Tuple2(1,1);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map();
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute();
> &gt;
> &gt; }
kcz
Reply | Threaded
Open this post in threaded view
|

回复: flink-1.11 模拟背压

kcz
嗯嗯谢谢 我试一试




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月4日(星期二) 凌晨1:13
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;Re: flink-1.11 模拟背压



source算子好像是没有In指标的,只有Out指标;
默认source和map算子会operator chain成一个task,你disable一下operator
chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。
背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。

kcz <[hidden email]&gt; 于2020年8月4日周二 上午12:41写道:

&gt; 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------ 原始邮件 ------------------
&gt; 发件人: shizk233 <[hidden email]&amp;gt;
&gt; 发送时间: 2020年8月3日 23:03
&gt; 收件人: [hidden email] <[hidden email]&amp;gt;
&gt; 主题: 回复:flink-1.11 模拟背压
&gt;
&gt;
&gt;
&gt; source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
&gt;
&gt; kcz <[hidden email]&amp;gt; 于2020年8月3日周一 下午7:29写道:
&gt;
&gt; &amp;gt; 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
&gt; &amp;gt; public static void main(String[] args) throws Exception{
&gt; &amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; StreamExecutionEnvironment env =
&gt; &amp;gt; StreamExecutionEnvironment.getExecutionEnvironment();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.enableCheckpointing(2000L,
&gt; CheckpointingMode.EXACTLY_ONCE);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.setStateBackend(new MemoryStateBackend());
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.setParallelism(4);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; Properties properties = getLocal();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; properties.setProperty("group.id","test");
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkKafkaConsumer<String&amp;amp;gt; consumer =
&gt; new
&gt; &amp;gt; FlinkKafkaConsumer<&amp;amp;gt;("testOrderTopic", new
&gt; SimpleStringSchema(),
&gt; &amp;gt; properties);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; DataStream<String&amp;amp;gt; stream = env
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .addSource(consumer);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; stream.map(new MapFunction<String,
&gt; Tuple2<Integer,Integer&amp;amp;gt;&amp;amp;gt;() {
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; @Override
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; public
&gt; Tuple2<Integer,Integer&amp;amp;gt; map(String s) throws Exception {
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; Thread.sleep(1000*60*60*60);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; return new Tuple2(1,1);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; }).keyBy(0).sum(0);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; stream.print();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; //stream.map();
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; env.execute();
&gt; &amp;gt;
&gt; &amp;gt; }
Reply | Threaded
Open this post in threaded view
|

请教:用flink实现实时告警的功能

samuel.qiu@ubtrobot.com
由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!

告警有分两部分:
   一是告警规则的设置,数据存放在mysql,存储的格式是json
    {"times":5}  ---就是事件发生大于5次就发出告警;
    {"temperature": 80} ---就是温度大于80就告警;
   二是告警实现
          1)上报的数据写入到kafka
          2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。


现在遇到的问题是:
1. 当规则变更时,如何及时生效?
2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
3.这一功能有最佳实践吗?

希望哪位解答一下,谢谢!
   

 
Reply | Threaded
Open this post in threaded view
|

Re: 请教:用flink实现实时告警的功能

Jun Zhang
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置

https://blog.csdn.net/zhangjun5965/article/details/106573528

[hidden email] <[hidden email]> 于2020年8月6日周四 上午10:26写道:

> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
>    一是告警规则的设置,数据存放在mysql,存储的格式是json
>     {"times":5}  ---就是事件发生大于5次就发出告警;
>     {"temperature": 80} ---就是温度大于80就告警;
>    二是告警实现
>           1)上报的数据写入到kafka
>           2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
> 现在遇到的问题是:
> 1. 当规则变更时,如何及时生效?
> 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
> 3.这一功能有最佳实践吗?
>
> 希望哪位解答一下,谢谢!
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复: 请教:用flink实现实时告警的功能

akira
关于cep
cep有种策略GroupPattern,可以支持一个流匹配多个规则,但是该功能有个不完善的地方就是多个规则共用同一个within,可以修改源码增加wait算子解决。
目前cep不支持动态修改规则,修改规则需要重启应用,同样也可以修改cep源码来实现动态cep
可参考 哈喽出现动态cep扩展,他们是基于flink1.7实现的 &nbsp;https://developer.aliyun.com/article/738451





------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年8月6日(星期四) 上午10:41
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 请教:用flink实现实时告警的功能



可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置

https://blog.csdn.net/zhangjun5965/article/details/106573528

[hidden email] <[hidden email]&gt; 于2020年8月6日周四 上午10:26写道:

&gt; 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
&gt;
&gt; 告警有分两部分:
&gt;&nbsp;&nbsp;&nbsp; 一是告警规则的设置,数据存放在mysql,存储的格式是json
&gt;&nbsp;&nbsp;&nbsp;&nbsp; {"times":5}&nbsp; ---就是事件发生大于5次就发出告警;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; {"temperature": 80} ---就是温度大于80就告警;
&gt;&nbsp;&nbsp;&nbsp; 二是告警实现
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 1)上报的数据写入到kafka
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
&gt;
&gt;
&gt; 现在遇到的问题是:
&gt; 1. 当规则变更时,如何及时生效?
&gt; 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
&gt; 3.这一功能有最佳实践吗?
&gt;
&gt; 希望哪位解答一下,谢谢!
&gt;
&gt;
&gt;
&gt;
Reply | Threaded
Open this post in threaded view
|

Re:请教:用flink实现实时告警的功能

hechuan
In reply to this post by samuel.qiu@ubtrobot.com
Hi,
你这个完全就是CEP的使用场景啊, 大于多少次, 大于一定数值组合起来判定事件,
1. 规则变更了, 重启任务就行, 规则都变了, 任务重启也没影响
2. CEP支持规则组合, 时间窗口
3. 最佳实践官网的介绍就很合适
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/cep.html

在 2020-08-06 10:26:19,"[hidden email]" <[hidden email]> 写道:

>由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
>告警有分两部分:
>   一是告警规则的设置,数据存放在mysql,存储的格式是json
>    {"times":5}  ---就是事件发生大于5次就发出告警;
>    {"temperature": 80} ---就是温度大于80就告警;
>   二是告警实现
>          1)上报的数据写入到kafka
>          2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
>现在遇到的问题是:
>1. 当规则变更时,如何及时生效?
>2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
>3.这一功能有最佳实践吗?
>
>希望哪位解答一下,谢谢!
>  
>
>
Reply | Threaded
Open this post in threaded view
|

请指教一个关于时间窗的问题,非常感谢!

samuel.qiu@ubtrobot.com
In reply to this post by samuel.qiu@ubtrobot.com
大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
                //指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))   //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)->event.f3));

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");

String sql = "select appid,eventid,cnt," +
            "(starttime + interval '8' hour ) as stime," +
            "(endtime + interval '8' hour ) as etime  " +      
            "from (select appid,eventid,count(*) as cnt," +
            "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
            "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
            "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";    //希望整点结束时触发时间窗关闭

Table table = tenv.sqlQuery(sql);
DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);

输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!
Reply | Threaded
Open this post in threaded view
|

回复:请指教一个关于时间窗的问题,非常感谢!

jacky-cui
你这个flink是什么版本,能贴全代码吗


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!



大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long&gt;&gt; withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)-&gt;event.f3));

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");

String sql = "select appid,eventid,cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(starttime + interval '8' hour ) as stime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(endtime + interval '8' hour ) as etime&nbsp; " +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from (select appid,eventid,count(*) as cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭

Table table = tenv.sqlQuery(sql);
DataStream<Result&gt; dataStream = tenv.toAppendStream(table, Result.class);

输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!
Reply | Threaded
Open this post in threaded view
|

回复: 回复:请指教一个关于时间窗的问题,非常感谢!

samuel.qiu@ubtrobot.com
谢谢回复!

是Flink1.11.1的版本

以下是代码:
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 使用广播实现动态的配置更新
 */
public class ExceptionAlertHour4{

private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] args) throws Exception{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//System.out.println(parameterTool.get("envFlag"));
Properties properties = new Properties();
String hdfs = null;
long maxTime = 0;
long maxSize = 0;
long inActive = 0;
DataStream<String> ds =null;

if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_TEST;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST);
properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST);
properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001");
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L);
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
} else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT);
properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001");
properties.setProperty("auto.offset.reset", "earliest");
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties));
} else {
System.exit(-1);
}
// transform
SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() {

@Override
public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> out) {

//System.out.println("Kafka2Hdfs-in:" + value);
String newStr = value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>");
//System.out.println("Kafka2Hdfs-newStr:" + newStr);

try {
// 解析JSON数据
JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);

// 获取最新的字段值
JSONArray bodyDataArray = record.getJSONArray("body_data");
// 遍历,字段值的JSON数组,只有一个元素
for (int i = 0; i < bodyDataArray.size(); i++) {
// 获取到JSON数组的第i个元素
JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);

if (bodyDataObj != null) {
Tuple4 log = Tuple4.of(
record.getString("HW-AppId"),
bodyDataObj.getString("HW-bugId"),
bodyDataObj.getString("HW-bugType"),
Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
);
out.collect(log);
}
}

} catch (Exception e) {
System.out.println(e.getMessage());
}
}

});
singleDS.print();
//指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");


String sql = "select  appid,eventid,cnt," +
            "(starttime + interval '8' hour ) as stime," +
            "(endtime + interval '8' hour ) as etime  " +      
            "from (select appid,eventid,count(*) as cnt," +
            "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
            "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
            "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";


Table table = tenv.sqlQuery(sql);
DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
dataStream.print();

env.execute("etl.exception.monitor.ExceptionAlertHour");
}



public static class Result{
private String appid;
private String eventid;
private long cnt;
private Timestamp stime;
private Timestamp etime;
public String getAppid() {
return appid;
}

public void setAppid(String appid) {
this.appid = appid;
}

public String getEventid() {
return eventid;
}

public void setEventid(String eventid) {
this.eventid = eventid;
}

public long getCnt() {
return cnt;
}

public void setCnt(long cnt) {
this.cnt = cnt;
}


public Timestamp getStime(){
return stime;
}

public void setStime(Timestamp stime){
this.stime = stime;
}

public Timestamp getEtime(){
return etime;
}

public void setEtime(Timestamp etime){
this.etime = etime;
}

@Override
public String toString(){
return "ResultHour{" +
      "appid=" + appid +
      ",eventid=" + eventid +
      ",cnt=" + cnt +
      ", stime=" + stime +
      ", etime=" + etime +
      ", SystemTime=" + System.currentTimeMillis() +
      '}';
}
}

}



深圳市优必选科技股份有限公司 | 研发体系 | 产品研发中心 | 软件部

邱钺 Samuel Qiu
手机/微信: +0086 150 1356 8368

Email: [hidden email]

UBTECH Robotics  www.ubtrobot.com 

广东省深圳市南山区学苑大道1001号南山智园C1栋25楼

 
发件人: [hidden email]
发送时间: 2020-09-02 18:58
收件人: [hidden email]
主题: 回复:请指教一个关于时间窗的问题,非常感谢!
你这个flink是什么版本,能贴全代码吗
 
 
------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!
 
 
 
大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long&gt;&gt; withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)-&gt;event.f3));
 
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
 
String sql = "select appid,eventid,cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(starttime + interval '8' hour ) as stime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(endtime + interval '8' hour ) as etime&nbsp; " +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from (select appid,eventid,count(*) as cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
 
Table table = tenv.sqlQuery(sql);
DataStream<Result&gt; dataStream = tenv.toAppendStream(table, Result.class);
 
输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!
Reply | Threaded
Open this post in threaded view
|

Re: 请指教一个关于时间窗的问题,非常感谢!

china_tao
In reply to this post by samuel.qiu@ubtrobot.com
没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照


getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0
点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0
的时间,比如:2020-09-01 18:00:00.001出发。

再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发

在 2020/9/2 15:20, [hidden email] 写道:

> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
>                  //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))   //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)->event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
>              "(starttime + interval '8' hour ) as stime," +
>              "(endtime + interval '8' hour ) as etime  " +
>              "from (select appid,eventid,count(*) as cnt," +
>              "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
>              "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
>              "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";    //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!
Reply | Threaded
Open this post in threaded view
|

Re: Re: 请指教一个关于时间窗的问题,非常感谢!

samuel.qiu@ubtrobot.com
您好,谢谢回复! 这个窗口在flink1.11.1里是关闭不了,要等下一个时间窗的一条数据来了才会触发。

发件人: taochanglian
发送时间: 2020-09-03 10:35
收件人: user-zh; [hidden email]
主题: Re: 请指教一个关于时间窗的问题,非常感谢!
没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照 getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0
点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0
的时间,比如:2020-09-01 18:00:00.001出发。
再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发

在 2020/9/2 15:20, [hidden email] 写道:
大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
                //指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))   //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
String sql = "select appid,eventid,cnt," +
            "(starttime + interval '8' hour ) as stime," +
            "(endtime + interval '8' hour ) as etime  " +      
            "from (select appid,eventid,count(*) as cnt," +
            "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
            "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
            "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";    //希望整点结束时触发时间窗关闭
Table table = tenv.sqlQuery(sql);
DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!

Reply | Threaded
Open this post in threaded view
|

回复: 回复:请指教一个关于时间窗的问题,非常感谢!

samuel.qiu@ubtrobot.com
In reply to this post by samuel.qiu@ubtrobot.com
补充一下环境信息:

有点类似以下问题:
在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。

不确定是否是因为kafka多分区引起的?



发件人: [hidden email]
发送时间: 2020-09-03 09:23
收件人: user-zh
主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
谢谢回复!

是Flink1.11.1的版本

以下是代码:
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 使用广播实现动态的配置更新
 */
public class ExceptionAlertHour4{

private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] args) throws Exception{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//System.out.println(parameterTool.get("envFlag"));
Properties properties = new Properties();
String hdfs = null;
long maxTime = 0;
long maxSize = 0;
long inActive = 0;
DataStream<String> ds =null;

if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_TEST;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST);
properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST);
properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001");
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L);
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
} else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT);
properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001");
properties.setProperty("auto.offset.reset", "earliest");
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties));
} else {
System.exit(-1);
}
// transform
SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() {

@Override
public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> out) {

//System.out.println("Kafka2Hdfs-in:" + value);
String newStr = value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>");
//System.out.println("Kafka2Hdfs-newStr:" + newStr);

try {
// 解析JSON数据
JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);

// 获取最新的字段值
JSONArray bodyDataArray = record.getJSONArray("body_data");
// 遍历,字段值的JSON数组,只有一个元素
for (int i = 0; i < bodyDataArray.size(); i++) {
// 获取到JSON数组的第i个元素
JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);

if (bodyDataObj != null) {
Tuple4 log = Tuple4.of(
record.getString("HW-AppId"),
bodyDataObj.getString("HW-bugId"),
bodyDataObj.getString("HW-bugType"),
Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
);
out.collect(log);
}
}

} catch (Exception e) {
System.out.println(e.getMessage());
}
}

});
singleDS.print();
//指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");


String sql = "select  appid,eventid,cnt," +
            "(starttime + interval '8' hour ) as stime," +
            "(endtime + interval '8' hour ) as etime  " +      
            "from (select appid,eventid,count(*) as cnt," +
            "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
            "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
            "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";


Table table = tenv.sqlQuery(sql);
DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
dataStream.print();

env.execute("etl.exception.monitor.ExceptionAlertHour");
}



public static class Result{
private String appid;
private String eventid;
private long cnt;
private Timestamp stime;
private Timestamp etime;
public String getAppid() {
return appid;
}

public void setAppid(String appid) {
this.appid = appid;
}

public String getEventid() {
return eventid;
}

public void setEventid(String eventid) {
this.eventid = eventid;
}

public long getCnt() {
return cnt;
}

public void setCnt(long cnt) {
this.cnt = cnt;
}


public Timestamp getStime(){
return stime;
}

public void setStime(Timestamp stime){
this.stime = stime;
}

public Timestamp getEtime(){
return etime;
}

public void setEtime(Timestamp etime){
this.etime = etime;
}

@Override
public String toString(){
return "ResultHour{" +
      "appid=" + appid +
      ",eventid=" + eventid +
      ",cnt=" + cnt +
      ", stime=" + stime +
      ", etime=" + etime +
      ", SystemTime=" + System.currentTimeMillis() +
      '}';
}
}

}


发件人: jacky-cui
发送时间: 2020-09-02 18:58
收件人: user-zh
主题: 回复:请指教一个关于时间窗的问题,非常感谢!
你这个flink是什么版本,能贴全代码吗
 
 
------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
收件人:&nbsp;"user-zh"<[hidden email]&gt;;
 
主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!
 
 
 
大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long&gt;&gt; withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)-&gt;event.f3));
 
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
 
String sql = "select appid,eventid,cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(starttime + interval '8' hour ) as stime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(endtime + interval '8' hour ) as etime&nbsp; " +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from (select appid,eventid,count(*) as cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
 
Table table = tenv.sqlQuery(sql);
DataStream<Result&gt; dataStream = tenv.toAppendStream(table, Result.class);
 
输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!
Reply | Threaded
Open this post in threaded view
|

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

Benchao Li-2
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道:

> 补充一下环境信息:
>
> 有点类似以下问题:
> 在1.11版本测试flink sql时发现一个问题,用streaming api
> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。
>
> 不确定是否是因为kafka多分区引起的?
>
>
>
> 发件人: [hidden email]
> 发送时间: 2020-09-03 09:23
> 收件人: user-zh
> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
> 谢谢回复!
>
> 是Flink1.11.1的版本
>
> 以下是代码:
> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> import org.apache.commons.collections.map.HashedMap;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.BroadcastState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import org.apache.flink.streaming.api.functions.co
> .BroadcastProcessFunction;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONArray;
> import com.alibaba.fastjson.JSONObject;
> import com.alibaba.fastjson.parser.Feature;
> import com.ubtechinc.dataplatform.flink.util.AES256;
> import com.ubtechinc.dataplatform.flink.util.ConstantStr;
> import com.ubtechinc.dataplatform.flink.util.MailUtils;
> import com.ubtechinc.dataplatform.flink.util.SmsUtil;
> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;
>
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import com.mysql.jdbc.Connection;
>
> import java.sql.Timestamp;
> import java.text.MessageFormat;
> import java.time.Duration;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
>  * 使用广播实现动态的配置更新
>  */
> public class ExceptionAlertHour4{
>
> private static final Logger LOG =
> LoggerFactory.getLogger(ExceptionAlertHour4.class);
>
> public static void main(String[] args) throws Exception{
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(parameterTool);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //System.out.println(parameterTool.get("envFlag"));
> Properties properties = new Properties();
> String hdfs = null;
> long maxTime = 0;
> long maxSize = 0;
> long inActive = 0;
> DataStream<String> ds =null;
>
> if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
> hdfs = ConstantStr.HDFS_IP_PORT_TEST;
> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
> properties.setProperty("bootstrap.servers",
> ConstantStr.KAFKA_IP_PORT_TEST);
> properties.setProperty("zookeeper.connect",
> ConstantStr.ZOOKEEPER_IP_PORT_TEST);
> properties.setProperty("group.id",
> "etl.exception.monitor.ExceptionAlertHour4-001");
> Map<KafkaTopicPartition, Long> offsets = new HashedMap();
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0),
> 2800L);
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1),
> 2700L);
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2),
> 3300L);
> ds = env.addSource(new
> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
> SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
> } else if
> (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
> hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
> properties.setProperty("bootstrap.servers",
> ConstantStr.KAFKA_IP_PORT_PRODUCT);
> properties.setProperty("zookeeper.connect",
> ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
> properties.setProperty("group.id",
> "etl.exception.monitor.ExceptionAlertHour-001");
> properties.setProperty("auto.offset.reset", "earliest");
> ds = env.addSource(new
> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
> SimpleStringSchema(), properties));
> } else {
> System.exit(-1);
> }
> // transform
> SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS =
> ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>()
> {
>
> @Override
> public void flatMap(String value,
> Collector<Tuple4<String,String,String,Long>> out) {
>
> //System.out.println("Kafka2Hdfs-in:" + value);
> String newStr =
> value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t",
> "<brbr>");
> //System.out.println("Kafka2Hdfs-newStr:" + newStr);
>
> try {
> // 解析JSON数据
> JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);
>
> // 获取最新的字段值
> JSONArray bodyDataArray = record.getJSONArray("body_data");
> // 遍历,字段值的JSON数组,只有一个元素
> for (int i = 0; i < bodyDataArray.size(); i++) {
> // 获取到JSON数组的第i个元素
> JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);
>
> if (bodyDataObj != null) {
> Tuple4 log = Tuple4.of(
> record.getString("HW-AppId"),
> bodyDataObj.getString("HW-bugId"),
> bodyDataObj.getString("HW-bugType"),
> Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
> );
> out.collect(log);
> }
> }
>
> } catch (Exception e) {
> System.out.println(e.getMessage());
> }
> }
>
> });
> singleDS.print();
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long>>
> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))
> .withTimestampAssigner((event, timestamp)->event.f3));
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
>
> String sql = "select  appid,eventid,cnt," +
>             "(starttime + interval '8' hour ) as stime," +
>             "(endtime + interval '8' hour ) as etime  " +
>             "from (select appid,eventid,count(*) as cnt," +
>             "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
>             "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
>             "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1'
> HOUR),TIME '00:00:00')";
>
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
> dataStream.print();
>
> env.execute("etl.exception.monitor.ExceptionAlertHour");
> }
>
>
>
> public static class Result{
> private String appid;
> private String eventid;
> private long cnt;
> private Timestamp stime;
> private Timestamp etime;
> public String getAppid() {
> return appid;
> }
>
> public void setAppid(String appid) {
> this.appid = appid;
> }
>
> public String getEventid() {
> return eventid;
> }
>
> public void setEventid(String eventid) {
> this.eventid = eventid;
> }
>
> public long getCnt() {
> return cnt;
> }
>
> public void setCnt(long cnt) {
> this.cnt = cnt;
> }
>
>
> public Timestamp getStime(){
> return stime;
> }
>
> public void setStime(Timestamp stime){
> this.stime = stime;
> }
>
> public Timestamp getEtime(){
> return etime;
> }
>
> public void setEtime(Timestamp etime){
> this.etime = etime;
> }
>
> @Override
> public String toString(){
> return "ResultHour{" +
>       "appid=" + appid +
>       ",eventid=" + eventid +
>       ",cnt=" + cnt +
>       ", stime=" + stime +
>       ", etime=" + etime +
>       ", SystemTime=" + System.currentTimeMillis() +
>       '}';
> }
> }
>
> }
>
>
> 发件人: jacky-cui
> 发送时间: 2020-09-02 18:58
> 收件人: user-zh
> 主题: 回复:请指教一个关于时间窗的问题,非常感谢!
> 你这个flink是什么版本,能贴全代码吗
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!
>
>
>
> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long&gt;&gt;
> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
> .<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)-&gt;event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "(starttime + interval '8' hour ) as stime," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "(endtime + interval '8' hour ) as etime&nbsp; "
> +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
> (select appid,eventid,count(*) as cnt," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
> log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
> '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result&gt; dataStream = tenv.toAppendStream(table,
> Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
> 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
> //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

china_tao
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。

举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。

在 2020/9/4 13:14, Benchao Li 写道:

> 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
> 要处理这种情况,可以了解下idle source[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> [hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道:
>
>> 补充一下环境信息:
>>
>> 有点类似以下问题:
>> 在1.11版本测试flink sql时发现一个问题,用streaming api
>> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
>> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
>> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。
>>
>> 不确定是否是因为kafka多分区引起的?
>>
>>
>>
>> 发件人: [hidden email]
>> 发送时间: 2020-09-03 09:23
>> 收件人: user-zh
>> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
>> 谢谢回复!
>>
>> 是Flink1.11.1的版本
>>
>> 以下是代码:
>> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
>>   * Licensed to the Apache Software Foundation (ASF) under one
>>   * or more contributor license agreements.  See the NOTICE file
>>   * distributed with this work for additional information
>>   * regarding copyright ownership.  The ASF licenses this file
>>   * to you under the Apache License, Version 2.0 (the
>>   * "License"); you may not use this file except in compliance
>>   * with the License.  You may obtain a copy of the License at
>>   *
>>   *     http://www.apache.org/licenses/LICENSE-2.0
>>   *
>>   * Unless required by applicable law or agreed to in writing, software
>>   * distributed under the License is distributed on an "AS IS" BASIS,
>>   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>   * See the License for the specific language governing permissions and
>>   * limitations under the License.
>>   */
>>
>> import org.apache.commons.collections.map.HashedMap;
>> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.api.common.state.BroadcastState;
>> import org.apache.flink.api.common.state.MapStateDescriptor;
>> import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.java.tuple.Tuple4;
>> import org.apache.flink.api.java.utils.ParameterTool;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.runtime.state.StateBackend;
>> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>> import org.apache.flink.streaming.api.CheckpointingMode;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.BroadcastStream;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import
>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
>> import org.apache.flink.streaming.api.functions.co
>> .BroadcastProcessFunction;
>> import
>> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
>> import
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>> import
>> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
>> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.util.Collector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import com.alibaba.fastjson.JSON;
>> import com.alibaba.fastjson.JSONArray;
>> import com.alibaba.fastjson.JSONObject;
>> import com.alibaba.fastjson.parser.Feature;
>> import com.ubtechinc.dataplatform.flink.util.AES256;
>> import com.ubtechinc.dataplatform.flink.util.ConstantStr;
>> import com.ubtechinc.dataplatform.flink.util.MailUtils;
>> import com.ubtechinc.dataplatform.flink.util.SmsUtil;
>> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;
>>
>> import java.sql.DriverManager;
>> import java.sql.PreparedStatement;
>> import java.sql.ResultSet;
>> import com.mysql.jdbc.Connection;
>>
>> import java.sql.Timestamp;
>> import java.text.MessageFormat;
>> import java.time.Duration;
>> import java.util.ArrayList;
>> import java.util.HashMap;
>> import java.util.List;
>> import java.util.Map;
>> import java.util.Properties;
>>
>> /**
>>   * 使用广播实现动态的配置更新
>>   */
>> public class ExceptionAlertHour4{
>>
>> private static final Logger LOG =
>> LoggerFactory.getLogger(ExceptionAlertHour4.class);
>>
>> public static void main(String[] args) throws Exception{
>> ParameterTool parameterTool = ParameterTool.fromArgs(args);
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().setGlobalJobParameters(parameterTool);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> //System.out.println(parameterTool.get("envFlag"));
>> Properties properties = new Properties();
>> String hdfs = null;
>> long maxTime = 0;
>> long maxSize = 0;
>> long inActive = 0;
>> DataStream<String> ds =null;
>>
>> if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
>> hdfs = ConstantStr.HDFS_IP_PORT_TEST;
>> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
>> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
>> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
>> properties.setProperty("bootstrap.servers",
>> ConstantStr.KAFKA_IP_PORT_TEST);
>> properties.setProperty("zookeeper.connect",
>> ConstantStr.ZOOKEEPER_IP_PORT_TEST);
>> properties.setProperty("group.id",
>> "etl.exception.monitor.ExceptionAlertHour4-001");
>> Map<KafkaTopicPartition, Long> offsets = new HashedMap();
>> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0),
>> 2800L);
>> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1),
>> 2700L);
>> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2),
>> 3300L);
>> ds = env.addSource(new
>> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
>> SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
>> } else if
>> (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
>> hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
>> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
>> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
>> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
>> properties.setProperty("bootstrap.servers",
>> ConstantStr.KAFKA_IP_PORT_PRODUCT);
>> properties.setProperty("zookeeper.connect",
>> ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
>> properties.setProperty("group.id",
>> "etl.exception.monitor.ExceptionAlertHour-001");
>> properties.setProperty("auto.offset.reset", "earliest");
>> ds = env.addSource(new
>> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
>> SimpleStringSchema(), properties));
>> } else {
>> System.exit(-1);
>> }
>> // transform
>> SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS =
>> ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>()
>> {
>>
>> @Override
>> public void flatMap(String value,
>> Collector<Tuple4<String,String,String,Long>> out) {
>>
>> //System.out.println("Kafka2Hdfs-in:" + value);
>> String newStr =
>> value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t",
>> "<brbr>");
>> //System.out.println("Kafka2Hdfs-newStr:" + newStr);
>>
>> try {
>> // 解析JSON数据
>> JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);
>>
>> // 获取最新的字段值
>> JSONArray bodyDataArray = record.getJSONArray("body_data");
>> // 遍历,字段值的JSON数组,只有一个元素
>> for (int i = 0; i < bodyDataArray.size(); i++) {
>> // 获取到JSON数组的第i个元素
>> JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);
>>
>> if (bodyDataObj != null) {
>> Tuple4 log = Tuple4.of(
>> record.getString("HW-AppId"),
>> bodyDataObj.getString("HW-bugId"),
>> bodyDataObj.getString("HW-bugType"),
>> Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
>> );
>> out.collect(log);
>> }
>> }
>>
>> } catch (Exception e) {
>> System.out.println(e.getMessage());
>> }
>> }
>>
>> });
>> singleDS.print();
>> //指定eventtime字段及生成watermark
>> DataStream<Tuple4<String,String,String,Long>>
>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
>> WatermarkStrategy
>>
>> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
>> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
>> .withIdleness(Duration.ofSeconds(10))
>> .withTimestampAssigner((event, timestamp)->event.f3));
>> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> tenv.registerDataStream(
>> "log",
>> withTimestampsAndWatermarksDS,
>> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>>
>>
>> String sql = "select  appid,eventid,cnt," +
>>              "(starttime + interval '8' hour ) as stime," +
>>              "(endtime + interval '8' hour ) as etime  " +
>>              "from (select appid,eventid,count(*) as cnt," +
>>              "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
>>              "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
>>              "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1'
>> HOUR),TIME '00:00:00')";
>>
>>
>> Table table = tenv.sqlQuery(sql);
>> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
>> dataStream.print();
>>
>> env.execute("etl.exception.monitor.ExceptionAlertHour");
>> }
>>
>>
>>
>> public static class Result{
>> private String appid;
>> private String eventid;
>> private long cnt;
>> private Timestamp stime;
>> private Timestamp etime;
>> public String getAppid() {
>> return appid;
>> }
>>
>> public void setAppid(String appid) {
>> this.appid = appid;
>> }
>>
>> public String getEventid() {
>> return eventid;
>> }
>>
>> public void setEventid(String eventid) {
>> this.eventid = eventid;
>> }
>>
>> public long getCnt() {
>> return cnt;
>> }
>>
>> public void setCnt(long cnt) {
>> this.cnt = cnt;
>> }
>>
>>
>> public Timestamp getStime(){
>> return stime;
>> }
>>
>> public void setStime(Timestamp stime){
>> this.stime = stime;
>> }
>>
>> public Timestamp getEtime(){
>> return etime;
>> }
>>
>> public void setEtime(Timestamp etime){
>> this.etime = etime;
>> }
>>
>> @Override
>> public String toString(){
>> return "ResultHour{" +
>>        "appid=" + appid +
>>        ",eventid=" + eventid +
>>        ",cnt=" + cnt +
>>        ", stime=" + stime +
>>        ", etime=" + etime +
>>        ", SystemTime=" + System.currentTimeMillis() +
>>        '}';
>> }
>> }
>>
>> }
>>
>>
>> 发件人: jacky-cui
>> 发送时间: 2020-09-02 18:58
>> 收件人: user-zh
>> 主题: 回复:请指教一个关于时间窗的问题,非常感谢!
>> 你这个flink是什么版本,能贴全代码吗
>>
>>
>> ------------------&nbsp;原始邮件&nbsp;------------------
>> 发件人:
>>                                                    "user-zh"
>>                                                                      <
>> [hidden email]&gt;;
>> 发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
>> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>>
>> 主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!
>>
>>
>>
>> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> //指定eventtime字段及生成watermark
>> DataStream<Tuple4<String,String,String,Long&gt;&gt;
>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
>> WatermarkStrategy
>>
>> .<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
>> //.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
>> .withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
>> .withTimestampAssigner((event, timestamp)-&gt;event.f3));
>>
>> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> tenv.registerDataStream(
>> "log",
>> withTimestampsAndWatermarksDS,
>> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>>
>> String sql = "select appid,eventid,cnt," +
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "(starttime + interval '8' hour ) as stime," +
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "(endtime + interval '8' hour ) as etime&nbsp;"
>> +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
>> (select appid,eventid,count(*) as cnt," +
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
>> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
>> log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
>> '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
>>
>> Table table = tenv.sqlQuery(sql);
>> DataStream<Result&gt; dataStream = tenv.toAppendStream(table,
>> Result.class);
>>
>> 输出的结果是:
>> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
>> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
>> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
>> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
>> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
>> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
>> 等到这条数据上来后才触发
>> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
>> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
>> //2020/9/2 15:23:35}
>> 请问一下哪里出了问题?万分感谢!
>>
>

Reply | Threaded
Open this post in threaded view
|

回复: 请指教一个关于时间窗的问题,非常感谢!

superainbower
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题


| |
superainbower
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2020年09月4日 15:11,taochanglian<[hidden email]> 写道:
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。

举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。

在 2020/9/4 13:14, Benchao Li 写道:
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道:

补充一下环境信息:

有点类似以下问题:
在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。

不确定是否是因为kafka多分区引起的?



发件人: [hidden email]
发送时间: 2020-09-03 09:23
收件人: user-zh
主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
谢谢回复!

是Flink1.11.1的版本

以下是代码:
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co
.BroadcastProcessFunction;
import
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* 使用广播实现动态的配置更新
*/
public class ExceptionAlertHour4{

private static final Logger LOG =
LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] args) throws Exception{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//System.out.println(parameterTool.get("envFlag"));
Properties properties = new Properties();
String hdfs = null;
long maxTime = 0;
long maxSize = 0;
long inActive = 0;
DataStream<String> ds =null;

if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_TEST;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
properties.setProperty("bootstrap.servers",
ConstantStr.KAFKA_IP_PORT_TEST);
properties.setProperty("zookeeper.connect",
ConstantStr.ZOOKEEPER_IP_PORT_TEST);
properties.setProperty("group.id",
"etl.exception.monitor.ExceptionAlertHour4-001");
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0),
2800L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1),
2700L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2),
3300L);
ds = env.addSource(new
FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
} else if
(ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
properties.setProperty("bootstrap.servers",
ConstantStr.KAFKA_IP_PORT_PRODUCT);
properties.setProperty("zookeeper.connect",
ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
properties.setProperty("group.id",
"etl.exception.monitor.ExceptionAlertHour-001");
properties.setProperty("auto.offset.reset", "earliest");
ds = env.addSource(new
FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
SimpleStringSchema(), properties));
} else {
System.exit(-1);
}
// transform
SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS =
ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>()
{

@Override
public void flatMap(String value,
Collector<Tuple4<String,String,String,Long>> out) {

//System.out.println("Kafka2Hdfs-in:" + value);
String newStr =
value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t",
"<brbr>");
//System.out.println("Kafka2Hdfs-newStr:" + newStr);

try {
// 解析JSON数据
JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);

// 获取最新的字段值
JSONArray bodyDataArray = record.getJSONArray("body_data");
// 遍历,字段值的JSON数组,只有一个元素
for (int i = 0; i < bodyDataArray.size(); i++) {
// 获取到JSON数组的第i个元素
JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);

if (bodyDataObj != null) {
Tuple4 log = Tuple4.of(
record.getString("HW-AppId"),
bodyDataObj.getString("HW-bugId"),
bodyDataObj.getString("HW-bugType"),
Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
);
out.collect(log);
}
}

} catch (Exception e) {
System.out.println(e.getMessage());
}
}

});
singleDS.print();
//指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long>>
withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy

.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");


String sql = "select  appid,eventid,cnt," +
"(starttime + interval '8' hour ) as stime," +
"(endtime + interval '8' hour ) as etime  " +
"from (select appid,eventid,count(*) as cnt," +
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
"from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1'
HOUR),TIME '00:00:00')";


Table table = tenv.sqlQuery(sql);
DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
dataStream.print();

env.execute("etl.exception.monitor.ExceptionAlertHour");
}



public static class Result{
private String appid;
private String eventid;
private long cnt;
private Timestamp stime;
private Timestamp etime;
public String getAppid() {
return appid;
}

public void setAppid(String appid) {
this.appid = appid;
}

public String getEventid() {
return eventid;
}

public void setEventid(String eventid) {
this.eventid = eventid;
}

public long getCnt() {
return cnt;
}

public void setCnt(long cnt) {
this.cnt = cnt;
}


public Timestamp getStime(){
return stime;
}

public void setStime(Timestamp stime){
this.stime = stime;
}

public Timestamp getEtime(){
return etime;
}

public void setEtime(Timestamp etime){
this.etime = etime;
}

@Override
public String toString(){
return "ResultHour{" +
"appid=" + appid +
",eventid=" + eventid +
",cnt=" + cnt +
", stime=" + stime +
", etime=" + etime +
", SystemTime=" + System.currentTimeMillis() +
'}';
}
}

}


发件人: jacky-cui
发送时间: 2020-09-02 18:58
收件人: user-zh
主题: 回复:请指教一个关于时间窗的问题,非常感谢!
你这个flink是什么版本,能贴全代码吗


------------------&nbsp;原始邮件&nbsp;------------------
发件人:
"user-zh"
<
[hidden email]&gt;;
发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!



大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
//指定eventtime字段及生成watermark
DataStream<Tuple4<String,String,String,Long&gt;&gt;
withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy

.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
.withTimestampAssigner((event, timestamp)-&gt;event.f3));

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");

String sql = "select appid,eventid,cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
"(starttime + interval '8' hour ) as stime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
"(endtime + interval '8' hour ) as etime&nbsp;"
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
(select appid,eventid,count(*) as cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
'00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭

Table table = tenv.sqlQuery(sql);
DataStream<Result&gt; dataStream = tenv.toAppendStream(table,
Result.class);

输出的结果是:
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
等到这条数据上来后才触发
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
//2020/9/2 15:23:35}
请问一下哪里出了问题?万分感谢!


Reply | Threaded
Open this post in threaded view
|

Re: 请指教一个关于时间窗的问题,非常感谢!

zilong xiao
可否发下是哪个配置,有相关的文档吗?

superainbower <[hidden email]> 于2020年9月4日周五 下午5:24写道:

> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
>
>
> | |
> superainbower
> |
> |
> [hidden email]
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月4日 15:11,taochanglian<[hidden email]> 写道:
> 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
>
> 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
> hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
>
> 在 2020/9/4 13:14, Benchao Li 写道:
> 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
> 要处理这种情况,可以了解下idle source[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> [hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道:
>
> 补充一下环境信息:
>
> 有点类似以下问题:
> 在1.11版本测试flink sql时发现一个问题,用streaming api
> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。
>
> 不确定是否是因为kafka多分区引起的?
>
>
>
> 发件人: [hidden email]
> 发送时间: 2020-09-03 09:23
> 收件人: user-zh
> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
> 谢谢回复!
>
> 是Flink1.11.1的版本
>
> 以下是代码:
> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership.  The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License.  You may obtain a copy of the License at
> *
> *     http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> import org.apache.commons.collections.map.HashedMap;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.BroadcastState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import org.apache.flink.streaming.api.functions.co
> .BroadcastProcessFunction;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import
>
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONArray;
> import com.alibaba.fastjson.JSONObject;
> import com.alibaba.fastjson.parser.Feature;
> import com.ubtechinc.dataplatform.flink.util.AES256;
> import com.ubtechinc.dataplatform.flink.util.ConstantStr;
> import com.ubtechinc.dataplatform.flink.util.MailUtils;
> import com.ubtechinc.dataplatform.flink.util.SmsUtil;
> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;
>
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import com.mysql.jdbc.Connection;
>
> import java.sql.Timestamp;
> import java.text.MessageFormat;
> import java.time.Duration;
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
>
> /**
> * 使用广播实现动态的配置更新
> */
> public class ExceptionAlertHour4{
>
> private static final Logger LOG =
> LoggerFactory.getLogger(ExceptionAlertHour4.class);
>
> public static void main(String[] args) throws Exception{
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(parameterTool);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //System.out.println(parameterTool.get("envFlag"));
> Properties properties = new Properties();
> String hdfs = null;
> long maxTime = 0;
> long maxSize = 0;
> long inActive = 0;
> DataStream<String> ds =null;
>
> if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
> hdfs = ConstantStr.HDFS_IP_PORT_TEST;
> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
> properties.setProperty("bootstrap.servers",
> ConstantStr.KAFKA_IP_PORT_TEST);
> properties.setProperty("zookeeper.connect",
> ConstantStr.ZOOKEEPER_IP_PORT_TEST);
> properties.setProperty("group.id",
> "etl.exception.monitor.ExceptionAlertHour4-001");
> Map<KafkaTopicPartition, Long> offsets = new HashedMap();
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0),
> 2800L);
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1),
> 2700L);
> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2),
> 3300L);
> ds = env.addSource(new
> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
> SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
> } else if
> (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
> hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
> properties.setProperty("bootstrap.servers",
> ConstantStr.KAFKA_IP_PORT_PRODUCT);
> properties.setProperty("zookeeper.connect",
> ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
> properties.setProperty("group.id",
> "etl.exception.monitor.ExceptionAlertHour-001");
> properties.setProperty("auto.offset.reset", "earliest");
> ds = env.addSource(new
> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new
> SimpleStringSchema(), properties));
> } else {
> System.exit(-1);
> }
> // transform
> SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS =
> ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>()
> {
>
> @Override
> public void flatMap(String value,
> Collector<Tuple4<String,String,String,Long>> out) {
>
> //System.out.println("Kafka2Hdfs-in:" + value);
> String newStr =
> value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t",
> "<brbr>");
> //System.out.println("Kafka2Hdfs-newStr:" + newStr);
>
> try {
> // 解析JSON数据
> JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);
>
> // 获取最新的字段值
> JSONArray bodyDataArray = record.getJSONArray("body_data");
> // 遍历,字段值的JSON数组,只有一个元素
> for (int i = 0; i < bodyDataArray.size(); i++) {
> // 获取到JSON数组的第i个元素
> JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);
>
> if (bodyDataObj != null) {
> Tuple4 log = Tuple4.of(
> record.getString("HW-AppId"),
> bodyDataObj.getString("HW-bugId"),
> bodyDataObj.getString("HW-bugType"),
> Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
> );
> out.collect(log);
> }
> }
>
> } catch (Exception e) {
> System.out.println(e.getMessage());
> }
> }
>
> });
> singleDS.print();
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long>>
> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
>
> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))
> .withTimestampAssigner((event, timestamp)->event.f3));
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
>
> String sql = "select  appid,eventid,cnt," +
> "(starttime + interval '8' hour ) as stime," +
> "(endtime + interval '8' hour ) as etime  " +
> "from (select appid,eventid,count(*) as cnt," +
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
> "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1'
> HOUR),TIME '00:00:00')";
>
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
> dataStream.print();
>
> env.execute("etl.exception.monitor.ExceptionAlertHour");
> }
>
>
>
> public static class Result{
> private String appid;
> private String eventid;
> private long cnt;
> private Timestamp stime;
> private Timestamp etime;
> public String getAppid() {
> return appid;
> }
>
> public void setAppid(String appid) {
> this.appid = appid;
> }
>
> public String getEventid() {
> return eventid;
> }
>
> public void setEventid(String eventid) {
> this.eventid = eventid;
> }
>
> public long getCnt() {
> return cnt;
> }
>
> public void setCnt(long cnt) {
> this.cnt = cnt;
> }
>
>
> public Timestamp getStime(){
> return stime;
> }
>
> public void setStime(Timestamp stime){
> this.stime = stime;
> }
>
> public Timestamp getEtime(){
> return etime;
> }
>
> public void setEtime(Timestamp etime){
> this.etime = etime;
> }
>
> @Override
> public String toString(){
> return "ResultHour{" +
> "appid=" + appid +
> ",eventid=" + eventid +
> ",cnt=" + cnt +
> ", stime=" + stime +
> ", etime=" + etime +
> ", SystemTime=" + System.currentTimeMillis() +
> '}';
> }
> }
>
> }
>
>
> 发件人: jacky-cui
> 发送时间: 2020-09-02 18:58
> 收件人: user-zh
> 主题: 回复:请指教一个关于时间窗的问题,非常感谢!
> 你这个flink是什么版本,能贴全代码吗
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
> "user-zh"
> <
> [hidden email]&gt;;
> 发送时间:&nbsp;2020年9月2日(星期三) 下午3:20
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;请指教一个关于时间窗的问题,非常感谢!
>
>
>
> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> //指定eventtime字段及生成watermark
> DataStream<Tuple4<String,String,String,Long&gt;&gt;
> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
>
> .<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; //即时没数据时,也生成watermark
> .withTimestampAssigner((event, timestamp)-&gt;event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "(starttime + interval '8' hour ) as stime," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "(endtime + interval '8' hour ) as etime&nbsp;"
> +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
> (select appid,eventid,count(*) as cnt," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from
> log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
> '00:00:00')";&nbsp;&nbsp;&nbsp; //希望整点结束时触发时间窗关闭
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result&gt; dataStream = tenv.toAppendStream(table,
> Result.class);
>
> 输出的结果是:
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
> 等到这条数据上来后才触发
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481
> //2020/9/2 15:23:35}
> 请问一下哪里出了问题?万分感谢!
>
>
>
Reply | Threaded
Open this post in threaded view
|

回复:请教:用flink实现实时告警的功能

李军
In reply to this post by samuel.qiu@ubtrobot.com
        您好!


       可以使用Flink+drools 做。drools可以实时更新规则
2020-9-4
| |
李军
|
|
[hidden email]
|
签名由网易邮箱大师定制
在2020年8月6日 10:26,[hidden email]<[hidden email]> 写道:
由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!

告警有分两部分:
一是告警规则的设置,数据存放在mysql,存储的格式是json
{"times":5}  ---就是事件发生大于5次就发出告警;
{"temperature": 80} ---就是温度大于80就告警;
二是告警实现
1)上报的数据写入到kafka
2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。


现在遇到的问题是:
1. 当规则变更时,如何及时生效?
2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
3.这一功能有最佳实践吗?

希望哪位解答一下,谢谢!



12