回复: flink1.10.x 解析 arrar<row> 问题

classic Classic list List threaded Threaded
4 messages Options
kcz
Reply | Threaded
Open this post in threaded view
|

回复: flink1.10.x 解析 arrar<row> 问题

kcz
udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。


------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月20日(星期三) 晚上6:51
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink1.10.x 解析 arrar<row&gt; 问题



你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据

了不起的盖茨比 <[hidden email]&gt; 于2020年5月20日周三 下午4:25写道:

&gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
&gt;&nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
&gt;
&gt;
&gt; 3.如果使用flink-planner那么是正确的
&gt;
&gt;
&gt;
&gt; CREATE TABLE sourceTable (
&gt;
&gt; &amp;nbsp;event_time_line array<ROW (
&gt;
&gt; &amp;nbsp; `rule_name` VARCHAR,
&gt;
&gt; &amp;nbsp; `count` VARCHAR
&gt;
&gt; &amp;nbsp;)&amp;gt;
&gt;
&gt; ) WITH (
&gt;
&gt; &amp;nbsp;'connector.type' = 'kafka',
&gt;
&gt; &amp;nbsp;'connector.version' = 'universal',
&gt;
&gt; &amp;nbsp;'connector.startup-mode' = 'earliest-offset',
&gt;
&gt; &amp;nbsp;'connector.topic' = 'topic_test_1',
&gt;
&gt; &amp;nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
&gt;
&gt; &amp;nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
&gt;
&gt; &amp;nbsp;'update-mode' = 'append',
&gt;
&gt; &amp;nbsp;'format.type' = 'json',
&gt;
&gt; &amp;nbsp;'format.derive-schema' = 'true'
&gt;
&gt; );
&gt;
&gt; --可以查出数据
&gt;
&gt; select event_time_line from sourceTable ;
&gt;
&gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
&gt;
&gt; select type_change(event_time_line) from sourceTable ;
&gt;
&gt; &amp;nbsp;
&gt;
&gt; public class TypeChange extends ScalarFunction {
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; /**
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; *
&gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @param rows
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @return
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; */
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; public String eval(Row [] rows){
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; &amp;nbsp;return
&gt; JSONObject.toJSONString(rows);
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; }
&gt;
&gt; &amp;nbsp;
&gt;
&gt; }



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.x 解析 arrar<row> 问题

Benchao Li
不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class<?>[]
signature)`这个方法,显示指定你的输入数据的类型
比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT,
Types.STRING...)),Row里面的类型需要填写
你真实的类型。

了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午7:24写道:

> udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月20日(星期三) 晚上6:51
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: flink1.10.x 解析 arrar<row&gt; 问题
>
>
>
> 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据
>
> 了不起的盖茨比 <[hidden email]&gt; 于2020年5月20日周三 下午4:25写道:
>
> &gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
> &gt;&nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
> &gt;
> &gt;
> &gt; 3.如果使用flink-planner那么是正确的
> &gt;
> &gt;
> &gt;
> &gt; CREATE TABLE sourceTable (
> &gt;
> &gt; &amp;nbsp;event_time_line array<ROW (
> &gt;
> &gt; &amp;nbsp; `rule_name` VARCHAR,
> &gt;
> &gt; &amp;nbsp; `count` VARCHAR
> &gt;
> &gt; &amp;nbsp;)&amp;gt;
> &gt;
> &gt; ) WITH (
> &gt;
> &gt; &amp;nbsp;'connector.type' = 'kafka',
> &gt;
> &gt; &amp;nbsp;'connector.version' = 'universal',
> &gt;
> &gt; &amp;nbsp;'connector.startup-mode' = 'earliest-offset',
> &gt;
> &gt; &amp;nbsp;'connector.topic' = 'topic_test_1',
> &gt;
> &gt; &amp;nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
> &gt;
> &gt; &amp;nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
> &gt;
> &gt; &amp;nbsp;'update-mode' = 'append',
> &gt;
> &gt; &amp;nbsp;'format.type' = 'json',
> &gt;
> &gt; &amp;nbsp;'format.derive-schema' = 'true'
> &gt;
> &gt; );
> &gt;
> &gt; --可以查出数据
> &gt;
> &gt; select event_time_line from sourceTable ;
> &gt;
> &gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
> &gt;
> &gt; select type_change(event_time_line) from sourceTable ;
> &gt;
> &gt; &amp;nbsp;
> &gt;
> &gt; public class TypeChange extends ScalarFunction {
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; /**
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; *
> &gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @param rows
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @return
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; */
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; public String eval(Row [] rows){
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &amp;nbsp;return
> &gt; JSONObject.toJSONString(rows);
> &gt;
> &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; }
> &gt;
> &gt; &amp;nbsp;
> &gt;
> &gt; }
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
kcz
Reply | Threaded
Open this post in threaded view
|

回复: flink1.10.x 解析 arrar<row> 问题

kcz
谢谢大佬,终于弄好了。谢谢。
public TypeInformation<?&gt;[] getParameterTypes(Class<?&gt;[] signature) {
    return new RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes();
}




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年5月20日(星期三) 晚上7:39
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: flink1.10.x 解析 arrar<row&gt; 问题



不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class<?&gt;[]
signature)`这个方法,显示指定你的输入数据的类型
比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT,
Types.STRING...)),Row里面的类型需要填写
你真实的类型。

了不起的盖茨比 <[hidden email]&gt; 于2020年5月20日周三 下午7:24写道:

&gt; udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年5月20日(星期三) 晚上6:51
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: flink1.10.x 解析 arrar<row&amp;gt; 问题
&gt;
&gt;
&gt;
&gt; 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据
&gt;
&gt; 了不起的盖茨比 <[hidden email]&amp;gt; 于2020年5月20日周三 下午4:25写道:
&gt;
&gt; &amp;gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
&gt; &amp;gt;&amp;nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 3.如果使用flink-planner那么是正确的
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; CREATE TABLE sourceTable (
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;event_time_line array<ROW (
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; `rule_name` VARCHAR,
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp; `count` VARCHAR
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;)&amp;amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ) WITH (
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.type' = 'kafka',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.version' = 'universal',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.startup-mode' = 'earliest-offset',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.topic' = 'topic_test_1',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'update-mode' = 'append',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'format.type' = 'json',
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;'format.derive-schema' = 'true'
&gt; &amp;gt;
&gt; &amp;gt; );
&gt; &amp;gt;
&gt; &amp;gt; --可以查出数据
&gt; &amp;gt;
&gt; &amp;gt; select event_time_line from sourceTable ;
&gt; &amp;gt;
&gt; &amp;gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
&gt; &amp;gt;
&gt; &amp;gt; select type_change(event_time_line) from sourceTable ;
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt;
&gt; &amp;gt; public class TypeChange extends ScalarFunction {
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; /**
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; *
&gt; &amp;gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; * @param rows
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; * @return
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; */
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; public String eval(Row [] rows){
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;amp;nbsp;return
&gt; &amp;gt; JSONObject.toJSONString(rows);
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; }
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt;
&gt; &amp;gt; }
&gt;
&gt;
&gt;
&gt; --
&gt;
&gt; Benchao Li
&gt; School of Electronics Engineering and Computer Science, Peking University
&gt; Tel:+86-15650713730
&gt; Email: [hidden email]; [hidden email]



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: flink1.10.x 解析 arrar<row> 问题

Jingsong Li
谢谢Benchao的回答。

虽然可以work around,但是这看起来应该是blink planner要去支持的事情。
我建个JIRA去跟踪下:https://issues.apache.org/jira/browse/FLINK-17855

Best,
Jingsong Lee

On Wed, May 20, 2020 at 8:02 PM 了不起的盖茨比 <[hidden email]> wrote:

> 谢谢大佬,终于弄好了。谢谢。
> public TypeInformation<?&gt;[] getParameterTypes(Class<?&gt;[] signature) {
>     return new
> RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes();
> }
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年5月20日(星期三) 晚上7:39
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: flink1.10.x 解析 arrar<row&gt; 问题
>
>
>
> 不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class<?&gt;[]
> signature)`这个方法,显示指定你的输入数据的类型
> 比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT,
> Types.STRING...)),Row里面的类型需要填写
> 你真实的类型。
>
> 了不起的盖茨比 <[hidden email]&gt; 于2020年5月20日周三 下午7:24写道:
>
> &gt; udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年5月20日(星期三) 晚上6:51
> &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: flink1.10.x 解析 arrar<row&amp;gt; 问题
> &gt;
> &gt;
> &gt;
> &gt; 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据
> &gt;
> &gt; 了不起的盖茨比 <[hidden email]&amp;gt; 于2020年5月20日周三 下午4:25写道:
> &gt;
> &gt; &amp;gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。
> &gt; &amp;gt;&amp;nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; 3.如果使用flink-planner那么是正确的
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; CREATE TABLE sourceTable (
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;event_time_line array<ROW (
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; `rule_name` VARCHAR,
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp; `count` VARCHAR
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;)&amp;amp;gt;
> &gt; &amp;gt;
> &gt; &amp;gt; ) WITH (
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'connector.type' = 'kafka',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'connector.version' = 'universal',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'connector.startup-mode' = 'earliest-offset',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'connector.topic' = 'topic_test_1',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'connector.properties.zookeeper.connect' =
> 'localhost:2181',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'connector.properties.bootstrap.servers' =
> 'localhost:9092',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'update-mode' = 'append',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'format.type' = 'json',
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;'format.derive-schema' = 'true'
> &gt; &amp;gt;
> &gt; &amp;gt; );
> &gt; &amp;gt;
> &gt; &amp;gt; --可以查出数据
> &gt; &amp;gt;
> &gt; &amp;gt; select event_time_line from sourceTable ;
> &gt; &amp;gt;
> &gt; &amp;gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
> &gt; &amp;gt;
> &gt; &amp;gt; select type_change(event_time_line) from sourceTable ;
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;
> &gt; &amp;gt;
> &gt; &amp;gt; public class TypeChange extends ScalarFunction {
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; /**
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; *
> &gt; &amp;gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; *
> @param rows
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; *
> @return
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; */
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; public String
> eval(Row [] rows){
> &gt; &amp;gt;
> &gt; &amp;gt;
> &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
> &gt; &amp;amp;nbsp;return
> &gt; &amp;gt; JSONObject.toJSONString(rows);
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; }
> &gt; &amp;gt;
> &gt; &amp;gt; &amp;amp;nbsp;
> &gt; &amp;gt;
> &gt; &amp;gt; }
> &gt;
> &gt;
> &gt;
> &gt; --
> &gt;
> &gt; Benchao Li
> &gt; School of Electronics Engineering and Computer Science, Peking
> University
> &gt; Tel:+86-15650713730
> &gt; Email: [hidden email]; [hidden email]
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]



--
Best, Jingsong Lee