你好,我在使用广播流的时候定义了一个MapState,并在逻辑处理中往其中放数据,但是我始终没法成功更新其值,忘解惑。
定义: private val carEfenceState: MapState[String, Boolean] = new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean]) 存值: carEfenceState.put(mapKey, true) 取值: carEfenceState.get(mapKey) 取到的值始终为 false. Thanks in advance! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
可以贴个完整的代码吗
------------------ 原始邮件 ------------------ 发件人: chaos <[hidden email]> 发送时间: 2021年3月10日 12:51 收件人: user-zh <[hidden email]> 主题: 回复:MapState 无法更新问题 你好,我在使用广播流的时候定义了一个MapState,并在逻辑处理中往其中放数据,但是我始终没法成功更新其值,忘解惑。 定义: private val carEfenceState: MapState[String, Boolean] = new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean]) 存值: carEfenceState.put(mapKey, true) 取值: carEfenceState.get(mapKey) 取到的值始终为 false. Thanks in advance! -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
你好,
主要代码如下: class getRule extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource] { private var carEfenceState: MapState[String, Boolean] = _ override def open(parameters: Configuration): Unit = { carEfenceState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean])) } override def processBroadcastElement(in2: List[Rule], context: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("rules", in2) } override def processElement(kafkaSource: KafkaStreamSource, readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#ReadOnlyContext, collector: Collector[KafkaStreamSource]): Unit = { val ruleIterator = readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator() while (ruleIterator.hasNext) { val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next() val ruleList: List[Rule] = ruleMap.getValue for (rule <- ruleList) { val mapKey = kafkaSource.vno + rule.id val tempState = carEfenceState.get(mapKey) val currentState = if (tempState != null) tempState else false // 业务逻辑 if (!currentState) { ... carEfenceState.put(mapKey, true) ... } else if (currentState) { ... carEfenceState.remove(mapKey) ... } } } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
A read-only view of the {@link BroadcastState}.
* * <p>Although read-only, the user code should not modify the value returned by the {@link * #get(Object)} or the entries of the immutable iterator returned by the {@link * #immutableEntries()}, as this can lead to inconsistent states. The reason for this is that we do * not create extra copies of the elements for performance reasons. * * @param <K> The key type of the elements in the {@link ReadOnlyBroadcastState}. * @param <V> The value type of the elements in the {@link ReadOnlyBroadcastState}. */ 这是源码中对ReadOnlyBroadcastState的描述,希望对你有帮助 smq 发件人: chaos 发送时间: 2021年3月10日 14:29 收件人: [hidden email] 主题: Re: 回复:MapState 无法更新问题 主要代码如下: class getRule extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource] { private var carEfenceState: MapState[String, Boolean] = _ override def open(parameters: Configuration): Unit = { carEfenceState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean])) } override def processBroadcastElement(in2: List[Rule], context: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("rules", in2) } override def processElement(kafkaSource: KafkaStreamSource, readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#ReadOnlyContext, collector: Collector[KafkaStreamSource]): Unit = { val ruleIterator = readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator() while (ruleIterator.hasNext) { val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next() val ruleList: List[Rule] = ruleMap.getValue for (rule <- ruleList) { val mapKey = kafkaSource.vno + rule.id val tempState = carEfenceState.get(mapKey) val currentState = if (tempState != null) tempState else false // 业务逻辑 if (!currentState) { ... carEfenceState.put(mapKey, true) ... } else if (currentState) { ... carEfenceState.remove(mapKey) ... } } } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ c |
In reply to this post by chaos
你虽然调整了实现方法的顺序,但是这个程序执行顺序还是先执行processElement(),后执行processBroadcastElement()
发件人: chaos 发送时间: 2021年3月10日 14:29 收件人: [hidden email] 主题: Re: 回复:MapState 无法更新问题 主要代码如下: class getRule extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource] { private var carEfenceState: MapState[String, Boolean] = _ override def open(parameters: Configuration): Unit = { carEfenceState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean])) } override def processBroadcastElement(in2: List[Rule], context: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("rules", in2) } override def processElement(kafkaSource: KafkaStreamSource, readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#ReadOnlyContext, collector: Collector[KafkaStreamSource]): Unit = { val ruleIterator = readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator() while (ruleIterator.hasNext) { val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next() val ruleList: List[Rule] = ruleMap.getValue for (rule <- ruleList) { val mapKey = kafkaSource.vno + rule.id val tempState = carEfenceState.get(mapKey) val currentState = if (tempState != null) tempState else false // 业务逻辑 if (!currentState) { ... carEfenceState.put(mapKey, true) ... } else if (currentState) { ... carEfenceState.remove(mapKey) ... } } } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/ c |
感谢回复,问题已解决。
解决方式: 参照官网的一个例子将状态的获取放在 processElement 内部。 private val eFenceMapStateDesc = new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean]) private val DbIdMapStateDesc = new MapStateDescriptor[String, Long]("eFenceCarDbIdState", classOf[String], classOf[Long]) override def processElement(...){ val eFenceMapState = getRuntimeContext.getMapState(eFenceMapStateDesc) val dbIdMapState = getRuntimeContext.getMapState(DbIdMapStateDesc) .... } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |