关于从savepoint启动作业报错 migration for MapState currently isn't supported.

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

关于从savepoint启动作业报错 migration for MapState currently isn't supported.

claylin
从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) at com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) ... 9 more
Reply | Threaded
Open this post in threaded view
|

Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

Congxian Qiu
Hi
看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema
evolution[1]

[1] https://issues.apache.org/jira/browse/FLINK-11947
Best,
Congxian


claylin <[hidden email]> 于2019年11月14日周四 下午9:35写道:

> 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
> java.lang.RuntimeException: Error while getting state   at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
>  at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
>      at
> com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
>    at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>        at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
>    at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>  at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.flink.util.StateMigrationException: The new serializer for a
> MapState requires state migration in order for the job to proceed. However,
> migration for MapState currently isn't supported.       at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
>    at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
>  at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
>    at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
>  at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
>    at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>      at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
>  at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
>    at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>  at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>  ... 9 more
Reply | Threaded
Open this post in threaded view
|

回复: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

claylin
谢谢,我这边确实修改了状态的schema,现在试下看下




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Congxian Qiu"<[hidden email]&gt;;
发送时间:&nbsp;2019年11月15日(星期五) 上午10:07
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.



Hi
看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema
evolution[1]

[1] https://issues.apache.org/jira/browse/FLINK-11947
Best,
Congxian


claylin <[hidden email]&gt; 于2019年11月14日周四 下午9:35写道:

&gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
&gt; java.lang.RuntimeException: Error while getting state&nbsp;&nbsp; at
&gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
&gt;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
&gt; at
&gt; org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
&gt;&nbsp;&nbsp; at
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
&gt;&nbsp; at java.lang.Thread.run(Thread.java:748) Caused by:
&gt; org.apache.flink.util.StateMigrationException: The new serializer for a
&gt; MapState requires state migration in order for the job to proceed. However,
&gt; migration for MapState currently isn't supported.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
&gt;&nbsp; at
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
&gt;&nbsp; at
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
&gt; at
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
&gt;&nbsp; at
&gt; org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
&gt;&nbsp;&nbsp;&nbsp; at
&gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
&gt;&nbsp; at
&gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
&gt;&nbsp; ... 9 more
Reply | Threaded
Open this post in threaded view
|

Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

shuwen zhou
成功了吗?
我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible 使用的版本的是fink 1.9.0
具体操作是:
trigger savepoint后从savepoint读取就是这个错误
使用的是MapState[String,Void] 在scala代码


... 25 more
 Caused by: java.io.IOException: Failed to open user defined function
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243)
at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206)
... 6 more
 Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)

On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]> wrote:

> 谢谢,我这边确实修改了状态的schema,现在试下看下
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Congxian Qiu"<[hidden email]&gt;;
> 发送时间:&nbsp;2019年11月15日(星期五) 上午10:07
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't
> supported.
>
>
>
> Hi
> 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema
> evolution[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-11947
> Best,
> Congxian
>
>
> claylin <[hidden email]&gt; 于2019年11月14日周四 下午9:35写道:
>
> &gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
> &gt; java.lang.RuntimeException: Error while getting state&nbsp;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> &gt;&nbsp; at
> &gt;
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> &gt; at
> &gt;
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> &gt;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> &gt;&nbsp; at java.lang.Thread.run(Thread.java:748) Caused by:
> &gt; org.apache.flink.util.StateMigrationException: The new serializer for
> a
> &gt; MapState requires state migration in order for the job to proceed.
> However,
> &gt; migration for MapState currently isn't
> supported.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
> &gt;&nbsp; at
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> &gt;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> &gt; at
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
> &gt;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
> &gt;&nbsp;&nbsp;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> &gt;&nbsp; at
> &gt;
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> &gt;&nbsp; ... 9 more



--
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
Reply | Threaded
Open this post in threaded view
|

回复: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

claylin
我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误
&nbsp;Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
是不是改了序列化类,和之前的不一样导致不兼容问题


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"shuwen zhou"<[hidden email]&gt;;
发送时间:&nbsp;2019年11月20日(星期三) 中午11:18
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.



成功了吗?
我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible 使用的版本的是fink 1.9.0
具体操作是:
trigger savepoint后从savepoint读取就是这个错误
使用的是MapState[String,Void] 在scala代码


... 25 more
&nbsp;Caused by: java.io.IOException: Failed to open user defined function
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
&nbsp;Caused by: java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243)
at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206)
... 6 more
&nbsp;Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)

On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]&gt; wrote:

&gt; 谢谢,我这边确实修改了状态的schema,现在试下看下
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Congxian Qiu"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2019年11月15日(星期五) 上午10:07
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't
&gt; supported.
&gt;
&gt;
&gt;
&gt; Hi
&gt; 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema
&gt; evolution[1]
&gt;
&gt; [1] https://issues.apache.org/jira/browse/FLINK-11947
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; claylin <[hidden email]&amp;gt; 于2019年11月14日周四 下午9:35写道:
&gt;
&gt; &amp;gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
&gt; &amp;gt; java.lang.RuntimeException: Error while getting state&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
&gt; &amp;gt;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
&gt; &amp;gt;&amp;nbsp; at java.lang.Thread.run(Thread.java:748) Caused by:
&gt; &amp;gt; org.apache.flink.util.StateMigrationException: The new serializer for
&gt; a
&gt; &amp;gt; MapState requires state migration in order for the job to proceed.
&gt; However,
&gt; &amp;gt; migration for MapState currently isn't
&gt; supported.&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
&gt; &amp;gt;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
&gt; &amp;gt;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
&gt; &amp;gt;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
&gt; &amp;gt;&amp;nbsp; at
&gt; &amp;gt;
&gt; org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
&gt; &amp;gt;&amp;nbsp; ... 9 more



--
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/&gt;
Reply | Threaded
Open this post in threaded view
|

Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

shuwen zhou
是在哪里指定序列化类的呢?我没有显示的指定序列化方法,前后用的版本都是flink 1.9.0,代码也是一样的

On Wed, 20 Nov 2019 at 11:25, claylin <[hidden email]> wrote:

> 我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误
> &nbsp;Caused by: org.apache.flink.util.StateMigrationException: The new
> state
> serializer cannot be incompatible.
> 是不是改了序列化类,和之前的不一样导致不兼容问题
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"shuwen zhou"<[hidden email]&gt;;
> 发送时间:&nbsp;2019年11月20日(星期三) 中午11:18
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't
> supported.
>
>
>
> 成功了吗?
> 我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible 使用的版本的是fink 1.9.0
> 具体操作是:
> trigger savepoint后从savepoint读取就是这个错误
> 使用的是MapState[String,Void] 在scala代码
>
>
> ... 25 more
> &nbsp;Caused by: java.io.IOException: Failed to open user defined function
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210)
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185)
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
> at
>
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> &nbsp;Caused by: java.lang.RuntimeException: Error while getting state
> at
>
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> at
>
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243)
> at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49)
> at
>
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
>
> org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206)
> ... 6 more
> &nbsp;Caused by: org.apache.flink.util.StateMigrationException: The new
> state
> serializer cannot be incompatible.
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> at
>
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at
>
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at
>
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at
>
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at
>
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
>
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>
> On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]&gt; wrote:
>
> &gt; 谢谢,我这边确实修改了状态的schema,现在试下看下
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Congxian Qiu"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2019年11月15日(星期五) 上午10:07
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently
> isn't
> &gt; supported.
> &gt;
> &gt;
> &gt;
> &gt; Hi
> &gt; 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value
> schema
> &gt; evolution[1]
> &gt;
> &gt; [1] https://issues.apache.org/jira/browse/FLINK-11947
> &gt; Best,
> &gt; Congxian
> &gt;
> &gt;
> &gt; claylin <[hidden email]&amp;gt; 于2019年11月14日周四 下午9:35写道:
> &gt;
> &gt; &amp;gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
> &gt; &amp;gt; java.lang.RuntimeException: Error while getting
> state&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> &gt; &amp;gt;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> &gt;
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> &gt; &amp;gt; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> &gt; &amp;gt;&amp;nbsp; at java.lang.Thread.run(Thread.java:748) Caused by:
> &gt; &amp;gt; org.apache.flink.util.StateMigrationException: The new
> serializer for
> &gt; a
> &gt; &amp;gt; MapState requires state migration in order for the job to
> proceed.
> &gt; However,
> &gt; &amp;gt; migration for MapState currently isn't
> &gt;
> supported.&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
> &gt; &amp;gt;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> &gt; &amp;gt;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> &gt; &amp;gt; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
> &gt; &amp;gt;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
> &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> &gt; &amp;gt;&amp;nbsp; at
> &gt; &amp;gt;
> &gt;
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> &gt; &amp;gt;&amp;nbsp; ... 9 more
>
>
>
> --
> Best Wishes,
> Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/&gt;



--
Best Wishes,
Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
Reply | Threaded
Open this post in threaded view
|

Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported.

Congxian Qiu
Hi  shuwen

你出现这个问题前后有修改代码吗?如果没有修改代码的话,能否提供一个可以复现的 demo 呢~

Best,
Congxian


shuwen zhou <[hidden email]> 于2019年11月20日周三 下午5:20写道:

> 是在哪里指定序列化类的呢?我没有显示的指定序列化方法,前后用的版本都是flink 1.9.0,代码也是一样的
>
> On Wed, 20 Nov 2019 at 11:25, claylin <[hidden email]> wrote:
>
> > 我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误
> > &nbsp;Caused by: org.apache.flink.util.StateMigrationException: The new
> > state
> > serializer cannot be incompatible.
> > 是不是改了序列化类,和之前的不一样导致不兼容问题
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;"shuwen zhou"<[hidden email]&gt;;
> > 发送时间:&nbsp;2019年11月20日(星期三) 中午11:18
> > 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> > 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't
> > supported.
> >
> >
> >
> > 成功了吗?
> > 我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new
> state
> > serializer cannot be incompatible 使用的版本的是fink 1.9.0
> > 具体操作是:
> > trigger savepoint后从savepoint读取就是这个错误
> > 使用的是MapState[String,Void] 在scala代码
> >
> >
> > ... 25 more
> > &nbsp;Caused by: java.io.IOException: Failed to open user defined
> function
> > at
> >
> >
> org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210)
> > at
> >
> >
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185)
> > at
> >
> >
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79)
> > at
> >
> >
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > &nbsp;Caused by: java.lang.RuntimeException: Error while getting state
> > at
> >
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> > at
> >
> >
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243)
> > at
> tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49)
> > at
> >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > at
> >
> >
> org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206)
> > ... 6 more
> > &nbsp;Caused by: org.apache.flink.util.StateMigrationException: The new
> > state
> > serializer cannot be incompatible.
> > at
> >
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> > at
> >
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> > at
> >
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> > at
> >
> >
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> > at
> >
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> > at
> >
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> > at
> >
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> > at
> >
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> > at
> >
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> >
> > On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]&gt; wrote:
> >
> > &gt; 谢谢,我这边确实修改了状态的schema,现在试下看下
> > &gt;
> > &gt;
> > &gt;
> > &gt;
> > &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> > &gt; 发件人:&amp;nbsp;"Congxian Qiu"<[hidden email]&amp;gt;;
> > &gt; 发送时间:&amp;nbsp;2019年11月15日(星期五) 上午10:07
> > &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> > &gt;
> > &gt; 主题:&amp;nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently
> > isn't
> > &gt; supported.
> > &gt;
> > &gt;
> > &gt;
> > &gt; Hi
> > &gt; 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value
> > schema
> > &gt; evolution[1]
> > &gt;
> > &gt; [1] https://issues.apache.org/jira/browse/FLINK-11947
> > &gt; Best,
> > &gt; Congxian
> > &gt;
> > &gt;
> > &gt; claylin <[hidden email]&amp;gt; 于2019年11月14日周四 下午9:35写道:
> > &gt;
> > &gt; &amp;gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
> > &gt; &amp;gt; java.lang.RuntimeException: Error while getting
> > state&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> > &gt; &amp;gt;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > &gt;
> >
> &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> > at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> > &gt; &amp;gt; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > &gt; &amp;gt;&amp;nbsp; at java.lang.Thread.run(Thread.java:748) Caused
> by:
> > &gt; &amp;gt; org.apache.flink.util.StateMigrationException: The new
> > serializer for
> > &gt; a
> > &gt; &amp;gt; MapState requires state migration in order for the job to
> > proceed.
> > &gt; However,
> > &gt; &amp;gt; migration for MapState currently isn't
> > &gt;
> > supported.&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
> > &gt; &amp;gt;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> > &gt; &amp;gt;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> > &gt; &amp;gt; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
> > &gt; &amp;gt;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
> > &gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> > &gt; &amp;gt;&amp;nbsp; at
> > &gt; &amp;gt;
> > &gt;
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> > &gt; &amp;gt;&amp;nbsp; ... 9 more
> >
> >
> >
> > --
> > Best Wishes,
> > Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/&gt;
>
>
>
> --
> Best Wishes,
> Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/>
>