例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 12:46 (4), 13:16 (0) 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题? |
首先通过一个自定义表函数(table function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1), (ts+31, 0),
然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM 袁尤军 -----邮件原件----- 发件人: 陈帅 <[hidden email]> 发送时间: Wednesday, December 11, 2019 9:31 PM 收件人: [hidden email] 主题: flink持续查询过去30分钟登录网站的人数 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 12:46 (4), 13:16 (0) 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题? |
感谢你的回复,不过我仍有个疑问,这里的ts是事件时间event time,如何能够按处理时间processing
time推移而不依赖后续输入消息来自动修改SUM值? 例如输入select查询后,这时进来了一条用户登录消息,后面就一直没有别的用户登录消息进来,这时看到的行为应该是统计登录人数为1,而过了30分钟后这个统计登录人数自动变为0. Yuan,Youjun <[hidden email]> 于2019年12月12日周四 上午9:28写道: > 首先通过一个自定义表函数(table > function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1), > (ts+31, 0), > 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM > > 袁尤军 > > -----邮件原件----- > 发件人: 陈帅 <[hidden email]> > 发送时间: Wednesday, December 11, 2019 9:31 PM > 收件人: [hidden email] > 主题: flink持续查询过去30分钟登录网站的人数 > > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无 > > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为 > 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41 > (5), 12:46 (4), 13:16 (0) > > 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。 > > 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题? > |
In reply to this post by Yuan,Youjun
附上我写的flink程序,输入Jack,输出一直是
Jack,0 Jack,1 Jack,1 而不是过了30秒后自动变回 Jack,0 麻烦看下是哪里的问题?谢谢! import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import java.sql.Timestamp; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; public class CustomWindowExample4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.setParallelism(1); DataStream<Tuple2<String, Timestamp>> callStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Timestamp>>() { @Override public Tuple2<String, Timestamp> map(String value) throws Exception { return Tuple2.of(value, new Timestamp(System.currentTimeMillis())); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Timestamp>>(Time.seconds(5)) { @Override public long extractTimestamp(Tuple2<String, Timestamp> element) { return element.f1.getTime(); } }); StreamTableEnvironment stblEnv = StreamTableEnvironment.create(env); stblEnv.registerDataStream("tbl", callStream, "phone, ts"); stblEnv.registerFunction("udtf", new MyTableFunction()); Table genTable = stblEnv.sqlQuery("select phone1, flag1, ts1 from tbl, LATERAL TABLE(udtf(phone, ts)) as t(phone1, ts1, flag1)"); DataStream<Tuple3<String, Integer, Timestamp>> genStream = stblEnv.toAppendStream(genTable, Row.class) .map(new MapFunction<Row, Tuple3<String, Integer, Timestamp>>() { @Override public Tuple3<String, Integer, Timestamp> map(Row row) throws Exception { return Tuple3.of((String)row.getField(0), (Integer)row.getField(1), (Timestamp)row.getField(2)); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Timestamp>>(Time.seconds(5)) { @Override public long extractTimestamp(Tuple3<String, Integer, Timestamp> element) { return element.f2.getTime(); } }); stblEnv.registerDataStream("tbl1", genStream, "phone1, flag1, ts1, pt.proctime"); String query = "SELECT phone1, SUM(flag1) OVER (PARTITION BY phone1 ORDER BY pt RANGE BETWEEN INTERVAL '30' SECOND preceding AND CURRENT ROW) AS last30SecondsCount FROM tbl1"; Table resultTable = stblEnv.sqlQuery(query); stblEnv.toAppendStream(resultTable, Row.class).print(); env.execute(); } public static class MyTableFunction extends TableFunction<Row> { public void eval(String phone, Timestamp t) { Timestamp t0 = addSeconds(t, -1); Timestamp t1 = addSeconds(t, 1); Timestamp t2 = addSeconds(t, 31); collect(createRow(phone, t0, 0)); collect(createRow(phone, t1, 1)); collect(createRow(phone, t2, 0)); } private Row createRow(String phone, Timestamp ts, Integer flag) { Row row = new Row(3); row.setField(0, phone); row.setField(1, ts); row.setField(2, flag); return row; } @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING(), Types.SQL_TIMESTAMP(), Types.INT()); } private Timestamp addSeconds(Timestamp oldTime, long seconds) { ZonedDateTime zonedDateTime = oldTime.toInstant().atZone(ZoneId.systemDefault()); Timestamp newTime = Timestamp.from(zonedDateTime.plus(seconds, ChronoUnit.SECONDS).toInstant()); return newTime; } } } Yuan,Youjun <[hidden email]> 于2019年12月12日周四 上午9:28写道: > 首先通过一个自定义表函数(table > function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 0), (ts+1, 1), > (ts+31, 0), > 然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM > > 袁尤军 > > -----邮件原件----- > 发件人: 陈帅 <[hidden email]> > 发送时间: Wednesday, December 11, 2019 9:31 PM > 收件人: [hidden email] > 主题: flink持续查询过去30分钟登录网站的人数 > > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无 > > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为 > 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), 12:41 > (5), 12:46 (4), 13:16 (0) > > 即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。 > > 用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题? > |
Free forum by Nabble | Edit this page |