我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new MemoryStateBackend()); env.setParallelism(4); Properties properties = getLocal(); properties.setProperty("group.id","test"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("testOrderTopic", new SimpleStringSchema(), properties); DataStream<String> stream = env .addSource(consumer); stream.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String s) throws Exception { Thread.sleep(1000*60*60*60); return new Tuple2(1,1); } }).keyBy(0).sum(0); stream.print(); //stream.map(); env.execute(); } |
source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
kcz <[hidden email]> 于2020年8月3日周一 下午7:29写道: > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new MemoryStateBackend()); > env.setParallelism(4); > Properties properties = getLocal(); > properties.setProperty("group.id","test"); > FlinkKafkaConsumer<String> consumer = new > FlinkKafkaConsumer<>("testOrderTopic", new SimpleStringSchema(), > properties); > DataStream<String> stream = env > .addSource(consumer); > stream.map(new MapFunction<String, Tuple2<Integer,Integer>>() { > @Override > public Tuple2<Integer,Integer> map(String s) throws Exception { > Thread.sleep(1000*60*60*60); > return new Tuple2(1,1); > } > }).keyBy(0).sum(0); > stream.print(); > //stream.map(); > env.execute(); > > } |
嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。
------------------ 原始邮件 ------------------ 发件人: shizk233 <[hidden email]> 发送时间: 2020年8月3日 23:03 收件人: [hidden email] <[hidden email]> 主题: 回复:flink-1.11 模拟背压 source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 kcz <[hidden email]> 于2020年8月3日周一 下午7:29写道: > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(2000L, CheckpointingMode.EXACTLY_ONCE); > env.setStateBackend(new MemoryStateBackend()); > env.setParallelism(4); > Properties properties = getLocal(); > properties.setProperty("group.id","test"); > FlinkKafkaConsumer<String&gt; consumer = new > FlinkKafkaConsumer<&gt;("testOrderTopic", new SimpleStringSchema(), > properties); > DataStream<String&gt; stream = env > .addSource(consumer); > stream.map(new MapFunction<String, Tuple2<Integer,Integer&gt;&gt;() { > @Override > public Tuple2<Integer,Integer&gt; map(String s) throws Exception { > Thread.sleep(1000*60*60*60); > return new Tuple2(1,1); > } > }).keyBy(0).sum(0); > stream.print(); > //stream.map(); > env.execute(); > > } |
source算子好像是没有In指标的,只有Out指标;
默认source和map算子会operator chain成一个task,你disable一下operator chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。 背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。 kcz <[hidden email]> 于2020年8月4日周二 上午12:41写道: > 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。 > > > > > > ------------------ 原始邮件 ------------------ > 发件人: shizk233 <[hidden email]> > 发送时间: 2020年8月3日 23:03 > 收件人: [hidden email] <[hidden email]> > 主题: 回复:flink-1.11 模拟背压 > > > > source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 > > kcz <[hidden email]> 于2020年8月3日周一 下午7:29写道: > > > 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > > public static void main(String[] args) throws Exception{ > > > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.enableCheckpointing(2000L, > CheckpointingMode.EXACTLY_ONCE); > > env.setStateBackend(new MemoryStateBackend()); > > env.setParallelism(4); > > Properties properties = getLocal(); > > properties.setProperty("group.id","test"); > > FlinkKafkaConsumer<String&gt; consumer = > new > > FlinkKafkaConsumer<&gt;("testOrderTopic", new > SimpleStringSchema(), > > properties); > > DataStream<String&gt; stream = env > > > .addSource(consumer); > > stream.map(new MapFunction<String, > Tuple2<Integer,Integer&gt;&gt;() { > > @Override > > public > Tuple2<Integer,Integer&gt; map(String s) throws Exception { > > > Thread.sleep(1000*60*60*60); > > > return new Tuple2(1,1); > > } > > }).keyBy(0).sum(0); > > stream.print(); > > //stream.map(); > > env.execute(); > > > > } |
嗯嗯谢谢 我试一试
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月4日(星期二) 凌晨1:13 收件人: "[hidden email]"<[hidden email]>; 主题: Re: flink-1.11 模拟背压 source算子好像是没有In指标的,只有Out指标; 默认source和map算子会operator chain成一个task,你disable一下operator chain,把map算子作为单独的task,就能在map算子上观察到In和Out了。 背压的话,建议看一下isBackpressured这个指标,我记得是operator级别的,可以看到各个算子的状态。 kcz <[hidden email]> 于2020年8月4日周二 上午12:41写道: > 嗯嗯 yeah。ui上看不到数据进来,应该会进souce算子的把,我只有map sleep了。可是也没有看到背压。 我不断产生数据100w以上了。 > > > > > > ------------------ 原始邮件 ------------------ > 发件人: shizk233 <[hidden email]&gt; > 发送时间: 2020年8月3日 23:03 > 收件人: [hidden email] <[hidden email]&gt; > 主题: 回复:flink-1.11 模拟背压 > > > > source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗 > > kcz <[hidden email]&gt; 于2020年8月3日周一 下午7:29写道: > > &gt; 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗 > &gt; public static void main(String[] args) throws Exception{ > &gt; > &gt;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = > &gt; StreamExecutionEnvironment.getExecutionEnvironment(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.enableCheckpointing(2000L, > CheckpointingMode.EXACTLY_ONCE); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setStateBackend(new MemoryStateBackend()); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(4); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; Properties properties = getLocal(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; properties.setProperty("group.id","test"); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; FlinkKafkaConsumer<String&amp;gt; consumer = > new > &gt; FlinkKafkaConsumer<&amp;gt;("testOrderTopic", new > SimpleStringSchema(), > &gt; properties); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; DataStream<String&amp;gt; stream = env > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > .addSource(consumer); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.map(new MapFunction<String, > Tuple2<Integer,Integer&amp;gt;&amp;gt;() { > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public > Tuple2<Integer,Integer&amp;gt; map(String s) throws Exception { > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > Thread.sleep(1000*60*60*60); > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return new Tuple2(1,1); > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } > &gt;&nbsp;&nbsp;&nbsp;&nbsp; }).keyBy(0).sum(0); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; stream.print(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; //stream.map(); > &gt;&nbsp;&nbsp;&nbsp;&nbsp; env.execute(); > &gt; > &gt; } |
由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
告警有分两部分: 一是告警规则的设置,数据存放在mysql,存储的格式是json {"times":5} ---就是事件发生大于5次就发出告警; {"temperature": 80} ---就是温度大于80就告警; 二是告警实现 1)上报的数据写入到kafka 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。 现在遇到的问题是: 1. 当规则变更时,如何及时生效? 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效? 3.这一功能有最佳实践吗? 希望哪位解答一下,谢谢! |
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置
https://blog.csdn.net/zhangjun5965/article/details/106573528 [hidden email] <[hidden email]> 于2020年8月6日周四 上午10:26写道: > 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢! > > 告警有分两部分: > 一是告警规则的设置,数据存放在mysql,存储的格式是json > {"times":5} ---就是事件发生大于5次就发出告警; > {"temperature": 80} ---就是温度大于80就告警; > 二是告警实现 > 1)上报的数据写入到kafka > 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。 > > > 现在遇到的问题是: > 1. 当规则变更时,如何及时生效? > 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效? > 3.这一功能有最佳实践吗? > > 希望哪位解答一下,谢谢! > > > > |
关于cep
cep有种策略GroupPattern,可以支持一个流匹配多个规则,但是该功能有个不完善的地方就是多个规则共用同一个within,可以修改源码增加wait算子解决。 目前cep不支持动态修改规则,修改规则需要重启应用,同样也可以修改cep源码来实现动态cep 可参考 哈喽出现动态cep扩展,他们是基于flink1.7实现的 https://developer.aliyun.com/article/738451 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年8月6日(星期四) 上午10:41 收件人: "user-zh"<[hidden email]>; 主题: Re: 请教:用flink实现实时告警的功能 可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置 https://blog.csdn.net/zhangjun5965/article/details/106573528 [hidden email] <[hidden email]> 于2020年8月6日周四 上午10:26写道: > 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢! > > 告警有分两部分: > 一是告警规则的设置,数据存放在mysql,存储的格式是json > {"times":5} ---就是事件发生大于5次就发出告警; > {"temperature": 80} ---就是温度大于80就告警; > 二是告警实现 > 1)上报的数据写入到kafka > 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。 > > > 现在遇到的问题是: > 1. 当规则变更时,如何及时生效? > 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效? > 3.这一功能有最佳实践吗? > > 希望哪位解答一下,谢谢! > > > > |
In reply to this post by samuel.qiu@ubtrobot.com
Hi,
你这个完全就是CEP的使用场景啊, 大于多少次, 大于一定数值组合起来判定事件, 1. 规则变更了, 重启任务就行, 规则都变了, 任务重启也没影响 2. CEP支持规则组合, 时间窗口 3. 最佳实践官网的介绍就很合适 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/cep.html 在 2020-08-06 10:26:19,"[hidden email]" <[hidden email]> 写道: >由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢! > >告警有分两部分: > 一是告警规则的设置,数据存放在mysql,存储的格式是json > {"times":5} ---就是事件发生大于5次就发出告警; > {"temperature": 80} ---就是温度大于80就告警; > 二是告警实现 > 1)上报的数据写入到kafka > 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。 > > >现在遇到的问题是: >1. 当规则变更时,如何及时生效? >2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效? >3.这一功能有最佳实践吗? > >希望哪位解答一下,谢谢! > > > |
In reply to this post by samuel.qiu@ubtrobot.com
大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看!
//指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭 Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); 输出的结果是: (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} 请问一下哪里出了问题?万分感谢! |
你这个flink是什么版本,能贴全代码吗
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月2日(星期三) 下午3:20 收件人: "user-zh"<[hidden email]>; 主题: 请指教一个关于时间窗的问题,非常感谢! 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭 Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); 输出的结果是: (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} 请问一下哪里出了问题?万分感谢! |
谢谢回复! 是Flink1.11.1的版本 以下是代码: package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * 使用广播实现动态的配置更新 */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[] args) throws Exception{ ParameterTool parameterTool = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //System.out.println(parameterTool.get("envFlag")); Properties properties = new Properties(); String hdfs = null; long maxTime = 0; long maxSize = 0; long inActive = 0; DataStream<String> ds =null; if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_TEST; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001"); Map<KafkaTopicPartition, Long> offsets = new HashedMap(); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); } else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001"); properties.setProperty("auto.offset.reset", "earliest"); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties)); } else { System.exit(-1); } // transform SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() { @Override public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> out) { //System.out.println("Kafka2Hdfs-in:" + value); String newStr = value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>"); //System.out.println("Kafka2Hdfs-newStr:" + newStr); try { // 解析JSON数据 JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); // 获取最新的字段值 JSONArray bodyDataArray = record.getJSONArray("body_data"); // 遍历,字段值的JSON数组,只有一个元素 for (int i = 0; i < bodyDataArray.size(); i++) { // 获取到JSON数组的第i个元素 JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); if (bodyDataObj != null) { Tuple4 log = Tuple4.of( record.getString("HW-AppId"), bodyDataObj.getString("HW-bugId"), bodyDataObj.getString("HW-bugType"), Long.valueOf(bodyDataObj.getString("HW-happenedAt")) ); out.collect(log); } } } catch (Exception e) { System.out.println(e.getMessage()); } } }); singleDS.print(); //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); dataStream.print(); env.execute("etl.exception.monitor.ExceptionAlertHour"); } public static class Result{ private String appid; private String eventid; private long cnt; private Timestamp stime; private Timestamp etime; public String getAppid() { return appid; } public void setAppid(String appid) { this.appid = appid; } public String getEventid() { return eventid; } public void setEventid(String eventid) { this.eventid = eventid; } public long getCnt() { return cnt; } public void setCnt(long cnt) { this.cnt = cnt; } public Timestamp getStime(){ return stime; } public void setStime(Timestamp stime){ this.stime = stime; } public Timestamp getEtime(){ return etime; } public void setEtime(Timestamp etime){ this.etime = etime; } @Override public String toString(){ return "ResultHour{" + "appid=" + appid + ",eventid=" + eventid + ",cnt=" + cnt + ", stime=" + stime + ", etime=" + etime + ", SystemTime=" + System.currentTimeMillis() + '}'; } } } 深圳市优必选科技股份有限公司 | 研发体系 | 产品研发中心 | 软件部 邱钺 Samuel Qiu Email: [hidden email] UBTECH Robotics | www.ubtrobot.com 广东省深圳市南山区学苑大道1001号南山智园C1栋25楼
|
In reply to this post by samuel.qiu@ubtrobot.com
没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照
getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0 点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0 的时间,比如:2020-09-01 18:00:00.001出发。 再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发 在 2020/9/2 15:20, [hidden email] 写道: > 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark > .withTimestampAssigner((event, timestamp)->event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > "(starttime + interval '8' hour ) as stime," + > "(endtime + interval '8' hour ) as etime " + > "from (select appid,eventid,count(*) as cnt," + > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭 > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); > > 输出的结果是: > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} > 请问一下哪里出了问题?万分感谢! |
您好,谢谢回复! 这个窗口在flink1.11.1里是关闭不了,要等下一个时间窗的一条数据来了才会触发。
发件人: taochanglian 发送时间: 2020-09-03 10:35 收件人: user-zh; [hidden email] 主题: Re: 请指教一个关于时间窗的问题,非常感谢! 没有问题的,时间窗口是左闭右开,你的窗口按照org.apache.flink.streaming.api.windowing.windows.TimeWindow按照 getWindowStartWithOffset方法的定义,应该是17-18的窗口,但是应该不是2020-09-01 18:00:00.0 点出发,因为左闭右开,应该是大于2020-09-01 18:00:00.0 的时间,比如:2020-09-01 18:00:00.001出发。 再加上你的wartermarker5秒,应该是2020-09-01 18:00:05.001 会触发 在 2020/9/2 15:20, [hidden email] 写道: 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭 Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); 输出的结果是: (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} 请问一下哪里出了问题?万分感谢! |
In reply to this post by samuel.qiu@ubtrobot.com
补充一下环境信息:
有点类似以下问题: 在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 不确定是否是因为kafka多分区引起的? 发件人: [hidden email] 发送时间: 2020-09-03 09:23 收件人: user-zh 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! 谢谢回复! 是Flink1.11.1的版本 以下是代码: package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * 使用广播实现动态的配置更新 */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[] args) throws Exception{ ParameterTool parameterTool = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //System.out.println(parameterTool.get("envFlag")); Properties properties = new Properties(); String hdfs = null; long maxTime = 0; long maxSize = 0; long inActive = 0; DataStream<String> ds =null; if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_TEST; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001"); Map<KafkaTopicPartition, Long> offsets = new HashedMap(); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); } else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001"); properties.setProperty("auto.offset.reset", "earliest"); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties)); } else { System.exit(-1); } // transform SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() { @Override public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> out) { //System.out.println("Kafka2Hdfs-in:" + value); String newStr = value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>"); //System.out.println("Kafka2Hdfs-newStr:" + newStr); try { // 解析JSON数据 JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); // 获取最新的字段值 JSONArray bodyDataArray = record.getJSONArray("body_data"); // 遍历,字段值的JSON数组,只有一个元素 for (int i = 0; i < bodyDataArray.size(); i++) { // 获取到JSON数组的第i个元素 JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); if (bodyDataObj != null) { Tuple4 log = Tuple4.of( record.getString("HW-AppId"), bodyDataObj.getString("HW-bugId"), bodyDataObj.getString("HW-bugType"), Long.valueOf(bodyDataObj.getString("HW-happenedAt")) ); out.collect(log); } } } catch (Exception e) { System.out.println(e.getMessage()); } } }); singleDS.print(); //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); dataStream.print(); env.execute("etl.exception.monitor.ExceptionAlertHour"); } public static class Result{ private String appid; private String eventid; private long cnt; private Timestamp stime; private Timestamp etime; public String getAppid() { return appid; } public void setAppid(String appid) { this.appid = appid; } public String getEventid() { return eventid; } public void setEventid(String eventid) { this.eventid = eventid; } public long getCnt() { return cnt; } public void setCnt(long cnt) { this.cnt = cnt; } public Timestamp getStime(){ return stime; } public void setStime(Timestamp stime){ this.stime = stime; } public Timestamp getEtime(){ return etime; } public void setEtime(Timestamp etime){ this.etime = etime; } @Override public String toString(){ return "ResultHour{" + "appid=" + appid + ",eventid=" + eventid + ",cnt=" + cnt + ", stime=" + stime + ", etime=" + etime + ", SystemTime=" + System.currentTimeMillis() + '}'; } } } 发件人: jacky-cui 发送时间: 2020-09-02 18:58 收件人: user-zh 主题: 回复:请指教一个关于时间窗的问题,非常感谢! 你这个flink是什么版本,能贴全代码吗 ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2020年9月2日(星期三) 下午3:20 收件人: "user-zh"<[hidden email]>; 主题: 请指教一个关于时间窗的问题,非常感谢! 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭 Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); 输出的结果是: (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} 请问一下哪里出了问题?万分感谢! |
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources [hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道: > 补充一下环境信息: > > 有点类似以下问题: > 在1.11版本测试flink sql时发现一个问题,用streaming api > 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui > watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka > topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 > > 不确定是否是因为kafka多分区引起的? > > > > 发件人: [hidden email] > 发送时间: 2020-09-03 09:23 > 收件人: user-zh > 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! > 谢谢回复! > > 是Flink1.11.1的版本 > > 以下是代码: > package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > import org.apache.commons.collections.map.HashedMap; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringEncoder; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.state.BroadcastState; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.state.ReadOnlyBroadcastState; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple4; > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.fs.Path; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.BroadcastStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; > import org.apache.flink.streaming.api.functions.co > .BroadcastProcessFunction; > import > org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > import > org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONArray; > import com.alibaba.fastjson.JSONObject; > import com.alibaba.fastjson.parser.Feature; > import com.ubtechinc.dataplatform.flink.util.AES256; > import com.ubtechinc.dataplatform.flink.util.ConstantStr; > import com.ubtechinc.dataplatform.flink.util.MailUtils; > import com.ubtechinc.dataplatform.flink.util.SmsUtil; > import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; > > import java.sql.DriverManager; > import java.sql.PreparedStatement; > import java.sql.ResultSet; > import com.mysql.jdbc.Connection; > > import java.sql.Timestamp; > import java.text.MessageFormat; > import java.time.Duration; > import java.util.ArrayList; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > > /** > * 使用广播实现动态的配置更新 > */ > public class ExceptionAlertHour4{ > > private static final Logger LOG = > LoggerFactory.getLogger(ExceptionAlertHour4.class); > > public static void main(String[] args) throws Exception{ > ParameterTool parameterTool = ParameterTool.fromArgs(args); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(parameterTool); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //System.out.println(parameterTool.get("envFlag")); > Properties properties = new Properties(); > String hdfs = null; > long maxTime = 0; > long maxSize = 0; > long inActive = 0; > DataStream<String> ds =null; > > if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { > hdfs = ConstantStr.HDFS_IP_PORT_TEST; > maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; > maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; > inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; > properties.setProperty("bootstrap.servers", > ConstantStr.KAFKA_IP_PORT_TEST); > properties.setProperty("zookeeper.connect", > ConstantStr.ZOOKEEPER_IP_PORT_TEST); > properties.setProperty("group.id", > "etl.exception.monitor.ExceptionAlertHour4-001"); > Map<KafkaTopicPartition, Long> offsets = new HashedMap(); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), > 2800L); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), > 2700L); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), > 3300L); > ds = env.addSource(new > FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new > SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); > } else if > (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { > hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; > maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; > maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; > inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; > properties.setProperty("bootstrap.servers", > ConstantStr.KAFKA_IP_PORT_PRODUCT); > properties.setProperty("zookeeper.connect", > ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); > properties.setProperty("group.id", > "etl.exception.monitor.ExceptionAlertHour-001"); > properties.setProperty("auto.offset.reset", "earliest"); > ds = env.addSource(new > FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new > SimpleStringSchema(), properties)); > } else { > System.exit(-1); > } > // transform > SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = > ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() > { > > @Override > public void flatMap(String value, > Collector<Tuple4<String,String,String,Long>> out) { > > //System.out.println("Kafka2Hdfs-in:" + value); > String newStr = > value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", > "<brbr>"); > //System.out.println("Kafka2Hdfs-newStr:" + newStr); > > try { > // 解析JSON数据 > JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); > > // 获取最新的字段值 > JSONArray bodyDataArray = record.getJSONArray("body_data"); > // 遍历,字段值的JSON数组,只有一个元素 > for (int i = 0; i < bodyDataArray.size(); i++) { > // 获取到JSON数组的第i个元素 > JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); > > if (bodyDataObj != null) { > Tuple4 log = Tuple4.of( > record.getString("HW-AppId"), > bodyDataObj.getString("HW-bugId"), > bodyDataObj.getString("HW-bugType"), > Long.valueOf(bodyDataObj.getString("HW-happenedAt")) > ); > out.collect(log); > } > } > > } catch (Exception e) { > System.out.println(e.getMessage()); > } > } > > }); > singleDS.print(); > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> > withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) > .withTimestampAssigner((event, timestamp)->event.f3)); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > > String sql = "select appid,eventid,cnt," + > "(starttime + interval '8' hour ) as stime," + > "(endtime + interval '8' hour ) as etime " + > "from (select appid,eventid,count(*) as cnt," + > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' > HOUR),TIME '00:00:00')"; > > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); > dataStream.print(); > > env.execute("etl.exception.monitor.ExceptionAlertHour"); > } > > > > public static class Result{ > private String appid; > private String eventid; > private long cnt; > private Timestamp stime; > private Timestamp etime; > public String getAppid() { > return appid; > } > > public void setAppid(String appid) { > this.appid = appid; > } > > public String getEventid() { > return eventid; > } > > public void setEventid(String eventid) { > this.eventid = eventid; > } > > public long getCnt() { > return cnt; > } > > public void setCnt(long cnt) { > this.cnt = cnt; > } > > > public Timestamp getStime(){ > return stime; > } > > public void setStime(Timestamp stime){ > this.stime = stime; > } > > public Timestamp getEtime(){ > return etime; > } > > public void setEtime(Timestamp etime){ > this.etime = etime; > } > > @Override > public String toString(){ > return "ResultHour{" + > "appid=" + appid + > ",eventid=" + eventid + > ",cnt=" + cnt + > ", stime=" + stime + > ", etime=" + etime + > ", SystemTime=" + System.currentTimeMillis() + > '}'; > } > } > > } > > > 发件人: jacky-cui > 发送时间: 2020-09-02 18:58 > 收件人: user-zh > 主题: 回复:请指教一个关于时间窗的问题,非常感谢! > 你这个flink是什么版本,能贴全代码吗 > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年9月2日(星期三) 下午3:20 > 收件人: "user-zh"<[hidden email]>; > > 主题: 请指教一个关于时间窗的问题,非常感谢! > > > > 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! > > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> > withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark > .withTimestampAssigner((event, timestamp)->event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > > "(starttime + interval '8' hour ) as stime," + > > "(endtime + interval '8' hour ) as etime " > + > "from > (select appid,eventid,count(*) as cnt," + > > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from > log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME > '00:00:00')"; //希望整点结束时触发时间窗关闭 > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, > Result.class); > > 输出的结果是: > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 > 等到这条数据上来后才触发 > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 > 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 > //2020/9/2 15:23:35} > 请问一下哪里出了问题?万分感谢! > -- Best, Benchao Li |
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: > 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 > 要处理这种情况,可以了解下idle source[1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources > > [hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道: > >> 补充一下环境信息: >> >> 有点类似以下问题: >> 在1.11版本测试flink sql时发现一个问题,用streaming api >> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui >> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka >> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 >> >> 不确定是否是因为kafka多分区引起的? >> >> >> >> 发件人: [hidden email] >> 发送时间: 2020-09-03 09:23 >> 收件人: user-zh >> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! >> 谢谢回复! >> >> 是Flink1.11.1的版本 >> >> 以下是代码: >> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> import org.apache.commons.collections.map.HashedMap; >> import org.apache.flink.api.common.eventtime.WatermarkStrategy; >> import org.apache.flink.api.common.functions.FlatMapFunction; >> import org.apache.flink.api.common.serialization.SimpleStringEncoder; >> import org.apache.flink.api.common.serialization.SimpleStringSchema; >> import org.apache.flink.api.common.state.BroadcastState; >> import org.apache.flink.api.common.state.MapStateDescriptor; >> import org.apache.flink.api.common.state.ReadOnlyBroadcastState; >> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.java.tuple.Tuple4; >> import org.apache.flink.api.java.utils.ParameterTool; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.core.fs.Path; >> import org.apache.flink.runtime.state.StateBackend; >> import org.apache.flink.runtime.state.filesystem.FsStateBackend; >> import org.apache.flink.streaming.api.CheckpointingMode; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.BroadcastStream; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.datastream.DataStreamSource; >> import >> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; >> import org.apache.flink.streaming.api.functions.co >> .BroadcastProcessFunction; >> import >> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; >> import >> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >> import >> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; >> import org.apache.flink.streaming.api.functions.source.RichSourceFunction; >> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >> import >> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> import org.apache.flink.util.Collector; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import com.alibaba.fastjson.JSON; >> import com.alibaba.fastjson.JSONArray; >> import com.alibaba.fastjson.JSONObject; >> import com.alibaba.fastjson.parser.Feature; >> import com.ubtechinc.dataplatform.flink.util.AES256; >> import com.ubtechinc.dataplatform.flink.util.ConstantStr; >> import com.ubtechinc.dataplatform.flink.util.MailUtils; >> import com.ubtechinc.dataplatform.flink.util.SmsUtil; >> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; >> >> import java.sql.DriverManager; >> import java.sql.PreparedStatement; >> import java.sql.ResultSet; >> import com.mysql.jdbc.Connection; >> >> import java.sql.Timestamp; >> import java.text.MessageFormat; >> import java.time.Duration; >> import java.util.ArrayList; >> import java.util.HashMap; >> import java.util.List; >> import java.util.Map; >> import java.util.Properties; >> >> /** >> * 使用广播实现动态的配置更新 >> */ >> public class ExceptionAlertHour4{ >> >> private static final Logger LOG = >> LoggerFactory.getLogger(ExceptionAlertHour4.class); >> >> public static void main(String[] args) throws Exception{ >> ParameterTool parameterTool = ParameterTool.fromArgs(args); >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.getConfig().setGlobalJobParameters(parameterTool); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> //System.out.println(parameterTool.get("envFlag")); >> Properties properties = new Properties(); >> String hdfs = null; >> long maxTime = 0; >> long maxSize = 0; >> long inActive = 0; >> DataStream<String> ds =null; >> >> if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { >> hdfs = ConstantStr.HDFS_IP_PORT_TEST; >> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; >> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; >> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; >> properties.setProperty("bootstrap.servers", >> ConstantStr.KAFKA_IP_PORT_TEST); >> properties.setProperty("zookeeper.connect", >> ConstantStr.ZOOKEEPER_IP_PORT_TEST); >> properties.setProperty("group.id", >> "etl.exception.monitor.ExceptionAlertHour4-001"); >> Map<KafkaTopicPartition, Long> offsets = new HashedMap(); >> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), >> 2800L); >> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), >> 2700L); >> offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), >> 3300L); >> ds = env.addSource(new >> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new >> SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); >> } else if >> (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { >> hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; >> maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; >> maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; >> inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; >> properties.setProperty("bootstrap.servers", >> ConstantStr.KAFKA_IP_PORT_PRODUCT); >> properties.setProperty("zookeeper.connect", >> ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); >> properties.setProperty("group.id", >> "etl.exception.monitor.ExceptionAlertHour-001"); >> properties.setProperty("auto.offset.reset", "earliest"); >> ds = env.addSource(new >> FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new >> SimpleStringSchema(), properties)); >> } else { >> System.exit(-1); >> } >> // transform >> SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = >> ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() >> { >> >> @Override >> public void flatMap(String value, >> Collector<Tuple4<String,String,String,Long>> out) { >> >> //System.out.println("Kafka2Hdfs-in:" + value); >> String newStr = >> value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", >> "<brbr>"); >> //System.out.println("Kafka2Hdfs-newStr:" + newStr); >> >> try { >> // 解析JSON数据 >> JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); >> >> // 获取最新的字段值 >> JSONArray bodyDataArray = record.getJSONArray("body_data"); >> // 遍历,字段值的JSON数组,只有一个元素 >> for (int i = 0; i < bodyDataArray.size(); i++) { >> // 获取到JSON数组的第i个元素 >> JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); >> >> if (bodyDataObj != null) { >> Tuple4 log = Tuple4.of( >> record.getString("HW-AppId"), >> bodyDataObj.getString("HW-bugId"), >> bodyDataObj.getString("HW-bugType"), >> Long.valueOf(bodyDataObj.getString("HW-happenedAt")) >> ); >> out.collect(log); >> } >> } >> >> } catch (Exception e) { >> System.out.println(e.getMessage()); >> } >> } >> >> }); >> singleDS.print(); >> //指定eventtime字段及生成watermark >> DataStream<Tuple4<String,String,String,Long>> >> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( >> WatermarkStrategy >> >> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) >> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() >> .withIdleness(Duration.ofSeconds(10)) >> .withTimestampAssigner((event, timestamp)->event.f3)); >> StreamTableEnvironment tenv = StreamTableEnvironment.create(env); >> tenv.registerDataStream( >> "log", >> withTimestampsAndWatermarksDS, >> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); >> >> >> String sql = "select appid,eventid,cnt," + >> "(starttime + interval '8' hour ) as stime," + >> "(endtime + interval '8' hour ) as etime " + >> "from (select appid,eventid,count(*) as cnt," + >> "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + >> "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + >> "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' >> HOUR),TIME '00:00:00')"; >> >> >> Table table = tenv.sqlQuery(sql); >> DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); >> dataStream.print(); >> >> env.execute("etl.exception.monitor.ExceptionAlertHour"); >> } >> >> >> >> public static class Result{ >> private String appid; >> private String eventid; >> private long cnt; >> private Timestamp stime; >> private Timestamp etime; >> public String getAppid() { >> return appid; >> } >> >> public void setAppid(String appid) { >> this.appid = appid; >> } >> >> public String getEventid() { >> return eventid; >> } >> >> public void setEventid(String eventid) { >> this.eventid = eventid; >> } >> >> public long getCnt() { >> return cnt; >> } >> >> public void setCnt(long cnt) { >> this.cnt = cnt; >> } >> >> >> public Timestamp getStime(){ >> return stime; >> } >> >> public void setStime(Timestamp stime){ >> this.stime = stime; >> } >> >> public Timestamp getEtime(){ >> return etime; >> } >> >> public void setEtime(Timestamp etime){ >> this.etime = etime; >> } >> >> @Override >> public String toString(){ >> return "ResultHour{" + >> "appid=" + appid + >> ",eventid=" + eventid + >> ",cnt=" + cnt + >> ", stime=" + stime + >> ", etime=" + etime + >> ", SystemTime=" + System.currentTimeMillis() + >> '}'; >> } >> } >> >> } >> >> >> 发件人: jacky-cui >> 发送时间: 2020-09-02 18:58 >> 收件人: user-zh >> 主题: 回复:请指教一个关于时间窗的问题,非常感谢! >> 你这个flink是什么版本,能贴全代码吗 >> >> >> ------------------ 原始邮件 ------------------ >> 发件人: >> "user-zh" >> < >> [hidden email]>; >> 发送时间: 2020年9月2日(星期三) 下午3:20 >> 收件人: "user-zh"<[hidden email]>; >> >> 主题: 请指教一个关于时间窗的问题,非常感谢! >> >> >> >> 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! >> >> //指定eventtime字段及生成watermark >> DataStream<Tuple4<String,String,String,Long>> >> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( >> WatermarkStrategy >> >> .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) >> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() >> .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark >> .withTimestampAssigner((event, timestamp)->event.f3)); >> >> StreamTableEnvironment tenv = StreamTableEnvironment.create(env); >> tenv.registerDataStream( >> "log", >> withTimestampsAndWatermarksDS, >> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); >> >> String sql = "select appid,eventid,cnt," + >> >> "(starttime + interval '8' hour ) as stime," + >> >> "(endtime + interval '8' hour ) as etime " >> + >> "from >> (select appid,eventid,count(*) as cnt," + >> >> "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + >> >> "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + >> "from >> log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME >> '00:00:00')"; //希望整点结束时触发时间窗关闭 >> >> Table table = tenv.sqlQuery(sql); >> DataStream<Result> dataStream = tenv.toAppendStream(table, >> Result.class); >> >> 输出的结果是: >> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 >> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 >> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 >> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 >> 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 >> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 >> 等到这条数据上来后才触发 >> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 >> 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 >> //2020/9/2 15:23:35} >> 请问一下哪里出了问题?万分感谢! >> > |
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
| | superainbower | | [hidden email] | 签名由网易邮箱大师定制 在2020年09月4日 15:11,taochanglian<[hidden email]> 写道: 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources [hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道: 补充一下环境信息: 有点类似以下问题: 在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 不确定是否是因为kafka多分区引起的? 发件人: [hidden email] 发送时间: 2020-09-03 09:23 收件人: user-zh 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! 谢谢回复! 是Flink1.11.1的版本 以下是代码: package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co .BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * 使用广播实现动态的配置更新 */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[] args) throws Exception{ ParameterTool parameterTool = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //System.out.println(parameterTool.get("envFlag")); Properties properties = new Properties(); String hdfs = null; long maxTime = 0; long maxSize = 0; long inActive = 0; DataStream<String> ds =null; if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_TEST; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001"); Map<KafkaTopicPartition, Long> offsets = new HashedMap(); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); } else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001"); properties.setProperty("auto.offset.reset", "earliest"); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties)); } else { System.exit(-1); } // transform SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() { @Override public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> out) { //System.out.println("Kafka2Hdfs-in:" + value); String newStr = value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>"); //System.out.println("Kafka2Hdfs-newStr:" + newStr); try { // 解析JSON数据 JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); // 获取最新的字段值 JSONArray bodyDataArray = record.getJSONArray("body_data"); // 遍历,字段值的JSON数组,只有一个元素 for (int i = 0; i < bodyDataArray.size(); i++) { // 获取到JSON数组的第i个元素 JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); if (bodyDataObj != null) { Tuple4 log = Tuple4.of( record.getString("HW-AppId"), bodyDataObj.getString("HW-bugId"), bodyDataObj.getString("HW-bugType"), Long.valueOf(bodyDataObj.getString("HW-happenedAt")) ); out.collect(log); } } } catch (Exception e) { System.out.println(e.getMessage()); } } }); singleDS.print(); //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); dataStream.print(); env.execute("etl.exception.monitor.ExceptionAlertHour"); } public static class Result{ private String appid; private String eventid; private long cnt; private Timestamp stime; private Timestamp etime; public String getAppid() { return appid; } public void setAppid(String appid) { this.appid = appid; } public String getEventid() { return eventid; } public void setEventid(String eventid) { this.eventid = eventid; } public long getCnt() { return cnt; } public void setCnt(long cnt) { this.cnt = cnt; } public Timestamp getStime(){ return stime; } public void setStime(Timestamp stime){ this.stime = stime; } public Timestamp getEtime(){ return etime; } public void setEtime(Timestamp etime){ this.etime = etime; } @Override public String toString(){ return "ResultHour{" + "appid=" + appid + ",eventid=" + eventid + ",cnt=" + cnt + ", stime=" + stime + ", etime=" + etime + ", SystemTime=" + System.currentTimeMillis() + '}'; } } } 发件人: jacky-cui 发送时间: 2020-09-02 18:58 收件人: user-zh 主题: 回复:请指教一个关于时间窗的问题,非常感谢! 你这个flink是什么版本,能贴全代码吗 ------------------ 原始邮件 ------------------ 发件人: "user-zh" < [hidden email]>; 发送时间: 2020年9月2日(星期三) 下午3:20 收件人: "user-zh"<[hidden email]>; 主题: 请指教一个关于时间窗的问题,非常感谢! 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! //指定eventtime字段及生成watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //希望整点结束时触发时间窗关闭 Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); 输出的结果是: (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 等到这条数据上来后才触发 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} 请问一下哪里出了问题?万分感谢! |
可否发下是哪个配置,有相关的文档吗?
superainbower <[hidden email]> 于2020年9月4日周五 下午5:24写道: > 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 > > > | | > superainbower > | > | > [hidden email] > | > 签名由网易邮箱大师定制 > > > 在2020年09月4日 15:11,taochanglian<[hidden email]> 写道: > 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 > > 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key > hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 > > 在 2020/9/4 13:14, Benchao Li 写道: > 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 > 要处理这种情况,可以了解下idle source[1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources > > [hidden email] <[hidden email]> 于2020年9月3日周四 下午3:41写道: > > 补充一下环境信息: > > 有点类似以下问题: > 在1.11版本测试flink sql时发现一个问题,用streaming api > 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui > watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka > topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 > > 不确定是否是因为kafka多分区引起的? > > > > 发件人: [hidden email] > 发送时间: 2020-09-03 09:23 > 收件人: user-zh > 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! > 谢谢回复! > > 是Flink1.11.1的版本 > > 以下是代码: > package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > import org.apache.commons.collections.map.HashedMap; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringEncoder; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.state.BroadcastState; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.state.ReadOnlyBroadcastState; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple4; > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.fs.Path; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.BroadcastStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > > org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; > import org.apache.flink.streaming.api.functions.co > .BroadcastProcessFunction; > import > org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > import > > org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONArray; > import com.alibaba.fastjson.JSONObject; > import com.alibaba.fastjson.parser.Feature; > import com.ubtechinc.dataplatform.flink.util.AES256; > import com.ubtechinc.dataplatform.flink.util.ConstantStr; > import com.ubtechinc.dataplatform.flink.util.MailUtils; > import com.ubtechinc.dataplatform.flink.util.SmsUtil; > import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; > > import java.sql.DriverManager; > import java.sql.PreparedStatement; > import java.sql.ResultSet; > import com.mysql.jdbc.Connection; > > import java.sql.Timestamp; > import java.text.MessageFormat; > import java.time.Duration; > import java.util.ArrayList; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > > /** > * 使用广播实现动态的配置更新 > */ > public class ExceptionAlertHour4{ > > private static final Logger LOG = > LoggerFactory.getLogger(ExceptionAlertHour4.class); > > public static void main(String[] args) throws Exception{ > ParameterTool parameterTool = ParameterTool.fromArgs(args); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(parameterTool); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //System.out.println(parameterTool.get("envFlag")); > Properties properties = new Properties(); > String hdfs = null; > long maxTime = 0; > long maxSize = 0; > long inActive = 0; > DataStream<String> ds =null; > > if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { > hdfs = ConstantStr.HDFS_IP_PORT_TEST; > maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; > maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; > inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; > properties.setProperty("bootstrap.servers", > ConstantStr.KAFKA_IP_PORT_TEST); > properties.setProperty("zookeeper.connect", > ConstantStr.ZOOKEEPER_IP_PORT_TEST); > properties.setProperty("group.id", > "etl.exception.monitor.ExceptionAlertHour4-001"); > Map<KafkaTopicPartition, Long> offsets = new HashedMap(); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), > 2800L); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), > 2700L); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), > 3300L); > ds = env.addSource(new > FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new > SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); > } else if > (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { > hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; > maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; > maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; > inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; > properties.setProperty("bootstrap.servers", > ConstantStr.KAFKA_IP_PORT_PRODUCT); > properties.setProperty("zookeeper.connect", > ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); > properties.setProperty("group.id", > "etl.exception.monitor.ExceptionAlertHour-001"); > properties.setProperty("auto.offset.reset", "earliest"); > ds = env.addSource(new > FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new > SimpleStringSchema(), properties)); > } else { > System.exit(-1); > } > // transform > SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = > ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() > { > > @Override > public void flatMap(String value, > Collector<Tuple4<String,String,String,Long>> out) { > > //System.out.println("Kafka2Hdfs-in:" + value); > String newStr = > value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", > "<brbr>"); > //System.out.println("Kafka2Hdfs-newStr:" + newStr); > > try { > // 解析JSON数据 > JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); > > // 获取最新的字段值 > JSONArray bodyDataArray = record.getJSONArray("body_data"); > // 遍历,字段值的JSON数组,只有一个元素 > for (int i = 0; i < bodyDataArray.size(); i++) { > // 获取到JSON数组的第i个元素 > JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); > > if (bodyDataObj != null) { > Tuple4 log = Tuple4.of( > record.getString("HW-AppId"), > bodyDataObj.getString("HW-bugId"), > bodyDataObj.getString("HW-bugType"), > Long.valueOf(bodyDataObj.getString("HW-happenedAt")) > ); > out.collect(log); > } > } > > } catch (Exception e) { > System.out.println(e.getMessage()); > } > } > > }); > singleDS.print(); > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> > withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) > .withTimestampAssigner((event, timestamp)->event.f3)); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > > String sql = "select appid,eventid,cnt," + > "(starttime + interval '8' hour ) as stime," + > "(endtime + interval '8' hour ) as etime " + > "from (select appid,eventid,count(*) as cnt," + > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' > HOUR),TIME '00:00:00')"; > > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); > dataStream.print(); > > env.execute("etl.exception.monitor.ExceptionAlertHour"); > } > > > > public static class Result{ > private String appid; > private String eventid; > private long cnt; > private Timestamp stime; > private Timestamp etime; > public String getAppid() { > return appid; > } > > public void setAppid(String appid) { > this.appid = appid; > } > > public String getEventid() { > return eventid; > } > > public void setEventid(String eventid) { > this.eventid = eventid; > } > > public long getCnt() { > return cnt; > } > > public void setCnt(long cnt) { > this.cnt = cnt; > } > > > public Timestamp getStime(){ > return stime; > } > > public void setStime(Timestamp stime){ > this.stime = stime; > } > > public Timestamp getEtime(){ > return etime; > } > > public void setEtime(Timestamp etime){ > this.etime = etime; > } > > @Override > public String toString(){ > return "ResultHour{" + > "appid=" + appid + > ",eventid=" + eventid + > ",cnt=" + cnt + > ", stime=" + stime + > ", etime=" + etime + > ", SystemTime=" + System.currentTimeMillis() + > '}'; > } > } > > } > > > 发件人: jacky-cui > 发送时间: 2020-09-02 18:58 > 收件人: user-zh > 主题: 回复:请指教一个关于时间窗的问题,非常感谢! > 你这个flink是什么版本,能贴全代码吗 > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [hidden email]>; > 发送时间: 2020年9月2日(星期三) 下午3:20 > 收件人: "user-zh"<[hidden email]>; > > 主题: 请指教一个关于时间窗的问题,非常感谢! > > > > 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! > > > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> > withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark > .withTimestampAssigner((event, timestamp)->event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > > "(starttime + interval '8' hour ) as stime," + > > "(endtime + interval '8' hour ) as etime " > + > "from > (select appid,eventid,count(*) as cnt," + > > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from > log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME > '00:00:00')"; //希望整点结束时触发时间窗关闭 > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, > Result.class); > > 输出的结果是: > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 > 等到这条数据上来后才触发 > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 > 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 > //2020/9/2 15:23:35} > 请问一下哪里出了问题?万分感谢! > > > |
In reply to this post by samuel.qiu@ubtrobot.com
您好!
可以使用Flink+drools 做。drools可以实时更新规则 2020-9-4 | | 李军 | | [hidden email] | 签名由网易邮箱大师定制 在2020年8月6日 10:26,[hidden email]<[hidden email]> 写道: 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢! 告警有分两部分: 一是告警规则的设置,数据存放在mysql,存储的格式是json {"times":5} ---就是事件发生大于5次就发出告警; {"temperature": 80} ---就是温度大于80就告警; 二是告警实现 1)上报的数据写入到kafka 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。 现在遇到的问题是: 1. 当规则变更时,如何及时生效? 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效? 3.这一功能有最佳实践吗? 希望哪位解答一下,谢谢! |
Free forum by Nabble | Edit this page |