有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
比如 robot1 2020-11-11 12:00:00 msginfo 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 我们必须 按 robotId 做 keyBy 求大神指教。 谢谢, 王磊 |
Hi Lei,
我理解这篇文章少介绍了 keyby 的逻辑。 可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间, 同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。 Best, hailong 在 2020-11-11 12:54:22,"Lei Wang" <[hidden email]> 写道: >有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > >比如 >robot1 2020-11-11 12:00:00 msginfo >之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? > >flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > >我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > >这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 >我们必须 按 robotId 做 keyBy > >求大神指教。 > >谢谢, >王磊 |
In reply to this post by Lei Wang
感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道: > 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > > 比如 > robot1 2020-11-11 12:00:00 msginfo > 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? > > flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > > 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > > 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > 我们必须 按 robotId 做 keyBy > > 求大神指教。 > > 谢谢, > 王磊 > |
这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道: >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道: > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 >> >> 比如 >> robot1 2020-11-11 12:00:00 msginfo >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 就发出报警呢? >> >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? >> >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 >> >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 >> 我们必须 按 robotId 做 keyBy >> >> 求大神指教。 >> >> 谢谢, >> 王磊 >> |
hi
可以看看 Timer 的机制,能不能解决你的问题 Best zhisheng hailongwang <[hidden email]> 于2020年11月12日周四 下午5:25写道: > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道: > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > > > >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道: > > > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > >> > >> 比如 > >> robot1 2020-11-11 12:00:00 msginfo > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 > 就发出报警呢? > >> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > >> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > >> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > >> 我们必须 按 robotId 做 keyBy > >> > >> 求大神指教。 > >> > >> 谢谢, > >> 王磊 > >> > |
In reply to this post by hailongwang
用 session windown 确实能满足功能:
robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, y) -> y); 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 On Thu, Nov 12, 2020 at 5:25 PM hailongwang <[hidden email]> wrote: > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道: > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > > > >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道: > > > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > >> > >> 比如 > >> robot1 2020-11-11 12:00:00 msginfo > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 > 就发出报警呢? > >> > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > >> > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > >> > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > >> 我们必须 按 robotId 做 keyBy > >> > >> 求大神指教。 > >> > >> 谢谢, > >> 王磊 > >> > |
如果 gap 溢出只是少数 record 满足,用 window 性能确实不好,可以考虑用传统的 kv 记录状态
Lei Wang <[hidden email]> 于2020年11月12日周四 下午9:17写道: > 用 session windown 确实能满足功能: > > > robotIdKeyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))).reduce((x, > y) -> y); > > 按照这种写法, 我理解 window state 中只保存了最近的一条记录。 > > > 正常情况下 robot 都是会上报日志的,也就是说我这个 window 正常情况下会一直被保存下去。我不清楚会不会有性能影响。 > > > > On Thu, Nov 12, 2020 at 5:25 PM hailongwang <[hidden email]> wrote: > > > > > > > > > 这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。 > > 因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。 > > > > > > > > > > 在 2020-11-12 14:34:59,"Danny Chan" <[hidden email]> 写道: > > >感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑 > > > > > >Lei Wang <[hidden email]> 于2020年11月11日周三 下午2:03写道: > > > > > >> 有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。 > > >> > > >> 比如 > > >> robot1 2020-11-11 12:00:00 msginfo > > >> 之后 20 mins 一直没有收到 robot1 的消息,怎样才才能在 flink 中实现 2020-11-11 12:10:00 > > 就发出报警呢? > > >> > > >> flink 是消息驱动的,没有收到消息就不会触发操作,怎样在没有收到后续消息的条件下触发操作呢? > > >> > > >> 我试验了下 https://juejin.im/post/6844904193052901384 的例子,不满足我的应用场景。 > > >> > > >> 这个例子相当于所用订单共用一个 timeService, 每一次遍历一下所有的订单。 > > >> 我们必须 按 robotId 做 keyBy > > >> > > >> 求大神指教。 > > >> > > >> 谢谢, > > >> 王磊 > > >> > > > |
Free forum by Nabble | Edit this page |