从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案
java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) at com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) ... 9 more |
Hi
看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema evolution[1] [1] https://issues.apache.org/jira/browse/FLINK-11947 Best, Congxian claylin <[hidden email]> 于2019年11月14日周四 下午9:35写道: > 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > java.lang.RuntimeException: Error while getting state at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > at > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.flink.util.StateMigrationException: The new serializer for a > MapState requires state migration in order for the job to proceed. However, > migration for MapState currently isn't supported. at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > ... 9 more |
谢谢,我这边确实修改了状态的schema,现在试下看下
------------------ 原始邮件 ------------------ 发件人: "Congxian Qiu"<[hidden email]>; 发送时间: 2019年11月15日(星期五) 上午10:07 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported. Hi 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema evolution[1] [1] https://issues.apache.org/jira/browse/FLINK-11947 Best, Congxian claylin <[hidden email]> 于2019年11月14日周四 下午9:35写道: > 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > java.lang.RuntimeException: Error while getting state at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > at > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.flink.util.StateMigrationException: The new serializer for a > MapState requires state migration in order for the job to proceed. However, > migration for MapState currently isn't supported. at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > ... 9 more |
成功了吗?
我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible 使用的版本的是fink 1.9.0 具体操作是: trigger savepoint后从savepoint读取就是这个错误 使用的是MapState[String,Void] 在scala代码 ... 25 more Caused by: java.io.IOException: Failed to open user defined function at org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243) at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206) ... 6 more Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]> wrote: > 谢谢,我这边确实修改了状态的schema,现在试下看下 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Congxian Qiu"<[hidden email]>; > 发送时间: 2019年11月15日(星期五) 上午10:07 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于从savepoint启动作业报错 migration for MapState currently isn't > supported. > > > > Hi > 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema > evolution[1] > > [1] https://issues.apache.org/jira/browse/FLINK-11947 > Best, > Congxian > > > claylin <[hidden email]> 于2019年11月14日周四 下午9:35写道: > > > 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > > java.lang.RuntimeException: Error while getting state at > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > > at > > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > > at > > > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > > at > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) Caused by: > > org.apache.flink.util.StateMigrationException: The new serializer for > a > > MapState requires state migration in order for the job to proceed. > However, > > migration for MapState currently isn't > supported. at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > > at > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > > at > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > > at > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > > at > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > > at > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > > ... 9 more -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> |
我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. 是不是改了序列化类,和之前的不一样导致不兼容问题 ------------------ 原始邮件 ------------------ 发件人: "shuwen zhou"<[hidden email]>; 发送时间: 2019年11月20日(星期三) 中午11:18 收件人: "user-zh"<[hidden email]>; 主题: Re: 关于从savepoint启动作业报错 migration for MapState currently isn't supported. 成功了吗? 我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible 使用的版本的是fink 1.9.0 具体操作是: trigger savepoint后从savepoint读取就是这个错误 使用的是MapState[String,Void] 在scala代码 ... 25 more Caused by: java.io.IOException: Failed to open user defined function at org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) at org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243) at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206) ... 6 more Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]> wrote: > 谢谢,我这边确实修改了状态的schema,现在试下看下 > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Congxian Qiu"<[hidden email]&gt;; > 发送时间:&nbsp;2019年11月15日(星期五) 上午10:07 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently isn't > supported. > > > > Hi > 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value schema > evolution[1] > > [1] https://issues.apache.org/jira/browse/FLINK-11947 > Best, > Congxian > > > claylin <[hidden email]&gt; 于2019年11月14日周四 下午9:35写道: > > &gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > &gt; java.lang.RuntimeException: Error while getting state&nbsp;&nbsp; at > &gt; > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > &gt;&nbsp; at > &gt; > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > &gt; > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > &gt;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > &gt; at > &gt; > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > &gt;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > &gt;&nbsp;&nbsp; at > &gt; > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > &gt;&nbsp; at java.lang.Thread.run(Thread.java:748) Caused by: > &gt; org.apache.flink.util.StateMigrationException: The new serializer for > a > &gt; MapState requires state migration in order for the job to proceed. > However, > &gt; migration for MapState currently isn't > supported.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > &gt;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > &gt;&nbsp; at > &gt; > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > &gt;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > &gt;&nbsp; at > &gt; > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > &gt; at > &gt; > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > &gt;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > &gt;&nbsp; at > &gt; > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > &gt;&nbsp;&nbsp;&nbsp; at > &gt; > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > &gt;&nbsp; at > &gt; > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > &gt;&nbsp; ... 9 more -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> |
是在哪里指定序列化类的呢?我没有显示的指定序列化方法,前后用的版本都是flink 1.9.0,代码也是一样的
On Wed, 20 Nov 2019 at 11:25, claylin <[hidden email]> wrote: > 我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误 > Caused by: org.apache.flink.util.StateMigrationException: The new > state > serializer cannot be incompatible. > 是不是改了序列化类,和之前的不一样导致不兼容问题 > > > ------------------ 原始邮件 ------------------ > 发件人: "shuwen zhou"<[hidden email]>; > 发送时间: 2019年11月20日(星期三) 中午11:18 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: 关于从savepoint启动作业报错 migration for MapState currently isn't > supported. > > > > 成功了吗? > 我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible 使用的版本的是fink 1.9.0 > 具体操作是: > trigger savepoint后从savepoint读取就是这个错误 > 使用的是MapState[String,Void] 在scala代码 > > > ... 25 more > Caused by: java.io.IOException: Failed to open user defined function > at > > org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210) > at > > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185) > at > > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) > at > > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Error while getting state > at > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > at > > org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243) > at tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49) > at > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > > org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206) > ... 6 more > Caused by: org.apache.flink.util.StateMigrationException: The new > state > serializer cannot be incompatible. > at > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) > at > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) > at > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) > at > > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > > On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]> wrote: > > > 谢谢,我这边确实修改了状态的schema,现在试下看下 > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Congxian Qiu"<[hidden email]&gt;; > > 发送时间:&nbsp;2019年11月15日(星期五) 上午10:07 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently > isn't > > supported. > > > > > > > > Hi > > 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value > schema > > evolution[1] > > > > [1] https://issues.apache.org/jira/browse/FLINK-11947 > > Best, > > Congxian > > > > > > claylin <[hidden email]&gt; 于2019年11月14日周四 下午9:35写道: > > > > &gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > > &gt; java.lang.RuntimeException: Error while getting > state&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > > &gt;&nbsp; at > > &gt; > > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > > &gt;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > at > > &gt; > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > &gt; at > > &gt; > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > > &gt;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > > &gt;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > &gt;&nbsp; at java.lang.Thread.run(Thread.java:748) Caused by: > > &gt; org.apache.flink.util.StateMigrationException: The new > serializer for > > a > > &gt; MapState requires state migration in order for the job to > proceed. > > However, > > &gt; migration for MapState currently isn't > > > supported.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > > &gt;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > > &gt;&nbsp; at > > &gt; > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > > &gt;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > > &gt;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > > &gt; at > > &gt; > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > > &gt;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > > &gt;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > > &gt;&nbsp;&nbsp;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > > &gt;&nbsp; at > > &gt; > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > > &gt;&nbsp; ... 9 more > > > > -- > Best Wishes, > Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> -- Best Wishes, Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> |
Hi shuwen
你出现这个问题前后有修改代码吗?如果没有修改代码的话,能否提供一个可以复现的 demo 呢~ Best, Congxian shuwen zhou <[hidden email]> 于2019年11月20日周三 下午5:20写道: > 是在哪里指定序列化类的呢?我没有显示的指定序列化方法,前后用的版本都是flink 1.9.0,代码也是一样的 > > On Wed, 20 Nov 2019 at 11:25, claylin <[hidden email]> wrote: > > > 我的那个问题后面使用1.9.1版本解决了。你这个错误和我那个有点不一样,看下是不是还改了其他东西,看这个错误 > > Caused by: org.apache.flink.util.StateMigrationException: The new > > state > > serializer cannot be incompatible. > > 是不是改了序列化类,和之前的不一样导致不兼容问题 > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: "shuwen zhou"<[hidden email]>; > > 发送时间: 2019年11月20日(星期三) 中午11:18 > > 收件人: "user-zh"<[hidden email]>; > > > > 主题: Re: 关于从savepoint启动作业报错 migration for MapState currently isn't > > supported. > > > > > > > > 成功了吗? > > 我这边报的是另外一个错误, org.apache.flink.util.StateMigrationException: The new > state > > serializer cannot be incompatible 使用的版本的是fink 1.9.0 > > 具体操作是: > > trigger savepoint后从savepoint读取就是这个错误 > > 使用的是MapState[String,Void] 在scala代码 > > > > > > ... 25 more > > Caused by: java.io.IOException: Failed to open user defined > function > > at > > > > > org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:210) > > at > > > > > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:185) > > at > > > > > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) > > at > > > > > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.RuntimeException: Error while getting state > > at > > > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > > at > > > > > org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:243) > > at > tv.freewheel.reporting.dip.ReaderFunction.open(SinkerReadState.scala:49) > > at > > > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > > > > org.apache.flink.state.api.input.KeyedStateInputFormat.getKeyIterator(KeyedStateInputFormat.java:206) > > ... 6 more > > Caused by: org.apache.flink.util.StateMigrationException: The new > > state > > serializer cannot be incompatible. > > at > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534) > > at > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482) > > at > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643) > > at > > > > > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > > at > > > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > > at > > > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > > at > > > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > > at > > > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > > at > > > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > > > > On Fri, 15 Nov 2019 at 10:10, claylin <[hidden email]> wrote: > > > > > 谢谢,我这边确实修改了状态的schema,现在试下看下 > > > > > > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > 发件人:&nbsp;"Congxian Qiu"<[hidden email]&gt;; > > > 发送时间:&nbsp;2019年11月15日(星期五) 上午10:07 > > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > > > 主题:&nbsp;Re: 关于从savepoint启动作业报错 migration for MapState currently > > isn't > > > supported. > > > > > > > > > > > > Hi > > > 看上去是 MapState 的 migration 不支持导致的,可以尝试下 1.9,1.9 解决了 MapState 的 value > > schema > > > evolution[1] > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11947 > > > Best, > > > Congxian > > > > > > > > > claylin <[hidden email]&gt; 于2019年11月14日周四 下午9:35写道: > > > > > > &gt; 从savepoint启动时候报了一下这个错误,用的1.8.1版本大家有遇到过吗,求解决方案 > > > &gt; java.lang.RuntimeException: Error while getting > > state&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119) > > > &gt;&nbsp; at > > > &gt; > > > > > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179) > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > com.yy.kafka.template.job.PushServerRspStatisticsVer3$DistinctProcessFunction.open(PushServerRspStatisticsVer3.java:243) > > > &gt;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > > > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > > at > > > &gt; > > > > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > > &gt; at > > > &gt; > > > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > > > &gt;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > > > &gt;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > > &gt;&nbsp; at java.lang.Thread.run(Thread.java:748) Caused > by: > > > &gt; org.apache.flink.util.StateMigrationException: The new > > serializer for > > > a > > > &gt; MapState requires state migration in order for the job to > > proceed. > > > However, > > > &gt; migration for MapState currently isn't > > > > > supported.&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:543) > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:525) > > > &gt;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475) > > > &gt;&nbsp; at > > > &gt; > > > > > > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613) > > > &gt;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197) > > > &gt;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155) > > > &gt; at > > > &gt; > > > > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126) > > > &gt;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) > > > &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286) > > > &gt;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335) > > > &gt;&nbsp;&nbsp;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > > > &gt;&nbsp; at > > > &gt; > > > > > > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116) > > > &gt;&nbsp; ... 9 more > > > > > > > > -- > > Best Wishes, > > Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> > > > > -- > Best Wishes, > Shuwen Zhou <http://www.linkedin.com/pub/shuwen-zhou/57/55b/599/> > |
Free forum by Nabble | Edit this page |