请教Flink SQL watermark遇到未来时间的处理问题

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

请教Flink SQL watermark遇到未来时间的处理问题

zhengzhongni
各位Flink社区大佬,
您好!
我使用Flink SQL (Flink 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。

这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。

目前,我们这边处理的方法是:
1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka topic,聚合任务再消费中间的kafka topic。

我想请教的是:
1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。
2.不知是否有方法在一个任务中完成下面的两步操作:
1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤
2)select sum(field2) from operatorTable  group by TUMBLE(rowtime,INTERVAL '5' SECOND),field2 //这一步使用rowtime聚合输出
这种方法目前存在的问题是:在定义KafkaSource 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。

我连接Kafka的代码大概如下:

tEnv.connect( new Kafka()

         .topic(topic)

         .version(version)

         .startFromLatest()

         .properties(prop))

        .withFormat(new Json()

         .failOnMissingField(false)

         .deriveSchema())

        .withSchema(new Schema()

         .schema(tableSchema.getTableSchema())

         .rowtime(new Rowtime()

         .timestampsFromField(rowTimeField)

         .watermarksPeriodicBounded(delay)))

        .inAppendMode()

        .registerTableSink("KafkaSource");

以上,期待您帮忙解答,非常感谢~~


顺利
—————————
Johnny Zheng

Reply | Threaded
Open this post in threaded view
|

Re: 请教Flink SQL watermark遇到未来时间的处理问题

zhisheng
hi,仲尼:
  通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法:

1、在 Flink 从 Kafka 中消费数据后就进行 filter
部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5
分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题
2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5
分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替





Best!
From zhisheng

郑 仲尼 <[hidden email]> 于2019年7月24日周三 下午3:44写道:

> 各位Flink社区大佬,
> 您好!
> 我使用Flink SQL (Flink
> 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。
>
> 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。
>
> 目前,我们这边处理的方法是:
> 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka
> topic,聚合任务再消费中间的kafka topic。
>
> 我想请教的是:
> 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。
> 2.不知是否有方法在一个任务中完成下面的两步操作:
> 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from
> KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤
> 2)select sum(field2) from operatorTable  group by TUMBLE(rowtime,INTERVAL
> '5' SECOND),field2 //这一步使用rowtime聚合输出
> 这种方法目前存在的问题是:在定义KafkaSource
> 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。
>
> 我连接Kafka的代码大概如下:
>
> tEnv.connect( new Kafka()
>
>          .topic(topic)
>
>          .version(version)
>
>          .startFromLatest()
>
>          .properties(prop))
>
>         .withFormat(new Json()
>
>          .failOnMissingField(false)
>
>          .deriveSchema())
>
>         .withSchema(new Schema()
>
>          .schema(tableSchema.getTableSchema())
>
>          .rowtime(new Rowtime()
>
>          .timestampsFromField(rowTimeField)
>
>          .watermarksPeriodicBounded(delay)))
>
>         .inAppendMode()
>
>         .registerTableSink("KafkaSource");
>
> 以上,期待您帮忙解答,非常感谢~~
>
> 祝
> 顺利
> —————————
> Johnny Zheng
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 请教Flink SQL watermark遇到未来时间的处理问题

zhengzhongni
In reply to this post by zhengzhongni
hi,智笙:


    感谢提供解决思路,目前我这边还尝试了几种可行的方案:

1.在kafka反序列化的时候,判断kafka中日期字段的值,如果超过当前时间太多,则丢弃,或者重置为当前时间(重置其实可能导致正常数据丢失)。

2.自定义一个watermark,当时间大于当前时间太多的时候,不更新当前的watermark,这样在watermark达到这条未来时间的时间点后,也会将这条数据纳入窗口计算,这种其实是比较理想的。但是这种没有完全的测试,感觉数据会一直存放在内存中,不知道会不会引起其他问题。

在编写自定义watermark的时候,发现只能使用scala写,使用java实现的话,没有数据输出,debug看java 实现的代码生成的watermark也正确。因为对scala调用java不熟,不打算深究了。


    下面是关键的实现代码,希望对有类似问题的人有所帮助:

构建Schema时指定rowtime:

Rowtime rowtime = new Rowtime() .timestampsFromField(rowTimeField) .watermarksFromStrategy(new BoundedOutOfOrderTimestamps(delay,futrueTimeLimit));


下面是scala版本的自定义watermark生成策略,仿的Flink自带的BoundedOutOfOrderTimestamps:


import org.apache.flink.streaming.api.watermark.Watermark

import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner


final class BoundedOutOfOrderTimestamps(val delay: Long, val futrueTimeLimit: Long) extends PeriodicWatermarkAssigner {

  var maxTimestamp: Long = Long.MinValue + delay

  override def nextTimestamp(timestamp: Long): Unit = {

    //需考虑时区,28800000为8小时,按需修改

    var currentTimeLimit: Long = System.currentTimeMillis() + 28800000 + futrueTimeLimit;

    if (timestamp > currentTimeLimit) {

    //这里不更新maxTimestamp即不更新返回的watermark

      println("未来时间:timestamp=" + timestamp + ",maxTimestamp=" + maxTimestamp + ",scalacurrentTimeLimit=" + currentTimeLimit);

    } else {

      if (timestamp > maxTimestamp) {

        maxTimestamp = timestamp

      }

    }

  }

  override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)

  override def equals(other: Any): Boolean = other match {

    case that: BoundedOutOfOrderTimestamps =>

      delay == that.delay

    case _ => false

  }

  override def hashCode(): Int = {

    delay.hashCode()

  }

}

 原始邮件
发件人: zhisheng<[hidden email]>
收件人: user-zh<[hidden email]>
发送时间: 2019年7月24日(周三) 23:45
主题: Re: 请教Flink SQL watermark遇到未来时间的处理问题


hi,仲尼:
  通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法:

1、在 Flink 从 Kafka 中消费数据后就进行 filter
部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5
分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题
2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5
分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替





Best!
From zhisheng

郑 仲尼 <[hidden email]<mailto:[hidden email]>> 于2019年7月24日周三 下午3:44写道:

> 各位Flink社区大佬,
> 您好!
> 我使用Flink SQL (Flink
> 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。
>
> 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。
>
> 目前,我们这边处理的方法是:
> 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka
> topic,聚合任务再消费中间的kafka topic。
>
> 我想请教的是:
> 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。
> 2.不知是否有方法在一个任务中完成下面的两步操作:
> 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from
> KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤
> 2)select sum(field2) from operatorTable  group by TUMBLE(rowtime,INTERVAL
> '5' SECOND),field2 //这一步使用rowtime聚合输出
> 这种方法目前存在的问题是:在定义KafkaSource
> 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。
>
> 我连接Kafka的代码大概如下:
>
> tEnv.connect( new Kafka()
>
>          .topic(topic)
>
>          .version(version)
>
>          .startFromLatest()
>
>          .properties(prop))
>
>         .withFormat(new Json()
>
>          .failOnMissingField(false)
>
>          .deriveSchema())
>
>         .withSchema(new Schema()
>
>          .schema(tableSchema.getTableSchema())
>
>          .rowtime(new Rowtime()
>
>          .timestampsFromField(rowTimeField)
>
>          .watermarksPeriodicBounded(delay)))
>
>         .inAppendMode()
>
>         .registerTableSink("KafkaSource");
>
> 以上,期待您帮忙解答,非常感谢~~
>
> 祝
> 顺利
> —————————
> Johnny Zheng
>
>

Reply | Threaded
Open this post in threaded view
|

Re: 请教Flink SQL watermark遇到未来时间的处理问题

zhisheng
感谢你

郑 仲尼 <[hidden email]> 于 2019年7月31日周三 下午4:09写道:

> hi,智笙:
>
>
>     感谢提供解决思路,目前我这边还尝试了几种可行的方案:
>
> 1.在kafka反序列化的时候,判断kafka中日期字段的值,如果超过当前时间太多,则丢弃,或者重置为当前时间(重置其实可能导致正常数据丢失)。
>
>
> 2.自定义一个watermark,当时间大于当前时间太多的时候,不更新当前的watermark,这样在watermark达到这条未来时间的时间点后,也会将这条数据纳入窗口计算,这种其实是比较理想的。但是这种没有完全的测试,感觉数据会一直存放在内存中,不知道会不会引起其他问题。
>
> 在编写自定义watermark的时候,发现只能使用scala写,使用java实现的话,没有数据输出,debug看java
> 实现的代码生成的watermark也正确。因为对scala调用java不熟,不打算深究了。
>
>
>     下面是关键的实现代码,希望对有类似问题的人有所帮助:
>
> 构建Schema时指定rowtime:
>
> Rowtime rowtime = new Rowtime() .timestampsFromField(rowTimeField)
> .watermarksFromStrategy(new
> BoundedOutOfOrderTimestamps(delay,futrueTimeLimit));
>
>
> 下面是scala版本的自定义watermark生成策略,仿的Flink自带的BoundedOutOfOrderTimestamps:
>
>
> import org.apache.flink.streaming.api.watermark.Watermark
>
> import
> org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner
>
>
> final class BoundedOutOfOrderTimestamps(val delay: Long, val
> futrueTimeLimit: Long) extends PeriodicWatermarkAssigner {
>
>   var maxTimestamp: Long = Long.MinValue + delay
>
>   override def nextTimestamp(timestamp: Long): Unit = {
>
>     //需考虑时区,28800000为8小时,按需修改
>
>     var currentTimeLimit: Long = System.currentTimeMillis() + 28800000 +
> futrueTimeLimit;
>
>     if (timestamp > currentTimeLimit) {
>
>     //这里不更新maxTimestamp即不更新返回的watermark
>
>       println("未来时间:timestamp=" + timestamp + ",maxTimestamp=" +
> maxTimestamp + ",scalacurrentTimeLimit=" + currentTimeLimit);
>
>     } else {
>
>       if (timestamp > maxTimestamp) {
>
>         maxTimestamp = timestamp
>
>       }
>
>     }
>
>   }
>
>   override def getWatermark: Watermark = new Watermark(maxTimestamp -
> delay)
>
>   override def equals(other: Any): Boolean = other match {
>
>     case that: BoundedOutOfOrderTimestamps =>
>
>       delay == that.delay
>
>     case _ => false
>
>   }
>
>   override def hashCode(): Int = {
>
>     delay.hashCode()
>
>   }
>
> }
>
>  原始邮件
> 发件人: zhisheng<[hidden email]>
> 收件人: user-zh<[hidden email]>
> 发送时间: 2019年7月24日(周三) 23:45
> 主题: Re: 请教Flink SQL watermark遇到未来时间的处理问题
>
>
> hi,仲尼:
>   通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法:
>
> 1、在 Flink 从 Kafka 中消费数据后就进行 filter
> 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前 5
> 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题
> 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过 5
> 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替
>
>
>
>
>
> Best!
> From zhisheng
>
> 郑 仲尼 <[hidden email]<mailto:[hidden email]>>
> 于2019年7月24日周三 下午3:44写道:
>
> > 各位Flink社区大佬,
> > 您好!
> > 我使用Flink SQL (Flink
> >
> 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。
> >
> > 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。
> >
> > 目前,我们这边处理的方法是:
> >
> 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka
> > topic,聚合任务再消费中间的kafka topic。
> >
> > 我想请教的是:
> > 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式 ? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。
> > 2.不知是否有方法在一个任务中完成下面的两步操作:
> > 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from
> > KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤
> > 2)select sum(field2) from operatorTable  group by TUMBLE(rowtime,INTERVAL
> > '5' SECOND),field2 //这一步使用rowtime聚合输出
> > 这种方法目前存在的问题是:在定义KafkaSource
> >
> 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。
> >
> > 我连接Kafka的代码大概如下:
> >
> > tEnv.connect( new Kafka()
> >
> >          .topic(topic)
> >
> >          .version(version)
> >
> >          .startFromLatest()
> >
> >          .properties(prop))
> >
> >         .withFormat(new Json()
> >
> >          .failOnMissingField(false)
> >
> >          .deriveSchema())
> >
> >         .withSchema(new Schema()
> >
> >          .schema(tableSchema.getTableSchema())
> >
> >          .rowtime(new Rowtime()
> >
> >          .timestampsFromField(rowTimeField)
> >
> >          .watermarksPeriodicBounded(delay)))
> >
> >         .inAppendMode()
> >
> >         .registerTableSink("KafkaSource");
> >
> > 以上,期待您帮忙解答,非常感谢~~
> >
> > 祝
> > 顺利
> > —————————
> > Johnny Zheng
> >
> >
>
>