Hi:
从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢? 我现在的想法是: 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是 ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢? 求教,谢谢大家! |
Hi 瑞斌
如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 祝好 唐云 ________________________________ From: 邢瑞斌 <[hidden email]> Sent: Wednesday, October 30, 2019 17:57 To: [hidden email] <[hidden email]> Subject: 如何过滤异常的timestamp? Hi: 从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢? 我现在的想法是: 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是 ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢? 求教,谢谢大家! |
Hi 唐云,
谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗? Yun Tang <[hidden email]> 于2019年10月31日周四 上午2:26写道: > Hi 瑞斌 > > 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter > operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 > > 祝好 > 唐云 > ________________________________ > From: 邢瑞斌 <[hidden email]> > Sent: Wednesday, October 30, 2019 17:57 > To: [hidden email] <[hidden email]> > Subject: 如何过滤异常的timestamp? > > Hi: > > > 从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢? > > 我现在的想法是: > > 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是 > ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢? > > 求教,谢谢大家! > |
Hi 瑞斌
后续的operator在使用window操作时,所依赖的时间戳都是这个ingestion time,如果你的message里面有"event time"语义的field,那么后续就可以拿在source端生成的ingestion time 与这个field所表征的时间进行比较。 On 10/31/19, 10:45 AM, "邢瑞斌" <[hidden email]> wrote: Hi 唐云, 谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗? Yun Tang <[hidden email]> 于2019年10月31日周四 上午2:26写道: > Hi 瑞斌 > > 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter > operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 > > 祝好 > 唐云 > ________________________________ > From: 邢瑞斌 <[hidden email]> > Sent: Wednesday, October 30, 2019 17:57 > To: [hidden email] <[hidden email]> > Subject: 如何过滤异常的timestamp? > > Hi: > > > 从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢? > > 我现在的想法是: > > 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是 > ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢? > > 求教,谢谢大家! > |
了解了,非常感谢!
Yun Tang <[hidden email]> 于2019年10月31日周四 下午3:10写道: > Hi 瑞斌 > > 后续的operator在使用window操作时,所依赖的时间戳都是这个ingestion time,如果你的message里面有"event > time"语义的field,那么后续就可以拿在source端生成的ingestion time 与这个field所表征的时间进行比较。 > > > > On 10/31/19, 10:45 AM, "邢瑞斌" <[hidden email]> wrote: > > Hi 唐云, > > > 谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗? > > > Yun Tang <[hidden email]> 于2019年10月31日周四 上午2:26写道: > > > Hi 瑞斌 > > > > > 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter > > operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 > > > > 祝好 > > 唐云 > > ________________________________ > > From: 邢瑞斌 <[hidden email]> > > Sent: Wednesday, October 30, 2019 17:57 > > To: [hidden email] <[hidden email]> > > Subject: 如何过滤异常的timestamp? > > > > Hi: > > > > > > > 从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢? > > > > 我现在的想法是: > > > > 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是 > > > ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢? > > > > 求教,谢谢大家! > > > > > |
Free forum by Nabble | Edit this page |