flink sql如何处理脏数据?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink sql如何处理脏数据?

casel.chen
业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka connector支持这种需求么?
Reply | Threaded
Open this post in threaded view
|

回复:flink sql如何处理脏数据?

chenlei677
你好,你所说的脏数据应该是由于反序列化失败的数据。反序列化失败往往会导致任务失败或者重启,针对这个问题,以Kafka为例,你可以从获得反序列化schema入手,对csv、json、avro等反序列化schema进行包装操作。




------------------ 原始邮件 ------------------
发件人:                                                                                                                        "user-zh"                                                                                    <[hidden email]&gt;;
发送时间:&nbsp;2020年12月20日(星期天) 上午10:48
收件人:&nbsp;"[hidden email]"<[hidden email]&gt;;

主题:&nbsp;flink sql如何处理脏数据?



业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka connector支持这种需求么?
Reply | Threaded
Open this post in threaded view
|

Re:flink sql如何处理脏数据?

占英华
In reply to this post by casel.chen
Flink代码里Json反序列化里有2个参数应该对你有帮助,你到官网上查询下怎么使用

上述2个配置项的参数名字分别是:

format.fail-on-missing-field
format.ignore-parse-errors





在 2020-12-20 10:48:04,"陈帅" <[hidden email]> 写道: >业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。 >请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka connector支持这种需求么?


 

Reply | Threaded
Open this post in threaded view
|

Re:Re:flink sql如何处理脏数据?

felixzh
format.fail-on-missing-field

format.ignore-parse-errors

挺有用的,可以容错数据格式不对,缺少字段等。

但是不能容错对于使用事件时间的处理模型,因为容错的手段是将所有字段值设为null,事件时间也设置为null。

但是flink sql需要从数据中提取事件时间且不能为null,否则就会抛出异常,从这一点来说,上述配置并没有完全容错,详细如下源码(这里应该可以考虑直接跳过这条数据吧?):

https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java

RowData row = element.getValue();

if (row.isNullAt(rowtimeFieldIndex)) {

throw new RuntimeException("RowTime field should not be null," +

" please convert it to a non-null long value.");

}
















在 2020-12-20 11:33:37,"邮件帮助中心" <[hidden email]> 写道:

Flink代码里Json反序列化里有2个参数应该对你有帮助,你到官网上查询下怎么使用

上述2个配置项的参数名字分别是:

format.fail-on-missing-field
format.ignore-parse-errors











在 2020-12-20 10:48:04,"陈帅" <[hidden email]> 写道:
>业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
>请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka connector支持这种需求么?