Flink checkpoint 并发问题

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink checkpoint 并发问题

戴嘉诚
大家好:

    我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的

在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。


这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?


java.lang.Exception: Could not perform checkpoint 550 for operator
KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
(16/20).

         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)

         at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)

         at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)

         at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)

         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)

         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)

         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

         at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not complete snapshot 550 for
operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
写入redis库存 (16/20).

         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)

         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)

         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)

         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)

         ... 8 more

Caused by: java.util.ConcurrentModificationException

         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)

         at java.util.HashMap$EntryIterator.next(HashMap.java:1476)

         at java.util.HashMap$EntryIterator.next(HashMap.java:1474)

         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)

         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)

         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)

         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)

         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)

         at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)

         at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)

         at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)

         at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)

         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)

         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)

         ... 13 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpoint 并发问题

athlon512@gmail.com
setMaxConcurrentCheckpoints 这个参数你设置过么?



[hidden email]
 
发件人: 戴嘉诚
发送时间: 2019-07-25 18:07
收件人: user-zh
主题: Flink checkpoint 并发问题
大家好:
 
    我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
 
在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
 
 
这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
 
 
java.lang.Exception: Could not perform checkpoint 550 for operator
KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
(16/20).
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
 
         at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
 
         at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
 
         at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
 
         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
 
         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 
         at java.lang.Thread.run(Thread.java:748)
 
Caused by: java.lang.Exception: Could not complete snapshot 550 for
operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
写入redis库存 (16/20).
 
         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
 
         ... 8 more
 
Caused by: java.util.ConcurrentModificationException
 
         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
 
         at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
 
         at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
 
         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
 
         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
 
         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
 
         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
 
         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
 
         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
 
         at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
 
         at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
 
         at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
 
         at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
 
         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
 
         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
 
         ... 13 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpoint 并发问题

athlon512@gmail.com
In reply to this post by 戴嘉诚
而且,我印象中mapstate操作不是线程安全的,需要你自己加锁



[hidden email]
 
发件人: 戴嘉诚
发送时间: 2019-07-25 18:07
收件人: user-zh
主题: Flink checkpoint 并发问题
大家好:
 
    我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
 
在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
 
 
这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
 
 
java.lang.Exception: Could not perform checkpoint 550 for operator
KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
(16/20).
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
 
         at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
 
         at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
 
         at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
 
         at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
 
         at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 
         at java.lang.Thread.run(Thread.java:748)
 
Caused by: java.lang.Exception: Could not complete snapshot 550 for
operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
写入redis库存 (16/20).
 
         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
 
         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
 
         ... 8 more
 
Caused by: java.util.ConcurrentModificationException
 
         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
 
         at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
 
         at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
 
         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
 
         at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
 
         at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
 
         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
 
         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
 
         at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
 
         at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
 
         at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
 
         at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
 
         at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
 
         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
 
         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
 
         ... 13 more
Reply | Threaded
Open this post in threaded view
|

Re: Flink checkpoint 并发问题

戴嘉诚
In reply to this post by athlon512@gmail.com
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了

[hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:

> setMaxConcurrentCheckpoints 这个参数你设置过么?
>
>
>
> [hidden email]
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:07
> 收件人: user-zh
> 主题: Flink checkpoint 并发问题
> 大家好:
>
>     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
>
>
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
>
>
>
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
>
>
> java.lang.Exception: Could not perform checkpoint 550 for operator
> KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> (16/20).
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
>
>          at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>          at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Could not complete snapshot 550 for
> operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存 (16/20).
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
>
>          ... 8 more
>
> Caused by: java.util.ConcurrentModificationException
>
>          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>
>          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>
>          at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
>
>          at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
>
>          ... 13 more
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

athlon512@gmail.com
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html 
If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys while iterating because of its specific implementation which does not support concurrent modifications. Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.

我找到这段信息,感觉和你的操作类似




[hidden email]
 
发件人: 戴嘉诚
发送时间: 2019-07-25 18:24
收件人: user-zh
主题: Re: Flink checkpoint 并发问题
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
 
[hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
 

> setMaxConcurrentCheckpoints 这个参数你设置过么?
>
>
>
> [hidden email]
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:07
> 收件人: user-zh
> 主题: Flink checkpoint 并发问题
> 大家好:
>
>     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
>
>
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
>
>
>
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
>
>
> java.lang.Exception: Could not perform checkpoint 550 for operator
> KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> (16/20).
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
>
>          at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>          at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Could not complete snapshot 550 for
> operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存 (16/20).
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
>
>          ... 8 more
>
> Caused by: java.util.ConcurrentModificationException
>
>          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>
>          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>
>          at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
>
>          at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
>
>          ... 13 more
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

athlon512@gmail.com
In reply to this post by 戴嘉诚
其实你可以不用自己删除.使用TTL设置短一些时间,试试



[hidden email]
 
发件人: 戴嘉诚
发送时间: 2019-07-25 18:24
收件人: user-zh
主题: Re: Flink checkpoint 并发问题
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
 
[hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
 

> setMaxConcurrentCheckpoints 这个参数你设置过么?
>
>
>
> [hidden email]
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:07
> 收件人: user-zh
> 主题: Flink checkpoint 并发问题
> 大家好:
>
>     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
>
>
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
>
>
>
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
>
>
> java.lang.Exception: Could not perform checkpoint 550 for operator
> KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> (16/20).
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
>
>          at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>          at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Could not complete snapshot 550 for
> operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存 (16/20).
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
>
>          ... 8 more
>
> Caused by: java.util.ConcurrentModificationException
>
>          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>
>          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>
>          at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
>
>          at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
>
>          ... 13 more
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

戴嘉诚
这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v


[hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:

> 其实你可以不用自己删除.使用TTL设置短一些时间,试试
>
>
>
> [hidden email]
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:24
> 收件人: user-zh
> 主题: Re: Flink checkpoint 并发问题
> 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
>
> [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
>
> > setMaxConcurrentCheckpoints 这个参数你设置过么?
> >
> >
> >
> > [hidden email]
> >
> > 发件人: 戴嘉诚
> > 发送时间: 2019-07-25 18:07
> > 收件人: user-zh
> > 主题: Flink checkpoint 并发问题
> > 大家好:
> >
> >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> >
> >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> >
> >
> >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> >
> >
> > java.lang.Exception: Could not perform checkpoint 550 for operator
> > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > (16/20).
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> >
> >          at org.apache.flink.streaming.runtime.io
> > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> >
> >          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> >
> >          at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > 写入redis库存 (16/20).
> >
> >          at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> >
> >          ... 8 more
> >
> > Caused by: java.util.ConcurrentModificationException
> >
> >          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> >
> >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> >
> >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> >
> >          at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> >
> >          at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> >
> >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> >
> >          at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> >
> >          at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> >
> >          at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> >
> >          at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> >
> >          at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> >
> >          at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> >
> >          at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> >
> >          ... 13 more
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

athlon512@gmail.com
那你用window和evictor 不可以吗?
类似这样,因为我理解你的业务需求可以用这个来实现
在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing


 
发件人: [hidden email]
发送时间: 2019-07-25 18:45
收件人: [hidden email]
主题: Re: Re: Flink checkpoint 并发问题
这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
 
[hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
 
> 其实你可以不用自己删除.使用TTL设置短一些时间,试试
>
>
>
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:24
> 收件人: user-zh
> 主题: Re: Flink checkpoint 并发问题
> 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
>
> [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
>
> > setMaxConcurrentCheckpoints 这个参数你设置过么?
> >
> >
> >
> >
> > 发件人: 戴嘉诚
> > 发送时间: 2019-07-25 18:07
> > 收件人: user-zh
> > 主题: Flink checkpoint 并发问题
> > 大家好:
> >
> >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> >
> >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> >
> >
> >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> >
> >
> > java.lang.Exception: Could not perform checkpoint 550 for operator
> > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > (16/20).
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> >
> >          at org.apache.flink.streaming.runtime.io
> > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> >
> >          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> >
> >          at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > 写入redis库存 (16/20).
> >
> >          at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> >
> >          ... 8 more
> >
> > Caused by: java.util.ConcurrentModificationException
> >
> >          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> >
> >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> >
> >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> >
> >          at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> >
> >          at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> >
> >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> >
> >          at
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> >
> >          at
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> >
> >          at
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> >
> >          at
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> >
> >          at
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> >
> >          at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> >
> >          at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> >
> >          at
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> >
> >          ... 13 more
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

戴嘉诚
这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数

[hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:

> 那你用window和evictor 不可以吗?
> 类似这样,因为我理解你的业务需求可以用这个来实现
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
>
> ------------------------------
> [hidden email]
>
>
> *发件人:* 戴嘉诚 <[hidden email]>
> *发送时间:* 2019-07-25 18:45
> *收件人:* user-zh <[hidden email]>
> *主题:* Re: Re: Flink checkpoint 并发问题
>
>
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> 对
>
> [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
>
> > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> >
> >
> >
> > [hidden email]
> >
> > 发件人: 戴嘉诚
> > 发送时间: 2019-07-25 18:24
> > 收件人: user-zh
> > 主题: Re: Flink checkpoint 并发问题
> > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> >
> > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> >
> > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > >
> > >
> > >
> > > [hidden email]
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:07
> > > 收件人: user-zh
> > > 主题: Flink checkpoint 并发问题
> > > 大家好:
> > >
> > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > >
> > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > >
> > >
> > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > >
> > >
> > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > (16/20).
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > >
> > >          at org.apache.flink.streaming.runtime.io
> > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > >
> > >          at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > >
> > >          at java.lang.Thread.run(Thread.java:748)
> > >
> > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > 写入redis库存 (16/20).
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > >
> > >          ... 8 more
> > >
> > > Caused by: java.util.ConcurrentModificationException
> > >
> > >          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > >
> > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > >
> > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > >
> > >          at
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > >
> > >          at
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > >
> > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > >
> > >          at
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > >
> > >          ... 13 more
> > >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

Yun Tang
Hi  all

你们讨论的已经越来越偏了,出问题的是operator state backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。

To 戴嘉诚
你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?

祝好
唐云
________________________________
From: 戴嘉诚 <[hidden email]>
Sent: Thursday, July 25, 2019 19:04
To: [hidden email] <[hidden email]>
Subject: Re: Re: Flink checkpoint 并发问题

这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数

[hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:

> 那你用window和evictor 不可以吗?
> 类似这样,因为我理解你的业务需求可以用这个来实现
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
>
> ------------------------------
> [hidden email]
>
>
> *发件人:* 戴嘉诚 <[hidden email]>
> *发送时间:* 2019-07-25 18:45
> *收件人:* user-zh <[hidden email]>
> *主题:* Re: Re: Flink checkpoint 并发问题
>
>
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> 对
>
> [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
>
> > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> >
> >
> >
> > [hidden email]
> >
> > 发件人: 戴嘉诚
> > 发送时间: 2019-07-25 18:24
> > 收件人: user-zh
> > 主题: Re: Flink checkpoint 并发问题
> > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> >
> > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> >
> > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > >
> > >
> > >
> > > [hidden email]
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:07
> > > 收件人: user-zh
> > > 主题: Flink checkpoint 并发问题
> > > 大家好:
> > >
> > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > >
> > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > >
> > >
> > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > >
> > >
> > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > (16/20).
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > >
> > >          at org.apache.flink.streaming.runtime.io
> > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > >
> > >          at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > >
> > >          at java.lang.Thread.run(Thread.java:748)
> > >
> > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > 写入redis库存 (16/20).
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > >
> > >          ... 8 more
> > >
> > > Caused by: java.util.ConcurrentModificationException
> > >
> > >          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > >
> > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > >
> > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > >
> > >          at
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > >
> > >          at
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > >
> > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > >
> > >          at
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > >
> > >          at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > >
> > >          at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > >
> > >          ... 13 more
> > >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

戴嘉诚
hi
    你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
descriptor是使用MapStateDescriptor<String, Long>,
谢谢!

Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:

> Hi  all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <[hidden email]>
> Sent: Thursday, July 25, 2019 19:04
> To: [hidden email] <[hidden email]>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
>
> [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
>
> > 那你用window和evictor 不可以吗?
> > 类似这样,因为我理解你的业务需求可以用这个来实现
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> >
> > ------------------------------
> > [hidden email]
> >
> >
> > *发件人:* 戴嘉诚 <[hidden email]>
> > *发送时间:* 2019-07-25 18:45
> > *收件人:* user-zh <[hidden email]>
> > *主题:* Re: Re: Flink checkpoint 并发问题
> >
> >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > 对
> >
> > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> >
> > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > >
> > >
> > >
> > > [hidden email]
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:24
> > > 收件人: user-zh
> > > 主题: Re: Flink checkpoint 并发问题
> > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > >
> > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> > >
> > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > >
> > > >
> > > >
> > > > [hidden email]
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:07
> > > > 收件人: user-zh
> > > > 主题: Flink checkpoint 并发问题
> > > > 大家好:
> > > >
> > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > >
> > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > >
> > > >
> > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > >
> > > >          at org.apache.flink.streaming.runtime.io
> > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > >
> > > >          at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > >
> > > >          at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > 写入redis库存 (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > >
> > > >          ... 8 more
> > > >
> > > > Caused by: java.util.ConcurrentModificationException
> > > >
> > > >          at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > >
> > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > >
> > > >          ... 13 more
> > > >
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

athlon512@gmail.com
In reply to this post by Yun Tang
有请大佬答疑解惑😀😀😀

Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:

> Hi  all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <[hidden email]>
> Sent: Thursday, July 25, 2019 19:04
> To: [hidden email] <[hidden email]>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
>
> [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
>
> > 那你用window和evictor 不可以吗?
> > 类似这样,因为我理解你的业务需求可以用这个来实现
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> >
> > ------------------------------
> > [hidden email]
> >
> >
> > *发件人:* 戴嘉诚 <[hidden email]>
> > *发送时间:* 2019-07-25 18:45
> > *收件人:* user-zh <[hidden email]>
> > *主题:* Re: Re: Flink checkpoint 并发问题
> >
> >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > 对
> >
> > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> >
> > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > >
> > >
> > >
> > > [hidden email]
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:24
> > > 收件人: user-zh
> > > 主题: Re: Flink checkpoint 并发问题
> > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > >
> > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> > >
> > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > >
> > > >
> > > >
> > > > [hidden email]
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:07
> > > > 收件人: user-zh
> > > > 主题: Flink checkpoint 并发问题
> > > > 大家好:
> > > >
> > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > >
> > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > >
> > > >
> > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > >
> > > >          at org.apache.flink.streaming.runtime.io
> > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > >
> > > >          at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > >
> > > >          at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > 写入redis库存 (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > >
> > > >          ... 8 more
> > > >
> > > > Caused by: java.util.ConcurrentModificationException
> > > >
> > > >          at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > >
> > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > >
> > > >          ... 13 more
> > > >
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

Yun Tang
In reply to this post by 戴嘉诚
Hi 戴嘉诚

从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor<String, Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:

  *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
  *   由于operator state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator state的申明以及相关的使用地方也最好提供一下。

[1] https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88

祝好
唐云
________________________________
From: 戴嘉诚 <[hidden email]>
Sent: Thursday, July 25, 2019 19:26
To: user-zh <[hidden email]>
Subject: Re: Re: Flink checkpoint 并发问题

hi
    你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
descriptor是使用MapStateDescriptor<String, Long>,
谢谢!

Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:

> Hi  all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <[hidden email]>
> Sent: Thursday, July 25, 2019 19:04
> To: [hidden email] <[hidden email]>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
>
> [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
>
> > 那你用window和evictor 不可以吗?
> > 类似这样,因为我理解你的业务需求可以用这个来实现
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> >
> > ------------------------------
> > [hidden email]
> >
> >
> > *发件人:* 戴嘉诚 <[hidden email]>
> > *发送时间:* 2019-07-25 18:45
> > *收件人:* user-zh <[hidden email]>
> > *主题:* Re: Re: Flink checkpoint 并发问题
> >
> >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > 对
> >
> > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> >
> > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > >
> > >
> > >
> > > [hidden email]
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:24
> > > 收件人: user-zh
> > > 主题: Re: Flink checkpoint 并发问题
> > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > >
> > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> > >
> > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > >
> > > >
> > > >
> > > > [hidden email]
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:07
> > > > 收件人: user-zh
> > > > 主题: Flink checkpoint 并发问题
> > > > 大家好:
> > > >
> > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > >
> > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > >
> > > >
> > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > >
> > > >          at org.apache.flink.streaming.runtime.io
> > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > >
> > > >          at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > >
> > > >          at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > 写入redis库存 (16/20).
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > >
> > > >          ... 8 more
> > > >
> > > > Caused by: java.util.ConcurrentModificationException
> > > >
> > > >          at
> java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > >
> > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > >
> > > >          at
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > >
> > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > >
> > > >          at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > >
> > > >          ... 13 more
> > > >
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

戴嘉诚
Hi 唐云

这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。

这个其中的ProcessElement大致的代码是这样的:
.process(new KeyedProcessFunction<String, Map<String, Object>, Map<String,
Object>>() {
          private static final long serialVersionUID = 5245290869789704294L;

          private MapState<String, Long> accumulateStateMap;
          Map<String, Object> resultMap = new HashMap<>();
          private transient Long hourClear = 24L;


          @Override
          public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Long> accumulateState = new
MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
LongSerializer.INSTANCE);
            accumulateStateMap =
getRuntimeContext().getMapState(accumulateState);
          }

          @Override
          public void processElement(Map<String, Object> value, Context ctx,
              Collector<Map<String, Object>> out) throws Exception {
            logger.info("来数据了:{}", value);
            realData.increment();
            resultMap.clear();
            String valueFieldValue =
String.valueOf(value.get(getLastName(valueFieldName)));
            Long timeFieldValue =
Long.parseLong(String.valueOf(value.get(timeFieldName)));
            //写到state中
            //判断,state是否存在fieldValue,  如果fieldValue
存在,再判断state的时间是否小于time(用于判断乱序时间
            if (!accumulateStateMap.contains(valueFieldValue) ||
accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
              accumulateStateMap.put(valueFieldValue, timeFieldValue);
            }
            //判断配置是否已经刷进来了
            if (value.containsKey("config")) {
              Map<String, String> config = (Map<String, String>)
value.get("config");
              Integer configCount =
Integer.parseInt(config.get(countFieldName));
              Long configTime =
Long.parseLong(config.get(timeRangeFieldName)) * 1000;
              //在配置时间范围前
              long lastTimeStamp = timeFieldValue - configTime;
              //状态里面有多少个值
              int stateSize = 0;
              //遍历state, 删除过时的时间
              Iterator<Entry<String, Long>> iterator =
accumulateStateMap.iterator();
              while (iterator.hasNext()) {
                ++stateSize;
                Entry<String, Long> next = iterator.next();
                if (lastTimeStamp >= next.getValue()) {
                  iterator.remove();
                  --stateSize;
                }
              }
              //state的值的数量大于阈值
              if (stateSize >= configCount) {
                resultMap.put("id", config.get("id"));
                resultMap.put("config_id", config.get("_id"));
                resultMap.put("config_version", config.get("_version"));
                resultMap.put("config_score", config.get("score"));
                resultMap.put("config_ttl", config.get("ttl"));
                resultMap.put("startTime", lastTimeStamp);
                resultMap.put("endTime", timeFieldValue);
                resultMap.put("key", ctx.getCurrentKey());
                resultMap.put("value", valueFieldValue);
                resultMap.put("count", stateSize);
                out.collect(resultMap);
              }
              logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
stateSize);
              //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
              hourClear = (configTime * 2  + EXPIRATION_TIME) / 3600000;
              LocalDateTime localDateTime =
Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
              localDateTime =
localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
              long timeClean =
localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
              timeTimerGauge = String.valueOf(timeClean);
              //注册过期时间
              ctx.timerService().registerEventTimeTimer(timeClean);
            }
          }

          @Override
          public void onTimer(long timestamp, OnTimerContext ctx,
              Collector<Map<String, Object>> out) throws Exception {
            //减去时间,防止删除中间要累积的数据
            Long lasttime = timestamp - (hourClear * 3600000);
            //删除过期时间
            int stateSize = 0;
            int removeState = 0;
            Iterator<Entry<String, Long>> iterator =
accumulateStateMap.iterator();
            while (iterator.hasNext()) {
              ++stateSize;
              Entry<String, Long> next = iterator.next();
              if (lasttime >= next.getValue()) {
                iterator.remove();
                --stateSize;
                ++removeState;
              }
            }
            if (stateSize == 0) {
              accumulateStateMap.clear();
            }
            //把这个定时器删除掉
            ctx.timerService().deleteEventTimeTimer(timestamp);
          }
        })

Yun Tang <[hidden email]> 于2019年7月25日周四 下午8:39写道:

> Hi 戴嘉诚
>
> 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
> state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor<String,
> Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:
>
>   *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
>   *   由于operator
> state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
> state的申明以及相关的使用地方也最好提供一下。
>
> [1]
> https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <[hidden email]>
> Sent: Thursday, July 25, 2019 19:26
> To: user-zh <[hidden email]>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> hi
>     你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
> descriptor是使用MapStateDescriptor<String, Long>,
> 谢谢!
>
> Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:
>
> > Hi  all
> >
> > 你们讨论的已经越来越偏了,出问题的是operator state
> > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
> >
> > To 戴嘉诚
> > 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: 戴嘉诚 <[hidden email]>
> > Sent: Thursday, July 25, 2019 19:04
> > To: [hidden email] <[hidden email]>
> > Subject: Re: Re: Flink checkpoint 并发问题
> >
> > 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> > 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
> >
> > [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
> >
> > > 那你用window和evictor 不可以吗?
> > > 类似这样,因为我理解你的业务需求可以用这个来实现
> > >
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> > >
> > > ------------------------------
> > > [hidden email]
> > >
> > >
> > > *发件人:* 戴嘉诚 <[hidden email]>
> > > *发送时间:* 2019-07-25 18:45
> > > *收件人:* user-zh <[hidden email]>
> > > *主题:* Re: Re: Flink checkpoint 并发问题
> > >
> > >
> > >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > > 对
> > >
> > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> > >
> > > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > > >
> > > >
> > > >
> > > > [hidden email]
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:24
> > > > 收件人: user-zh
> > > > 主题: Re: Flink checkpoint 并发问题
> > > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > > >
> > > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> > > >
> > > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > > >
> > > > >
> > > > >
> > > > > [hidden email]
> > > > >
> > > > > 发件人: 戴嘉诚
> > > > > 发送时间: 2019-07-25 18:07
> > > > > 收件人: user-zh
> > > > > 主题: Flink checkpoint 并发问题
> > > > > 大家好:
> > > > >
> > > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > > >
> > > > >
> > > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > > (16/20).
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > > >
> > > > >          at org.apache.flink.streaming.runtime.io
> > > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > > >
> > > > >          at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > > >
> > > > >          at java.lang.Thread.run(Thread.java:748)
> > > > >
> > > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > > 写入redis库存 (16/20).
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > > >
> > > > >          ... 8 more
> > > > >
> > > > > Caused by: java.util.ConcurrentModificationException
> > > > >
> > > > >          at
> > java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > > >
> > > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > > >
> > > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > > >
> > > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > > >
> > > > >          ... 13 more
> > > > >
> > > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

Yun Tang
Hi

你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator -> Flat Map -> Sink", 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator state "_async_wait_operator_state_"相关。最近fix的 https://issues.apache.org/jira/browse/FLINK-13063 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。

祝好
唐云
________________________________
From: 戴嘉诚 <[hidden email]>
Sent: Thursday, July 25, 2019 21:07
To: user-zh <[hidden email]>
Subject: Re: Re: Flink checkpoint 并发问题

Hi 唐云

这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。

这个其中的ProcessElement大致的代码是这样的:
.process(new KeyedProcessFunction<String, Map<String, Object>, Map<String,
Object>>() {
          private static final long serialVersionUID = 5245290869789704294L;

          private MapState<String, Long> accumulateStateMap;
          Map<String, Object> resultMap = new HashMap<>();
          private transient Long hourClear = 24L;


          @Override
          public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Long> accumulateState = new
MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
LongSerializer.INSTANCE);
            accumulateStateMap =
getRuntimeContext().getMapState(accumulateState);
          }

          @Override
          public void processElement(Map<String, Object> value, Context ctx,
              Collector<Map<String, Object>> out) throws Exception {
            logger.info("来数据了:{}", value);
            realData.increment();
            resultMap.clear();
            String valueFieldValue =
String.valueOf(value.get(getLastName(valueFieldName)));
            Long timeFieldValue =
Long.parseLong(String.valueOf(value.get(timeFieldName)));
            //写到state中
            //判断,state是否存在fieldValue,  如果fieldValue
存在,再判断state的时间是否小于time(用于判断乱序时间
            if (!accumulateStateMap.contains(valueFieldValue) ||
accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
              accumulateStateMap.put(valueFieldValue, timeFieldValue);
            }
            //判断配置是否已经刷进来了
            if (value.containsKey("config")) {
              Map<String, String> config = (Map<String, String>)
value.get("config");
              Integer configCount =
Integer.parseInt(config.get(countFieldName));
              Long configTime =
Long.parseLong(config.get(timeRangeFieldName)) * 1000;
              //在配置时间范围前
              long lastTimeStamp = timeFieldValue - configTime;
              //状态里面有多少个值
              int stateSize = 0;
              //遍历state, 删除过时的时间
              Iterator<Entry<String, Long>> iterator =
accumulateStateMap.iterator();
              while (iterator.hasNext()) {
                ++stateSize;
                Entry<String, Long> next = iterator.next();
                if (lastTimeStamp >= next.getValue()) {
                  iterator.remove();
                  --stateSize;
                }
              }
              //state的值的数量大于阈值
              if (stateSize >= configCount) {
                resultMap.put("id", config.get("id"));
                resultMap.put("config_id", config.get("_id"));
                resultMap.put("config_version", config.get("_version"));
                resultMap.put("config_score", config.get("score"));
                resultMap.put("config_ttl", config.get("ttl"));
                resultMap.put("startTime", lastTimeStamp);
                resultMap.put("endTime", timeFieldValue);
                resultMap.put("key", ctx.getCurrentKey());
                resultMap.put("value", valueFieldValue);
                resultMap.put("count", stateSize);
                out.collect(resultMap);
              }
              logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
stateSize);
              //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
              hourClear = (configTime * 2  + EXPIRATION_TIME) / 3600000;
              LocalDateTime localDateTime =
Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
              localDateTime =
localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
              long timeClean =
localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
              timeTimerGauge = String.valueOf(timeClean);
              //注册过期时间
              ctx.timerService().registerEventTimeTimer(timeClean);
            }
          }

          @Override
          public void onTimer(long timestamp, OnTimerContext ctx,
              Collector<Map<String, Object>> out) throws Exception {
            //减去时间,防止删除中间要累积的数据
            Long lasttime = timestamp - (hourClear * 3600000);
            //删除过期时间
            int stateSize = 0;
            int removeState = 0;
            Iterator<Entry<String, Long>> iterator =
accumulateStateMap.iterator();
            while (iterator.hasNext()) {
              ++stateSize;
              Entry<String, Long> next = iterator.next();
              if (lasttime >= next.getValue()) {
                iterator.remove();
                --stateSize;
                ++removeState;
              }
            }
            if (stateSize == 0) {
              accumulateStateMap.clear();
            }
            //把这个定时器删除掉
            ctx.timerService().deleteEventTimeTimer(timestamp);
          }
        })

Yun Tang <[hidden email]> 于2019年7月25日周四 下午8:39写道:

> Hi 戴嘉诚
>
> 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
> state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor<String,
> Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:
>
>   *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
>   *   由于operator
> state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
> state的申明以及相关的使用地方也最好提供一下。
>
> [1]
> https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <[hidden email]>
> Sent: Thursday, July 25, 2019 19:26
> To: user-zh <[hidden email]>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> hi
>     你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
> descriptor是使用MapStateDescriptor<String, Long>,
> 谢谢!
>
> Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:
>
> > Hi  all
> >
> > 你们讨论的已经越来越偏了,出问题的是operator state
> > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
> >
> > To 戴嘉诚
> > 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: 戴嘉诚 <[hidden email]>
> > Sent: Thursday, July 25, 2019 19:04
> > To: [hidden email] <[hidden email]>
> > Subject: Re: Re: Flink checkpoint 并发问题
> >
> > 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> > 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
> >
> > [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
> >
> > > 那你用window和evictor 不可以吗?
> > > 类似这样,因为我理解你的业务需求可以用这个来实现
> > >
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> > >
> > > ------------------------------
> > > [hidden email]
> > >
> > >
> > > *发件人:* 戴嘉诚 <[hidden email]>
> > > *发送时间:* 2019-07-25 18:45
> > > *收件人:* user-zh <[hidden email]>
> > > *主题:* Re: Re: Flink checkpoint 并发问题
> > >
> > >
> > >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > > 对
> > >
> > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> > >
> > > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > > >
> > > >
> > > >
> > > > [hidden email]
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:24
> > > > 收件人: user-zh
> > > > 主题: Re: Flink checkpoint 并发问题
> > > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > > >
> > > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> > > >
> > > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > > >
> > > > >
> > > > >
> > > > > [hidden email]
> > > > >
> > > > > 发件人: 戴嘉诚
> > > > > 发送时间: 2019-07-25 18:07
> > > > > 收件人: user-zh
> > > > > 主题: Flink checkpoint 并发问题
> > > > > 大家好:
> > > > >
> > > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > > >
> > > > >
> > > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > > (16/20).
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > > >
> > > > >          at org.apache.flink.streaming.runtime.io
> > > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > > >
> > > > >          at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > > >
> > > > >          at java.lang.Thread.run(Thread.java:748)
> > > > >
> > > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > > 写入redis库存 (16/20).
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > > >
> > > > >          ... 8 more
> > > > >
> > > > > Caused by: java.util.ConcurrentModificationException
> > > > >
> > > > >          at
> > java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > > >
> > > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > > >
> > > > >          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > > >
> > > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > > >
> > > > >          at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > > >
> > > > >          ... 13 more
> > > > >
> > > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

戴嘉诚
嗯嗯!感谢

Yun Tang <[hidden email]> 于2019年7月25日周四 下午9:37写道:

> Hi
>
> 你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed
> state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator ->
> Flat Map -> Sink",
> 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator
> state "_async_wait_operator_state_"相关。最近fix的
> https://issues.apache.org/jira/browse/FLINK-13063
> 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。
>
> 祝好
> 唐云
> ________________________________
> From: 戴嘉诚 <[hidden email]>
> Sent: Thursday, July 25, 2019 21:07
> To: user-zh <[hidden email]>
> Subject: Re: Re: Flink checkpoint 并发问题
>
> Hi 唐云
>
>
> 这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。
>
> 这个其中的ProcessElement大致的代码是这样的:
> .process(new KeyedProcessFunction<String, Map<String, Object>, Map<String,
> Object>>() {
>           private static final long serialVersionUID =
> 5245290869789704294L;
>
>           private MapState<String, Long> accumulateStateMap;
>           Map<String, Object> resultMap = new HashMap<>();
>           private transient Long hourClear = 24L;
>
>
>           @Override
>           public void open(Configuration parameters) throws Exception {
>             MapStateDescriptor<String, Long> accumulateState = new
> MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
> LongSerializer.INSTANCE);
>             accumulateStateMap =
> getRuntimeContext().getMapState(accumulateState);
>           }
>
>           @Override
>           public void processElement(Map<String, Object> value, Context
> ctx,
>               Collector<Map<String, Object>> out) throws Exception {
>             logger.info("来数据了:{}", value);
>             realData.increment();
>             resultMap.clear();
>             String valueFieldValue =
> String.valueOf(value.get(getLastName(valueFieldName)));
>             Long timeFieldValue =
> Long.parseLong(String.valueOf(value.get(timeFieldName)));
>             //写到state中
>             //判断,state是否存在fieldValue,  如果fieldValue
> 存在,再判断state的时间是否小于time(用于判断乱序时间
>             if (!accumulateStateMap.contains(valueFieldValue) ||
> accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
>               accumulateStateMap.put(valueFieldValue, timeFieldValue);
>             }
>             //判断配置是否已经刷进来了
>             if (value.containsKey("config")) {
>               Map<String, String> config = (Map<String, String>)
> value.get("config");
>               Integer configCount =
> Integer.parseInt(config.get(countFieldName));
>               Long configTime =
> Long.parseLong(config.get(timeRangeFieldName)) * 1000;
>               //在配置时间范围前
>               long lastTimeStamp = timeFieldValue - configTime;
>               //状态里面有多少个值
>               int stateSize = 0;
>               //遍历state, 删除过时的时间
>               Iterator<Entry<String, Long>> iterator =
> accumulateStateMap.iterator();
>               while (iterator.hasNext()) {
>                 ++stateSize;
>                 Entry<String, Long> next = iterator.next();
>                 if (lastTimeStamp >= next.getValue()) {
>                   iterator.remove();
>                   --stateSize;
>                 }
>               }
>               //state的值的数量大于阈值
>               if (stateSize >= configCount) {
>                 resultMap.put("id", config.get("id"));
>                 resultMap.put("config_id", config.get("_id"));
>                 resultMap.put("config_version", config.get("_version"));
>                 resultMap.put("config_score", config.get("score"));
>                 resultMap.put("config_ttl", config.get("ttl"));
>                 resultMap.put("startTime", lastTimeStamp);
>                 resultMap.put("endTime", timeFieldValue);
>                 resultMap.put("key", ctx.getCurrentKey());
>                 resultMap.put("value", valueFieldValue);
>                 resultMap.put("count", stateSize);
>                 out.collect(resultMap);
>               }
>               logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
> stateSize);
>               //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
>               hourClear = (configTime * 2  + EXPIRATION_TIME) / 3600000;
>               LocalDateTime localDateTime =
>
> Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
>               localDateTime =
> localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
>               long timeClean =
> localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
>               timeTimerGauge = String.valueOf(timeClean);
>               //注册过期时间
>               ctx.timerService().registerEventTimeTimer(timeClean);
>             }
>           }
>
>           @Override
>           public void onTimer(long timestamp, OnTimerContext ctx,
>               Collector<Map<String, Object>> out) throws Exception {
>             //减去时间,防止删除中间要累积的数据
>             Long lasttime = timestamp - (hourClear * 3600000);
>             //删除过期时间
>             int stateSize = 0;
>             int removeState = 0;
>             Iterator<Entry<String, Long>> iterator =
> accumulateStateMap.iterator();
>             while (iterator.hasNext()) {
>               ++stateSize;
>               Entry<String, Long> next = iterator.next();
>               if (lasttime >= next.getValue()) {
>                 iterator.remove();
>                 --stateSize;
>                 ++removeState;
>               }
>             }
>             if (stateSize == 0) {
>               accumulateStateMap.clear();
>             }
>             //把这个定时器删除掉
>             ctx.timerService().deleteEventTimeTimer(timestamp);
>           }
>         })
>
> Yun Tang <[hidden email]> 于2019年7月25日周四 下午8:39写道:
>
> > Hi 戴嘉诚
> >
> > 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
> > state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor<String,
> >
> Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:
> >
> >   *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
> >   *   由于operator
> >
> state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
> > state的申明以及相关的使用地方也最好提供一下。
> >
> > [1]
> >
> https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: 戴嘉诚 <[hidden email]>
> > Sent: Thursday, July 25, 2019 19:26
> > To: user-zh <[hidden email]>
> > Subject: Re: Re: Flink checkpoint 并发问题
> >
> > hi
> >     你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
> > descriptor是使用MapStateDescriptor<String, Long>,
> > 谢谢!
> >
> > Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:
> >
> > > Hi  all
> > >
> > > 你们讨论的已经越来越偏了,出问题的是operator state
> > > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
> > >
> > > To 戴嘉诚
> > > 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
> > >
> > > 祝好
> > > 唐云
> > > ________________________________
> > > From: 戴嘉诚 <[hidden email]>
> > > Sent: Thursday, July 25, 2019 19:04
> > > To: [hidden email] <[hidden email]>
> > > Subject: Re: Re: Flink checkpoint 并发问题
> > >
> > > 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> > > 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
> > >
> > > [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
> > >
> > > > 那你用window和evictor 不可以吗?
> > > > 类似这样,因为我理解你的业务需求可以用这个来实现
> > > >
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> > > >
> > > > ------------------------------
> > > > [hidden email]
> > > >
> > > >
> > > > *发件人:* 戴嘉诚 <[hidden email]>
> > > > *发送时间:* 2019-07-25 18:45
> > > > *收件人:* user-zh <[hidden email]>
> > > > *主题:* Re: Re: Flink checkpoint 并发问题
> > > >
> > > >
> > > >
> > >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > > > 对
> > > >
> > > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> > > >
> > > > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > > > >
> > > > >
> > > > >
> > > > > [hidden email]
> > > > >
> > > > > 发件人: 戴嘉诚
> > > > > 发送时间: 2019-07-25 18:24
> > > > > 收件人: user-zh
> > > > > 主题: Re: Flink checkpoint 并发问题
> > > > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > > > >
> > > > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:20写道:
> > > > >
> > > > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > > > >
> > > > > >
> > > > > >
> > > > > > [hidden email]
> > > > > >
> > > > > > 发件人: 戴嘉诚
> > > > > > 发送时间: 2019-07-25 18:07
> > > > > > 收件人: user-zh
> > > > > > 主题: Flink checkpoint 并发问题
> > > > > > 大家好:
> > > > > >
> > > > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > > > >
> > > > > >
> > > > > > java.lang.Exception: Could not perform checkpoint 550 for
> operator
> > > > > > KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存
> > > > > > (16/20).
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > > > >
> > > > > >          at org.apache.flink.streaming.runtime.io
> > > > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > > > >
> > > > > >          at
> > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > > > >
> > > > > >          at java.lang.Thread.run(Thread.java:748)
> > > > > >
> > > > > > Caused by: java.lang.Exception: Could not complete snapshot 550
> for
> > > > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > > > 写入redis库存 (16/20).
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > > > >
> > > > > >          ... 8 more
> > > > > >
> > > > > > Caused by: java.util.ConcurrentModificationException
> > > > > >
> > > > > >          at
> > > java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > > > >
> > > > > >          at
> java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > > > >
> > > > > >          at
> java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > > > >
> > > > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > > > >
> > > > > >          at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > > > >
> > > > > >          ... 13 more
> > > > > >
> > > > >
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink checkpoint 并发问题

LakeShen
Hi 嘉诚,
我觉得是否可以将value的  Map 换成线程安全的Map,可以尝试一下。
  public void processElement(Map<String, Object> value, Context ctx,
              Collector<Map<String, Object>> out) throws Exception {}
沈磊,
祝好

戴嘉诚 <[hidden email]> 于2019年7月25日周四 下午10:40写道:

> 嗯嗯!感谢
>
> Yun Tang <[hidden email]> 于2019年7月25日周四 下午9:37写道:
>
> > Hi
> >
> > 你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed
> > state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator ->
> > Flat Map -> Sink",
> >
> 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator
> > state "_async_wait_operator_state_"相关。最近fix的
> > https://issues.apache.org/jira/browse/FLINK-13063
> > 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: 戴嘉诚 <[hidden email]>
> > Sent: Thursday, July 25, 2019 21:07
> > To: user-zh <[hidden email]>
> > Subject: Re: Re: Flink checkpoint 并发问题
> >
> > Hi 唐云
> >
> >
> >
> 这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。
> >
> > 这个其中的ProcessElement大致的代码是这样的:
> > .process(new KeyedProcessFunction<String, Map<String, Object>,
> Map<String,
> > Object>>() {
> >           private static final long serialVersionUID =
> > 5245290869789704294L;
> >
> >           private MapState<String, Long> accumulateStateMap;
> >           Map<String, Object> resultMap = new HashMap<>();
> >           private transient Long hourClear = 24L;
> >
> >
> >           @Override
> >           public void open(Configuration parameters) throws Exception {
> >             MapStateDescriptor<String, Long> accumulateState = new
> > MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
> > LongSerializer.INSTANCE);
> >             accumulateStateMap =
> > getRuntimeContext().getMapState(accumulateState);
> >           }
> >
> >           @Override
> >           public void processElement(Map<String, Object> value, Context
> > ctx,
> >               Collector<Map<String, Object>> out) throws Exception {
> >             logger.info("来数据了:{}", value);
> >             realData.increment();
> >             resultMap.clear();
> >             String valueFieldValue =
> > String.valueOf(value.get(getLastName(valueFieldName)));
> >             Long timeFieldValue =
> > Long.parseLong(String.valueOf(value.get(timeFieldName)));
> >             //写到state中
> >             //判断,state是否存在fieldValue,  如果fieldValue
> > 存在,再判断state的时间是否小于time(用于判断乱序时间
> >             if (!accumulateStateMap.contains(valueFieldValue) ||
> > accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
> >               accumulateStateMap.put(valueFieldValue, timeFieldValue);
> >             }
> >             //判断配置是否已经刷进来了
> >             if (value.containsKey("config")) {
> >               Map<String, String> config = (Map<String, String>)
> > value.get("config");
> >               Integer configCount =
> > Integer.parseInt(config.get(countFieldName));
> >               Long configTime =
> > Long.parseLong(config.get(timeRangeFieldName)) * 1000;
> >               //在配置时间范围前
> >               long lastTimeStamp = timeFieldValue - configTime;
> >               //状态里面有多少个值
> >               int stateSize = 0;
> >               //遍历state, 删除过时的时间
> >               Iterator<Entry<String, Long>> iterator =
> > accumulateStateMap.iterator();
> >               while (iterator.hasNext()) {
> >                 ++stateSize;
> >                 Entry<String, Long> next = iterator.next();
> >                 if (lastTimeStamp >= next.getValue()) {
> >                   iterator.remove();
> >                   --stateSize;
> >                 }
> >               }
> >               //state的值的数量大于阈值
> >               if (stateSize >= configCount) {
> >                 resultMap.put("id", config.get("id"));
> >                 resultMap.put("config_id", config.get("_id"));
> >                 resultMap.put("config_version", config.get("_version"));
> >                 resultMap.put("config_score", config.get("score"));
> >                 resultMap.put("config_ttl", config.get("ttl"));
> >                 resultMap.put("startTime", lastTimeStamp);
> >                 resultMap.put("endTime", timeFieldValue);
> >                 resultMap.put("key", ctx.getCurrentKey());
> >                 resultMap.put("value", valueFieldValue);
> >                 resultMap.put("count", stateSize);
> >                 out.collect(resultMap);
> >               }
> >               logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
> > stateSize);
> >               //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
> >               hourClear = (configTime * 2  + EXPIRATION_TIME) / 3600000;
> >               LocalDateTime localDateTime =
> >
> >
> Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
> >               localDateTime =
> >
> localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
> >               long timeClean =
> > localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
> >               timeTimerGauge = String.valueOf(timeClean);
> >               //注册过期时间
> >               ctx.timerService().registerEventTimeTimer(timeClean);
> >             }
> >           }
> >
> >           @Override
> >           public void onTimer(long timestamp, OnTimerContext ctx,
> >               Collector<Map<String, Object>> out) throws Exception {
> >             //减去时间,防止删除中间要累积的数据
> >             Long lasttime = timestamp - (hourClear * 3600000);
> >             //删除过期时间
> >             int stateSize = 0;
> >             int removeState = 0;
> >             Iterator<Entry<String, Long>> iterator =
> > accumulateStateMap.iterator();
> >             while (iterator.hasNext()) {
> >               ++stateSize;
> >               Entry<String, Long> next = iterator.next();
> >               if (lasttime >= next.getValue()) {
> >                 iterator.remove();
> >                 --stateSize;
> >                 ++removeState;
> >               }
> >             }
> >             if (stateSize == 0) {
> >               accumulateStateMap.clear();
> >             }
> >             //把这个定时器删除掉
> >             ctx.timerService().deleteEventTimeTimer(timestamp);
> >           }
> >         })
> >
> > Yun Tang <[hidden email]> 于2019年7月25日周四 下午8:39写道:
> >
> > > Hi 戴嘉诚
> > >
> > > 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
> > > state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor<String,
> > >
> >
> Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:
> > >
> > >   *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
> > >   *   由于operator
> > >
> >
> state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
> > > state的申明以及相关的使用地方也最好提供一下。
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88
> > >
> > > 祝好
> > > 唐云
> > > ________________________________
> > > From: 戴嘉诚 <[hidden email]>
> > > Sent: Thursday, July 25, 2019 19:26
> > > To: user-zh <[hidden email]>
> > > Subject: Re: Re: Flink checkpoint 并发问题
> > >
> > > hi
> > >     你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator
> state
> > > descriptor是使用MapStateDescriptor<String, Long>,
> > > 谢谢!
> > >
> > > Yun Tang <[hidden email]> 于2019年7月25日周四 下午7:10写道:
> > >
> > > > Hi  all
> > > >
> > > > 你们讨论的已经越来越偏了,出问题的是operator state
> > > > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
> > > >
> > > > To 戴嘉诚
> > > > 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
> > > >
> > > > 祝好
> > > > 唐云
> > > > ________________________________
> > > > From: 戴嘉诚 <[hidden email]>
> > > > Sent: Thursday, July 25, 2019 19:04
> > > > To: [hidden email] <[hidden email]>
> > > > Subject: Re: Re: Flink checkpoint 并发问题
> > > >
> > > > 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> > > > 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
> > > >
> > > > [hidden email] <[hidden email]>于2019年7月25日 周四18:50写道:
> > > >
> > > > > 那你用window和evictor 不可以吗?
> > > > > 类似这样,因为我理解你的业务需求可以用这个来实现
> > > > >
> > >
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> > > > >
> > > > > ------------------------------
> > > > > [hidden email]
> > > > >
> > > > >
> > > > > *发件人:* 戴嘉诚 <[hidden email]>
> > > > > *发送时间:* 2019-07-25 18:45
> > > > > *收件人:* user-zh <[hidden email]>
> > > > > *主题:* Re: Re: Flink checkpoint 并发问题
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > > > > 对
> > > > >
> > > > > [hidden email] <[hidden email]> 于2019年7月25日周四 下午6:40写道:
> > > > >
> > > > > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > > > > >
> > > > > >
> > > > > >
> > > > > > [hidden email]
> > > > > >
> > > > > > 发件人: 戴嘉诚
> > > > > > 发送时间: 2019-07-25 18:24
> > > > > > 收件人: user-zh
> > > > > > 主题: Re: Flink checkpoint 并发问题
> > > > > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > > > > >
> > > > > > [hidden email] <[hidden email]> 于2019年7月25日周四
> 下午6:20写道:
> > > > > >
> > > > > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > [hidden email]
> > > > > > >
> > > > > > > 发件人: 戴嘉诚
> > > > > > > 发送时间: 2019-07-25 18:07
> > > > > > > 收件人: user-zh
> > > > > > > 主题: Flink checkpoint 并发问题
> > > > > > > 大家好:
> > > > > > >
> > > > > > >     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > > > > >
> > > > > > >
> > > > > > > java.lang.Exception: Could not perform checkpoint 550 for
> > operator
> > > > > > > KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > 写入redis库存
> > > > > > > (16/20).
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > > > > >
> > > > > > >          at org.apache.flink.streaming.runtime.io
> > > > > > >
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > > > > >
> > > > > > >          at
> > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > > > > >
> > > > > > >          at java.lang.Thread.run(Thread.java:748)
> > > > > > >
> > > > > > > Caused by: java.lang.Exception: Could not complete snapshot 550
> > for
> > > > > > > operator KeyedProcess -> async wait operator -> Flat Map ->
> Sink:
> > > > > > > 写入redis库存 (16/20).
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > > > > > >
> > > > > > >          ... 8 more
> > > > > > >
> > > > > > > Caused by: java.util.ConcurrentModificationException
> > > > > > >
> > > > > > >          at
> > > > java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > > > > > >
> > > > > > >          at
> > java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > > > > > >
> > > > > > >          at
> > java.util.HashMap$EntryIterator.next(HashMap.java:1474)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > > > > >
> > > > > > >          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
> > > > > > >
> > > > > > >          at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
> > > > > > >
> > > > > > >          ... 13 more
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>