Flink savepoint迁移问题

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink savepoint迁移问题

赵 建云
社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
        new ListStateDescriptor<>(
                OFFSETS_STATE_NAME,
                TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
                })));
oldUnionSubscriptionNameStates =
        stateStore.getUnionListState(
                new ListStateDescriptor<>(
                        OFFSETS_STATE_NAME + "_subName",
                        TypeInformation.of(new TypeHint<String>() {
                        })));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
    at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
    at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)
    at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)
    at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11
Reply | Threaded
Open this post in threaded view
|

Re: Flink savepoint迁移问题

Kezhu Wang
新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+        unionOffsetStates = stateStore.getUnionListState(
+                new ListStateDescriptor<>(
+                        OFFSETS_STATE_NAME,
+                        TypeInformation.of(new TypeHint<Tuple3<String,
MessageId, String>>() {
+                        })));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

                 new ListStateDescriptor<>(
                         OFFSETS_STATE_NAME,
-                        TypeInformation.of(new TypeHint<Tuple3<String,
MessageId, String>>() {
+                        TypeInformation.of(new TypeHint<Tuple3<TopicRange,
MessageId, String>>() {
                         })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 ([hidden email]) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint<String>() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)

at com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)

at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)

at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)

... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11
Reply | Threaded
Open this post in threaded view
|

Re: Flink savepoint迁移问题

赵 建云
现在是我在维护pulsar-flink connector,是存在不兼容的升级。还是个很坑的改动。我现在尝试旧的迁移新的字段上方法,会报这个错误。我对1.11支持的代码进行修改,将state的数据结构改成旧版本的形式,同样也是这个错误。你说的StatefulSinkWriterOperator我研究下怎么使用。

2021年3月11日 上午11:36,Kezhu Wang <[hidden email]<mailto:[hidden email]>> 写道:

StatefulSinkWriterOperator

Reply | Threaded
Open this post in threaded view
|

Re: Flink savepoint迁移问题

赵 建云
In reply to this post by Kezhu Wang
你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang <[hidden email]<mailto:[hidden email]>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+        unionOffsetStates = stateStore.getUnionListState(
+                new ListStateDescriptor<>(
+                        OFFSETS_STATE_NAME,
+                        TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() {
+                        })));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

                 new ListStateDescriptor<>(
                         OFFSETS_STATE_NAME,
-                        TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() {
+                        TypeInformation.of(new TypeHint<Tuple3<TopicRange, MessageId, String>>() {
                         })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 ([hidden email]<mailto:[hidden email]>) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint<String>() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0><http://java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D&reserved=0>>.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=GOPIGWBHoGVAMJG0viu7heBsBPl%2BfUi5cwBwZcFe8zo%3D&reserved=0><http://com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>>.Input.readVarInt(Input.java:355)
at com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0><http://com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>>.Input.readInt(Input.java:350)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11

Reply | Threaded
Open this post in threaded view
|

Re: Flink savepoint迁移问题

Kezhu Wang
> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 ([hidden email]) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang <[hidden email]> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+        unionOffsetStates = stateStore.getUnionListState(
+                new ListStateDescriptor<>(
+                        OFFSETS_STATE_NAME,
+                        TypeInformation.of(new TypeHint<Tuple3<String,
MessageId, String>>() {
+                        })));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

                 new ListStateDescriptor<>(
                         OFFSETS_STATE_NAME,
-                        TypeInformation.of(new TypeHint<Tuple3<String,
MessageId, String>>() {
+                        TypeInformation.of(new TypeHint<Tuple3<TopicRange,
MessageId, String>>() {
                         })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 ([hidden email]) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint<String>() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0>
<http://java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D&reserved=0>>.EOFException:
No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=GOPIGWBHoGVAMJG0viu7heBsBPl%2BfUi5cwBwZcFe8zo%3D&reserved=0>
<http://com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>
>.Input.readVarInt(Input.java:355)
at com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>
<http://com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>
>.Input.readInt(Input.java:350)
at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)

at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)

... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11
Reply | Threaded
Open this post in threaded view
|

回复: Flink savepoint迁移问题

allanqinjy
建云,
    之前我也遇到了savepoint 起作业失败的问题,是我们升级pulsar客户端以后,从2.2升级到2.5.2,我-s 启动作业的时候。因为作业也不是很重要,当时手头有其他任务,我就没有关注这个问题。你看看pulsar source那儿是不是做了什么。


| |
allanqinjy
|
|
[hidden email]
|
签名由网易邮箱大师定制


在2021年03月11日 22:43,Kezhu Wang<[hidden email]> 写道:
有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar
的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用
`MessageId.toByteArray`。


On March 11, 2021 at 20:26:15, 赵 建云 ([hidden email]) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的
initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang <[hidden email]> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+        unionOffsetStates = stateStore.getUnionListState(
+                new ListStateDescriptor<>(
+                        OFFSETS_STATE_NAME,
+                        TypeInformation.of(new TypeHint<Tuple3<String,
MessageId, String>>() {
+                        })));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
-                        TypeInformation.of(new TypeHint<Tuple3<String,
MessageId, String>>() {
+                        TypeInformation.of(new TypeHint<Tuple3<TopicRange,
MessageId, String>>() {
})));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator
的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 ([hidden email]) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint<String>() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)

... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)

at
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)

at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more
Caused by: java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0>
<http://java.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D&reserved=0>>.EOFException:
No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)

at com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=GOPIGWBHoGVAMJG0viu7heBsBPl%2BfUi5cwBwZcFe8zo%3D&reserved=0>
<http://com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>
.Input.readVarInt(Input.java:355)
at com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>
<http://com.esotericsoftware.kryo.io
<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0>
.Input.readInt(Input.java:350)
at
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)

at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)

at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)

at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)

at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)

... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11
Reply | Threaded
Open this post in threaded view
|

Re: Flink savepoint迁移问题

赵 建云
In reply to this post by Kezhu Wang
确认了,pulsar的MessageId的实现类内部的增加了字段,导致flink在反序列化时失败了。具体的issue:https://github.com/streamnative/pulsar-flink/issues/256。
我会给flink 1.9的pulsar连接器升级下checkpoint,让MessageId的序列化使用基于 `MessageId.toByteArray`的序列化器。
非常感谢您的帮助~。

Jianyun8023
2021-03-12

2021年3月11日 下午10:43,Kezhu Wang <[hidden email]<mailto:[hidden email]>> 写道:

> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。

确实是这样的,checkpoint 把 serializer 也 snapshot 了。

重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。



On March 11, 2021 at 20:26:15, 赵 建云 ([hidden email]<mailto:[hidden email]>) wrote:

你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗?

感谢~


2021年3月11日 上午11:36,Kezhu Wang <[hidden email]<mailto:[hidden email]>> 写道:

新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。

+        unionOffsetStates = stateStore.getUnionListState(
+                new ListStateDescriptor<>(
+                        OFFSETS_STATE_NAME,
+                        TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() {
+                        })));

解决方法 :?
1. 尝试通过 state-processor-api 重写下 state ?
2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗?

感觉后面还有不兼容的更新

                 new ListStateDescriptor<>(
                         OFFSETS_STATE_NAME,
-                        TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() {
+                        TypeInformation.of(new TypeHint<Tuple3<TopicRange, MessageId, String>>() {
                         })));


不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。

可以等 streamnative 的人确认下。

On March 11, 2021 at 10:43:53, 赵 建云 ([hidden email]<mailto:[hidden email]>) wrote:

社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
})));
oldUnionSubscriptionNameStates =
stateStore.getUnionListState(
new ListStateDescriptor<>(
OFFSETS_STATE_NAME + "_subName",
TypeInformation.of(new TypeHint<String>() {
})));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12f8%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510706268056450%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=lEhX5Qf46h6hvycK46APfWvpHK3%2B0ZzBM1kGH%2B0c%2BJw%3D&reserved=0><http://java.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12f8%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510706268066404%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=xJydto6%2BVHzJZZ8etqIyCjCZ5iU5R%2FqNqeFE8qkbO8M%3D&reserved=0>>.EOFException: No more bytes left.
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
at com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12f8%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510706268066404%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=XRkK0AxnV6%2FUa97p3CF6DAbgvQsr4mt8dc0IaVZv9as%3D&reserved=0><http://com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12f8%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510706268076362%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=eooii7AXSuQ672l9ypbuYEf9SqvRNaAH9SVXwg2cNAc%3D&reserved=0>>.Input.readVarInt(Input.java:355)
at com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12f8%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510706268076362%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=eooii7AXSuQ672l9ypbuYEf9SqvRNaAH9SVXwg2cNAc%3D&reserved=0><http://com.esotericsoftware.kryo.io<https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7Ccc0a40e3ae954472c75b08d8e49c12f8%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510706268076362%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=eooii7AXSuQ672l9ypbuYEf9SqvRNaAH9SVXwg2cNAc%3D&reserved=0>>.Input.readInt(Input.java:350)
at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11