版本: 1.9
问题: 当从savepoint或者checkpoint恢复flink job时,发现部分keyedState中的数据丢失。 这里我们使用的是ListState,里面存储的是ObjectNode(Jackson DataBinding)类型的对象。 查log发现部分key的 listState.get() 返回空的iterator。 然而使用State Process API确认State的内容时, 发现上述这些key对应的数据是存在于State中的。 求问各位大佬这种情况是怎么回事?应该如何排查 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
按理说 checkpoint/savepoint 有的数据,正常恢复后是可以读取到的。 1 正常从 checkpoint/savepoint 恢复了吗? 2 获取 state 的时候,key 是同一个 key 吗? Best, Congxian Liu Rising <[hidden email]> 于2020年9月3日周四 上午9:28写道: > 版本: 1.9 > > 问题: > 当从savepoint或者checkpoint恢复flink job时,发现部分keyedState中的数据丢失。 > 这里我们使用的是ListState,里面存储的是ObjectNode(Jackson DataBinding)类型的对象。 > > 查log发现部分key的 listState.get() 返回空的iterator。 > 然而使用State Process API确认State的内容时, 发现上述这些key对应的数据是存在于State中的。 > > 求问各位大佬这种情况是怎么回事?应该如何排查 > 谢谢 > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
找到原因了。 问题在于在定义ListState时使用了transient关键字,如下。 private transient ListState<ObjectNode> state; 去掉了transient之后,问题解决。 虽然不太清粗为何transient会造成这种情况。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
我觉得这个不是root cause,实际上 transient ListState 是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。 麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。 祝好 唐云 ________________________________ From: Liu Rising <[hidden email]> Sent: Thursday, September 3, 2020 12:26 To: [hidden email] <[hidden email]> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 找到原因了。 问题在于在定义ListState时使用了transient关键字,如下。 private transient ListState<ObjectNode> state; 去掉了transient之后,问题解决。 虽然不太清粗为何transient会造成这种情况。 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi 唐云
以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, ObjectNode>, Tuple2<String, JsonNode>> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState<ObjectNode> unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List<ObjectNode> unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi
首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] 可以排查的思路 1. 你的state是否开启了TTL呢 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 [1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 祝好 唐云 ________________________________ From: Liu Rising <[hidden email]> Sent: Sunday, September 6, 2020 17:45 To: [hidden email] <[hidden email]> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 唐云 以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, ObjectNode>, Tuple2<String, JsonNode>> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState<ObjectNode> unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List<ObjectNode> unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Transient 都不参与序列化了,怎么可能从checkopont里恢复?
-----邮件原件----- 发件人: Yun Tang [mailto:[hidden email]] 发送时间: 2020年9月7日 星期一 12:50 收件人: [hidden email] 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] 可以排查的思路 1. 你的state是否开启了TTL呢 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 [1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 祝好 唐云 ________________________________ From: Liu Rising <[hidden email]> Sent: Sunday, September 6, 2020 17:45 To: [hidden email] <[hidden email]> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 唐云 以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, ObjectNode>, Tuple2<String, JsonNode>> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState<ObjectNode> unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List<ObjectNode> unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。
按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。 至于不加上transient是否可能产生其他影响,就不太清楚了。 范超 <[hidden email]> 于2020年9月10日周四 上午9:35写道: > Transient 都不参与序列化了,怎么可能从checkopont里恢复? > > -----邮件原件----- > 发件人: Yun Tang [mailto:[hidden email]] > 发送时间: 2020年9月7日 星期一 12:50 > 收件人: [hidden email] > 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 > > Hi > > 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] > > 可以排查的思路 > > 1. 你的state是否开启了TTL呢 > 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 > 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 > > [1] > https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 > > 祝好 > 唐云 > ________________________________ > From: Liu Rising <[hidden email]> > Sent: Sunday, September 6, 2020 17:45 > To: [hidden email] <[hidden email]> > Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 > > Hi 唐云 > > 以下是state定义以及初始化的code > > public class FlinkKeyedProcessFunction extends > KeyedProcessFunction<String, Tuple2<String, ObjectNode>, Tuple2<String, > JsonNode>> { > > private static final Logger LOG = > LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); > > ... > > private final ParameterTool params; > private transient ListState<ObjectNode> unmatchedProbesState; > > ... > > FlinkKeyedProcessFunction(ParameterTool params) { > this.params = params; > } > > @Override > public void open(Configuration parameters) { > > ListStateDescriptor<ObjectNode> descriptor = new > ListStateDescriptor<>( > "unmatchedProbes", TypeInformation.of(ObjectNode.class) > ); > unmatchedProbesState = > getRuntimeContext().getListState(descriptor); > > 以下是往state里add内容的部分 > ... > > List<ObjectNode> unmatchedProbes = > mapMatching.getUnMatchedProbes(id); > unmatchedProbesState.clear(); > > if (unmatchedProbes.size() > 0) { > try { > unmatchedProbesState.addAll(unmatchedProbes); > } catch (Exception e) { > LOG.warn("Continue processing although failed to add > unmatchedProbes to ListState. ID: " + id, e); > } > } > > ... > > 以下是从state读取的code > > for (ObjectNode unmatchedProbe : > unmatchedProbesState.get()) { > LOG.info("Processing unmatched probe: " + > unmatchedProbe); > matchedValues.addAll(mapMatching.matchLocation(id, > unmatchedProbe)); > } > > > 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) > 去掉定义state那里的transient之后,上述问题不再出现。 > > 谢谢。 > Rising > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |