Flink SQL UDF 动态类型

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL UDF 动态类型

forideal
你好,我的朋友:


      我使用的是 Flink 1.10 Blink Planer。
      我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。


      为什么我想要这个功能:
      场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
     场景2: 我的数据是一个 Json ,问题同上。
 
     在场景1中,我改了下 Flink 的源码,在 ScalarFunction 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
@Override
public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
// 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
}
    这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
    这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) 这个类型不进行 cast 是无法直接使用的。
public class TimestampTest extends ScalarFunction {

public Object eval(long timestamp, String pattern, int num) {
        Timestamp timestamp1 = new Timestamp(timestamp);
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
        if (num < 4) {
//返回 STRING 类型
return String.valueOf(timestamp);
}
if (num < 6) {
//返回 BIGINT
return timestamp - 100;
}
if (num < 8) {
//返回 DOUBLE
double ss = 0.9;
            return (double) timestamp + ss;
}
//返回 STRING
return sdf.format(timestamp1);
}
}
Reply | Threaded
Open this post in threaded view
|

回复:Flink SQL UDF 动态类型

Yichao Yang
Hi


我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。


但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。


如果有理解不对之处,敬请指出。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"forideal"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 中午1:33
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Flink SQL UDF 动态类型



你好,我的朋友:


&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink 1.10 Blink Planer。
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。


&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
&nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。
&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
@Override
public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
// 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
}
&nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
&nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) 这个类型不进行 cast 是无法直接使用的。
public class TimestampTest extends ScalarFunction {

public Object eval(long timestamp, String pattern, int num) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new Timestamp(timestamp);
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) {
//返回 STRING 类型
return String.valueOf(timestamp);
}
if (num < 6) {
//返回 BIGINT
return timestamp - 100;
}
if (num < 8) {
//返回 DOUBLE
double ss = 0.9;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return (double) timestamp + ss;
}
//返回 STRING
return sdf.format(timestamp1);
}
}
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL UDF 动态类型

Benchao Li-2
我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。

之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。

1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道:

> Hi
>
>
>
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
>
>
>
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
>
>
> 如果有理解不对之处,敬请指出。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"forideal"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月9日(星期二) 中午1:33
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Flink SQL UDF 动态类型
>
>
>
> 你好,我的朋友:
>
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink 1.10 Blink Planer。
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
>
>
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能:
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
> &nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。
> &nbsp;
> &nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> @Override
> public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
> // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
> }
> &nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
> &nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> 这个类型不进行 cast 是无法直接使用的。
> public class TimestampTest extends ScalarFunction {
>
> public Object eval(long timestamp, String pattern, int num) {
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new
> Timestamp(timestamp);
> SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) {
> //返回 STRING 类型
> return String.valueOf(timestamp);
> }
> if (num < 6) {
> //返回 BIGINT
> return timestamp - 100;
> }
> if (num < 8) {
> //返回 DOUBLE
> double ss = 0.9;
> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> (double) timestamp + ss;
> }
> //返回 STRING
> return sdf.format(timestamp1);
> }
> }
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL UDF 动态类型

Jark
Administrator
+1 to support pb format.

On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]> wrote:

> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>
> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>
> 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道:
>
> > Hi
> >
> >
> >
> >
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
> >
> >
> >
> >
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
> >
> >
> > 如果有理解不对之处,敬请指出。
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;"forideal"<[hidden email]&gt;;
> > 发送时间:&nbsp;2020年6月9日(星期二) 中午1:33
> > 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
> >
> > 主题:&nbsp;Flink SQL UDF 动态类型
> >
> >
> >
> > 你好,我的朋友:
> >
> >
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink 1.10 Blink Planer。
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
> >
> >
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能:
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
> > &nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。
> > &nbsp;
> > &nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> > @Override
> > public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
> > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
> > }
> > &nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
> > &nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> > 这个类型不进行 cast 是无法直接使用的。
> > public class TimestampTest extends ScalarFunction {
> >
> > public Object eval(long timestamp, String pattern, int num) {
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new
> > Timestamp(timestamp);
> > SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) {
> > //返回 STRING 类型
> > return String.valueOf(timestamp);
> > }
> > if (num < 6) {
> > //返回 BIGINT
> > return timestamp - 100;
> > }
> > if (num < 8) {
> > //返回 DOUBLE
> > double ss = 0.9;
> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
> > (double) timestamp + ss;
> > }
> > //返回 STRING
> > return sdf.format(timestamp1);
> > }
> > }
>
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL UDF 动态类型

