关于flink状态后端使用Rocksdb序列化问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

关于flink状态后端使用Rocksdb序列化问题

1900
目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5


现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo),
只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗?


错误信息如下


INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job test  (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to FAILING.
TimerException{com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104}
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 104
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492)
        at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428)
        at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146)
        at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
        at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
        at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
        at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235)
        at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
        ... 7 more
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink状态后端使用Rocksdb序列化问题

Congxian Qiu
hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink
中判断 POJO 的定义 [2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types
Best,
Congxian


1900 <[hidden email]> 于2019年8月26日周一 下午9:43写道:

> 目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5
>
>
>
> 现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo),
> 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗?
>
>
> 错误信息如下
>
>
> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> test  (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to
> FAILING.
> TimerException{com.esotericsoftware.kryo.KryoException: Encountered
> unregistered class ID: 104}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException: Encountered
> unregistered class ID: 104
>         at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>         at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492)
>         at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
>         at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
>         at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235)
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>         ... 7 more
Reply | Threaded
Open this post in threaded view
|

Re: 关于flink状态后端使用Rocksdb序列化问题

Jark
Administrator
I think Congxian is right. POJO Schema Evolution is the feature what you want.

Best,
Jark


> 在 2019年8月26日,21:52,Congxian Qiu <[hidden email]> 写道:
>
> hi, 你看以看一下 1.8 开始支持的 POJO Scheme Evolution[1] 是否满足你的需求,你需要注意的是如何满足 Flink
> 中判断 POJO 的定义 [2]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/types_serialization.html#rules-for-pojo-types
> Best,
> Congxian
>
>
> 1900 <[hidden email]> 于2019年8月26日周一 下午9:43写道:
>
>> 目前采用flink on yarn, flink版本是1.7.2,hadoop是2.8.5
>>
>>
>>
>> 现在遇到个问题,假设订单对象Order,因使用了窗口等,所以有状态产生,假设现在要在Order中添加几个新的字段,采用savepoint重启任务,就会报错,看错误信息是反序列化问题(rocksdb默认用的kryo),
>> 只能kill掉再重启,导致状态丢失,请问有什么解决办法吗?需求不断变更,肯定会定期增加字段的;或者更改Rocksdb的序列化方式,用json?可以吗?
>>
>>
>> 错误信息如下
>>
>>
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> test  (e241f87f338a95ce7a2b1e8dd96bea0a) switched from state RUNNING to
>> FAILING.
>> TimerException{com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: 104}
>>        at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
>>        at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>        at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>        at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>        at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>        at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: 104
>>        at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
>>        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>        at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
>>        at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:492)
>>        at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:428)
>>        at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:146)
>>        at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
>>        at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
>>        at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
>>        at
>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
>>        at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:498)
>>        at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:235)
>>        at
>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>        ... 7 more