Hi
你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.htmlBest,
Congxian
op <
[hidden email]> 于2020年7月16日周四 下午7:16写道:
> 大家好
> 我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
> .keyBy(_._1).process(new
> KeyedProcessFunction[String,(String,String),String]() {
> var state:ValueState[BloomFilter[CharSequence]]= null
> override def open(parameters: Configuration): Unit = {
> val stateDesc = new
> ValueStateDescriptor("state",TypeInformation.of(new
> TypeHint[BloomFilter[CharSequence]](){}))
> state = getRuntimeContext.getState(stateDesc)
> }
> override def processElement(value: (String, String), ctx:
> KeyedProcessFunction[String, (String, String), String]#Context, out:
> Collector[String]) = {
>
> var filter = state.value
> if(filter==null){
> println("null filter")
> filter=
> BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,10000,0.0001)}
> //val contains = filter.mightContain(value._2)
> if(!filter.mightContain(value._2)) {
> filter.put(value._2)
> state.update(filter)
> out.collect(value._2)
>
> }
>
> }
> })
> 通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