TUMBLE函数不支持 回撤流

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

TUMBLE函数不支持 回撤流

夜思流年梦
开发者你好:
现有此场景:
求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
select

> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime

> ,sum(amt) as paymoney_h  

> from XXXX

> group by TUMBLE(write_time,interval '1' HOUR);


报错:
org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan




发现把kafka建表语句改成 json格式就可以


现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊









Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE函数不支持 回撤流

admin
Hi,
能贴一下完整的sql吗,数据源是CDC的数据吗?

> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
>
> 开发者你好:
> 现有此场景:
> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> select
>
>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>
>> ,sum(amt) as paymoney_h  
>
>> from XXXX
>
>> group by TUMBLE(write_time,interval '1' HOUR);
>
>
> 报错:
> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>
>
>
>
> 发现把kafka建表语句改成 json格式就可以
>
>
> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>
>
>
>
>
>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: TUMBLE函数不支持 回撤流

夜思流年梦
原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道:

>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h  
>>
>>> from XXXX
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE函数不支持 回撤流

wangleigis@163.com
退订


Regards
Alex Wang | BigData Architect
Email: [hidden email]

> 2020年10月30日 15:12,夜思流年梦 <[hidden email]> 写道:
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h  
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
> 'connector' = 'kafka',
>  'properties.group.id' = 'XX',
>  'properties.bootstrap.servers' = 'XX',
>  'topic' = 'ODS_XXX',  
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'canal-json');
>
>
> 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
>  'connector' = 'kafka',
>  'properties.group.id' = 'XX',
>  'properties.bootstrap.servers' = 'XXX',
>  'topic' = 'DWD_XXX',  
>  'scan.startup.mode' = 'group-offsets',
>  'format' = 'changelog-json');  
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道:
>> Hi,
>> 能贴一下完整的sql吗,数据源是CDC的数据吗?
>>
>>> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
>>>
>>> 开发者你好:
>>> 现有此场景:
>>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>>> select
>>>
>>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>>
>>>> ,sum(amt) as paymoney_h  
>>>
>>>> from XXXX
>>>
>>>> group by TUMBLE(write_time,interval '1' HOUR);
>>>
>>>
>>> 报错:
>>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>>>
>>>
>>>
>>>
>>> 发现把kafka建表语句改成 json格式就可以
>>>
>>>
>>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Reply | Threaded
Open this post in threaded view
|

TUMBLE函数不支持 回撤流

夜思流年梦
In reply to this post by admin



这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h  

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',  
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');  











在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道:

>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h  
>>
>>> from XXXX
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>





 
Reply | Threaded
Open this post in threaded view
|

回复: TUMBLE函数不支持 回撤流

史 正超
canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT UPDATE DELETE, 相关代码如下:

@Override
public ChangelogMode getChangelogMode() {
   return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.UPDATE_BEFORE)
      .addContainedKind(RowKind.UPDATE_AFTER)
      .addContainedKind(RowKind.DELETE)
      .build();
}

所以在window里消费带有update和delete的数据现在应该是不支持的。
________________________________
发件人: 夜思流年梦 <[hidden email]>
发送时间: 2020年11月3日 9:46
收件人: [hidden email] <[hidden email]>
主题: TUMBLE函数不支持 回撤流




这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;







原sql

select 0 as id

, HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime

,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h

,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h

from dwd_XXX

where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')

group by TUMBLE(proctime ,interval '1' HOUR);


报错:
 org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
发现把kafka建表语句改成 json格式就可以


数据源不是flink-mysql-cdc得来的


是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),


 'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XX',
  'topic' = 'ODS_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'canal-json');


上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
建kafka表的格式,使用的changelog-json:


