FlinkSQL 是否支持类似临时中间表的概念

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL 是否支持类似临时中间表的概念

WeiXubin
Hi,
我希望通过FlinkSQL的方式在一个Job中完成两步的操作,但似乎办不到,情况大致如下:


eg.有一个ETL过程,需要从Source获取数据--将每条数据拆分为一条多列数据--对拆分完的数据开窗聚合--输出到sink。
//从Source获取数据
CREATE TABLE sourceTable (
  request_uri STRING
) WITH (
   ..........
);


//这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做)
CREATE TABLE tempTable (
  row1 STRING,
  row2 STRING,
)
Insert into tempTable   select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )....


//最后从 tempTable 表中获取数据并进行开窗做聚合操作
CREATE TABLE sinkTable (
  row1 STRING,
)
INSERT INTO sinkTable SELECT .., SUM(unit) AS unitSum from tempTable GROUP BY TUMBLE(rowtime,INTERVAL '30' SECOND), ...


以上是大致情况描述,我希望能在一个Job中一次性完成以上数据处理,而不分成两个Job,不知是否有好的解决方案?
Thank!

Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是否支持类似临时中间表的概念

Leonard Xu
Hi,

> 在 2020年6月23日,10:49,Weixubin <[hidden email]> 写道:
>
> //这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做)


看着描述应该是源数据中的一行拆成多行。这个需求是不是用 VIEW 就可以了[1]?Flink SQL 支持 VIEW 语法的[1]。

Best,
Leonard Xu

[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view>
Reply | Threaded
Open this post in threaded view
|

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

WeiXubin



感谢,我查阅了下资料后发现CREATE VIEW这个语法是在Flink.1.12有提及而1.10版本没有 ,1.12版本暂未发行, 而我目前使用的版本是1.10版本。
而且很奇怪,我并没有找到1.11版本的文档














在 2020-06-23 10:58:25,"Leonard Xu" <[hidden email]> 写道:

>Hi,
>
>> 在 2020年6月23日,10:49,Weixubin <[hidden email]> 写道:
>>
>> //这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做)
>
>
>看着描述应该是源数据中的一行拆成多行。这个需求是不是用 VIEW 就可以了[1]?Flink SQL 支持 VIEW 语法的[1]。
>
>Best,
>Leonard Xu
>
>[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是否支持类似临时中间表的概念

Leonard Xu
Hi,
是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 分支上的版本号为1.12-SNAPSHOT
,等1.11版本发布了就可以看到对应的文档。

回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。


祝好,
Leonard Xu
Reply | Threaded
Open this post in threaded view
|

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

WeiXubin



 Hi,
关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了”
我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。
Thanks,
Bin










在 2020-06-23 11:57:28,"Leonard Xu" <[hidden email]> 写道:

>Hi,
>是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 分支上的版本号为1.12-SNAPSHOT
>,等1.11版本发布了就可以看到对应的文档。
>
>回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>
>
>祝好,
>Leonard Xu
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是否支持类似临时中间表的概念

Leonard Xu
Hi
我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 的这段sql是可以复用的,就和 VIEW的作用类似。

如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, 插入最终的结果表就能满足需求了吧。

祝好,
Leonard Xu


> 在 2020年6月23日,15:21,Weixubin <[hidden email]> 写道:
>
>
>
>
> Hi,
> 关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了”
> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。
> Thanks,
> Bin
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-23 11:57:28,"Leonard Xu" <[hidden email]> 写道:
>> Hi,
>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 分支上的版本号为1.12-SNAPSHOT
>> ,等1.11版本发布了就可以看到对应的文档。
>>
>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>>
>>
>> 祝好,
>> Leonard Xu

Reply | Threaded
Open this post in threaded view
|

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

WeiXubin



感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。


我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。


//如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
select
..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
 from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·


//如果应用到source,则一开始并不知道heart_time 的值
CREATE TABLE sourceTable (
  request_uri STRING
..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ( ....... );


只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
Thanks
Bin

在 2020-06-23 15:28:50,"Leonard Xu" <[hidden email]> 写道:

>Hi
>我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 的这段sql是可以复用的,就和 VIEW的作用类似。
>
>如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, 插入最终的结果表就能满足需求了吧。
>
>祝好,
>Leonard Xu
>
>
>> 在 2020年6月23日,15:21,Weixubin <[hidden email]> 写道:
>>
>>
>>
>>
>> Hi,
>> 关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了”
>> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。
>> Thanks,
>> Bin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-23 11:57:28,"Leonard Xu" <[hidden email]> 写道:
>>> Hi,
>>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 分支上的版本号为1.12-SNAPSHOT
>>> ,等1.11版本发布了就可以看到对应的文档。
>>>
>>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>>>
>>>
>>> 祝好,
>>> Leonard Xu
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: FlinkSQL 是否支持类似临时中间表的概念

Jark
Administrator
你可以在 DDL 中直接用计算列去从 request_uri 里获得 heart_time 哈,然后在这个计算列上定义 watermark 即可。
例如:

CREATE TABLE sourceTable (
  request_uri STRING,
  heart_time AS my_parse(request_uri),
  WATERMARK FOR heart_time AS heart_time - INTERVAL '1' SECOND
) WITH ( ....... );

虽然这会导致重复解析两遍。


Best,
Jark

On Wed, 24 Jun 2020 at 12:09, Weixubin <[hidden email]> wrote:

>
>
>
> 感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。
>
>
> 我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
> 第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。
>
>
> //如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
> select
> ..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>  from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·
>
>
> //如果应用到source,则一开始并不知道heart_time 的值
> CREATE TABLE sourceTable (
>   request_uri STRING
> ..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
> ) WITH ( ....... );
>
>
> 只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
> Thanks
> Bin
>
> 在 2020-06-23 15:28:50,"Leonard Xu" <[hidden email]> 写道:
> >Hi
> >我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select
> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`
> 的这段sql是可以复用的,就和 VIEW的作用类似。
> >
> >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable ,
> LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by,
> 插入最终的结果表就能满足需求了吧。
> >
> >祝好,
> >Leonard Xu
> >
> >
> >> 在 2020年6月23日,15:21,Weixubin <[hidden email]> 写道:
> >>
> >>
> >>
> >>
> >> Hi,
> >> 关于这句 “把 ` select * from sourceTable , LATERAL
> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和
> group后再写入最终结果表就可以了”
> >> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误?
> 可否简单举个例子。
> >> Thanks,
> >> Bin
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-06-23 11:57:28,"Leonard Xu" <[hidden email]> 写道:
> >>> Hi,
> >>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master
> 分支上的版本号为1.12-SNAPSHOT
> >>> ,等1.11版本发布了就可以看到对应的文档。
> >>>
> >>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10
> 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL
> TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和
> group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
> >>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
> >>>
> >>>
> >>> 祝好,
> >>> Leonard Xu
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是否支持类似临时中间表的概念

Leonard Xu
In reply to this post by WeiXubin
Hello,

你的需求其实是要 抽取记录的字段定义watermark, 这个只能放到source 表的DDL中,view上也不支持的。
1.10里的计算列 + udf 应该就可以满足你的需求, 大概长这样:

CREATE TABLE sourceTable (
 request_uri STRING,
 ts as extractTsUdf(request_uri),
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

) WITH (
  ..........
);

select ... from (
select ts, T.* from
sourceTable  sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T(...)
) t
group by TUMBLE(ts, INTERVAL '30' SECOND)

祝好,
Leonard


> 在 2020年6月24日,12:09,Weixubin <[hidden email]> 写道:
>
> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。

Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: FlinkSQL 是否支持类似临时中间表的概念

cxydevelop@163.com
In reply to this post by Jark
你好,请问下,my_parse是个udf吧
然后有没有什么操作可以使用udtf解析出多个字段 , 这些字段直接就是source表的字段 , 然后选出时间字段定义 watermark ,
类似如下
CREATE TABLE sourceTable(
request_uri STRING,
(column_1,column_2,heart_time) as udtf_parse(request_uri)
)with(......);
哈哈,不知道有没有这样的语法











在 2020-06-24 12:24:46,"Jark Wu" <[hidden email]> 写道:

>你可以在 DDL 中直接用计算列去从 request_uri 里获得 heart_time 哈,然后在这个计算列上定义 watermark 即可。
>例如:
>
>CREATE TABLE sourceTable (
>  request_uri STRING,
>  heart_time AS my_parse(request_uri),
>  WATERMARK FOR heart_time AS heart_time - INTERVAL '1' SECOND
>) WITH ( ....... );
>
>虽然这会导致重复解析两遍。
>
>
>Best,
>Jark
>
>On Wed, 24 Jun 2020 at 12:09, Weixubin <[hidden email]> wrote:
>
>>
>>
>>
>> 感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。
>>
>>
>> 我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>> 第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。
>>
>>
>> //如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
>> select
>> ..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
>> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>>  from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·
>>
>>
>> //如果应用到source,则一开始并不知道heart_time 的值
>> CREATE TABLE sourceTable (
>>   request_uri STRING
>> ..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
>> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>> ) WITH ( ....... );
>>
>>
>> 只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
>> Thanks
>> Bin
>>
>> 在 2020-06-23 15:28:50,"Leonard Xu" <[hidden email]> 写道:
>> >Hi
>> >我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select
>> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`
>> 的这段sql是可以复用的,就和 VIEW的作用类似。
>> >
>> >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable ,
>> LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by,
>> 插入最终的结果表就能满足需求了吧。
>> >
>> >祝好,
>> >Leonard Xu
>> >
>> >
>> >> 在 2020年6月23日,15:21,Weixubin <[hidden email]> 写道:
>> >>
>> >>
>> >>
>> >>
>> >> Hi,
>> >> 关于这句 “把 ` select * from sourceTable , LATERAL
>> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和
>> group后再写入最终结果表就可以了”
>> >> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误?
>> 可否简单举个例子。
>> >> Thanks,
>> >> Bin
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-23 11:57:28,"Leonard Xu" <[hidden email]> 写道:
>> >>> Hi,
>> >>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master
>> 分支上的版本号为1.12-SNAPSHOT
>> >>> ,等1.11版本发布了就可以看到对应的文档。
>> >>>
>> >>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10
>> 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL
>> TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和
>> group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>> >>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>> >>>
>> >>>
>> >>> 祝好,
>> >>> Leonard Xu
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re:Re: FlinkSQL 是否支持类似临时中间表的概念

WeiXubin
In reply to this post by Leonard Xu



感谢 Leonard Xu 与 Jark 两位,已成功解决满足需求!
对于 chenxuying 所提出的问题,我也很感兴趣。
由于使用UDF重复解析两遍,不知是否有更好的替代方法
Thanks
Bin














在 2020-06-24 12:32:27,"Leonard Xu" <[hidden email]> 写道:

>Hello,
>
>你的需求其实是要 抽取记录的字段定义watermark, 这个只能放到source 表的DDL中,view上也不支持的。
>1.10里的计算列 + udf 应该就可以满足你的需求, 大概长这样:
>
>CREATE TABLE sourceTable (
> request_uri STRING,
> ts as extractTsUdf(request_uri),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>
>) WITH (
>  ..........
>);
>
>select ... from (
>select ts, T.* from
>sourceTable  sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T(...)
>) t
>group by TUMBLE(ts, INTERVAL '30' SECOND)
>
>祝好,
>Leonard
>
>
>> 在 2020年6月24日,12:09,Weixubin <[hidden email]> 写道:
>>
>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>
Reply | Threaded
Open this post in threaded view
|

回复: Re: FlinkSQL 是否支持类似临时中间表的概念

wangweiguang@stevegame.cn
In reply to this post by WeiXubin

根据你的需求描述,用Flink Table API和SQL是可以解决的!



 
发件人: Weixubin
发送时间: 2020-06-24 12:09
收件人: user-zh
主题: Re:Re: FlinkSQL 是否支持类似临时中间表的概念
 
 
 
感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。
 
 
我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。
 
 
//如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
select
..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….・
 
 
//如果应用到source,则一开始并不知道heart_time 的值
CREATE TABLE sourceTable (
  request_uri STRING
..ts as TO_TIMESTAMP(heart_time,'yyyy-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ( ....... );
 
 
只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
Thanks
Bin
 
在 2020-06-23 15:28:50,"Leonard Xu" <[hidden email]> 写道:

>Hi
>我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 的这段sql是可以复用的,就和 VIEW的作用类似。
>
>如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….・,外层是 group by, 插入最终的结果表就能满足需求了吧。
>
>祝好,
>Leonard Xu
>
>
>> 在 2020年6月23日,15:21,Weixubin <[hidden email]> 写道:
>>
>>
>>
>>
>> Hi,
>> 关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了”
>> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。
>> Thanks,
>> Bin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-23 11:57:28,"Leonard Xu" <[hidden email]> 写道:
>>> Hi,
>>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 分支上的版本号为1.12-SNAPSHOT
>>> ,等1.11版本发布了就可以看到对应的文档。
>>>
>>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>>>
>>>
>>> 祝好,
>>> Leonard Xu
>
Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL 是否支持类似临时中间表的概念

Leonard Xu
In reply to this post by WeiXubin
Hi
Chenxuying 是想在计算列中使用udtf, 现在的计算列只支持udf, 不支持udtf, 目前还做不到的。

祝好,
Leonard Xu

> 在 2020年6月24日,18:06,Weixubin <[hidden email]> 写道:
>
>
>
>
> 感谢 Leonard Xu 与 Jark 两位,已成功解决满足需求!
> 对于 chenxuying 所提出的问题,我也很感兴趣。
> 由于使用UDF重复解析两遍,不知是否有更好的替代方法
> Thanks
> Bin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-24 12:32:27,"Leonard Xu" <[hidden email]> 写道:
>> Hello,
>>
>> 你的需求其实是要 抽取记录的字段定义watermark, 这个只能放到source 表的DDL中,view上也不支持的。
>> 1.10里的计算列 + udf 应该就可以满足你的需求, 大概长这样:
>>
>> CREATE TABLE sourceTable (
>> request_uri STRING,
>> ts as extractTsUdf(request_uri),
>> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>>
>> ) WITH (
>> ..........
>> );
>>
>> select ... from (
>> select ts, T.* from
>> sourceTable  sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T(...)
>> ) t
>> group by TUMBLE(ts, INTERVAL '30' SECOND)
>>
>> 祝好,
>> Leonard
>>
>>
>>> 在 2020年6月24日,12:09,Weixubin <[hidden email]> 写道:
>>>
>>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>>