udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。
------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年5月20日(星期三) 晚上6:51 收件人: "user-zh"<[hidden email]>; 主题: Re: flink1.10.x 解析 arrar<row> 问题 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据 了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午4:25写道: > 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。 > 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析 > > > 3.如果使用flink-planner那么是正确的 > > > > CREATE TABLE sourceTable ( > > &nbsp;event_time_line array<ROW ( > > &nbsp; `rule_name` VARCHAR, > > &nbsp; `count` VARCHAR > > &nbsp;)&gt; > > ) WITH ( > > &nbsp;'connector.type' = 'kafka', > > &nbsp;'connector.version' = 'universal', > > &nbsp;'connector.startup-mode' = 'earliest-offset', > > &nbsp;'connector.topic' = 'topic_test_1', > > &nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181', > > &nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092', > > &nbsp;'update-mode' = 'append', > > &nbsp;'format.type' = 'json', > > &nbsp;'format.derive-schema' = 'true' > > ); > > --可以查出数据 > > select event_time_line from sourceTable ; > > --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了 > > select type_change(event_time_line) from sourceTable ; > > &nbsp; > > public class TypeChange extends ScalarFunction { > > &nbsp;&nbsp;&nbsp; /** > > &nbsp;&nbsp;&nbsp;&nbsp; * > 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值 > > &nbsp;&nbsp;&nbsp;&nbsp; * @param rows > > &nbsp;&nbsp;&nbsp;&nbsp; * @return > > &nbsp;&nbsp;&nbsp;&nbsp; */ > > &nbsp;&nbsp;&nbsp; public String eval(Row [] rows){ > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; &nbsp;return > JSONObject.toJSONString(rows); > > &nbsp;&nbsp;&nbsp; } > > &nbsp; > > } -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class<?>[]
signature)`这个方法,显示指定你的输入数据的类型 比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT, Types.STRING...)),Row里面的类型需要填写 你真实的类型。 了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午7:24写道: > udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。 > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年5月20日(星期三) 晚上6:51 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flink1.10.x 解析 arrar<row> 问题 > > > > 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据 > > 了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午4:25写道: > > > 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。 > > 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析 > > > > > > 3.如果使用flink-planner那么是正确的 > > > > > > > > CREATE TABLE sourceTable ( > > > > &nbsp;event_time_line array<ROW ( > > > > &nbsp; `rule_name` VARCHAR, > > > > &nbsp; `count` VARCHAR > > > > &nbsp;)&gt; > > > > ) WITH ( > > > > &nbsp;'connector.type' = 'kafka', > > > > &nbsp;'connector.version' = 'universal', > > > > &nbsp;'connector.startup-mode' = 'earliest-offset', > > > > &nbsp;'connector.topic' = 'topic_test_1', > > > > &nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181', > > > > &nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092', > > > > &nbsp;'update-mode' = 'append', > > > > &nbsp;'format.type' = 'json', > > > > &nbsp;'format.derive-schema' = 'true' > > > > ); > > > > --可以查出数据 > > > > select event_time_line from sourceTable ; > > > > --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了 > > > > select type_change(event_time_line) from sourceTable ; > > > > &nbsp; > > > > public class TypeChange extends ScalarFunction { > > > > &nbsp;&nbsp;&nbsp; /** > > > > &nbsp;&nbsp;&nbsp;&nbsp; * > > 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值 > > > > &nbsp;&nbsp;&nbsp;&nbsp; * @param rows > > > > &nbsp;&nbsp;&nbsp;&nbsp; * @return > > > > &nbsp;&nbsp;&nbsp;&nbsp; */ > > > > &nbsp;&nbsp;&nbsp; public String eval(Row [] rows){ > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > &nbsp;return > > JSONObject.toJSONString(rows); > > > > &nbsp;&nbsp;&nbsp; } > > > > &nbsp; > > > > } > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
谢谢大佬,终于弄好了。谢谢。
public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { return new RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes(); } ------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年5月20日(星期三) 晚上7:39 收件人: "user-zh"<[hidden email]>; 主题: Re: flink1.10.x 解析 arrar<row> 问题 不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class<?>[] signature)`这个方法,显示指定你的输入数据的类型 比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT, Types.STRING...)),Row里面的类型需要填写 你真实的类型。 了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午7:24写道: > udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。 > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > 发送时间:&nbsp;2020年5月20日(星期三) 晚上6:51 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Re: flink1.10.x 解析 arrar<row&gt; 问题 > > > > 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据 > > 了不起的盖茨比 <[hidden email]&gt; 于2020年5月20日周三 下午4:25写道: > > &gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。 > &gt;&nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析 > &gt; > &gt; > &gt; 3.如果使用flink-planner那么是正确的 > &gt; > &gt; > &gt; > &gt; CREATE TABLE sourceTable ( > &gt; > &gt; &amp;nbsp;event_time_line array<ROW ( > &gt; > &gt; &amp;nbsp; `rule_name` VARCHAR, > &gt; > &gt; &amp;nbsp; `count` VARCHAR > &gt; > &gt; &amp;nbsp;)&amp;gt; > &gt; > &gt; ) WITH ( > &gt; > &gt; &amp;nbsp;'connector.type' = 'kafka', > &gt; > &gt; &amp;nbsp;'connector.version' = 'universal', > &gt; > &gt; &amp;nbsp;'connector.startup-mode' = 'earliest-offset', > &gt; > &gt; &amp;nbsp;'connector.topic' = 'topic_test_1', > &gt; > &gt; &amp;nbsp;'connector.properties.zookeeper.connect' = 'localhost:2181', > &gt; > &gt; &amp;nbsp;'connector.properties.bootstrap.servers' = 'localhost:9092', > &gt; > &gt; &amp;nbsp;'update-mode' = 'append', > &gt; > &gt; &amp;nbsp;'format.type' = 'json', > &gt; > &gt; &amp;nbsp;'format.derive-schema' = 'true' > &gt; > &gt; ); > &gt; > &gt; --可以查出数据 > &gt; > &gt; select event_time_line from sourceTable ; > &gt; > &gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了 > &gt; > &gt; select type_change(event_time_line) from sourceTable ; > &gt; > &gt; &amp;nbsp; > &gt; > &gt; public class TypeChange extends ScalarFunction { > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; /** > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * > &gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值 > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @param rows > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * @return > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; */ > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; public String eval(Row [] rows){ > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > &amp;nbsp;return > &gt; JSONObject.toJSONString(rows); > &gt; > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; } > &gt; > &gt; &amp;nbsp; > &gt; > &gt; } > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
谢谢Benchao的回答。
虽然可以work around,但是这看起来应该是blink planner要去支持的事情。 我建个JIRA去跟踪下:https://issues.apache.org/jira/browse/FLINK-17855 Best, Jingsong Lee On Wed, May 20, 2020 at 8:02 PM 了不起的盖茨比 <[hidden email]> wrote: > 谢谢大佬,终于弄好了。谢谢。 > public TypeInformation<?>[] getParameterTypes(Class<?>[] signature) { > return new > RowTypeInfo(Types.OBJECT_ARRAY(Types.ROW(Types.STRING,Types.STRING))).getFieldTypes(); > } > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[hidden email]>; > 发送时间: 2020年5月20日(星期三) 晚上7:39 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: flink1.10.x 解析 arrar<row> 问题 > > > > 不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class<?>[] > signature)`这个方法,显示指定你的输入数据的类型 > 比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT, > Types.STRING...)),Row里面的类型需要填写 > 你真实的类型。 > > 了不起的盖茨比 <[hidden email]> 于2020年5月20日周三 下午7:24写道: > > > udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。 > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年5月20日(星期三) 晚上6:51 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Re: flink1.10.x 解析 arrar<row&gt; 问题 > > > > > > > > 你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据 > > > > 了不起的盖茨比 <[hidden email]&gt; 于2020年5月20日周三 下午4:25写道: > > > > &gt; 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。 > > &gt;&nbsp; 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析 > > &gt; > > &gt; > > &gt; 3.如果使用flink-planner那么是正确的 > > &gt; > > &gt; > > &gt; > > &gt; CREATE TABLE sourceTable ( > > &gt; > > &gt; &amp;nbsp;event_time_line array<ROW ( > > &gt; > > &gt; &amp;nbsp; `rule_name` VARCHAR, > > &gt; > > &gt; &amp;nbsp; `count` VARCHAR > > &gt; > > &gt; &amp;nbsp;)&amp;gt; > > &gt; > > &gt; ) WITH ( > > &gt; > > &gt; &amp;nbsp;'connector.type' = 'kafka', > > &gt; > > &gt; &amp;nbsp;'connector.version' = 'universal', > > &gt; > > &gt; &amp;nbsp;'connector.startup-mode' = 'earliest-offset', > > &gt; > > &gt; &amp;nbsp;'connector.topic' = 'topic_test_1', > > &gt; > > &gt; &amp;nbsp;'connector.properties.zookeeper.connect' = > 'localhost:2181', > > &gt; > > &gt; &amp;nbsp;'connector.properties.bootstrap.servers' = > 'localhost:9092', > > &gt; > > &gt; &amp;nbsp;'update-mode' = 'append', > > &gt; > > &gt; &amp;nbsp;'format.type' = 'json', > > &gt; > > &gt; &amp;nbsp;'format.derive-schema' = 'true' > > &gt; > > &gt; ); > > &gt; > > &gt; --可以查出数据 > > &gt; > > &gt; select event_time_line from sourceTable ; > > &gt; > > &gt; --当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了 > > &gt; > > &gt; select type_change(event_time_line) from sourceTable ; > > &gt; > > &gt; &amp;nbsp; > > &gt; > > &gt; public class TypeChange extends ScalarFunction { > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; /** > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * > > &gt; 为null,但是数组有长度,数组里面的字段没有被识别出来,换了默认的planner,可以正常得到值 > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * > @param rows > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; * > @return > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; */ > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; public String > eval(Row [] rows){ > > &gt; > > &gt; > &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; > > &amp;nbsp;return > > &gt; JSONObject.toJSONString(rows); > > &gt; > > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; } > > &gt; > > &gt; &amp;nbsp; > > &gt; > > &gt; } > > > > > > > > -- > > > > Benchao Li > > School of Electronics Engineering and Computer Science, Peking > University > > Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |