你好,我的朋友:
我使用的是 Flink 1.10 Blink Planer。 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 为什么我想要这个功能: 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 场景2: 我的数据是一个 Json ,问题同上。 在场景1中,我改了下 Flink 的源码,在 ScalarFunction 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 @Override public void initialize(LogicalType[] sqlTypes, String[] paramNames) { // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 } 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) 这个类型不进行 cast 是无法直接使用的。 public class TimestampTest extends ScalarFunction { public Object eval(long timestamp, String pattern, int num) { Timestamp timestamp1 = new Timestamp(timestamp); SimpleDateFormat sdf = new SimpleDateFormat(pattern); if (num < 4) { //返回 STRING 类型 return String.valueOf(timestamp); } if (num < 6) { //返回 BIGINT return timestamp - 100; } if (num < 8) { //返回 DOUBLE double ss = 0.9; return (double) timestamp + ss; } //返回 STRING return sdf.format(timestamp1); } } |
Hi
我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 如果有理解不对之处,敬请指出。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "forideal"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 中午1:33 收件人: "user-zh"<[hidden email]>; 主题: Flink SQL UDF 动态类型 你好,我的朋友: 我使用的是 Flink 1.10 Blink Planer。 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 为什么我想要这个功能: 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 场景2: 我的数据是一个 Json ,问题同上。 在场景1中,我改了下 Flink 的源码,在 ScalarFunction 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 @Override public void initialize(LogicalType[] sqlTypes, String[] paramNames) { // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 } 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) 这个类型不进行 cast 是无法直接使用的。 public class TimestampTest extends ScalarFunction { public Object eval(long timestamp, String pattern, int num) { Timestamp timestamp1 = new Timestamp(timestamp); SimpleDateFormat sdf = new SimpleDateFormat(pattern); if (num < 4) { //返回 STRING 类型 return String.valueOf(timestamp); } if (num < 6) { //返回 BIGINT return timestamp - 100; } if (num < 8) { //返回 DOUBLE double ss = 0.9; return (double) timestamp + ss; } //返回 STRING return sdf.format(timestamp1); } } |
我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。
我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道: > Hi > > > > 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 > > > > 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 > > > 如果有理解不对之处,敬请指出。 > > > Best, > Yichao Yang > > > > > ------------------ 原始邮件 ------------------ > 发件人: "forideal"<[hidden email]>; > 发送时间: 2020年6月9日(星期二) 中午1:33 > 收件人: "user-zh"<[hidden email]>; > > 主题: Flink SQL UDF 动态类型 > > > > 你好,我的朋友: > > > 我使用的是 Flink 1.10 Blink Planer。 > 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 > > > 为什么我想要这个功能: > 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 > 场景2: 我的数据是一个 Json ,问题同上。 > > 在场景1中,我改了下 Flink 的源码,在 ScalarFunction > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 > @Override > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 > } > 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 > 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) > 这个类型不进行 cast 是无法直接使用的。 > public class TimestampTest extends ScalarFunction { > > public Object eval(long timestamp, String pattern, int num) { > Timestamp timestamp1 = new > Timestamp(timestamp); > SimpleDateFormat sdf = new SimpleDateFormat(pattern); > if (num < 4) { > //返回 STRING 类型 > return String.valueOf(timestamp); > } > if (num < 6) { > //返回 BIGINT > return timestamp - 100; > } > if (num < 8) { > //返回 DOUBLE > double ss = 0.9; > return > (double) timestamp + ss; > } > //返回 STRING > return sdf.format(timestamp1); > } > } |
Administrator
|
+1 to support pb format.
On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]> wrote: > 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 > 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 > 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 > > 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, > 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 > > 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道: > > > Hi > > > > > > > > > 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 > > > > > > > > > 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 > > > > > > 如果有理解不对之处,敬请指出。 > > > > > > Best, > > Yichao Yang > > > > > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: "forideal"<[hidden email]>; > > 发送时间: 2020年6月9日(星期二) 中午1:33 > > 收件人: "user-zh"<[hidden email]>; > > > > 主题: Flink SQL UDF 动态类型 > > > > > > > > 你好,我的朋友: > > > > > > 我使用的是 Flink 1.10 Blink Planer。 > > 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 > > > > > > 为什么我想要这个功能: > > 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 > > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 > > 场景2: 我的数据是一个 Json ,问题同上。 > > > > 在场景1中,我改了下 Flink 的源码,在 ScalarFunction > > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 > > @Override > > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { > > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 > > } > > 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 > > 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) > > 这个类型不进行 cast 是无法直接使用的。 > > public class TimestampTest extends ScalarFunction { > > > > public Object eval(long timestamp, String pattern, int num) { > > Timestamp timestamp1 = new > > Timestamp(timestamp); > > SimpleDateFormat sdf = new SimpleDateFormat(pattern); > > if (num < 4) { > > //返回 STRING 类型 > > return String.valueOf(timestamp); > > } > > if (num < 6) { > > //返回 BIGINT > > return timestamp - 100; > > } > > if (num < 8) { > > //返回 DOUBLE > > double ss = 0.9; > > return > > (double) timestamp + ss; > > } > > //返回 STRING > > return sdf.format(timestamp1); > > } > > } > |
Hi
+1,各位大佬,其实我自己已经通过参考avro,json等format实现基于pb实现了一个flink-protobuf解析的formats,git地址如下 https://github.com/yangyichao-mango/flink-protobuf 之后我会持续关注社区关于pb format的实现。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "Jark Wu"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 下午2:49 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink SQL UDF 动态类型 +1 to support pb format. On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]> wrote: > 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 > 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 > 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 > > 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, > 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 > > 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道: > > > Hi > > > > > > > > > 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 > > > > > > > > > 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 > > > > > > 如果有理解不对之处,敬请指出。 > > > > > > Best, > > Yichao Yang > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人:&nbsp;"forideal"<[hidden email]&gt;; > > 发送时间:&nbsp;2020年6月9日(星期二) 中午1:33 > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > 主题:&nbsp;Flink SQL UDF 动态类型 > > > > > > > > 你好,我的朋友: > > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink 1.10 Blink Planer。 > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 > > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能: > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 > > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 > > &nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。 > > &nbsp; > > &nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction > > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 > > @Override > > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { > > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 > > } > > &nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 > > &nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) > > 这个类型不进行 cast 是无法直接使用的。 > > public class TimestampTest extends ScalarFunction { > > > > public Object eval(long timestamp, String pattern, int num) { > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new > > Timestamp(timestamp); > > SimpleDateFormat sdf = new SimpleDateFormat(pattern); > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) { > > //返回 STRING 类型 > > return String.valueOf(timestamp); > > } > > if (num < 6) { > > //返回 BIGINT > > return timestamp - 100; > > } > > if (num < 8) { > > //返回 DOUBLE > > double ss = 0.9; > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > > (double) timestamp + ss; > > } > > //返回 STRING > > return sdf.format(timestamp1); > > } > > } > |
FYI: issue[1] 已经建好了,各位感兴趣的可以关注一下,也非常欢迎参与设计和实现~
[1] https://issues.apache.org/jira/browse/FLINK-18202 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:54写道: > Hi > > > > +1,各位大佬,其实我自己已经通过参考avro,json等format实现基于pb实现了一个flink-protobuf解析的formats,git地址如下 > https://github.com/yangyichao-mango/flink-protobuf > 之后我会持续关注社区关于pb format的实现。 > > > Best, > Yichao Yang > > > > > ------------------ 原始邮件 ------------------ > 发件人: "Jark Wu"<[hidden email]>; > 发送时间: 2020年6月9日(星期二) 下午2:49 > 收件人: "user-zh"<[hidden email]>; > > 主题: Re: Flink SQL UDF 动态类型 > > > > +1 to support pb format. > > On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]> wrote: > > > 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 > > 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 > > 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 > > > > 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, > > 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 > > > > 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道: > > > > > Hi > > > > > > > > > > > > > > > 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 > > > > > > > > > > > > > > > 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 > > > > > > > > > 如果有理解不对之处,敬请指出。 > > > > > > > > > Best, > > > Yichao Yang > > > > > > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > > 发件人:&nbsp;"forideal"<[hidden email]&gt;; > > > 发送时间:&nbsp;2020年6月9日(星期二) 中午1:33 > > > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > > > > > 主题:&nbsp;Flink SQL UDF 动态类型 > > > > > > > > > > > > 你好,我的朋友: > > > > > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink > 1.10 Blink Planer。 > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink > UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 > > > > > > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能: > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 > pb 的 bytes,我想从里面获取数据,如果统一的返回 > > > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string > 这样的方式,实现起来又非常多 > > > &nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。 > > > &nbsp; > > > &nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 > ScalarFunction > > > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 > > > @Override > > > public void initialize(LogicalType[] sqlTypes, String[] > paramNames) { > > > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema > 信息,拿到类型信息,这样就可以动态的设置类型 > > > } > > > &nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround > 了一段时间,目前依然work。只是有些不是那么优雅。 > > > &nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 > RAW('java.lang.Object', ?) > > > 这个类型不进行 cast 是无法直接使用的。 > > > public class TimestampTest extends ScalarFunction { > > > > > > public Object eval(long timestamp, String pattern, int num) { > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > Timestamp timestamp1 = new > > > Timestamp(timestamp); > > > SimpleDateFormat sdf = new SimpleDateFormat(pattern); > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if > (num < 4) { > > > //返回 STRING 类型 > > > return String.valueOf(timestamp); > > > } > > > if (num < 6) { > > > //返回 BIGINT > > > return timestamp - 100; > > > } > > > if (num < 8) { > > > //返回 DOUBLE > > > double ss = 0.9; > > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; > return > > > (double) timestamp + ss; > > > } > > > //返回 STRING > > > return sdf.format(timestamp1); > > > } > > > } > > |
In reply to this post by Jark
+1 to support pb format
如果能支持 pb format 那简直太好了,实际了我们也自己搞了一个 pb format。大概的方法也是在外边做了一个对应的service,这个service保存了一个jar,在进行parse byte 的时候,采用了urlclassload+反射调用 parse 类型的方法。 同时也尝试过使用 dynamic message 的方式,这个方式更轻量一些,但是,性能差强人意。 在 2020-06-09 14:49:02,"Jark Wu" <[hidden email]> 写道: >+1 to support pb format. > >On Tue, 9 Jun 2020 at 14:47, Benchao Li <[hidden email]> wrote: > >> 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 >> 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 >> 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 >> >> 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, >> 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 >> >> 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道: >> >> > Hi >> > >> > >> > >> > >> 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 >> > >> > >> > >> > >> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 >> > >> > >> > 如果有理解不对之处,敬请指出。 >> > >> > >> > Best, >> > Yichao Yang >> > >> > >> > >> > >> > ------------------ 原始邮件 ------------------ >> > 发件人: "forideal"<[hidden email]>; >> > 发送时间: 2020年6月9日(星期二) 中午1:33 >> > 收件人: "user-zh"<[hidden email]>; >> > >> > 主题: Flink SQL UDF 动态类型 >> > >> > >> > >> > 你好,我的朋友: >> > >> > >> > 我使用的是 Flink 1.10 Blink Planer。 >> > 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 >> > >> > >> > 为什么我想要这个功能: >> > 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 >> > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 >> > 场景2: 我的数据是一个 Json ,问题同上。 >> > >> > 在场景1中,我改了下 Flink 的源码,在 ScalarFunction >> > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 >> > @Override >> > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { >> > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 >> > } >> > 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 >> > 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) >> > 这个类型不进行 cast 是无法直接使用的。 >> > public class TimestampTest extends ScalarFunction { >> > >> > public Object eval(long timestamp, String pattern, int num) { >> > Timestamp timestamp1 = new >> > Timestamp(timestamp); >> > SimpleDateFormat sdf = new SimpleDateFormat(pattern); >> > if (num < 4) { >> > //返回 STRING 类型 >> > return String.valueOf(timestamp); >> > } >> > if (num < 6) { >> > //返回 BIGINT >> > return timestamp - 100; >> > } >> > if (num < 8) { >> > //返回 DOUBLE >> > double ss = 0.9; >> > return >> > (double) timestamp + ss; >> > } >> > //返回 STRING >> > return sdf.format(timestamp1); >> > } >> > } >> |
In reply to this post by Benchao Li-2
动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。
------------------ 原始邮件 ------------------ 发件人: "Benchao Li"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 下午2:47 收件人: "user-zh"<[hidden email]>; 主题: Re: Flink SQL UDF 动态类型 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 1048262223 <[hidden email]> 于2020年6月9日周二 下午2:23写道: > Hi > > > > 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 > > > > 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 > > > 如果有理解不对之处,敬请指出。 > > > Best, > Yichao Yang > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"forideal"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 中午1:33 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;Flink SQL UDF 动态类型 > > > > 你好,我的朋友: > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我使用的是 Flink 1.10 Blink Planer。 > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 > > > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 为什么我想要这个功能: > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 > string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 > &nbsp;&nbsp;&nbsp;&nbsp; 场景2: 我的数据是一个 Json ,问题同上。 > &nbsp; > &nbsp;&nbsp;&nbsp;&nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction > 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 > @Override > public void initialize(LogicalType[] sqlTypes, String[] paramNames) { > // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 > } > &nbsp;&nbsp;&nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 > &nbsp;&nbsp;&nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) > 这个类型不进行 cast 是无法直接使用的。 > public class TimestampTest extends ScalarFunction { > > public Object eval(long timestamp, String pattern, int num) { > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Timestamp timestamp1 = new > Timestamp(timestamp); > SimpleDateFormat sdf = new SimpleDateFormat(pattern); > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if (num < 4) { > //返回 STRING 类型 > return String.valueOf(timestamp); > } > if (num < 6) { > //返回 BIGINT > return timestamp - 100; > } > if (num < 8) { > //返回 DOUBLE > double ss = 0.9; > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return > (double) timestamp + ss; > } > //返回 STRING > return sdf.format(timestamp1); > } > } |
Hi
这个可以通过返回一个通用类型比如Map来实现。 Best, Yichao Yang ------------------ 原始邮件 ------------------ 发件人: "kcz"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 下午4:49 收件人: "user-zh"<[hidden email]>; 主题: 回复: Flink SQL UDF 动态类型 动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。 ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"Benchao Li"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月9日(星期二) 下午2:47 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;Re: Flink SQL UDF 动态类型 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 1048262223 <[hidden email]&gt; 于2020年6月9日周二 下午2:23写道: &gt; Hi &gt; &gt; &gt; &gt; 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 &gt; &gt; &gt; &gt; 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 &gt; &gt; &gt; 如果有理解不对之处,敬请指出。 &gt; &gt; &gt; Best, &gt; Yichao Yang &gt; &gt; &gt; &gt; &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ &gt; 发件人:&amp;nbsp;"forideal"<[hidden email]&amp;gt;; &gt; 发送时间:&amp;nbsp;2020年6月9日(星期二) 中午1:33 &gt; 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; &gt; &gt; 主题:&amp;nbsp;Flink SQL UDF 动态类型 &gt; &gt; &gt; &gt; 你好,我的朋友: &gt; &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我使用的是 Flink 1.10 Blink Planer。 &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 &gt; &gt; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 为什么我想要这个功能: &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 &gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。 &gt; &amp;nbsp; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction &gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 &gt; @Override &gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) { &gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 &gt; } &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) &gt; 这个类型不进行 cast 是无法直接使用的。 &gt; public class TimestampTest extends ScalarFunction { &gt; &gt; public Object eval(long timestamp, String pattern, int num) { &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; Timestamp timestamp1 = new &gt; Timestamp(timestamp); &gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern); &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; if (num < 4) { &gt; //返回 STRING 类型 &gt; return String.valueOf(timestamp); &gt; } &gt; if (num < 6) { &gt; //返回 BIGINT &gt; return timestamp - 100; &gt; } &gt; if (num < 8) { &gt; //返回 DOUBLE &gt; double ss = 0.9; &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; return &gt; (double) timestamp + ss; &gt; } &gt; //返回 STRING &gt; return sdf.format(timestamp1); &gt; } &gt; } |
有道理呀,我直接map就好 。tks。
------------------ 原始邮件 ------------------ 发件人: "1048262223"<[hidden email]>; 发送时间: 2020年6月9日(星期二) 下午4:51 收件人: "user-zh"<[hidden email]>; 主题: 回复: Flink SQL UDF 动态类型 Hi 这个可以通过返回一个通用类型比如Map来实现。 Best, Yichao Yang ------------------&nbsp;原始邮件&nbsp;------------------ 发件人:&nbsp;"kcz"<[hidden email]&gt;; 发送时间:&nbsp;2020年6月9日(星期二) 下午4:49 收件人:&nbsp;"user-zh"<[hidden email]&gt;; 主题:&nbsp;回复: Flink SQL UDF 动态类型 动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。 ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;; 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午2:47 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; 主题:&amp;nbsp;Re: Flink SQL UDF 动态类型 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 1048262223 <[hidden email]&amp;gt; 于2020年6月9日周二 下午2:23写道: &amp;gt; Hi &amp;gt; &amp;gt; &amp;gt; &amp;gt; 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 &amp;gt; &amp;gt; &amp;gt; &amp;gt; 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 &amp;gt; &amp;gt; &amp;gt; 如果有理解不对之处,敬请指出。 &amp;gt; &amp;gt; &amp;gt; Best, &amp;gt; Yichao Yang &amp;gt; &amp;gt; &amp;gt; &amp;gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------ &amp;gt; 发件人:&amp;amp;nbsp;"forideal"<[hidden email]&amp;amp;gt;; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 中午1:33 &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email]&amp;amp;gt;; &amp;gt; &amp;gt; 主题:&amp;amp;nbsp;Flink SQL UDF 动态类型 &amp;gt; &amp;gt; &amp;gt; &amp;gt; 你好,我的朋友: &amp;gt; &amp;gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 我使用的是 Flink 1.10 Blink Planer。 &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 &amp;gt; &amp;gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 为什么我想要这个功能: &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 &amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 场景2: 我的数据是一个 Json ,问题同上。 &amp;gt; &amp;amp;nbsp; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 在场景1中,我改了下 Flink 的源码,在 ScalarFunction &amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 &amp;gt; @Override &amp;gt; public void initialize(LogicalType[] sqlTypes, String[] paramNames) { &amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型 &amp;gt; } &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) &amp;gt; 这个类型不进行 cast 是无法直接使用的。 &amp;gt; public class TimestampTest extends ScalarFunction { &amp;gt; &amp;gt; public Object eval(long timestamp, String pattern, int num) { &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; Timestamp timestamp1 = new &amp;gt; Timestamp(timestamp); &amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern); &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; if (num < 4) { &amp;gt; //返回 STRING 类型 &amp;gt; return String.valueOf(timestamp); &amp;gt; } &amp;gt; if (num < 6) { &amp;gt; //返回 BIGINT &amp;gt; return timestamp - 100; &amp;gt; } &amp;gt; if (num < 8) { &amp;gt; //返回 DOUBLE &amp;gt; double ss = 0.9; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; return &amp;gt; (double) timestamp + ss; &amp;gt; } &amp;gt; //返回 STRING &amp;gt; return sdf.format(timestamp1); &amp;gt; } &amp;gt; } |
Hi all,
业务上一般是可以避免动态类型的UDF的,如果有刚需,1.11已经支持了[1],文档还在路上,一个简单的例子根据第一个参数来推断返回类型: @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInference.newBuilder() .outputTypeStrategy(TypeStrategies.argument(0)) .build(); } [1]https://issues.apache.org/jira/browse/FLINK-15487 Best, Jingsong Lee On Tue, Jun 9, 2020 at 4:57 PM kcz <[hidden email]> wrote: > 有道理呀,我直接map就好 。tks。 > > > > > ------------------ 原始邮件 ------------------ > 发件人: "1048262223"<[hidden email]>; > 发送时间: 2020年6月9日(星期二) 下午4:51 > 收件人: "user-zh"<[hidden email]>; > > 主题: 回复: Flink SQL UDF 动态类型 > > > > Hi > > > 这个可以通过返回一个通用类型比如Map来实现。 > > > Best, > Yichao Yang > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"kcz"<[hidden email]&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 下午4:49 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;回复: Flink SQL UDF 动态类型 > > > > 动态类型这个我们其实也挺需要的,比如我写一个列转行的udf,这个时候我需要确认返回的字段个数以及返回的类型,如果增加字段等,就需要更多udf来实现。 > > > > > ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------ > 发件人:&amp;nbsp;"Benchao Li"<[hidden email]&amp;gt;; > 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午2:47 > 收件人:&amp;nbsp;"user-zh"<[hidden email]&amp;gt;; > > 主题:&amp;nbsp;Re: Flink SQL UDF 动态类型 > > > > 我也感觉这种场景可以用一个pb format来搞比较好一些,其实我们内部也是这么用的。 > 我们的format实现也是拿到pb编译后的class,根据这个class推导出来table的schema。 > 这个过程主要是需要定义清楚pb的所有类型到flink类型的映射关系。 > > 之前也考虑过在社区讨论下是否有必要支持一下pb format。如果你们都有这个诉求, > 我们可以先建一个jira,然后再jira里面讨论具体的需求和实现。 > > 1048262223 <[hidden email]&amp;gt; 于2020年6月9日周二 下午2:23写道: > > &amp;gt; Hi > &amp;gt; > &amp;gt; > &amp;gt; > &amp;gt; > 我们是用到了pb作为源数据的,我们的方法是在程序初始化的时候根据pb的schema(descriptor)解析出TypeInformation,然后再env.addSource().returns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 > &amp;gt; > &amp;gt; > &amp;gt; > &amp;gt; > 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 > &amp;gt; > &amp;gt; > &amp;gt; 如果有理解不对之处,敬请指出。 > &amp;gt; > &amp;gt; > &amp;gt; Best, > &amp;gt; Yichao Yang > &amp;gt; > &amp;gt; > &amp;gt; > &amp;gt; > &amp;gt; > ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------ > &amp;gt; 发件人:&amp;amp;nbsp;"forideal"<[hidden email] > &amp;amp;gt;; > &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月9日(星期二) 中午1:33 > &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<[hidden email] > &amp;amp;gt;; > &amp;gt; > &amp;gt; 主题:&amp;amp;nbsp;Flink SQL UDF 动态类型 > &amp;gt; > &amp;gt; > &amp;gt; > &amp;gt; 你好,我的朋友: > &amp;gt; > &amp;gt; > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 我使用的是 Flink 1.10 Blink Planer。 > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 > &amp;gt; > &amp;gt; > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 为什么我想要这个功能: > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 > &amp;gt; string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string > 这样的方式,实现起来又非常多 > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 场景2: 我的数据是一个 Json ,问题同上。 > &amp;gt; &amp;amp;nbsp; > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 在场景1中,我改了下 Flink 的源码,在 ScalarFunction > &amp;gt; 中加了一个初始化方法,在Flink 初始化 scalar function 的时候,进行相关的初始化 > &amp;gt; @Override > &amp;gt; public void initialize(LogicalType[] sqlTypes, String[] > paramNames) { > &amp;gt; // 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema > 信息,拿到类型信息,这样就可以动态的设置类型 > &amp;gt; } > &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > 这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。 > &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 这个case > 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) > &amp;gt; 这个类型不进行 cast 是无法直接使用的。 > &amp;gt; public class TimestampTest extends ScalarFunction { > &amp;gt; > &amp;gt; public Object eval(long timestamp, String pattern, int num) { > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > Timestamp timestamp1 = new > &amp;gt; Timestamp(timestamp); > &amp;gt; SimpleDateFormat sdf = new SimpleDateFormat(pattern); > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > if (num < 4) { > &amp;gt; //返回 STRING 类型 > &amp;gt; return String.valueOf(timestamp); > &amp;gt; } > &amp;gt; if (num < 6) { > &amp;gt; //返回 BIGINT > &amp;gt; return timestamp - 100; > &amp;gt; } > &amp;gt; if (num < 8) { > &amp;gt; //返回 DOUBLE > &amp;gt; double ss = 0.9; > &amp;gt; > &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; > return > &amp;gt; (double) timestamp + ss; > &amp;gt; } > &amp;gt; //返回 STRING > &amp;gt; return sdf.format(timestamp1); > &amp;gt; } > &amp;gt; } -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |