This post was updated on .
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink
to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢? flinkKafkaProducer.setWriteTimestampToKafka(true); 偶现报错如下: 13:59:53,885 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (5/8) (171fc1965c9c20a35cb48588cd88b35f) switched from RUNNING to FAILED on d0468f82-70e8-4b65-99f2-315466cd15cd @ 127.0.0.1 (dataPort=-1). java.lang.IllegalArgumentException: Invalid timestamp: -1. Timestamp should always be non-negative or null. at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97) ~[kafka-clients-2.4.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper.serialize(KafkaSerializationSchemaWrapper.java:86) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:83) ~[classes/:?] at org.myorg.quickstart.osqueryDemo.analyze.ODS_ExtractAndEtlFromKafka$1.flatMap(ODS_ExtractAndEtlFromKafka.java:78) ~[classes/:?] at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] 13:59:53,885 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_4. 13:59:53,886 INFO org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1 tasks should be restarted to recover the failed task cbc357ccb763df2852fee8c4fc7d55f2_4. -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |