flink1.10.x 解析 arrar<row> 问题

classic Classic list List threaded Threaded
2 messages Options
kcz
Reply | Threaded
Open this post in threaded view
|

flink1.10.x 解析 arrar<row> 问题

kcz
1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析


3.如果使用flink-planner那么是正确的



CREATE TABLE sourceTable (
 
&nbsp;event_time_line array<ROW (
 
&nbsp; `rule_name` VARCHAR,
 
&nbsp; `count` VARCHAR
 
&nbsp;)&gt;
 
) WITH (
 
&nbsp;'connector.type' = 'kafka',
 
&nbsp;'connector.version' = 'universal',
 
&nbsp;'connector.startup-mode' = 'earliest-offset',
 
&nbsp;'connector.topic' = 'topic_test_1',
 
&nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
 
&nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
 
&nbsp;'update-mode' = 'append',
 
&nbsp;'format.type' = 'json',
 
&nbsp;'format.derive-schema' = 'true'
 
);
 
--可以查出数据
 
select event_time_line from sourceTable ;
 
--当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
 
select type_change(event_time_line) from sourceTable ;
 
&nbsp;
 
public class TypeChange extends ScalarFunction {
 
&nbsp;&nbsp;&nbsp; /**
 
&nbsp;&nbsp;&nbsp;&nbsp; * 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
 
&nbsp;&nbsp;&nbsp;&nbsp; * @param rows
 
&nbsp;&nbsp;&nbsp;&nbsp; * @return
 
&nbsp;&nbsp;&nbsp;&nbsp; */
 
&nbsp;&nbsp;&nbsp; public String eval(Row [] rows){
 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;return JSONObject.toJSONString(rows);
 
&nbsp;&nbsp;&nbsp; }
 
&nbsp;
 
}
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.x 解析 arrar<row> 问题

Benchao Li
你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据

了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午4:25写道:

> 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
>  2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
>
>
> 3.如果使用flink-planner那么是正确的
>
>
>
> CREATE TABLE sourceTable (
>
> &nbsp;event_time_line array<ROW (
>
> &nbsp; `rule_name` VARCHAR,
>
> &nbsp; `count` VARCHAR
>
> &nbsp;)&gt;
>
> ) WITH (
>
> &nbsp;'connector.type' = 'kafka',
>
> &nbsp;'connector.version' = 'universal',
>
> &nbsp;'connector.startup-mode' = 'earliest-offset',
>
> &nbsp;'connector.topic' = 'topic_test_1',
>
> &nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> &nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> &nbsp;'update-mode' = 'append',
>
> &nbsp;'format.type' = 'json',
>
> &nbsp;'format.derive-schema' = 'true'
>
> );
>
> --可以查出数据
>
> select event_time_line from sourceTable ;
>
> --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
>
> select type_change(event_time_line) from sourceTable ;
>
> &nbsp;
>
> public class TypeChange extends ScalarFunction {
>
> &nbsp;&nbsp;&nbsp; /**
>
> &nbsp;&nbsp;&nbsp;&nbsp; *
> 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
>
> &nbsp;&nbsp;&nbsp;&nbsp; * @param rows
>
> &nbsp;&nbsp;&nbsp;&nbsp; * @return
>
> &nbsp;&nbsp;&nbsp;&nbsp; */
>
> &nbsp;&nbsp;&nbsp; public String eval(Row [] rows){
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;return
> JSONObject.toJSONString(rows);
>
> &nbsp;&nbsp;&nbsp; }
>
> &nbsp;
>
> }



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]