从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Liu Rising
版本: 1.9

问题:
当从savepoint或者checkpoint恢复flink job时,发现部分keyedState中的数据丢失。
这里我们使用的是ListState,里面存储的是ObjectNode(Jackson DataBinding)类型的对象。

查log发现部分key的 listState.get() 返回空的iterator。
然而使用State Process API确认State的内容时, 发现上述这些key对应的数据是存在于State中的。

求问各位大佬这种情况是怎么回事?应该如何排查
谢谢






--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Congxian Qiu
Hi
   按理说 checkpoint/savepoint 有的数据,正常恢复后是可以读取到的。
   1 正常从 checkpoint/savepoint 恢复了吗?
   2 获取 state 的时候,key 是同一个 key 吗?
Best,
Congxian


Liu Rising <[hidden email]> 于2020年9月3日周四 上午9:28写道:

> 版本: 1.9
>
> 问题:
> 当从savepoint或者checkpoint恢复flink job时,发现部分keyedState中的数据丢失。
> 这里我们使用的是ListState,里面存储的是ObjectNode(Jackson DataBinding)类型的对象。
>
> 查log发现部分key的 listState.get() 返回空的iterator。
> 然而使用State Process API确认State的内容时, 发现上述这些key对应的数据是存在于State中的。
>
> 求问各位大佬这种情况是怎么回事?应该如何排查
> 谢谢
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Liu Rising
Hi

找到原因了。

问题在于在定义ListState时使用了transient关键字,如下。
 private transient ListState<ObjectNode> state;

去掉了transient之后,问题解决。
虽然不太清粗为何transient会造成这种情况。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Yun Tang
Hi

我觉得这个不是root cause,实际上 transient ListState 是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。

麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。

祝好
唐云
________________________________
From: Liu Rising <[hidden email]>
Sent: Thursday, September 3, 2020 12:26
To: [hidden email] <[hidden email]>
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi

找到原因了。

问题在于在定义ListState时使用了transient关键字,如下。
 private transient ListState<ObjectNode> state;

去掉了transient之后,问题解决。
虽然不太清粗为何transient会造成这种情况。



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Liu Rising
Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String,
Tuple2&lt;String, ObjectNode>, Tuple2<String, JsonNode>> {

    private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

    private final ParameterTool params;
    private transient ListState<ObjectNode> unmatchedProbesState;

    ...

    FlinkKeyedProcessFunction(ParameterTool params) {
        this.params = params;
    }

    @Override
    public void open(Configuration parameters) {

        ListStateDescriptor<ObjectNode> descriptor = new
ListStateDescriptor<>(
                "unmatchedProbes", TypeInformation.of(ObjectNode.class)
        );
        unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

        List<ObjectNode> unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
        unmatchedProbesState.clear();

        if (unmatchedProbes.size() > 0) {
            try {
                unmatchedProbesState.addAll(unmatchedProbes);
            } catch (Exception e) {
                LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
            }
        }

       ...

以下是从state读取的code

                    for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
                        LOG.info("Processing unmatched probe: " +
unmatchedProbe);
                        matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
                    }


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Yun Tang
Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云
________________________________
From: Liu Rising <[hidden email]>
Sent: Sunday, September 6, 2020 17:45
To: [hidden email] <[hidden email]>
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String,
Tuple2&lt;String, ObjectNode>, Tuple2<String, JsonNode>> {

    private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

    private final ParameterTool params;
    private transient ListState<ObjectNode> unmatchedProbesState;

    ...

    FlinkKeyedProcessFunction(ParameterTool params) {
        this.params = params;
    }

    @Override
    public void open(Configuration parameters) {

        ListStateDescriptor<ObjectNode> descriptor = new
ListStateDescriptor<>(
                "unmatchedProbes", TypeInformation.of(ObjectNode.class)
        );
        unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

        List<ObjectNode> unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
        unmatchedProbesState.clear();

        if (unmatchedProbes.size() > 0) {
            try {
                unmatchedProbesState.addAll(unmatchedProbes);
            } catch (Exception e) {
                LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
            }
        }

       ...

以下是从state读取的code

                    for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
                        LOG.info("Processing unmatched probe: " +
unmatchedProbe);
                        matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
                    }


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

答复: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

范超
Transient 都不参与序列化了,怎么可能从checkopont里恢复?

-----邮件原件-----
发件人: Yun Tang [mailto:[hidden email]]
发送时间: 2020年9月7日 星期一 12:50
收件人: [hidden email]
主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云
________________________________
From: Liu Rising <[hidden email]>
Sent: Sunday, September 6, 2020 17:45
To: [hidden email] <[hidden email]>
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2&lt;String, ObjectNode>, Tuple2<String, JsonNode>> {

    private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

    private final ParameterTool params;
    private transient ListState<ObjectNode> unmatchedProbesState;

    ...

    FlinkKeyedProcessFunction(ParameterTool params) {
        this.params = params;
    }

    @Override
    public void open(Configuration parameters) {

        ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<>(
                "unmatchedProbes", TypeInformation.of(ObjectNode.class)
        );
        unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

        List<ObjectNode> unmatchedProbes = mapMatching.getUnMatchedProbes(id);
        unmatchedProbesState.clear();

        if (unmatchedProbes.size() > 0) {
            try {
                unmatchedProbesState.addAll(unmatchedProbes);
            } catch (Exception e) {
                LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e);
            }
        }

       ...

以下是从state读取的code

                    for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
                        LOG.info("Processing unmatched probe: " + unmatchedProbe);
                        matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
                    }


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

shizk233
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。

按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。
至于不加上transient是否可能产生其他影响,就不太清楚了。

范超 <[hidden email]> 于2020年9月10日周四 上午9:35写道:

> Transient 都不参与序列化了,怎么可能从checkopont里恢复?
>
> -----邮件原件-----
> 发件人: Yun Tang [mailto:[hidden email]]
> 发送时间: 2020年9月7日 星期一 12:50
> 收件人: [hidden email]
> 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
>
> Hi
>
> 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]
>
> 可以排查的思路
>
>   1.  你的state是否开启了TTL呢
>   2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
>   3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么
>
> [1]
> https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158
>
> 祝好
> 唐云
> ________________________________
> From: Liu Rising <[hidden email]>
> Sent: Sunday, September 6, 2020 17:45
> To: [hidden email] <[hidden email]>
> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
>
> Hi 唐云
>
> 以下是state定义以及初始化的code
>
> public class FlinkKeyedProcessFunction extends
> KeyedProcessFunction<String, Tuple2&lt;String, ObjectNode>, Tuple2<String,
> JsonNode>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);
>
> ...
>
>     private final ParameterTool params;
>     private transient ListState<ObjectNode> unmatchedProbesState;
>
>     ...
>
>     FlinkKeyedProcessFunction(ParameterTool params) {
>         this.params = params;
>     }
>
>     @Override
>     public void open(Configuration parameters) {
>
>         ListStateDescriptor<ObjectNode> descriptor = new
> ListStateDescriptor<>(
>                 "unmatchedProbes", TypeInformation.of(ObjectNode.class)
>         );
>         unmatchedProbesState =
> getRuntimeContext().getListState(descriptor);
>
> 以下是往state里add内容的部分
> ...
>
>         List<ObjectNode> unmatchedProbes =
> mapMatching.getUnMatchedProbes(id);
>         unmatchedProbesState.clear();
>
>         if (unmatchedProbes.size() > 0) {
>             try {
>                 unmatchedProbesState.addAll(unmatchedProbes);
>             } catch (Exception e) {
>                 LOG.warn("Continue processing although failed to add
> unmatchedProbes to ListState. ID: " + id, e);
>             }
>         }
>
>        ...
>
> 以下是从state读取的code
>
>                     for (ObjectNode unmatchedProbe :
> unmatchedProbesState.get()) {
>                         LOG.info("Processing unmatched probe: " +
> unmatchedProbe);
>                         matchedValues.addAll(mapMatching.matchLocation(id,
> unmatchedProbe));
>                     }
>
>
> 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
> 去掉定义state那里的transient之后,上述问题不再出现。
>
> 谢谢。
> Rising
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>