flink state问题

classic Classic list List threaded Threaded
2 messages Options
op
Reply | Threaded
Open this post in threaded view
|

flink state问题

op
大家好 
我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
 .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() {
  var state:ValueState[BloomFilter[CharSequence]]= null
  override def open(parameters: Configuration): Unit = {
    val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHint[BloomFilter[CharSequence]](){}))
    state = getRuntimeContext.getState(stateDesc)
  }
  override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {

    var filter = state.value
    if(filter==null){
      println("null filter")
      filter=  BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,10000,0.0001)}
    //val contains = filter.mightContain(value._2)
    if(!filter.mightContain(value._2)) {
      filter.put(value._2)
      state.update(filter)
      out.collect(value._2)

    }

  }
})
通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊
Reply | Threaded
Open this post in threaded view
|

Re: flink state问题

Congxian Qiu
Hi

你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html

Best,
Congxian


op <[hidden email]> 于2020年7月16日周四 下午7:16写道:

> 大家好&nbsp;
> 我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
>  .keyBy(_._1).process(new
> KeyedProcessFunction[String,(String,String),String]() {
>   var state:ValueState[BloomFilter[CharSequence]]= null
>   override def open(parameters: Configuration): Unit = {
>     val stateDesc = new
> ValueStateDescriptor("state",TypeInformation.of(new
> TypeHint[BloomFilter[CharSequence]](){}))
>     state = getRuntimeContext.getState(stateDesc)
>   }
>   override def processElement(value: (String, String), ctx:
> KeyedProcessFunction[String, (String, String), String]#Context, out:
> Collector[String]) = {
>
>     var filter = state.value
>     if(filter==null){
>       println("null filter")
>       filter=
> BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,10000,0.0001)}
>     //val contains = filter.mightContain(value._2)
>     if(!filter.mightContain(value._2)) {
>       filter.put(value._2)
>       state.update(filter)
>       out.collect(value._2)
>
>     }
>
>   }
> })
> 通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