Flink1.12触发保存点时失败

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink1.12触发保存点时失败

nobleyd
报错信息如下:
java.lang.IllegalArgumentException: Can not set long field
com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
    at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:167)
    at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:171)
    at sun.reflect.UnsafeLongFieldAccessorImpl.set(
UnsafeLongFieldAccessorImpl.java:80)
    at java.lang.reflect.Field.set(Field.java:764)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:409)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
    at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
    at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)


根据堆栈找到报错位置代码为:

try {
   for (int i = 0; i < numFields; i++) {
      boolean isNull = source.readBoolean();

      if (fields[i] != null) {
         if (isNull) {
            fields[i].set(target, null); // 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
         } else {
            Object field = fieldSerializers[i].deserialize(source);
            fields[i].set(target, field);
         }
      } else if (!isNull) {
         // read and dump a pre-existing field value
         fieldSerializers[i].deserialize(source);
      }
   }
} catch (IllegalAccessException e) {
   throw new RuntimeException("Error during POJO copy, this should not
happen since we check the fields before.", e);
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink1.12触发保存点时失败

r pp
hi~ Java 语法不支持,Long 可以设置

赵一旦 <[hidden email]> 于2021年1月7日周四 下午8:13写道:

> 报错信息如下:
> java.lang.IllegalArgumentException: Can not set long field
> com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
>     at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
> UnsafeFieldAccessorImpl.java:167)
>     at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
> UnsafeFieldAccessorImpl.java:171)
>     at sun.reflect.UnsafeLongFieldAccessorImpl.set(
> UnsafeLongFieldAccessorImpl.java:80)
>     at java.lang.reflect.Field.set(Field.java:764)
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:409)
>     at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>     at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>     at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> .read(NonReusingDeserializationDelegate.java:55)
>     at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:372)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:575)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:539)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>     at java.lang.Thread.run(Thread.java:748)
>
>
> 根据堆栈找到报错位置代码为:
>
> try {
>    for (int i = 0; i < numFields; i++) {
>       boolean isNull = source.readBoolean();
>
>       if (fields[i] != null) {
>          if (isNull) {
>             fields[i].set(target, null); //
> 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
>          } else {
>             Object field = fieldSerializers[i].deserialize(source);
>             fields[i].set(target, field);
>          }
>       } else if (!isNull) {
>          // read and dump a pre-existing field value
>          fieldSerializers[i].deserialize(source);
>       }
>    }
> } catch (IllegalAccessException e) {
>    throw new RuntimeException("Error during POJO copy, this should not
> happen since we check the fields before.", e);
> }
>


--
Best,
  pp