怎样实现超过一定时间没有收到消息就发出报警的功能?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

怎样实现超过一定时间没有收到消息就发出报警的功能?

Lei Wang
有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。

比如
robot1   2020-11-11 12:00:00     msginfo
之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?

flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?

我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。

这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
我们必须 按 robotId 做 keyBy

求大神指教。

谢谢,
王磊
Reply | Threaded
Open this post in threaded view
|

Re:怎样实现超过一定时间没有收到消息就发出报警的功能?

hailongwang
Hi Lei,
我理解这篇文章少介绍了 keyby 的逻辑。
可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间,
同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。


Best,
hailong




在 2020-11-11 12:54:22,"Lei Wang" <[hidden email]> 写道:

>有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>
>比如
>robot1   2020-11-11 12:00:00     msginfo
>之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>
>flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>
>我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>
>这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
>我们必须 按 robotId 做 keyBy
>
>求大神指教。
>
>谢谢,
>王磊
Reply | Threaded
Open this post in threaded view
|

Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

Danny Chan-2
In reply to this post by Lei Wang
感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑

Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道:

> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>
> 比如
> robot1   2020-11-11 12:00:00     msginfo
> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>
> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>
> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>
> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> 我们必须 按 robotId 做 keyBy
>
> 求大神指教。
>
> 谢谢,
> 王磊
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

hailongwang



这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。




在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道:

>感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
>
>Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道:
>
>> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>>
>> 比如
>> robot1   2020-11-11 12:00:00     msginfo
>> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00 就发出报警呢?
>>
>> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
>>
>> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
>>
>> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
>> 我们必须 按 robotId 做 keyBy
>>
>> 求大神指教。
>>
>> 谢谢,
>> 王磊
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

zhisheng
hi

可以看看 Timer 的机制,能不能解决你的问题

Best zhisheng

hailongwang <[hidden email]> 于2020年11月12日周四 下午5:25写道:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00     msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

Lei Wang
In reply to this post by hailongwang
用 session windown 确实能满足功能:

robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
y) -> y);

按照这种写法, 我理解 window state 中只保存了最近的一条记录。


正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。



On Thu, Nov 12, 2020 at 5:25 PM hailongwang <[hidden email]> wrote:

>
>
>
> 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
>
>
>
>
> 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道:
> >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> >
> >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道:
> >
> >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> >>
> >> 比如
> >> robot1   2020-11-11 12:00:00     msginfo
> >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> 就发出报警呢?
> >>
> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> >>
> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> >>
> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> >> 我们必须 按 robotId 做 keyBy
> >>
> >> 求大神指教。
> >>
> >> 谢谢,
> >> 王磊
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: 怎样实现超过一定时间没有收到消息就发出报警的功能?

Danny Chan-2
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态

Lei Wang <[hidden email]> 于2020年11月12日周四 下午9:17写道:

> 用 session windown 确实能满足功能:
>
>
> robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x,
> y) -> y);
>
> 按照这种写法, 我理解 window state 中只保存了最近的一条记录。
>
>
> 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。
>
>
>
> On Thu, Nov 12, 2020 at 5:25 PM hailongwang <[hidden email]> wrote:
>
> >
> >
> >
> > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
> > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
> >
> >
> >
> >
> > 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道:
> > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
> > >
> > >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道:
> > >
> > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
> > >>
> > >> 比如
> > >> robot1   2020-11-11 12:00:00     msginfo
> > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现  2020-11-11 12:10:00
> > 就发出报警呢?
> > >>
> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢?
> > >>
> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。
> > >>
> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。
> > >> 我们必须 按 robotId 做 keyBy
> > >>
> > >> 求大神指教。
> > >>
> > >> 谢谢,
> > >> 王磊
> > >>
> >
>