目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5
现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo), 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗? 错误信息如下 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job test (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to FAILING. TimerException{com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104} at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428) at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146) at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) ... 7 more |
hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink
中判断 POJO 的定义 [2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types Best, Congxian 1900 <[hidden email]> 于2019年8月26日周一 下午9:43写道: > 目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5 > > > > 现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo), > 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗? > > > 错误信息如下 > > > INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > test (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to > FAILING. > TimerException{com.esotericsoftware.kryo.KryoException: Encountered > unregistered class ID: 104} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: com.esotericsoftware.kryo.KryoException: Encountered > unregistered class ID: 104 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) > ... 7 more |
Administrator
|
I think Congxian is right. POJO Schema Evolution is the feature what you want.
Best, Jark > 在 2019年8月26日,21:52,Congxian Qiu <[hidden email]> 写道: > > hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink > 中判断 POJO 的定义 [2] > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types > Best, > Congxian > > > 1900 <[hidden email]> 于2019年8月26日周一 下午9:43写道: > >> 目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5 >> >> >> >> 现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo), >> 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗? >> >> >> 错误信息如下 >> >> >> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> test (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to >> FAILING. >> TimerException{com.esotericsoftware.kryo.KryoException: Encountered >> unregistered class ID: 104} >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: com.esotericsoftware.kryo.KryoException: Encountered >> unregistered class ID: 104 >> at >> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112) >> at >> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498) >> at >> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235) >> at >> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285) >> ... 7 more |
Free forum by Nabble | Edit this page |