hello
我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_152] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_152] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_152] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.1.jar:1.11.1] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.1.jar:1.11.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.1.jar:1.11.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.1.jar:1.11.1] Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1] Caused by: java.lang.NullPointerException at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) ~[flink-avro-1.11.1.jar:1.11.1] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[lexus-flink_2.11-0.1.jar:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1] private static DeserializationRuntimeConverter createMapConverter(LogicalType type) { final DeserializationRuntimeConverter keyConverter = createConverter( DataTypes.STRING().getLogicalType()); final DeserializationRuntimeConverter valueConverter = createConverter( extractValueTypeToAvroMap(type)); return avroObject -> { final Map<?, ?> map = (Map<?, ?>) avroObject; Map<Object, Object> result = new HashMap<>(); for (Map.Entry<?, ?> entry : map.entrySet()) { Object key = keyConverter.convert(entry.getKey()); Object value = valueConverter.convert(entry.getValue()); result.put(key, value); } return new GenericMapData(result); }; } -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi,
你的map是什么类型呢?我来复现一下。 奔跑的小飞袁 <[hidden email]> 于2020年10月13日周二 下午6:07写道: > hello > 我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_152] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_152] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_152] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152] > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.11.1.jar:1.11.1] > Caused by: java.io.IOException: Failed to deserialize Avro record. > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > Caused by: java.lang.NullPointerException > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > private static DeserializationRuntimeConverter > createMapConverter(LogicalType type) { > final DeserializationRuntimeConverter keyConverter = > createConverter( > DataTypes.STRING().getLogicalType()); > final DeserializationRuntimeConverter valueConverter = > createConverter( > extractValueTypeToAvroMap(type)); > return avroObject -> { > final Map<?, ?> map = (Map<?, ?>) avroObject; > Map<Object, Object> result = new HashMap<>(); > for (Map.Entry<?, ?> entry : map.entrySet()) { > Object key = > keyConverter.convert(entry.getKey()); > Object value = > valueConverter.convert(entry.getValue()); > result.put(key, value); > } > return new GenericMapData(result); > }; > } > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
In reply to this post by Benchao Li-2
|
从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
case CHAR: case VARCHAR: return avroObject -> StringData.fromString(avroObject.toString()); 所以,你的map类型的value值为null,会报空指针异常的。 ________________________________ 发件人: 奔跑的小飞袁 <[hidden email]> 发送时间: 2020年10月14日 1:46 收件人: [hidden email] <[hidden email]> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 other_para MAP<VARCHAR,VARCHAR> -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
嗯,这应该是一个实现的bug,可以提个issue修复一下~
史 正超 <[hidden email]> 于2020年10月14日周三 上午10:19写道: > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的: > > case CHAR: > case VARCHAR: > return avroObject -> StringData.fromString(avroObject.toString()); > > 所以,你的map类型的value值为null,会报空指针异常的。 > ________________________________ > 发件人: 奔跑的小飞袁 <[hidden email]> > 发送时间: 2020年10月14日 1:46 > 收件人: [hidden email] <[hidden email]> > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > other_para MAP<VARCHAR,VARCHAR> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
In reply to this post by Benchao Li-2
但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is not null. 代码这样写的前提是,不允许对象的值为null的。 ________________________________ 发件人: Benchao Li <[hidden email]> 发送时间: 2020年10月14日 2:34 收件人: user-zh <[hidden email]> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 嗯,这应该是一个实现的bug,可以提个issue修复一下~ 史 正超 <[hidden email]> 于2020年10月14日周三 上午10:19写道: > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的: > > case CHAR: > case VARCHAR: > return avroObject -> StringData.fromString(avroObject.toString()); > > 所以,你的map类型的value值为null,会报空指针异常的。 > ________________________________ > 发件人: 奔跑的小飞袁 <[hidden email]> > 发送时间: 2020年10月14日 1:46 > 收件人: [hidden email] <[hidden email]> > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > other_para MAP<VARCHAR,VARCHAR> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
是的,所以应该用createNullableConverter,而不是createConverter
史 正超 <[hidden email]> 于2020年10月14日周三 上午10:45写道: > > 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is > not null. > 代码这样写的前提是,不允许对象的值为null的。 > ________________________________ > 发件人: Benchao Li <[hidden email]> > 发送时间: 2020年10月14日 2:34 > 收件人: user-zh <[hidden email]> > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > 嗯,这应该是一个实现的bug,可以提个issue修复一下~ > > 史 正超 <[hidden email]> 于2020年10月14日周三 上午10:19写道: > > > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的: > > > > case CHAR: > > case VARCHAR: > > return avroObject -> StringData.fromString(avroObject.toString()); > > > > 所以,你的map类型的value值为null,会报空指针异常的。 > > ________________________________ > > 发件人: 奔跑的小飞袁 <[hidden email]> > > 发送时间: 2020年10月14日 1:46 > > 收件人: [hidden email] <[hidden email]> > > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > > > other_para MAP<VARCHAR,VARCHAR> > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > > > > -- > > Best, > Benchao Li > -- Best, Benchao Li |
In reply to this post by 史 正超
所以我的建议是用avro的规范,你可以这样定义你的MAP类型:
MAP<STRING, STRING NULL> ________________________________ 发件人: 史 正超 <[hidden email]> 发送时间: 2020年10月14日 2:45 收件人: user-zh <[hidden email]> 主题: 回复: flink-SQL1.11版本对map类型中value的空指针异常 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is not null. 代码这样写的前提是,不允许对象的值为null的。 ________________________________ 发件人: Benchao Li <[hidden email]> 发送时间: 2020年10月14日 2:34 收件人: user-zh <[hidden email]> 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 嗯,这应该是一个实现的bug,可以提个issue修复一下~ 史 正超 <[hidden email]> 于2020年10月14日周三 上午10:19写道: > 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的: > > case CHAR: > case VARCHAR: > return avroObject -> StringData.fromString(avroObject.toString()); > > 所以,你的map类型的value值为null,会报空指针异常的。 > ________________________________ > 发件人: 奔跑的小飞袁 <[hidden email]> > 发送时间: 2020年10月14日 1:46 > 收件人: [hidden email] <[hidden email]> > 主题: Re: flink-SQL1.11版本对map类型中value的空指针异常 > > other_para MAP<VARCHAR,VARCHAR> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li |
This post was updated on .
In reply to this post by 史 正超
|
确定吗?我这边测试还是有问题,这应该是avro 的一个bug。
________________________________ 发件人: 奔跑的小飞袁 <[hidden email]> 发送时间: 2020年10月14日 3:29 收件人: [hidden email] <[hidden email]> 主题: Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常 我尝试使用MAP<STRING, STRING NULL>来定义我的类型,问题已经解决,谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Benchao Li的那个方法是对的,avro的一个bug:
private static AvroToRowDataConverter createMapConverter(LogicalType type) { final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType()); final AvroToRowDataConverter valueConverter = createConverter(extractValueTypeToAvroMap(type)); return avroObject -> { final Map<?, ?> map = (Map<?, ?>) avroObject; Map<Object, Object> result = new HashMap<>(); for (Map.Entry<?, ?> entry : map.entrySet()) { Object key = keyConverter.convert(entry.getKey()); Object value = valueConverter.convert(entry.getValue()); result.put(key, value); } return new GenericMapData(result); }; } 应该是 createNullableConverter final AvroToRowDataConverter valueConverter = createNullableConverter(extractValueTypeToAvroMap(type)); ________________________________ 发件人: 史 正超 <[hidden email]> 发送时间: 2020年10月14日 5:22 收件人: [hidden email] <[hidden email]> 主题: 回复: 回复: flink-SQL1.11版本对map类型中value的空指针异常 确定吗?我这边测试还是有问题,这应该是avro 的一个bug。 ________________________________ 发件人: 奔跑的小飞袁 <[hidden email]> 发送时间: 2020年10月14日 3:29 收件人: [hidden email] <[hidden email]> 主题: Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常 我尝试使用MAP<STRING, STRING NULL>来定义我的类型,问题已经解决,谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
In reply to this post by 史 正超
我之前对源码进行了修复,测试的时候没有恢复之前的源码状态,后来发现Map<STRING,STRING NULL>这种方式是不可以的
-- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |