Flink SQL 1.11.3问题请教

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL 1.11.3问题请教

占英华
我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据?



[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL 1.11.3问题请教

WeiXubin
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。

Row row = new Row(arity);
collect(row);

具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/

Best,Weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Re: Flink SQL 1.11.3问题请教

占英华
我这个情况还有点不一样的,本来单条数据是如下的:一条数据对应一个offset
 {"name":"test1"}
但是Nifi采集数据后,写入kafka格式是下面这样的,一个offset对应下面几条数据(每一个offset对应的真实数据条数还不是固定的)
 {"name":"test1"}
 {"name":"test2"}
 {"name":"test3"}
.......

感谢你的回复,我借鉴下看怎么处理下,多谢了!




[hidden email]
 
发件人: WeiXubin
发送时间: 2021-06-02 17:44
收件人: user-zh
主题: Re: Flink SQL 1.11.3问题请教
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。
 
Row row = new Row(arity);
collect(row);
 
具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/
 
Best,Weixubin
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/