Yichao Yang
Hi


+1,各位大佬,其实我自己已经通过参考avro,json等format实现基于pb实现了一个flink-protobuf解析的formats,git地址如下
https://github.com/yangyichao-mango/flink-protobuf
之后我会持续关注社区关于pb format的实现。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 下午2:49
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink SQL UDF 动态类型



+1 to support pb format.

On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]&gt; wrote:

&gt; 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
&gt; 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
&gt; 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
&gt;
&gt; 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
&gt; 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
&gt;
&gt; 1048262223 <[hidden email]&gt; 于2020年6月9日周二 下午2:23写道:
&gt;
&gt; &gt; Hi
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
&gt; &gt;
&gt; &gt;
&gt; &gt; 如果有理解不对之处,敬请指出。
&gt; &gt;
&gt; &gt;
&gt; &gt; Best,
&gt; &gt; Yichao Yang
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; &gt; 发件人:&amp;nbsp;"forideal"<[hidden email]&amp;gt;;
&gt; &gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 中午1:33
&gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt; &gt;
&gt; &gt; 主题:&amp;nbsp;Flink SQL UDF 动态类型
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; 你好,我的朋友:
&gt; &gt;
&gt; &gt;
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我使用的是 Flink 1.10 Blink Planer。
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
&gt; &gt;
&gt; &gt;
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 为什么我想要这个功能:
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
&gt; &gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。
&gt; &gt; &amp;nbsp;
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
&gt; &gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
&gt; &gt; @Override
&gt; &gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
&gt; &gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
&gt; &gt; }
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
&gt; &gt; 这个类型不进行 cast 是无法直接使用的。
&gt; &gt; public class TimestampTest extends ScalarFunction {
&gt; &gt;
&gt; &gt; public Object eval(long timestamp, String pattern, int num) {
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; Timestamp timestamp1 = new
&gt; &gt; Timestamp(timestamp);
&gt; &gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if (num < 4) {
&gt; &gt; //返回 STRING 类型
&gt; &gt; return String.valueOf(timestamp);
&gt; &gt; }
&gt; &gt; if (num < 6) {
&gt; &gt; //返回 BIGINT
&gt; &gt; return timestamp - 100;
&gt; &gt; }
&gt; &gt; if (num < 8) {
&gt; &gt; //返回 DOUBLE
&gt; &gt; double ss = 0.9;
&gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return
&gt; &gt; (double) timestamp + ss;
&gt; &gt; }
&gt; &gt; //返回 STRING
&gt; &gt; return sdf.format(timestamp1);
&gt; &gt; }
&gt; &gt; }
&gt;
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL UDF 动态类型

Benchao Li-2
FYI: issue[1] 已经建好了,各位感兴趣的可以关注一下,也非常欢迎参与设计和实现~

[1] https://issues.apache.org/jira/browse/FLINK-18202

1048262223 <[hidden email]> 于2020年6月9日周二 下午2:54写道:

> Hi
>
>
>
> +1,各位大佬,其实我自己已经通过参考avro,json等format实现基于pb实现了一个flink-protobuf解析的formats,git地址如下
> https://github.com/yangyichao-mango/flink-protobuf
> 之后我会持续关注社区关于pb format的实现。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Jark Wu"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月9日(星期二) 下午2:49
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;Re: Flink SQL UDF 动态类型
>
>
>
> +1 to support pb format.
>
> On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]&gt; wrote:
>
> &gt; 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
> &gt; 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
> &gt; 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
> &gt;
> &gt; 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
> &gt; 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
> &gt;
> &gt; 1048262223 <[hidden email]&gt; 于2020年6月9日周二 下午2:23写道:
> &gt;
> &gt; &gt; Hi
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt;
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt;
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; 如果有理解不对之处,敬请指出。
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; Best,
> &gt; &gt; Yichao Yang
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; &gt; 发件人:&amp;nbsp;"forideal"<[hidden email]&amp;gt;;
> &gt; &gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 中午1:33
> &gt; &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
> &gt; &gt;
> &gt; &gt; 主题:&amp;nbsp;Flink SQL UDF 动态类型
> &gt; &gt;
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; 你好,我的朋友:
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我使用的是 Flink
> 1.10 Blink Planer。
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我想构造一个Flink
> UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
> &gt; &gt;
> &gt; &gt;
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 为什么我想要这个功能:
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景1: 我的数据是一个
> pb 的 bytes,我想从里面获取数据,如果统一的返回
> &gt; &gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string
> 这样的方式,实现起来又非常多
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。
> &gt; &gt; &amp;nbsp;
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 在场景1中,我改了下 Flink 的源码,在
> ScalarFunction
> &gt; &gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> &gt; &gt; @Override
> &gt; &gt; public void initialize(LogicalType[] sqlTypes, String[]
> paramNames) {
> &gt; &gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema
> 信息,拿到类型信息,这样就可以动态的设置类型
> &gt; &gt; }
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个方法很有效果,他帮我们 workaround
> 了一段时间,目前依然work。只是有些不是那么优雅。
> &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回
> RAW('java.lang.Object', ?)
> &gt; &gt; 这个类型不进行 cast 是无法直接使用的。
> &gt; &gt; public class TimestampTest extends ScalarFunction {
> &gt; &gt;
> &gt; &gt; public Object eval(long timestamp, String pattern, int num) {
> &gt; &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> Timestamp timestamp1 = new
> &gt; &gt; Timestamp(timestamp);
> &gt; &gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> &gt; &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if
> (num < 4) {
> &gt; &gt; //返回 STRING 类型
> &gt; &gt; return String.valueOf(timestamp);
> &gt; &gt; }
> &gt; &gt; if (num < 6) {
> &gt; &gt; //返回 BIGINT
> &gt; &gt; return timestamp - 100;
> &gt; &gt; }
> &gt; &gt; if (num < 8) {
> &gt; &gt; //返回 DOUBLE
> &gt; &gt; double ss = 0.9;
> &gt; &gt;
> &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> return
> &gt; &gt; (double) timestamp + ss;
> &gt; &gt; }
> &gt; &gt; //返回 STRING
> &gt; &gt; return sdf.format(timestamp1);
> &gt; &gt; }
> &gt; &gt; }
> &gt;
Reply | Threaded
Open this post in threaded view
|

Re:Re: Flink SQL UDF 动态类型

forideal
In reply to this post by Jark
+1 to support pb format



如果能支持 pb format 那简直太好了,实际了我们也自己搞了一个 pb format。大概的方法也是在外边做了一个对应的service,这个service保存了一个jar,在进行parse byte 的时候,采用了urlclassload+反射调用 parse 类型的方法。
同时也尝试过使用 dynamic message 的方式,这个方式更轻量一些,但是,性能差强人意。











在 2020-06-09 14:49:02,"Jark Wu" <[hidden email]> 写道:

>+1 to support pb format.
>
>On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]> wrote:
>
>> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
>> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
>> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>>
>> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
>> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>>
>> 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道:
>>
>> > Hi
>> >
>> >
>> >
>> >
>> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
>> >
>> >
>> >
>> >
>> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
>> >
>> >
>> > 如果有理解不对之处,敬请指出。
>> >
>> >
>> > Best,
>> > Yichao Yang
>> >
>> >
>> >
>> >
>> > ------------------&nbsp;原始邮件&nbsp;------------------
>> > 发件人:&nbsp;"forideal"<[hidden email]&gt;;
>> > 发送时间:&nbsp;2020年6月9日(星期二) 中午1:33
>> > 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>> >
>> > 主题:&nbsp;Flink SQL UDF 动态类型
>> >
>> >
>> >
>> > 你好,我的朋友:
>> >
>> >
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink 1.10 Blink Planer。
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
>> >
>> >
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能:
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
>> > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
>> > &nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。
>> > &nbsp;
>> > &nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
>> > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
>> > @Override
>> > public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
>> > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
>> > }
>> > &nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
>> > &nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
>> > 这个类型不进行 cast 是无法直接使用的。
>> > public class TimestampTest extends ScalarFunction {
>> >
>> > public Object eval(long timestamp, String pattern, int num) {
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new
>> > Timestamp(timestamp);
>> > SimpleDateFormat sdf = new SimpleDateFormat(pattern);
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) {
>> > //返回 STRING 类型
>> > return String.valueOf(timestamp);
>> > }
>> > if (num < 6) {
>> > //返回 BIGINT
>> > return timestamp - 100;
>> > }
>> > if (num < 8) {
>> > //返回 DOUBLE
>> > double ss = 0.9;
>> > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return
>> > (double) timestamp + ss;
>> > }
>> > //返回 STRING
>> > return sdf.format(timestamp1);
>> > }
>> > }
>>
kcz
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL UDF 动态类型