WITH (
  'connector' = 'kafka',
  'properties.group.id' = 'XX',
  'properties.bootstrap.servers' = 'XXX',
  'topic' = 'DWD_XXX',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json');











在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道:

>Hi,
>能贴一下完整的sql吗,数据源是CDC的数据吗?
>
>> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
>>
>> 开发者你好:
>> 现有此场景:
>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
>> select
>>
>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
>>
>>> ,sum(amt) as paymoney_h
>>
>>> from XXXX
>>
>>> group by TUMBLE(write_time,interval '1' HOUR);
>>
>>
>> 报错:
>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan
>>
>>
>>
>>
>> 发现把kafka建表语句改成 json格式就可以
>>
>>
>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
>>
>>
>>
>>
>>
>>
>>
>>
>>






Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE函数不支持 回撤流

LakeShen
Hi 夜思流年梦,

    看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
    如果是 retract ,应该就不能再上面进行窗口计算了。

Best,
LakeShen

史 正超 <[hidden email]> 于2020年11月3日周二 下午6:34写道:

> canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> UPDATE DELETE, 相关代码如下:
>
> @Override
> public ChangelogMode getChangelogMode() {
>    return ChangelogMode.newBuilder()
>       .addContainedKind(RowKind.INSERT)
>       .addContainedKind(RowKind.UPDATE_BEFORE)
>       .addContainedKind(RowKind.UPDATE_AFTER)
>       .addContainedKind(RowKind.DELETE)
>       .build();
> }
>
> 所以在window里消费带有update和delete的数据现在应该是不支持的。
> ________________________________
> 发件人: 夜思流年梦 <[hidden email]>
> 发送时间: 2020年11月3日 9:46
> 收件人: [hidden email] <[hidden email]>
> 主题: TUMBLE函数不支持 回撤流
>
>
>
>
> 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
>
>
>
>
>
>
>
> 原sql
>
> select 0 as id
>
> , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
>
> ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> then memberid else NULL end) as paynum_h
>
> ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h
>
> from dwd_XXX
>
> where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
>
> group by TUMBLE(proctime ,interval '1' HOUR);
>
>
> 报错:
>  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> 发现把kafka建表语句改成 json格式就可以
>
>
> 数据源不是flink-mysql-cdc得来的
>
>
> 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
>
>
>  'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XX',
>   'topic' = 'ODS_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'canal-json');
>
>
> 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> 建kafka表的格式,使用的changelog-json:
>
>
> WITH (
>   'connector' = 'kafka',
>   'properties.group.id' = 'XX',
>   'properties.bootstrap.servers' = 'XXX',
>   'topic' = 'DWD_XXX',
>   'scan.startup.mode' = 'group-offsets',
>   'format' = 'changelog-json');
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道:
> >Hi,
> >能贴一下完整的sql吗,数据源是CDC的数据吗?
> >
> >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
> >>
> >> 开发者你好:
> >> 现有此场景:
> >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> >> select
> >>
> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> >>
> >>> ,sum(amt) as paymoney_h
> >>
> >>> from XXXX
> >>
> >>> group by TUMBLE(write_time,interval '1' HOUR);
> >>
> >>
> >> 报错:
> >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> support consuming update and delete changes which is produced by node
> TableSourceScan
> >>
> >>
> >>
> >>
> >> 发现把kafka建表语句改成 json格式就可以
> >>
> >>
> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: TUMBLE函数不支持 回撤流

nobleyd
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。

LakeShen <[hidden email]> 于2020年11月4日周三 上午10:12写道:

> Hi 夜思流年梦,
>
>     看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。
>     如果是 retract ,应该就不能再上面进行窗口计算了。
>
> Best,
> LakeShen
>
> 史 正超 <[hidden email]> 于2020年11月3日周二 下午6:34写道:
>
> > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT
> > UPDATE DELETE, 相关代码如下:
> >
> > @Override
> > public ChangelogMode getChangelogMode() {
> >    return ChangelogMode.newBuilder()
> >       .addContainedKind(RowKind.INSERT)
> >       .addContainedKind(RowKind.UPDATE_BEFORE)
> >       .addContainedKind(RowKind.UPDATE_AFTER)
> >       .addContainedKind(RowKind.DELETE)
> >       .build();
> > }
> >
> > 所以在window里消费带有update和delete的数据现在应该是不支持的。
> > ________________________________
> > 发件人: 夜思流年梦 <[hidden email]>
> > 发送时间: 2020年11月3日 9:46
> > 收件人: [hidden email] <[hidden email]>
> > 主题: TUMBLE函数不支持 回撤流
> >
> >
> >
> >
> > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性;
> >
> >
> >
> >
> >
> >
> >
> > 原sql
> >
> > select 0 as id
> >
> > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime
> >
> > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> > then memberid else NULL end) as paynum_h
> >
> > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP,
> > 'yyyy-MM-dd')  then real_product else 0 end)) as paymoney_h
> >
> > from dwd_XXX
> >
> > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd')
> >
> > group by TUMBLE(proctime ,interval '1' HOUR);
> >
> >
> > 报错:
> >  org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > 发现把kafka建表语句改成 json格式就可以
> >
> >
> > 数据源不是flink-mysql-cdc得来的
> >
> >
> > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表),
> >
> >
> >  'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XX',
> >   'topic' = 'ODS_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'canal-json');
> >
> >
> > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的,
> > 建kafka表的格式,使用的changelog-json:
> >
> >
> > WITH (
> >   'connector' = 'kafka',
> >   'properties.group.id' = 'XX',
> >   'properties.bootstrap.servers' = 'XXX',
> >   'topic' = 'DWD_XXX',
> >   'scan.startup.mode' = 'group-offsets',
> >   'format' = 'changelog-json');
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道:
> > >Hi,
> > >能贴一下完整的sql吗,数据源是CDC的数据吗?
> > >
> > >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道:
> > >>
> > >> 开发者你好:
> > >> 现有此场景:
> > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流
> > >> select
> > >>
> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime
> > >>
> > >>> ,sum(amt) as paymoney_h
> > >>
> > >>> from XXXX
> > >>
> > >>> group by TUMBLE(write_time,interval '1' HOUR);
> > >>
> > >>
> > >> 报错:
> > >> org.apache.flink.table.api.TableException: GroupWindowAggregate
> doesn't
> > support consuming update and delete changes which is produced by node
> > TableSourceScan
> > >>
> > >>
> > >>
> > >>
> > >> 发现把kafka建表语句改成 json格式就可以
> > >>
> > >>
> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> >
> >
> >
> >
> >
> >
> >
>