各位Flink社区大佬,
您好! 我使用Flink SQL (Flink 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。 目前,我们这边处理的方法是: 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka topic,聚合任务再消费中间的kafka topic。 我想请教的是: 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。 2.不知是否有方法在一个任务中完成下面的两步操作: 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤 2)select sum(field2) from operatorTable group by TUMBLE(rowtime,INTERVAL '5' SECOND),field2 //这一步使用rowtime聚合输出 这种方法目前存在的问题是:在定义KafkaSource 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。 我连接Kafka的代码大概如下: tEnv.connect( new Kafka() .topic(topic) .version(version) .startFromLatest() .properties(prop)) .withFormat(new Json() .failOnMissingField(false) .deriveSchema()) .withSchema(new Schema() .schema(tableSchema.getTableSchema()) .rowtime(new Rowtime() .timestampsFromField(rowTimeField) .watermarksPeriodicBounded(delay))) .inAppendMode() .registerTableSink("KafkaSource"); 以上,期待您帮忙解答,非常感谢~~ 祝 顺利 ————————— Johnny Zheng |
hi,仲尼:
通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法: 1、在 Flink 从 Kafka 中消费数据后就进行 filter 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替 Best! From zhisheng 郑 仲尼 <[hidden email]> 于2019年7月24日周三 下午3:44写道: > 各位Flink社区大佬, > 您好! > 我使用Flink SQL (Flink > 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。 > > 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。 > > 目前,我们这边处理的方法是: > 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka > topic,聚合任务再消费中间的kafka topic。 > > 我想请教的是: > 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。 > 2.不知是否有方法在一个任务中完成下面的两步操作: > 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from > KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤 > 2)select sum(field2) from operatorTable group by TUMBLE(rowtime,INTERVAL > '5' SECOND),field2 //这一步使用rowtime聚合输出 > 这种方法目前存在的问题是:在定义KafkaSource > 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。 > > 我连接Kafka的代码大概如下: > > tEnv.connect( new Kafka() > > .topic(topic) > > .version(version) > > .startFromLatest() > > .properties(prop)) > > .withFormat(new Json() > > .failOnMissingField(false) > > .deriveSchema()) > > .withSchema(new Schema() > > .schema(tableSchema.getTableSchema()) > > .rowtime(new Rowtime() > > .timestampsFromField(rowTimeField) > > .watermarksPeriodicBounded(delay))) > > .inAppendMode() > > .registerTableSink("KafkaSource"); > > 以上,期待您帮忙解答,非常感谢~~ > > 祝 > 顺利 > ————————— > Johnny Zheng > > |
In reply to this post by zhengzhongni
hi,智笙:
感谢提供解决思路,目前我这边还尝试了几种可行的方案: 1.在kafka反序列化的时候,判断kafka中日期字段的值,如果超过当前时间太多,则丢弃,或者重置为当前时间(重置其实可能导致正常数据丢失)。 2.自定义一个watermark,当时间大于当前时间太多的时候,不更新当前的watermark,这样在watermark达到这条未来时间的时间点后,也会将这条数据纳入窗口计算,这种其实是比较理想的。但是这种没有完全的测试,感觉数据会一直存放在内存中,不知道会不会引起其他问题。 在编写自定义watermark的时候,发现只能使用scala写,使用java实现的话,没有数据输出,debug看java 实现的代码生成的watermark也正确。因为对scala调用java不熟,不打算深究了。 下面是关键的实现代码,希望对有类似问题的人有所帮助: 构建Schema时指定rowtime: Rowtime rowtime = new Rowtime() .timestampsFromField(rowTimeField) .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay,futrueTimeLimit)); 下面是scala版本的自定义watermark生成策略,仿的Flink自带的BoundedOutOfOrderTimestamps: import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner final class BoundedOutOfOrderTimestamps(val delay: Long, val futrueTimeLimit: Long) extends PeriodicWatermarkAssigner { var maxTimestamp: Long = Long.MinValue + delay override def nextTimestamp(timestamp: Long): Unit = { //需考虑时区,28800000为8小时,按需修改 var currentTimeLimit: Long = System.currentTimeMillis() + 28800000 + futrueTimeLimit; if (timestamp > currentTimeLimit) { //这里不更新maxTimestamp即不更新返回的watermark println("未来时间:timestamp=" + timestamp + ",maxTimestamp=" + maxTimestamp + ",scalacurrentTimeLimit=" + currentTimeLimit); } else { if (timestamp > maxTimestamp) { maxTimestamp = timestamp } } } override def getWatermark: Watermark = new Watermark(maxTimestamp - delay) override def equals(other: Any): Boolean = other match { case that: BoundedOutOfOrderTimestamps => delay == that.delay case _ => false } override def hashCode(): Int = { delay.hashCode() } } 原始邮件 发件人: zhisheng<[hidden email]> 收件人: user-zh<[hidden email]> 发送时间: 2019年7月24日(周三) 23:45 主题: Re: 请教Flink SQL watermark遇到未来时间的处理问题 hi,仲尼: 通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法: 1、在 Flink 从 Kafka 中消费数据后就进行 filter 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替 Best! From zhisheng 郑 仲尼 <[hidden email]<mailto:[hidden email]>> 于2019年7月24日周三 下午3:44写道: > 各位Flink社区大佬, > 您好! > 我使用Flink SQL (Flink > 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。 > > 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。 > > 目前,我们这边处理的方法是: > 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka > topic,聚合任务再消费中间的kafka topic。 > > 我想请教的是: > 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。 > 2.不知是否有方法在一个任务中完成下面的两步操作: > 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from > KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤 > 2)select sum(field2) from operatorTable group by TUMBLE(rowtime,INTERVAL > '5' SECOND),field2 //这一步使用rowtime聚合输出 > 这种方法目前存在的问题是:在定义KafkaSource > 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。 > > 我连接Kafka的代码大概如下: > > tEnv.connect( new Kafka() > > .topic(topic) > > .version(version) > > .startFromLatest() > > .properties(prop)) > > .withFormat(new Json() > > .failOnMissingField(false) > > .deriveSchema()) > > .withSchema(new Schema() > > .schema(tableSchema.getTableSchema()) > > .rowtime(new Rowtime() > > .timestampsFromField(rowTimeField) > > .watermarksPeriodicBounded(delay))) > > .inAppendMode() > > .registerTableSink("KafkaSource"); > > 以上,期待您帮忙解答,非常感谢~~ > > 祝 > 顺利 > ————————— > Johnny Zheng > > |
感谢你
郑 仲尼 <[hidden email]> 于 2019年7月31日周三 下午4:09写道: > hi,智笙: > > > 感谢提供解决思路,目前我这边还尝试了几种可行的方案: > > 1.在kafka反序列化的时候,判断kafka中日期字段的值,如果超过当前时间太多,则丢弃,或者重置为当前时间(重置其实可能导致正常数据丢失)。 > > > 2.自定义一个watermark,当时间大于当前时间太多的时候,不更新当前的watermark,这样在watermark达到这条未来时间的时间点后,也会将这条数据纳入窗口计算,这种其实是比较理想的。但是这种没有完全的测试,感觉数据会一直存放在内存中,不知道会不会引起其他问题。 > > 在编写自定义watermark的时候,发现只能使用scala写,使用java实现的话,没有数据输出,debug看java > 实现的代码生成的watermark也正确。因为对scala调用java不熟,不打算深究了。 > > > 下面是关键的实现代码,希望对有类似问题的人有所帮助: > > 构建Schema时指定rowtime: > > Rowtime rowtime = new Rowtime() .timestampsFromField(rowTimeField) > .watermarksFromStrategy(new > BoundedOutOfOrderTimestamps(delay,futrueTimeLimit)); > > > 下面是scala版本的自定义watermark生成策略,仿的Flink自带的BoundedOutOfOrderTimestamps: > > > import org.apache.flink.streaming.api.watermark.Watermark > > import > org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner > > > final class BoundedOutOfOrderTimestamps(val delay: Long, val > futrueTimeLimit: Long) extends PeriodicWatermarkAssigner { > > var maxTimestamp: Long = Long.MinValue + delay > > override def nextTimestamp(timestamp: Long): Unit = { > > //需考虑时区,28800000为8小时,按需修改 > > var currentTimeLimit: Long = System.currentTimeMillis() + 28800000 + > futrueTimeLimit; > > if (timestamp > currentTimeLimit) { > > //这里不更新maxTimestamp即不更新返回的watermark > > println("未来时间:timestamp=" + timestamp + ",maxTimestamp=" + > maxTimestamp + ",scalacurrentTimeLimit=" + currentTimeLimit); > > } else { > > if (timestamp > maxTimestamp) { > > maxTimestamp = timestamp > > } > > } > > } > > override def getWatermark: Watermark = new Watermark(maxTimestamp - > delay) > > override def equals(other: Any): Boolean = other match { > > case that: BoundedOutOfOrderTimestamps => > > delay == that.delay > > case _ => false > > } > > override def hashCode(): Int = { > > delay.hashCode() > > } > > } > > 原始邮件 > 发件人: zhisheng<[hidden email]> > 收件人: user-zh<[hidden email]> > 发送时间: 2019年7月24日(周三) 23:45 > 主题: Re: 请教Flink SQL watermark遇到未来时间的处理问题 > > > hi,仲尼: > 通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法: > > 1、在 Flink 从 Kafka 中消费数据后就进行 filter > 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5 > 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题 > 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5 > 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替 > > > > > > Best! > From zhisheng > > 郑 仲尼 <[hidden email]<mailto:[hidden email]>> > 于2019年7月24日周三 下午3:44写道: > > > 各位Flink社区大佬, > > 您好! > > 我使用Flink SQL (Flink > > > 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。 > > > > 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。 > > > > 目前,我们这边处理的方法是: > > > 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka > > topic,聚合任务再消费中间的kafka topic。 > > > > 我想请教的是: > > 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。 > > 2.不知是否有方法在一个任务中完成下面的两步操作: > > 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from > > KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤 > > 2)select sum(field2) from operatorTable group by TUMBLE(rowtime,INTERVAL > > '5' SECOND),field2 //这一步使用rowtime聚合输出 > > 这种方法目前存在的问题是:在定义KafkaSource > > > 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。 > > > > 我连接Kafka的代码大概如下: > > > > tEnv.connect( new Kafka() > > > > .topic(topic) > > > > .version(version) > > > > .startFromLatest() > > > > .properties(prop)) > > > > .withFormat(new Json() > > > > .failOnMissingField(false) > > > > .deriveSchema()) > > > > .withSchema(new Schema() > > > > .schema(tableSchema.getTableSchema()) > > > > .rowtime(new Rowtime() > > > > .timestampsFromField(rowTimeField) > > > > .watermarksPeriodicBounded(delay))) > > > > .inAppendMode() > > > > .registerTableSink("KafkaSource"); > > > > 以上,期待您帮忙解答,非常感谢~~ > > > > 祝 > > 顺利 > > ————————— > > Johnny Zheng > > > > > > |
Free forum by Nabble | Edit this page |