kcz
In reply to this post by Benchao Li-2
动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Benchao Li"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 下午2:47
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;Re: Flink SQL UDF 动态类型



我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。

之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。

1048262223 <[hidden email]&gt; 于2020年6月9日周二 下午2:23写道:

&gt; Hi
&gt;
&gt;
&gt;
&gt; 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
&gt;
&gt;
&gt;
&gt; 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
&gt;
&gt;
&gt; 如果有理解不对之处,敬请指出。
&gt;
&gt;
&gt; Best,
&gt; Yichao Yang
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"forideal"<[hidden email]&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 中午1:33
&gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Flink SQL UDF 动态类型
&gt;
&gt;
&gt;
&gt; 你好,我的朋友:
&gt;
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我使用的是 Flink 1.10 Blink Planer。
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
&gt;
&gt;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 为什么我想要这个功能:
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
&gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。
&gt; &amp;nbsp;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
&gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
&gt; @Override
&gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
&gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
&gt; }
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
&gt; 这个类型不进行 cast 是无法直接使用的。
&gt; public class TimestampTest extends ScalarFunction {
&gt;
&gt; public Object eval(long timestamp, String pattern, int num) {
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; Timestamp timestamp1 = new
&gt; Timestamp(timestamp);
&gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if (num < 4) {
&gt; //返回 STRING 类型
&gt; return String.valueOf(timestamp);
&gt; }
&gt; if (num < 6) {
&gt; //返回 BIGINT
&gt; return timestamp - 100;
&gt; }
&gt; if (num < 8) {
&gt; //返回 DOUBLE
&gt; double ss = 0.9;
&gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return
&gt; (double) timestamp + ss;
&gt; }
&gt; //返回 STRING
&gt; return sdf.format(timestamp1);
&gt; }
&gt; }
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL UDF 动态类型

Yichao Yang
Hi


这个可以通过返回一个通用类型比如Map来实现。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"kcz"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 下午4:49
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: Flink SQL UDF 动态类型



动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;;
发送时间:&amp;nbsp;2020年6月9日(星期二) 下午2:47
收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;

主题:&amp;nbsp;Re: Flink SQL UDF 动态类型



我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。

之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。

1048262223 <[hidden email]&amp;gt; 于2020年6月9日周二 下午2:23写道:

&amp;gt; Hi
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
&amp;gt;
&amp;gt;
&amp;gt; 如果有理解不对之处,敬请指出。
&amp;gt;
&amp;gt;
&amp;gt; Best,
&amp;gt; Yichao Yang
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&amp;gt; 发件人:&amp;amp;nbsp;"forideal"<[hidden email]&amp;amp;gt;;
&amp;gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 中午1:33
&amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
&amp;gt;
&amp;gt; 主题:&amp;amp;nbsp;Flink SQL UDF 动态类型
&amp;gt;
&amp;gt;
&amp;gt;
&amp;gt; 你好,我的朋友:
&amp;gt;
&amp;gt;
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 我使用的是 Flink 1.10 Blink Planer。
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
&amp;gt;
&amp;gt;
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 为什么我想要这个功能:
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
&amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。
&amp;gt; &amp;amp;nbsp;
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
&amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
&amp;gt; @Override
&amp;gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
&amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
&amp;gt; }
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
&amp;gt; 这个类型不进行 cast 是无法直接使用的。
&amp;gt; public class TimestampTest extends ScalarFunction {
&amp;gt;
&amp;gt; public Object eval(long timestamp, String pattern, int num) {
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; Timestamp timestamp1 = new
&amp;gt; Timestamp(timestamp);
&amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; if (num < 4) {
&amp;gt; //返回 STRING 类型
&amp;gt; return String.valueOf(timestamp);
&amp;gt; }
&amp;gt; if (num < 6) {
&amp;gt; //返回 BIGINT
&amp;gt; return timestamp - 100;
&amp;gt; }
&amp;gt; if (num < 8) {
&amp;gt; //返回 DOUBLE
&amp;gt; double ss = 0.9;
&amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; return
&amp;gt; (double) timestamp + ss;
&amp;gt; }
&amp;gt; //返回 STRING
&amp;gt; return sdf.format(timestamp1);
&amp;gt; }
&amp;gt; }
kcz
Reply | Threaded
Open this post in threaded view
|

回复: Flink SQL UDF 动态类型

kcz
有道理呀,我直接map就好 。tks。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"1048262223"<[hidden email]&gt;;
发送时间:&nbsp;2020年6月9日(星期二) 下午4:51
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;回复: Flink SQL UDF 动态类型



Hi


这个可以通过返回一个通用类型比如Map来实现。


Best,
Yichao Yang




------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
发件人:&amp;nbsp;"kcz"<[hidden email]&amp;gt;;
发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:49
收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;

主题:&amp;nbsp;回复: Flink SQL UDF 动态类型



动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。




------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email]&amp;amp;gt;;
发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 下午2:47
收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;

主题:&amp;amp;nbsp;Re: Flink SQL UDF 动态类型



我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。

之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。

1048262223 <[hidden email]&amp;amp;gt; 于2020年6月9日周二 下午2:23写道:

&amp;amp;gt; Hi
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; 如果有理解不对之处,敬请指出。
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; Best,
&amp;amp;gt; Yichao Yang
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
&amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"forideal"<[hidden email]&amp;amp;amp;gt;;
&amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月9日(星期二) 中午1:33
&amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;amp;gt;;
&amp;amp;gt;
&amp;amp;gt; 主题:&amp;amp;amp;nbsp;Flink SQL UDF 动态类型
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; 你好,我的朋友:
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 我使用的是 Flink 1.10 Blink Planer。
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
&amp;amp;gt;
&amp;amp;gt;
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 为什么我想要这个功能:
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
&amp;amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。
&amp;amp;gt; &amp;amp;amp;nbsp;
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
&amp;amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
&amp;amp;gt; @Override
&amp;amp;gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
&amp;amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
&amp;amp;gt; }
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
&amp;amp;gt; 这个类型不进行 cast 是无法直接使用的。
&amp;amp;gt; public class TimestampTest extends ScalarFunction {
&amp;amp;gt;
&amp;amp;gt; public Object eval(long timestamp, String pattern, int num) {
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Timestamp timestamp1 = new
&amp;amp;gt; Timestamp(timestamp);
&amp;amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; if (num < 4) {
&amp;amp;gt; //返回 STRING 类型
&amp;amp;gt; return String.valueOf(timestamp);
&amp;amp;gt; }
&amp;amp;gt; if (num < 6) {
&amp;amp;gt; //返回 BIGINT
&amp;amp;gt; return timestamp - 100;
&amp;amp;gt; }
&amp;amp;gt; if (num < 8) {
&amp;amp;gt; //返回 DOUBLE
&amp;amp;gt; double ss = 0.9;
&amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; return
&amp;amp;gt; (double) timestamp + ss;
&amp;amp;gt; }
&amp;amp;gt; //返回 STRING
&amp;amp;gt; return sdf.format(timestamp1);
&amp;amp;gt; }
&amp;amp;gt; }
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL UDF 动态类型

Jingsong Li
Hi all,

业务上一般是可以避免动态类型的UDF的,如果有刚需,1.11已经支持了[1],文档还在路上,一个简单的例子根据第一个参数来推断返回类型:

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
   return TypeInference.newBuilder()
      .outputTypeStrategy(TypeStrategies.argument(0))
      .build();
}


[1]https://issues.apache.org/jira/browse/FLINK-15487

Best,
Jingsong Lee

On Tue, Jun 9, 2020 at 4:57 PM kcz <[hidden email]> wrote:

> 有道理呀,我直接map就好 。tks。
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"1048262223"<[hidden email]&gt;;
> 发送时间:&nbsp;2020年6月9日(星期二) 下午4:51
> 收件人:&nbsp;"user-zh"<[hidden email]&gt;;
>
> 主题:&nbsp;回复: Flink SQL UDF 动态类型
>
>
>
> Hi
>
>
> 这个可以通过返回一个通用类型比如Map来实现。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> 发件人:&amp;nbsp;"kcz"<[hidden email]&amp;gt;;
> 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:49
> 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;;
>
> 主题:&amp;nbsp;回复: Flink SQL UDF 动态类型
>
>
>
> 动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。
>
>
>
>
> ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
> 发件人:&amp;amp;nbsp;"Benchao Li"<[hidden email]&amp;amp;gt;;
> 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 下午2:47
> 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;;
>
> 主题:&amp;amp;nbsp;Re: Flink SQL UDF 动态类型
>
>
>
> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。
> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。
>
> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求,
> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。
>
> 1048262223 <[hidden email]&amp;amp;gt; 于2020年6月9日周二 下午2:23写道:
>
> &amp;amp;gt; Hi
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt; 如果有理解不对之处,敬请指出。
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt; Best,
> &amp;amp;gt; Yichao Yang
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> ------------------&amp;amp;amp;nbsp;原始邮件&amp;amp;amp;nbsp;------------------
> &amp;amp;gt; 发件人:&amp;amp;amp;nbsp;"forideal"<[hidden email]
> &amp;amp;amp;gt;;
> &amp;amp;gt; 发送时间:&amp;amp;amp;nbsp;2020年6月9日(星期二) 中午1:33
> &amp;amp;gt; 收件人:&amp;amp;amp;nbsp;"user-zh"<[hidden email]
> &amp;amp;amp;gt;;
> &amp;amp;gt;
> &amp;amp;gt; 主题:&amp;amp;amp;nbsp;Flink SQL UDF 动态类型
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt; 你好,我的朋友:
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 我使用的是 Flink 1.10 Blink Planer。
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 为什么我想要这个功能:
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回
> &amp;amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string
> 这样的方式,实现起来又非常多
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 场景2: 我的数据是一个 Json ,问题同上。
> &amp;amp;gt; &amp;amp;amp;nbsp;
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 在场景1中,我改了下 Flink 的源码,在 ScalarFunction
> &amp;amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化
> &amp;amp;gt; @Override
> &amp;amp;gt; public void initialize(LogicalType[] sqlTypes, String[]
> paramNames) {
> &amp;amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema
> 信息,拿到类型信息,这样就可以动态的设置类型
> &amp;amp;gt; }
> &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
> &amp;amp;gt; &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 这个case
> 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?)
> &amp;amp;gt; 这个类型不进行 cast 是无法直接使用的。
> &amp;amp;gt; public class TimestampTest extends ScalarFunction {
> &amp;amp;gt;
> &amp;amp;gt; public Object eval(long timestamp, String pattern, int num) {
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> Timestamp timestamp1 = new
> &amp;amp;gt; Timestamp(timestamp);
> &amp;amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern);
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> if (num < 4) {
> &amp;amp;gt; //返回 STRING 类型
> &amp;amp;gt; return String.valueOf(timestamp);
> &amp;amp;gt; }
> &amp;amp;gt; if (num < 6) {
> &amp;amp;gt; //返回 BIGINT
> &amp;amp;gt; return timestamp - 100;
> &amp;amp;gt; }
> &amp;amp;gt; if (num < 8) {
> &amp;amp;gt; //返回 DOUBLE
> &amp;amp;gt; double ss = 0.9;
> &amp;amp;gt;
> &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
> return
> &amp;amp;gt; (double) timestamp + ss;
> &amp;amp;gt; }
> &amp;amp;gt; //返回 STRING
> &amp;amp;gt; return sdf.format(timestamp1);
> &amp;amp;gt; }
> &amp;amp;gt; }



--
Best, Jingsong Lee