回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。
所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) 就会有些问题,很多数据被作为late数据直接丢掉了。 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: > 您好,我想请教一个问题: > flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 > 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime > and a.rowtime + INTERVAL '1' HOUR > ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + > leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + > allowedLateness + > 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, > rightRelativeSize) + > allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group > by的时候这种右表数据为空的数据就丢掉了啊。 > flink版本 1.10.0。 > > 下面是我的一段测试代码: > > import org.apache.commons.net.ntp.TimeStamp; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; > import org.apache.flink.streaming.api.functions.ProcessFunction; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.streaming.api.watermark.Watermark; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.functions.ScalarFunction; > import org.apache.flink.types.Row; > import org.apache.flink.util.Collector; > import org.apache.flink.util.IOUtils; > > import java.io.BufferedReader; > import java.io.InputStreamReader; > import java.io.Serializable; > import java.net.InetSocketAddress; > import java.net.Socket; > import java.sql.Timestamp; > import java.text.SimpleDateFormat; > import java.util.ArrayList; > import java.util.Date; > import java.util.List; > > public class TimeBoundedJoin { > > public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { > AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() { > private long currentMaxTimestamp = 0; > private long lastMaxTimestamp = 0; > private long lastUpdateTime = 0; > boolean firstWatermark = true; > // Integer maxIdleTime = 30; > > @Override > public Watermark getCurrentWatermark() { > if(firstWatermark) { > lastUpdateTime = System.currentTimeMillis(); > firstWatermark = false; > } > if(currentMaxTimestamp != lastMaxTimestamp) { > lastMaxTimestamp = currentMaxTimestamp; > lastUpdateTime = System.currentTimeMillis(); > } > if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) { > return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000); > } > return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000); > > } > > @Override > public long extractTimestamp(Row row, long previousElementTimestamp) { > Object value = row.getField(1); > long timestamp; > try { > timestamp = (long)value; > } catch (Exception e) { > timestamp = ((Timestamp)value).getTime(); > } > if(timestamp > currentMaxTimestamp) { > currentMaxTimestamp = timestamp; > } > return timestamp; > } > }; > return timestampExtractor; > } > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); > bsEnv.setParallelism(1); > bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000)); > SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); > List<Row> list = new ArrayList<>(); > list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100)); > list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100)); > list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100)); > list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100)); > list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100)); > list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100)); > DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() { > @Override > public void run(SourceContext<Row> ctx) throws Exception { > for(Row row : list) { > ctx.collect(row); > Thread.sleep(1000); > } > > } > > @Override > public void cancel() { > > } > }); > ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0)); > ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); > bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime"); > > List<Row> list2 = new ArrayList<>(); > list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime()))); > list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()))); > // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()))); > list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime()))); > list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime()))); > list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime()))); > DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() { > @Override > public void run(SourceContext<Row> ctx) throws Exception { > for(Row row : list2) { > ctx.collect(row); > Thread.sleep(1000); > } > > } > > @Override > public void cancel() { > > } > }); > ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0)); > ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); > bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime"); > > Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' "); > > bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() { > @Override > public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception { > SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); > System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark())); > } > }); > > bsTableEnv.execute("job"); > } > } > > -- Best, Benchao Li |
谢谢您的解答。感觉flink这个机制有点奇怪呢
------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年7月5日(星期天) 中午11:58 收件人: "元始(Bob Hu)"<[hidden email]>; 抄送: "user-zh"<[hidden email]>; 主题: Re: flink interval join后按窗口聚组问题 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) 就会有些问题,很多数据被作为late数据直接丢掉了。 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: 您好,我想请教一个问题: flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + allowedLateness + 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, rightRelativeSize) + allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group by的时候这种右表数据为空的数据就丢掉了啊。 flink版本 1.10.0。 下面是我的一段测试代码: import org.apache.commons.net.ntp.TimeStamp; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.IOUtils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.Serializable; import java.net.InetSocketAddress; import java.net.Socket; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; public class TimeBoundedJoin { public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() { private long currentMaxTimestamp = 0; private long lastMaxTimestamp = 0; private long lastUpdateTime = 0; boolean firstWatermark = true; // Integer maxIdleTime = 30; @Override public Watermark getCurrentWatermark() { if(firstWatermark) { lastUpdateTime = System.currentTimeMillis(); firstWatermark = false; } if(currentMaxTimestamp != lastMaxTimestamp) { lastMaxTimestamp = currentMaxTimestamp; lastUpdateTime = System.currentTimeMillis(); } if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) { return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000); } return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000); } @Override public long extractTimestamp(Row row, long previousElementTimestamp) { Object value = row.getField(1); long timestamp; try { timestamp = (long)value; } catch (Exception e) { timestamp = ((Timestamp)value).getTime(); } if(timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }; return timestampExtractor; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000)); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); List<Row> list = new ArrayList<>(); list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100)); list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100)); list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100)); list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100)); list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100)); list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100)); DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() { @Override public void run(SourceContext<Row> ctx) throws Exception { for(Row row : list) { ctx.collect(row); Thread.sleep(1000); } } @Override public void cancel() { } }); ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0)); ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime"); List<Row> list2 = new ArrayList<>(); list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime()))); list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()))); // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()))); list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime()))); list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime()))); list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime()))); DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() { @Override public void run(SourceContext<Row> ctx) throws Exception { for(Row row : list2) { ctx.collect(row); Thread.sleep(1000); } } @Override public void cancel() { } }); ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0)); ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime"); Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' "); bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() { @Override public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark())); } }); bsTableEnv.execute("job"); } } -- Best, Benchao Li |
我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间, 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据 的时间最早的那个。 元始(Bob Hu) <[hidden email]> 于2020年7月5日周日 下午8:48写道: > 谢谢您的解答。感觉flink这个机制有点奇怪呢 > > > ------------------ 原始邮件 ------------------ > *发件人:* "Benchao Li"<[hidden email]>; > *发送时间:* 2020年7月5日(星期天) 中午11:58 > *收件人:* "元始(Bob Hu)"<[hidden email]>; > *抄送:* "user-zh"<[hidden email]>; > *主题:* Re: flink interval join后按窗口聚组问题 > > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。 > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) > 就会有些问题,很多数据被作为late数据直接丢掉了。 > > 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: > >> 您好,我想请教一个问题: >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between a.rowtime >> and a.rowtime + INTERVAL '1' HOUR >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + >> allowedLateness + >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, >> rightRelativeSize) + >> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group >> by的时候这种右表数据为空的数据就丢掉了啊。 >> flink版本 1.10.0。 >> >> 下面是我的一段测试代码: >> >> import org.apache.commons.net.ntp.TimeStamp; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.Types; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >> import org.apache.flink.streaming.api.functions.ProcessFunction; >> import org.apache.flink.streaming.api.functions.source.SourceFunction; >> import org.apache.flink.streaming.api.watermark.Watermark; >> import org.apache.flink.table.api.EnvironmentSettings; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> import org.apache.flink.table.functions.ScalarFunction; >> import org.apache.flink.types.Row; >> import org.apache.flink.util.Collector; >> import org.apache.flink.util.IOUtils; >> >> import java.io.BufferedReader; >> import java.io.InputStreamReader; >> import java.io.Serializable; >> import java.net.InetSocketAddress; >> import java.net.Socket; >> import java.sql.Timestamp; >> import java.text.SimpleDateFormat; >> import java.util.ArrayList; >> import java.util.Date; >> import java.util.List; >> >> public class TimeBoundedJoin { >> >> public static AssignerWithPeriodicWatermarks<Row> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new AssignerWithPeriodicWatermarks<Row>() { >> private long currentMaxTimestamp = 0; >> private long lastMaxTimestamp = 0; >> private long lastUpdateTime = 0; >> boolean firstWatermark = true; >> // Integer maxIdleTime = 30; >> >> @Override >> public Watermark getCurrentWatermark() { >> if(firstWatermark) { >> lastUpdateTime = System.currentTimeMillis(); >> firstWatermark = false; >> } >> if(currentMaxTimestamp != lastMaxTimestamp) { >> lastMaxTimestamp = currentMaxTimestamp; >> lastUpdateTime = System.currentTimeMillis(); >> } >> if(maxIdleTime != null && System.currentTimeMillis() - lastUpdateTime > maxIdleTime * 1000) { >> return new Watermark(new Date().getTime() - finalMaxOutOfOrderness * 1000); >> } >> return new Watermark(currentMaxTimestamp - finalMaxOutOfOrderness * 1000); >> >> } >> >> @Override >> public long extractTimestamp(Row row, long previousElementTimestamp) { >> Object value = row.getField(1); >> long timestamp; >> try { >> timestamp = (long)value; >> } catch (Exception e) { >> timestamp = ((Timestamp)value).getTime(); >> } >> if(timestamp > currentMaxTimestamp) { >> currentMaxTimestamp = timestamp; >> } >> return timestamp; >> } >> }; >> return timestampExtractor; >> } >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); >> bsEnv.setParallelism(1); >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> >> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000)); >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); >> List<Row> list = new ArrayList<>(); >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 00:00:00").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:20:00").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 00:40:00").getTime()), 100)); >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 01:00:01").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:30:00").getTime()), 100)); >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 02:00:02").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()), 100)); >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()), 100)); >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()), 100)); >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 04:00:04").getTime()), 100)); >> DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() { >> @Override >> public void run(SourceContext<Row> ctx) throws Exception { >> for(Row row : list) { >> ctx.collect(row); >> Thread.sleep(1000); >> } >> >> } >> >> @Override >> public void cancel() { >> >> } >> }); >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0)); >> ds1.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, order_time, fee, rowtime.rowtime"); >> >> List<Row> list2 = new ArrayList<>(); >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 01:00:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:20:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 01:30:00").getTime()))); >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 02:00:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:20:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 02:40:00").getTime()))); >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 03:00:03").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:20:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 03:40:00").getTime()))); >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 04:00:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:20:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 04:40:00").getTime()))); >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 05:00:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:20:00").getTime()))); >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 05:40:00").getTime()))); >> DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() { >> @Override >> public void run(SourceContext<Row> ctx) throws Exception { >> for(Row row : list2) { >> ctx.collect(row); >> Thread.sleep(1000); >> } >> >> } >> >> @Override >> public void cancel() { >> >> } >> }); >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0)); >> ds2.getTransformation().setOutputType((new RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, rowtime.rowtime"); >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id from order_info a left join pay b on a.order_id=b.order_id and b.rowtime between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id <>'000' "); >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new ProcessFunction<Row, Object>() { >> @Override >> public void processElement(Row value, Context ctx, Collector<Object> out) throws Exception { >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); >> System.err.println("row:" + value + ",rowtime:" + value.getField(3) + ",watermark:" + sdf.format(ctx.timerService().currentWatermark())); >> } >> }); >> >> bsTableEnv.execute("job"); >> } >> } >> >> > > -- > > Best, > Benchao Li > -- Best, Benchao Li |
展开讨论一些特点场景。
Benchao Li <[hidden email]> 于2020年7月6日周一 下午11:08写道: > 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。 > > 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, > A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 > > 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 > 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间, > 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据 > 的时间最早的那个。 > > 元始(Bob Hu) <[hidden email]> 于2020年7月5日周日 下午8:48写道: > > > 谢谢您的解答。感觉flink这个机制有点奇怪呢 > > > > > > ------------------ 原始邮件 ------------------ > > *发件人:* "Benchao Li"<[hidden email]>; > > *发送时间:* 2020年7月5日(星期天) 中午11:58 > > *收件人:* "元始(Bob Hu)"<[hidden email]>; > > *抄送:* "user-zh"<[hidden email]>; > > *主题:* Re: flink interval join后按窗口聚组问题 > > > > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。 > > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) > > 就会有些问题,很多数据被作为late数据直接丢掉了。 > > > > 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: > > > >> 您好,我想请教一个问题: > >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 > >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between > a.rowtime > >> and a.rowtime + INTERVAL '1' HOUR > >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + > >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + > >> allowedLateness + > >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, > >> rightRelativeSize) + > >> > allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group > >> by的时候这种右表数据为空的数据就丢掉了啊。 > >> flink版本 1.10.0。 > >> > >> 下面是我的一段测试代码: > >> > >> import org.apache.commons.net.ntp.TimeStamp; > >> import org.apache.flink.api.common.typeinfo.TypeInformation; > >> import org.apache.flink.api.common.typeinfo.Types; > >> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >> import org.apache.flink.streaming.api.TimeCharacteristic; > >> import org.apache.flink.streaming.api.datastream.DataStream; > >> import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >> import > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; > >> import org.apache.flink.streaming.api.functions.ProcessFunction; > >> import org.apache.flink.streaming.api.functions.source.SourceFunction; > >> import org.apache.flink.streaming.api.watermark.Watermark; > >> import org.apache.flink.table.api.EnvironmentSettings; > >> import org.apache.flink.table.api.Table; > >> import org.apache.flink.table.api.java.StreamTableEnvironment; > >> import org.apache.flink.table.functions.ScalarFunction; > >> import org.apache.flink.types.Row; > >> import org.apache.flink.util.Collector; > >> import org.apache.flink.util.IOUtils; > >> > >> import java.io.BufferedReader; > >> import java.io.InputStreamReader; > >> import java.io.Serializable; > >> import java.net.InetSocketAddress; > >> import java.net.Socket; > >> import java.sql.Timestamp; > >> import java.text.SimpleDateFormat; > >> import java.util.ArrayList; > >> import java.util.Date; > >> import java.util.List; > >> > >> public class TimeBoundedJoin { > >> > >> public static AssignerWithPeriodicWatermarks<Row> > getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { > >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new > AssignerWithPeriodicWatermarks<Row>() { > >> private long currentMaxTimestamp = 0; > >> private long lastMaxTimestamp = 0; > >> private long lastUpdateTime = 0; > >> boolean firstWatermark = true; > >> // Integer maxIdleTime = 30; > >> > >> @Override > >> public Watermark getCurrentWatermark() { > >> if(firstWatermark) { > >> lastUpdateTime = System.currentTimeMillis(); > >> firstWatermark = false; > >> } > >> if(currentMaxTimestamp != lastMaxTimestamp) { > >> lastMaxTimestamp = currentMaxTimestamp; > >> lastUpdateTime = System.currentTimeMillis(); > >> } > >> if(maxIdleTime != null && System.currentTimeMillis() - > lastUpdateTime > maxIdleTime * 1000) { > >> return new Watermark(new Date().getTime() - > finalMaxOutOfOrderness * 1000); > >> } > >> return new Watermark(currentMaxTimestamp - > finalMaxOutOfOrderness * 1000); > >> > >> } > >> > >> @Override > >> public long extractTimestamp(Row row, long > previousElementTimestamp) { > >> Object value = row.getField(1); > >> long timestamp; > >> try { > >> timestamp = (long)value; > >> } catch (Exception e) { > >> timestamp = ((Timestamp)value).getTime(); > >> } > >> if(timestamp > currentMaxTimestamp) { > >> currentMaxTimestamp = timestamp; > >> } > >> return timestamp; > >> } > >> }; > >> return timestampExtractor; > >> } > >> > >> public static void main(String[] args) throws Exception { > >> StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > >> EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > >> StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > >> bsEnv.setParallelism(1); > >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> > >> > >> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000)); > >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss"); > >> List<Row> list = new ArrayList<>(); > >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 > 00:00:00").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 00:20:00").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 00:40:00").getTime()), 100)); > >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 > 01:00:01").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 02:20:00").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 02:30:00").getTime()), 100)); > >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 > 02:00:02").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 02:20:00").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 02:40:00").getTime()), 100)); > >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 > 03:00:03").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 03:20:00").getTime()), 100)); > >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > 03:40:00").getTime()), 100)); > >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 > 04:00:04").getTime()), 100)); > >> DataStream<Row> ds1 = bsEnv.addSource(new SourceFunction<Row>() > { > >> @Override > >> public void run(SourceContext<Row> ctx) throws Exception { > >> for(Row row : list) { > >> ctx.collect(row); > >> Thread.sleep(1000); > >> } > >> > >> } > >> > >> @Override > >> public void cancel() { > >> > >> } > >> }); > >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0)); > >> ds1.getTransformation().setOutputType((new > RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); > >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, > order_time, fee, rowtime.rowtime"); > >> > >> List<Row> list2 = new ArrayList<>(); > >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 > 01:00:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 01:20:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 01:30:00").getTime()))); > >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 > 02:00:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 02:20:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 02:40:00").getTime()))); > >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 > 03:00:03").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 03:20:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 03:40:00").getTime()))); > >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 > 04:00:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 04:20:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 04:40:00").getTime()))); > >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 > 05:00:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 05:20:00").getTime()))); > >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > 05:40:00").getTime()))); > >> DataStream<Row> ds2 = bsEnv.addSource(new SourceFunction<Row>() > { > >> @Override > >> public void run(SourceContext<Row> ctx) throws Exception { > >> for(Row row : list2) { > >> ctx.collect(row); > >> Thread.sleep(1000); > >> } > >> > >> } > >> > >> @Override > >> public void cancel() { > >> > >> } > >> }); > >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0)); > >> ds2.getTransformation().setOutputType((new > RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); > >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, pay_time, > rowtime.rowtime"); > >> > >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id > from order_info a left join pay b on a.order_id=b.order_id and b.rowtime > between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id > <>'000' "); > >> > >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new > ProcessFunction<Row, Object>() { > >> @Override > >> public void processElement(Row value, Context ctx, > Collector<Object> out) throws Exception { > >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss.SSS"); > >> System.err.println("row:" + value + ",rowtime:" + > value.getField(3) + ",watermark:" + > sdf.format(ctx.timerService().currentWatermark())); > >> } > >> }); > >> > >> bsTableEnv.execute("job"); > >> } > >> } > >> > >> > > > > -- > > > > Best, > > Benchao Li > > > > > -- > > Best, > Benchao Li > -- ************************************** tivanli ************************************** |
展开讨论一些特点从场景。
1、inner join场景。有什么办法取两条流的的rowtime 的max吗? 使用SQL语句的场合,怎么实现? 例如: SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as rowtime, ... 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。 Tianwang Li <[hidden email]> 于2020年8月16日周日 上午10:40写道: > 展开讨论一些特点场景。 > > Benchao Li <[hidden email]> 于2020年7月6日周一 下午11:08写道: > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。 >> >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 >> >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间, >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据 >> 的时间最早的那个。 >> >> 元始(Bob Hu) <[hidden email]> 于2020年7月5日周日 下午8:48写道: >> >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢 >> > >> > >> > ------------------ 原始邮件 ------------------ >> > *发件人:* "Benchao Li"<[hidden email]>; >> > *发送时间:* 2020年7月5日(星期天) 中午11:58 >> > *收件人:* "元始(Bob Hu)"<[hidden email]>; >> > *抄送:* "user-zh"<[hidden email]>; >> > *主题:* Re: flink interval join后按窗口聚组问题 >> > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。 >> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) >> > 就会有些问题,很多数据被作为late数据直接丢掉了。 >> > >> > 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: >> > >> >> 您好,我想请教一个问题: >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between >> a.rowtime >> >> and a.rowtime + INTERVAL '1' HOUR >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime + >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + >> >> allowedLateness + >> >> >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, >> >> rightRelativeSize) + >> >> >> allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group >> >> by的时候这种右表数据为空的数据就丢掉了啊。 >> >> flink版本 1.10.0。 >> >> >> >> 下面是我的一段测试代码: >> >> >> >> import org.apache.commons.net.ntp.TimeStamp; >> >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> >> import org.apache.flink.api.common.typeinfo.Types; >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> >> import org.apache.flink.streaming.api.TimeCharacteristic; >> >> import org.apache.flink.streaming.api.datastream.DataStream; >> >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> import >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >> >> import org.apache.flink.streaming.api.functions.ProcessFunction; >> >> import org.apache.flink.streaming.api.functions.source.SourceFunction; >> >> import org.apache.flink.streaming.api.watermark.Watermark; >> >> import org.apache.flink.table.api.EnvironmentSettings; >> >> import org.apache.flink.table.api.Table; >> >> import org.apache.flink.table.api.java.StreamTableEnvironment; >> >> import org.apache.flink.table.functions.ScalarFunction; >> >> import org.apache.flink.types.Row; >> >> import org.apache.flink.util.Collector; >> >> import org.apache.flink.util.IOUtils; >> >> >> >> import java.io.BufferedReader; >> >> import java.io.InputStreamReader; >> >> import java.io.Serializable; >> >> import java.net.InetSocketAddress; >> >> import java.net.Socket; >> >> import java.sql.Timestamp; >> >> import java.text.SimpleDateFormat; >> >> import java.util.ArrayList; >> >> import java.util.Date; >> >> import java.util.List; >> >> >> >> public class TimeBoundedJoin { >> >> >> >> public static AssignerWithPeriodicWatermarks<Row> >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { >> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new >> AssignerWithPeriodicWatermarks<Row>() { >> >> private long currentMaxTimestamp = 0; >> >> private long lastMaxTimestamp = 0; >> >> private long lastUpdateTime = 0; >> >> boolean firstWatermark = true; >> >> // Integer maxIdleTime = 30; >> >> >> >> @Override >> >> public Watermark getCurrentWatermark() { >> >> if(firstWatermark) { >> >> lastUpdateTime = System.currentTimeMillis(); >> >> firstWatermark = false; >> >> } >> >> if(currentMaxTimestamp != lastMaxTimestamp) { >> >> lastMaxTimestamp = currentMaxTimestamp; >> >> lastUpdateTime = System.currentTimeMillis(); >> >> } >> >> if(maxIdleTime != null && System.currentTimeMillis() - >> lastUpdateTime > maxIdleTime * 1000) { >> >> return new Watermark(new Date().getTime() - >> finalMaxOutOfOrderness * 1000); >> >> } >> >> return new Watermark(currentMaxTimestamp - >> finalMaxOutOfOrderness * 1000); >> >> >> >> } >> >> >> >> @Override >> >> public long extractTimestamp(Row row, long >> previousElementTimestamp) { >> >> Object value = row.getField(1); >> >> long timestamp; >> >> try { >> >> timestamp = (long)value; >> >> } catch (Exception e) { >> >> timestamp = ((Timestamp)value).getTime(); >> >> } >> >> if(timestamp > currentMaxTimestamp) { >> >> currentMaxTimestamp = timestamp; >> >> } >> >> return timestamp; >> >> } >> >> }; >> >> return timestampExtractor; >> >> } >> >> >> >> public static void main(String[] args) throws Exception { >> >> StreamExecutionEnvironment bsEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> bsEnv.setParallelism(1); >> >> >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> >> >> >> >> // DataStream<Row> ds1 = bsEnv.addSource(sourceFunction(9000)); >> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd >> HH:mm:ss"); >> >> List<Row> list = new ArrayList<>(); >> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 >> 00:00:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 00:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 00:40:00").getTime()), 100)); >> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 >> 01:00:01").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:30:00").getTime()), 100)); >> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 >> 02:00:02").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 02:40:00").getTime()), 100)); >> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 >> 03:00:03").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 03:20:00").getTime()), 100)); >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 >> 03:40:00").getTime()), 100)); >> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 >> 04:00:04").getTime()), 100)); >> >> DataStream<Row> ds1 = bsEnv.addSource(new >> SourceFunction<Row>() { >> >> @Override >> >> public void run(SourceContext<Row> ctx) throws Exception { >> >> for(Row row : list) { >> >> ctx.collect(row); >> >> Thread.sleep(1000); >> >> } >> >> >> >> } >> >> >> >> @Override >> >> public void cancel() { >> >> >> >> } >> >> }); >> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, 0)); >> >> ds1.getTransformation().setOutputType((new >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); >> >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, >> order_time, fee, rowtime.rowtime"); >> >> >> >> List<Row> list2 = new ArrayList<>(); >> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 >> 01:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 01:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 01:30:00").getTime()))); >> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 >> 02:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 02:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 02:40:00").getTime()))); >> >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 >> 03:00:03").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 03:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 03:40:00").getTime()))); >> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 >> 04:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 04:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 04:40:00").getTime()))); >> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 >> 05:00:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 05:20:00").getTime()))); >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 >> 05:40:00").getTime()))); >> >> DataStream<Row> ds2 = bsEnv.addSource(new >> SourceFunction<Row>() { >> >> @Override >> >> public void run(SourceContext<Row> ctx) throws Exception { >> >> for(Row row : list2) { >> >> ctx.collect(row); >> >> Thread.sleep(1000); >> >> } >> >> >> >> } >> >> >> >> @Override >> >> public void cancel() { >> >> >> >> } >> >> }); >> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, 0)); >> >> ds2.getTransformation().setOutputType((new >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); >> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, >> pay_time, rowtime.rowtime"); >> >> >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id >> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id >> <>'000' "); >> >> >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new >> ProcessFunction<Row, Object>() { >> >> @Override >> >> public void processElement(Row value, Context ctx, >> Collector<Object> out) throws Exception { >> >> SimpleDateFormat sdf = new >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); >> >> System.err.println("row:" + value + ",rowtime:" + >> value.getField(3) + ",watermark:" + >> sdf.format(ctx.timerService().currentWatermark())); >> >> } >> >> }); >> >> >> >> bsTableEnv.execute("job"); >> >> } >> >> } >> >> >> >> >> > >> > -- >> > >> > Best, >> > Benchao Li >> > >> >> >> -- >> >> Best, >> Benchao Li >> > > > -- > ************************************** > tivanli > ************************************** > -- ************************************** tivanli ************************************** |
大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。
不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。 watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right join。 并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。 你这个问题主要就是watermark重设就可以了。 Tianwang Li <[hidden email]> 于2020年8月16日周日 上午10:45写道: > 展开讨论一些特点从场景。 > 1、inner join场景。有什么办法取两条流的的rowtime 的max吗? > 使用SQL语句的场合,怎么实现? > 例如: > SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as > rowtime, ... > > 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。 > > Tianwang Li <[hidden email]> 于2020年8月16日周日 上午10:40写道: > > > 展开讨论一些特点场景。 > > > > Benchao Li <[hidden email]> 于2020年7月6日周一 下午11:08写道: > > > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。 > >> > >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, > >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 > >> > >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 > >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间, > >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据 > >> 的时间最早的那个。 > >> > >> 元始(Bob Hu) <[hidden email]> 于2020年7月5日周日 下午8:48写道: > >> > >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢 > >> > > >> > > >> > ------------------ 原始邮件 ------------------ > >> > *发件人:* "Benchao Li"<[hidden email]>; > >> > *发送时间:* 2020年7月5日(星期天) 中午11:58 > >> > *收件人:* "元始(Bob Hu)"<[hidden email]>; > >> > *抄送:* "user-zh"<[hidden email]>; > >> > *主题:* Re: flink interval join后按窗口聚组问题 > >> > > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。 > >> > 所以如果用事件时间的time interval join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) > >> > 就会有些问题,很多数据被作为late数据直接丢掉了。 > >> > > >> > 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: > >> > > >> >> 您好,我想请教一个问题: > >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left join关联不上的数据会被丢掉。 > >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between > >> a.rowtime > >> >> and a.rowtime + INTERVAL '1' HOUR > >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = rowTime > + > >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + > >> >> allowedLateness + > >> >> > >> 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, > >> >> rightRelativeSize) + > >> >> > >> > allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group > >> >> by的时候这种右表数据为空的数据就丢掉了啊。 > >> >> flink版本 1.10.0。 > >> >> > >> >> 下面是我的一段测试代码: > >> >> > >> >> import org.apache.commons.net.ntp.TimeStamp; > >> >> import org.apache.flink.api.common.typeinfo.TypeInformation; > >> >> import org.apache.flink.api.common.typeinfo.Types; > >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >> >> import org.apache.flink.streaming.api.TimeCharacteristic; > >> >> import org.apache.flink.streaming.api.datastream.DataStream; > >> >> import > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >> >> import > >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; > >> >> import org.apache.flink.streaming.api.functions.ProcessFunction; > >> >> import > org.apache.flink.streaming.api.functions.source.SourceFunction; > >> >> import org.apache.flink.streaming.api.watermark.Watermark; > >> >> import org.apache.flink.table.api.EnvironmentSettings; > >> >> import org.apache.flink.table.api.Table; > >> >> import org.apache.flink.table.api.java.StreamTableEnvironment; > >> >> import org.apache.flink.table.functions.ScalarFunction; > >> >> import org.apache.flink.types.Row; > >> >> import org.apache.flink.util.Collector; > >> >> import org.apache.flink.util.IOUtils; > >> >> > >> >> import java.io.BufferedReader; > >> >> import java.io.InputStreamReader; > >> >> import java.io.Serializable; > >> >> import java.net.InetSocketAddress; > >> >> import java.net.Socket; > >> >> import java.sql.Timestamp; > >> >> import java.text.SimpleDateFormat; > >> >> import java.util.ArrayList; > >> >> import java.util.Date; > >> >> import java.util.List; > >> >> > >> >> public class TimeBoundedJoin { > >> >> > >> >> public static AssignerWithPeriodicWatermarks<Row> > >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { > >> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = new > >> AssignerWithPeriodicWatermarks<Row>() { > >> >> private long currentMaxTimestamp = 0; > >> >> private long lastMaxTimestamp = 0; > >> >> private long lastUpdateTime = 0; > >> >> boolean firstWatermark = true; > >> >> // Integer maxIdleTime = 30; > >> >> > >> >> @Override > >> >> public Watermark getCurrentWatermark() { > >> >> if(firstWatermark) { > >> >> lastUpdateTime = System.currentTimeMillis(); > >> >> firstWatermark = false; > >> >> } > >> >> if(currentMaxTimestamp != lastMaxTimestamp) { > >> >> lastMaxTimestamp = currentMaxTimestamp; > >> >> lastUpdateTime = System.currentTimeMillis(); > >> >> } > >> >> if(maxIdleTime != null && System.currentTimeMillis() > - > >> lastUpdateTime > maxIdleTime * 1000) { > >> >> return new Watermark(new Date().getTime() - > >> finalMaxOutOfOrderness * 1000); > >> >> } > >> >> return new Watermark(currentMaxTimestamp - > >> finalMaxOutOfOrderness * 1000); > >> >> > >> >> } > >> >> > >> >> @Override > >> >> public long extractTimestamp(Row row, long > >> previousElementTimestamp) { > >> >> Object value = row.getField(1); > >> >> long timestamp; > >> >> try { > >> >> timestamp = (long)value; > >> >> } catch (Exception e) { > >> >> timestamp = ((Timestamp)value).getTime(); > >> >> } > >> >> if(timestamp > currentMaxTimestamp) { > >> >> currentMaxTimestamp = timestamp; > >> >> } > >> >> return timestamp; > >> >> } > >> >> }; > >> >> return timestampExtractor; > >> >> } > >> >> > >> >> public static void main(String[] args) throws Exception { > >> >> StreamExecutionEnvironment bsEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> >> EnvironmentSettings bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > >> >> StreamTableEnvironment bsTableEnv = > >> StreamTableEnvironment.create(bsEnv, bsSettings); > >> >> bsEnv.setParallelism(1); > >> >> > >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> >> > >> >> > >> >> // DataStream<Row> ds1 = > bsEnv.addSource(sourceFunction(9000)); > >> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > >> HH:mm:ss"); > >> >> List<Row> list = new ArrayList<>(); > >> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 > >> 00:00:00").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 00:20:00").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 00:40:00").getTime()), 100)); > >> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 > >> 01:00:01").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 02:20:00").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 02:30:00").getTime()), 100)); > >> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 > >> 02:00:02").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 02:20:00").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 02:40:00").getTime()), 100)); > >> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 > >> 03:00:03").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 03:20:00").getTime()), 100)); > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > >> 03:40:00").getTime()), 100)); > >> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 > >> 04:00:04").getTime()), 100)); > >> >> DataStream<Row> ds1 = bsEnv.addSource(new > >> SourceFunction<Row>() { > >> >> @Override > >> >> public void run(SourceContext<Row> ctx) throws Exception > { > >> >> for(Row row : list) { > >> >> ctx.collect(row); > >> >> Thread.sleep(1000); > >> >> } > >> >> > >> >> } > >> >> > >> >> @Override > >> >> public void cancel() { > >> >> > >> >> } > >> >> }); > >> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, > 0)); > >> >> ds1.getTransformation().setOutputType((new > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); > >> >> bsTableEnv.createTemporaryView("order_info", ds1, "order_id, > >> order_time, fee, rowtime.rowtime"); > >> >> > >> >> List<Row> list2 = new ArrayList<>(); > >> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 > >> 01:00:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 01:20:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 01:30:00").getTime()))); > >> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 > >> 02:00:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 02:20:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 02:40:00").getTime()))); > >> >> // list2.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 > >> 03:00:03").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 03:20:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 03:40:00").getTime()))); > >> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 > >> 04:00:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 04:20:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 04:40:00").getTime()))); > >> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 > >> 05:00:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 05:20:00").getTime()))); > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > >> 05:40:00").getTime()))); > >> >> DataStream<Row> ds2 = bsEnv.addSource(new > >> SourceFunction<Row>() { > >> >> @Override > >> >> public void run(SourceContext<Row> ctx) throws Exception > { > >> >> for(Row row : list2) { > >> >> ctx.collect(row); > >> >> Thread.sleep(1000); > >> >> } > >> >> > >> >> } > >> >> > >> >> @Override > >> >> public void cancel() { > >> >> > >> >> } > >> >> }); > >> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, > 0)); > >> >> ds2.getTransformation().setOutputType((new > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); > >> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, > >> pay_time, rowtime.rowtime"); > >> >> > >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT a.*,b.order_id > >> from order_info a left join pay b on a.order_id=b.order_id and b.rowtime > >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id > >> <>'000' "); > >> >> > >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new > >> ProcessFunction<Row, Object>() { > >> >> @Override > >> >> public void processElement(Row value, Context ctx, > >> Collector<Object> out) throws Exception { > >> >> SimpleDateFormat sdf = new > >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); > >> >> System.err.println("row:" + value + ",rowtime:" + > >> value.getField(3) + ",watermark:" + > >> sdf.format(ctx.timerService().currentWatermark())); > >> >> } > >> >> }); > >> >> > >> >> bsTableEnv.execute("job"); > >> >> } > >> >> } > >> >> > >> >> > >> > > >> > -- > >> > > >> > Best, > >> > Benchao Li > >> > > >> > >> > >> -- > >> > >> Best, > >> Benchao Li > >> > > > > > > -- > > ************************************** > > tivanli > > ************************************** > > > > > -- > ************************************** > tivanli > ************************************** > |
Hi Tianwang,一旦,
我感觉这个场景其实可以在Flink SQL中做一个优化,我建了一个issue[1],欢迎讨论~ [1] https://issues.apache.org/jira/browse/FLINK-18996 赵一旦 <[hidden email]> 于2020年8月17日周一 上午11:52写道: > 大概看了下。这个问题我业务中涉及到过。我是DataStream API做的。 > 不过我是在任务设计阶段就考虑了所有case,然后提前考虑了这些问题的。 > watermark是可以重设的。其次我还更改了interval join的算子实现,默认1.10只支持inner join。不支持left/right > join。 > 并且inner join后采用最大的timestamp。这个比较复杂,实际如果做left join,业务上可能更希望使用left的时间,right > join则使用right的时间。out join则只能使用留下的那个的时间,inner join情况需要看业务。 > > > 你这个问题主要就是watermark重设就可以了。 > > > > Tianwang Li <[hidden email]> 于2020年8月16日周日 上午10:45写道: > > > 展开讨论一些特点从场景。 > > 1、inner join场景。有什么办法取两条流的的rowtime 的max吗? > > 使用SQL语句的场合,怎么实现? > > 例如: > > SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as > > rowtime, ... > > > > 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。 > > > > Tianwang Li <[hidden email]> 于2020年8月16日周日 上午10:40写道: > > > > > 展开讨论一些特点场景。 > > > > > > Benchao Li <[hidden email]> 于2020年7月6日周一 下午11:08写道: > > > > > >> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。 > > >> > > >> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, > > >> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 > > >> > > >> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 > > >> 现在默认实现是选择了更加实时的产生结果。当然还有另外一种实现思路,就是保证watermark不会超过数据时间, > > >> 那样的话,Join结果的产生就会delay,或者需要修改watermark逻辑,让watermark一定要小于当前能join到的数据 > > >> 的时间最早的那个。 > > >> > > >> 元始(Bob Hu) <[hidden email]> 于2020年7月5日周日 下午8:48写道: > > >> > > >> > 谢谢您的解答。感觉flink这个机制有点奇怪呢 > > >> > > > >> > > > >> > ------------------ 原始邮件 ------------------ > > >> > *发件人:* "Benchao Li"<[hidden email]>; > > >> > *发送时间:* 2020年7月5日(星期天) 中午11:58 > > >> > *收件人:* "元始(Bob Hu)"<[hidden email]>; > > >> > *抄送:* "user-zh"<[hidden email]>; > > >> > *主题:* Re: flink interval join后按窗口聚组问题 > > >> > > > >> > 回到你的问题,我觉得你的观察是正确的。Time interval join产生的结果的确是会有这种情况。 > > >> > 所以如果用事件时间的time interval > join,后面再接一个事件时间的window(或者其他的使用事件时间的算子,比如CEP等) > > >> > 就会有些问题,很多数据被作为late数据直接丢掉了。 > > >> > > > >> > 元始(Bob Hu) <[hidden email]> 于2020年7月3日周五 下午3:29写道: > > >> > > > >> >> 您好,我想请教一个问题: > > >> >> flink双流表 interval join后再做window group是不是有问题呢,有些left > join关联不上的数据会被丢掉。 > > >> >> 比如关联条件是select * from a,b where a.id=b.id and b.rowtime between > > >> a.rowtime > > >> >> and a.rowtime + INTERVAL '1' HOUR > > >> >> ,看源码leftRelativeSize=1小时,rightRelativeSize=0,左流cleanUpTime = > rowTime > > + > > >> >> leftRelativeSize + (leftRelativeSize + rightRelativeSize) / 2 + > > >> >> allowedLateness + > > >> >> > > >> > 1,左表关联不上的数据会在1.5小时后输出(右表为null),而watermark的调整值是Math.max(leftRelativeSize, > > >> >> rightRelativeSize) + > > >> >> > > >> > > > allowedLateness,也就是1小时,那这样等数据输出的时候watermark不是比左表rowtime还大0.5小时了吗,后面再有对连接流做group > > >> >> by的时候这种右表数据为空的数据就丢掉了啊。 > > >> >> flink版本 1.10.0。 > > >> >> > > >> >> 下面是我的一段测试代码: > > >> >> > > >> >> import org.apache.commons.net.ntp.TimeStamp; > > >> >> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >> >> import org.apache.flink.api.common.typeinfo.Types; > > >> >> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >> >> import org.apache.flink.streaming.api.TimeCharacteristic; > > >> >> import org.apache.flink.streaming.api.datastream.DataStream; > > >> >> import > > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >> >> import > > >> > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; > > >> >> import org.apache.flink.streaming.api.functions.ProcessFunction; > > >> >> import > > org.apache.flink.streaming.api.functions.source.SourceFunction; > > >> >> import org.apache.flink.streaming.api.watermark.Watermark; > > >> >> import org.apache.flink.table.api.EnvironmentSettings; > > >> >> import org.apache.flink.table.api.Table; > > >> >> import org.apache.flink.table.api.java.StreamTableEnvironment; > > >> >> import org.apache.flink.table.functions.ScalarFunction; > > >> >> import org.apache.flink.types.Row; > > >> >> import org.apache.flink.util.Collector; > > >> >> import org.apache.flink.util.IOUtils; > > >> >> > > >> >> import java.io.BufferedReader; > > >> >> import java.io.InputStreamReader; > > >> >> import java.io.Serializable; > > >> >> import java.net.InetSocketAddress; > > >> >> import java.net.Socket; > > >> >> import java.sql.Timestamp; > > >> >> import java.text.SimpleDateFormat; > > >> >> import java.util.ArrayList; > > >> >> import java.util.Date; > > >> >> import java.util.List; > > >> >> > > >> >> public class TimeBoundedJoin { > > >> >> > > >> >> public static AssignerWithPeriodicWatermarks<Row> > > >> getWatermark(Integer maxIdleTime, long finalMaxOutOfOrderness) { > > >> >> AssignerWithPeriodicWatermarks<Row> timestampExtractor = > new > > >> AssignerWithPeriodicWatermarks<Row>() { > > >> >> private long currentMaxTimestamp = 0; > > >> >> private long lastMaxTimestamp = 0; > > >> >> private long lastUpdateTime = 0; > > >> >> boolean firstWatermark = true; > > >> >> // Integer maxIdleTime = 30; > > >> >> > > >> >> @Override > > >> >> public Watermark getCurrentWatermark() { > > >> >> if(firstWatermark) { > > >> >> lastUpdateTime = System.currentTimeMillis(); > > >> >> firstWatermark = false; > > >> >> } > > >> >> if(currentMaxTimestamp != lastMaxTimestamp) { > > >> >> lastMaxTimestamp = currentMaxTimestamp; > > >> >> lastUpdateTime = System.currentTimeMillis(); > > >> >> } > > >> >> if(maxIdleTime != null && > System.currentTimeMillis() > > - > > >> lastUpdateTime > maxIdleTime * 1000) { > > >> >> return new Watermark(new Date().getTime() - > > >> finalMaxOutOfOrderness * 1000); > > >> >> } > > >> >> return new Watermark(currentMaxTimestamp - > > >> finalMaxOutOfOrderness * 1000); > > >> >> > > >> >> } > > >> >> > > >> >> @Override > > >> >> public long extractTimestamp(Row row, long > > >> previousElementTimestamp) { > > >> >> Object value = row.getField(1); > > >> >> long timestamp; > > >> >> try { > > >> >> timestamp = (long)value; > > >> >> } catch (Exception e) { > > >> >> timestamp = ((Timestamp)value).getTime(); > > >> >> } > > >> >> if(timestamp > currentMaxTimestamp) { > > >> >> currentMaxTimestamp = timestamp; > > >> >> } > > >> >> return timestamp; > > >> >> } > > >> >> }; > > >> >> return timestampExtractor; > > >> >> } > > >> >> > > >> >> public static void main(String[] args) throws Exception { > > >> >> StreamExecutionEnvironment bsEnv = > > >> StreamExecutionEnvironment.getExecutionEnvironment(); > > >> >> EnvironmentSettings bsSettings = > > >> > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > >> >> StreamTableEnvironment bsTableEnv = > > >> StreamTableEnvironment.create(bsEnv, bsSettings); > > >> >> bsEnv.setParallelism(1); > > >> >> > > >> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >> >> > > >> >> > > >> >> // DataStream<Row> ds1 = > > bsEnv.addSource(sourceFunction(9000)); > > >> >> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > > >> HH:mm:ss"); > > >> >> List<Row> list = new ArrayList<>(); > > >> >> list.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 > > >> 00:00:00").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 00:20:00").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 00:40:00").getTime()), 100)); > > >> >> list.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 > > >> 01:00:01").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 02:20:00").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 02:30:00").getTime()), 100)); > > >> >> list.add(Row.of("003",new Timestamp(sdf.parse("2020-05-13 > > >> 02:00:02").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 02:20:00").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 02:40:00").getTime()), 100)); > > >> >> list.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 > > >> 03:00:03").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 03:20:00").getTime()), 100)); > > >> >> list.add(Row.of("000",new Timestamp(sdf.parse("2020-05-13 > > >> 03:40:00").getTime()), 100)); > > >> >> list.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 > > >> 04:00:04").getTime()), 100)); > > >> >> DataStream<Row> ds1 = bsEnv.addSource(new > > >> SourceFunction<Row>() { > > >> >> @Override > > >> >> public void run(SourceContext<Row> ctx) throws > Exception > > { > > >> >> for(Row row : list) { > > >> >> ctx.collect(row); > > >> >> Thread.sleep(1000); > > >> >> } > > >> >> > > >> >> } > > >> >> > > >> >> @Override > > >> >> public void cancel() { > > >> >> > > >> >> } > > >> >> }); > > >> >> ds1 = ds1.assignTimestampsAndWatermarks(getWatermark(null, > > 0)); > > >> >> ds1.getTransformation().setOutputType((new > > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))); > > >> >> bsTableEnv.createTemporaryView("order_info", ds1, > "order_id, > > >> order_time, fee, rowtime.rowtime"); > > >> >> > > >> >> List<Row> list2 = new ArrayList<>(); > > >> >> list2.add(Row.of("001",new Timestamp(sdf.parse("2020-05-13 > > >> 01:00:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 01:20:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 01:30:00").getTime()))); > > >> >> list2.add(Row.of("002",new Timestamp(sdf.parse("2020-05-13 > > >> 02:00:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 02:20:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 02:40:00").getTime()))); > > >> >> // list2.add(Row.of("003",new > Timestamp(sdf.parse("2020-05-13 > > >> 03:00:03").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 03:20:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 03:40:00").getTime()))); > > >> >> list2.add(Row.of("004",new Timestamp(sdf.parse("2020-05-13 > > >> 04:00:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 04:20:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 04:40:00").getTime()))); > > >> >> list2.add(Row.of("005",new Timestamp(sdf.parse("2020-05-13 > > >> 05:00:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 05:20:00").getTime()))); > > >> >> list2.add(Row.of("111",new Timestamp(sdf.parse("2020-05-13 > > >> 05:40:00").getTime()))); > > >> >> DataStream<Row> ds2 = bsEnv.addSource(new > > >> SourceFunction<Row>() { > > >> >> @Override > > >> >> public void run(SourceContext<Row> ctx) throws > Exception > > { > > >> >> for(Row row : list2) { > > >> >> ctx.collect(row); > > >> >> Thread.sleep(1000); > > >> >> } > > >> >> > > >> >> } > > >> >> > > >> >> @Override > > >> >> public void cancel() { > > >> >> > > >> >> } > > >> >> }); > > >> >> ds2 = ds2.assignTimestampsAndWatermarks(getWatermark(null, > > 0)); > > >> >> ds2.getTransformation().setOutputType((new > > >> RowTypeInfo(Types.STRING, Types.SQL_TIMESTAMP))); > > >> >> bsTableEnv.createTemporaryView("pay", ds2, "order_id, > > >> pay_time, rowtime.rowtime"); > > >> >> > > >> >> Table joinTable = bsTableEnv.sqlQuery("SELECT > a.*,b.order_id > > >> from order_info a left join pay b on a.order_id=b.order_id and > b.rowtime > > >> between a.rowtime and a.rowtime + INTERVAL '1' HOUR where a.order_id > > >> <>'000' "); > > >> >> > > >> >> bsTableEnv.toAppendStream(joinTable, Row.class).process(new > > >> ProcessFunction<Row, Object>() { > > >> >> @Override > > >> >> public void processElement(Row value, Context ctx, > > >> Collector<Object> out) throws Exception { > > >> >> SimpleDateFormat sdf = new > > >> SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); > > >> >> System.err.println("row:" + value + ",rowtime:" + > > >> value.getField(3) + ",watermark:" + > > >> sdf.format(ctx.timerService().currentWatermark())); > > >> >> } > > >> >> }); > > >> >> > > >> >> bsTableEnv.execute("job"); > > >> >> } > > >> >> } > > >> >> > > >> >> > > >> > > > >> > -- > > >> > > > >> > Best, > > >> > Benchao Li > > >> > > > >> > > >> > > >> -- > > >> > > >> Best, > > >> Benchao Li > > >> > > > > > > > > > -- > > > ************************************** > > > tivanli > > > ************************************** > > > > > > > > > -- > > ************************************** > > tivanli > > ************************************** > > > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |