我在SQL-Client中定义了一个Kafka动态表,但由于kafka主题中的某些元素格式错误,导致在SQL-Client中引发了异常,我们可否新增一个配置项来忽略这些错误记录?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

我在SQL-Client中定义了一个Kafka动态表,但由于kafka主题中的某些元素格式错误,导致在SQL-Client中引发了异常,我们可否新增一个配置项来忽略这些错误记录?

mr.mengyao@outlook.com
This post was updated on .
我用的Flink版本是1.11.2,在内嵌模式下使用sql-client.sh($FLINK_HOME/bin/sql-client.sh embedded)创建了一个基于kafka的动态表,如下:

CREATE TABLE kfk_test01 (
 order_id BIGINT,  -- 订单ID
 original_price DOUBLE,  -- 实付金额
 ctime BIGINT,  -- 创建时间
 ts AS TO_TIMESTAMP(FROM_UNIXTIME(ctime, 'yyyy-MM-dd HH:mm:ss')),  -- 使用ctime字段值作为时间戳ts
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- 在ts字段上定义5秒延迟的水位线
) WITH (
 'connector' = 'kafka',
 'topic' = 'test01',
 'properties.bootstrap.servers' = 'node1:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'earliest-offset'
);


我查看了table模块的kafka-connector的参数配置项(https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#connector-options),并没有可用于跳过错误记录的配置参数(考虑到MapReduce具有一个跳过损坏记录的功能),能否在Flink中提供一个类似的功能呢?

例如添加一个通用(json、csv、text)的配置参数:skip.fail.records=0(默认值0表示不允许跳过坏记录,-1表示可以跳过所有的坏记录,任何>0的数字就表示最多可接受的坏记录数量)


<http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg

<http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png>
Caused by: java.io.IOException: Failed to deserialize JSON ''.
        at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
~[flink-json-1.11.2.jar:1.11.2]
        at
org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
~[flink-json-1.11.2.jar:1.11.2]
        at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode








--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: I defined a Kafka dynamic table in SQL-Client, but the kafka theme had some elements in the wrong format, so an exception was thrown in SQL-Client. Can we define the Kafka dynamic table with some additional configuration items to ignore these error records?

Jark
Administrator
我觉得这应该是个 bug,已创建 issue: https://issues.apache.org/jira/browse/FLINK-20470

On Wed, 2 Dec 2020 at 18:02, [hidden email] <[hidden email]>
wrote:

> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg>
>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png
> >
> Caused by: java.io.IOException: Failed to deserialize JSON ''.
>         at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
> ~[flink-json-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
> ~[flink-json-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> Caused by: java.lang.ClassCastException:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
> cannot be cast to
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>