This post was updated on .
我用的Flink版本是1.11.2,在内嵌模式下使用sql-client.sh($FLINK_HOME/bin/sql-client.sh embedded)创建了一个基于kafka的动态表,如下:
CREATE TABLE kfk_test01 ( order_id BIGINT, -- 订单ID original_price DOUBLE, -- 实付金额 ctime BIGINT, -- 创建时间 ts AS TO_TIMESTAMP(FROM_UNIXTIME(ctime, 'yyyy-MM-dd HH:mm:ss')), -- 使用ctime字段值作为时间戳ts WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 在ts字段上定义5秒延迟的水位线 ) WITH ( 'connector' = 'kafka', 'topic' = 'test01', 'properties.bootstrap.servers' = 'node1:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'earliest-offset' ); 我查看了table模块的kafka-connector的参数配置项(https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#connector-options),并没有可用于跳过错误记录的配置参数(考虑到MapReduce具有一个跳过损坏记录的功能),能否在Flink中提供一个类似的功能呢? 例如添加一个通用(json、csv、text)的配置参数:skip.fail.records=0(默认值0表示不允许跳过坏记录,-1表示可以跳过所有的坏记录,任何>0的数字就表示最多可接受的坏记录数量) <http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg> <http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png> Caused by: java.io.IOException: Failed to deserialize JSON ''. at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126) ~[flink-json-1.11.2.jar:1.11.2] at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76) ~[flink-json-1.11.2.jar:1.11.2] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2] Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
我觉得这应该是个 bug,已创建 issue: https://issues.apache.org/jira/browse/FLINK-20470
On Wed, 2 Dec 2020 at 18:02, [hidden email] <[hidden email]> wrote: > < > http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg> > > > < > http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png > > > Caused by: java.io.IOException: Failed to deserialize JSON ''. > at > > org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126) > ~[flink-json-1.11.2.jar:1.11.2] > at > > org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76) > ~[flink-json-1.11.2.jar:1.11.2] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > Caused by: java.lang.ClassCastException: > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode > cannot be cast to > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > > > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |