开发者你好:
现有此场景: 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 select > HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > ,sum(amt) as paymoney_h > from XXXX > group by TUMBLE(write_time,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 |
Hi,
能贴一下完整的sql吗,数据源是CDC的数据吗? > 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: > > 开发者你好: > 现有此场景: > 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 > select > >> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > >> ,sum(amt) as paymoney_h > >> from XXXX > >> group by TUMBLE(write_time,interval '1' HOUR); > > > 报错: > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan > > > > > 发现把kafka建表语句改成 json格式就可以 > > > 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 > > > > > > > > > |
原sql
select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from XXXX >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >> |
退订
Regards Alex Wang | BigData Architect Email: [hidden email] > 2020年10月30日 15:12,夜思流年梦 <[hidden email]> 写道: > > 原sql > > select 0 as id > > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime > > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h > > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h > > from dwd_XXX > > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') > > group by TUMBLE(proctime ,interval '1' HOUR); > > > 数据源不是flink-mysql-cdc得来的 > > > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), > > > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XX', > 'topic' = 'ODS_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'canal-json'); > > > 上面这个dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, > 建kafka表的格式,使用的changelog-json: > > > WITH ( > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XXX', > 'topic' = 'DWD_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'changelog-json'); > > > > > > > > > > > > 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道: >> Hi, >> 能贴一下完整的sql吗,数据源是CDC的数据吗? >> >>> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: >>> >>> 开发者你好: >>> 现有此场景: >>> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >>> select >>> >>>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >>> >>>> ,sum(amt) as paymoney_h >>> >>>> from XXXX >>> >>>> group by TUMBLE(write_time,interval '1' HOUR); >>> >>> >>> 报错: >>> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan >>> >>> >>> >>> >>> 发现把kafka建表语句改成 json格式就可以 >>> >>> >>> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >>> >>> >>> >>> >>> >>> >>> >>> >>> |
In reply to this post by admin
这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; 原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from XXXX >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >> |
canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT UPDATE DELETE, 相关代码如下:
@Override public ChangelogMode getChangelogMode() { return ChangelogMode.newBuilder() .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.UPDATE_BEFORE) .addContainedKind(RowKind.UPDATE_AFTER) .addContainedKind(RowKind.DELETE) .build(); } 所以在window里消费带有update和delete的数据现在应该是不支持的。 ________________________________ 发件人: 夜思流年梦 <[hidden email]> 发送时间: 2020年11月3日 9:46 收件人: [hidden email] <[hidden email]> 主题: TUMBLE函数不支持 回撤流 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; 原sql select 0 as id , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then memberid else NULL end) as paynum_h ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by TUMBLE(proctime ,interval '1' HOUR); 报错: org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan 发现把kafka建表语句改成 json格式就可以 数据源不是flink-mysql-cdc得来的 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XX', 'topic' = 'ODS_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'canal-json'); 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, 建kafka表的格式,使用的changelog-json: WITH ( 'connector' = 'kafka', 'properties.group.id' = 'XX', 'properties.bootstrap.servers' = 'XXX', 'topic' = 'DWD_XXX', 'scan.startup.mode' = 'group-offsets', 'format' = 'changelog-json'); 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道: >Hi, >能贴一下完整的sql吗,数据源是CDC的数据吗? > >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: >> >> 开发者你好: >> 现有此场景: >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 >> select >> >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime >> >>> ,sum(amt) as paymoney_h >> >>> from XXXX >> >>> group by TUMBLE(write_time,interval '1' HOUR); >> >> >> 报错: >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan >> >> >> >> >> 发现把kafka建表语句改成 json格式就可以 >> >> >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 >> >> >> >> >> >> >> >> >> |
Hi 夜思流年梦,
看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。 如果是 retract ,应该就不能再上面进行窗口计算了。 Best, LakeShen 史 正超 <[hidden email]> 于2020年11月3日周二 下午6:34写道: > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT > UPDATE DELETE, 相关代码如下: > > @Override > public ChangelogMode getChangelogMode() { > return ChangelogMode.newBuilder() > .addContainedKind(RowKind.INSERT) > .addContainedKind(RowKind.UPDATE_BEFORE) > .addContainedKind(RowKind.UPDATE_AFTER) > .addContainedKind(RowKind.DELETE) > .build(); > } > > 所以在window里消费带有update和delete的数据现在应该是不支持的。 > ________________________________ > 发件人: 夜思流年梦 <[hidden email]> > 发送时间: 2020年11月3日 9:46 > 收件人: [hidden email] <[hidden email]> > 主题: TUMBLE函数不支持 回撤流 > > > > > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; > > > > > > > > 原sql > > select 0 as id > > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime > > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') > then memberid else NULL end) as paynum_h > > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, > 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h > > from dwd_XXX > > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') > > group by TUMBLE(proctime ,interval '1' HOUR); > > > 报错: > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > support consuming update and delete changes which is produced by node > TableSourceScan > 发现把kafka建表语句改成 json格式就可以 > > > 数据源不是flink-mysql-cdc得来的 > > > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), > > > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XX', > 'topic' = 'ODS_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'canal-json'); > > > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, > 建kafka表的格式,使用的changelog-json: > > > WITH ( > 'connector' = 'kafka', > 'properties.group.id' = 'XX', > 'properties.bootstrap.servers' = 'XXX', > 'topic' = 'DWD_XXX', > 'scan.startup.mode' = 'group-offsets', > 'format' = 'changelog-json'); > > > > > > > > > > > > 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道: > >Hi, > >能贴一下完整的sql吗,数据源是CDC的数据吗? > > > >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: > >> > >> 开发者你好: > >> 现有此场景: > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 > >> select > >> > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > >> > >>> ,sum(amt) as paymoney_h > >> > >>> from XXXX > >> > >>> group by TUMBLE(write_time,interval '1' HOUR); > >> > >> > >> 报错: > >> org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > support consuming update and delete changes which is produced by node > TableSourceScan > >> > >> > >> > >> > >> 发现把kafka建表语句改成 json格式就可以 > >> > >> > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 > >> > >> > >> > >> > >> > >> > >> > >> > >> > > > > > > > |
@LakeShen。 怎么看是append/retract数据流呢?是通过逻辑自己判定还是说有什么flink层面的信息直接反映。
LakeShen <[hidden email]> 于2020年11月4日周三 上午10:12写道: > Hi 夜思流年梦, > > 看下你的 dwd_XXX 这张表的类型,是 append 数据流,还是 retract 数据流。 > 如果是 retract ,应该就不能再上面进行窗口计算了。 > > Best, > LakeShen > > 史 正超 <[hidden email]> 于2020年11月3日周二 下午6:34写道: > > > canal-json 的format也是会有delete 和update的数据的,同样changelog-json也是。他们的都支持 INSERT > > UPDATE DELETE, 相关代码如下: > > > > @Override > > public ChangelogMode getChangelogMode() { > > return ChangelogMode.newBuilder() > > .addContainedKind(RowKind.INSERT) > > .addContainedKind(RowKind.UPDATE_BEFORE) > > .addContainedKind(RowKind.UPDATE_AFTER) > > .addContainedKind(RowKind.DELETE) > > .build(); > > } > > > > 所以在window里消费带有update和delete的数据现在应该是不支持的。 > > ________________________________ > > 发件人: 夜思流年梦 <[hidden email]> > > 发送时间: 2020年11月3日 9:46 > > 收件人: [hidden email] <[hidden email]> > > 主题: TUMBLE函数不支持 回撤流 > > > > > > > > > > 这个问题上次给淹没了,就把这个在拿出来问下,看上次admin的回复感觉像是 支持的,不知道是我用法有问题还是flink不支持此特性; > > > > > > > > > > > > > > > > 原sql > > > > select 0 as id > > > > , HOUR(TUMBLE_START(proctime ,interval '1' HOUR)) as ftime > > > > ,count(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') > > then memberid else NULL end) as paynum_h > > > > ,round(sum(case when write_time >= DATE_FORMAT(LOCALTIMESTAMP, > > 'yyyy-MM-dd') then real_product else 0 end)) as paymoney_h > > > > from dwd_XXX > > > > where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') > > > > group by TUMBLE(proctime ,interval '1' HOUR); > > > > > > 报错: > > org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't > > support consuming update and delete changes which is produced by node > > TableSourceScan > > 发现把kafka建表语句改成 json格式就可以 > > > > > > 数据源不是flink-mysql-cdc得来的 > > > > > > 是通过cannal 将binlog 写到kafka ,然后建了一个kafka 表(ods表), > > > > > > 'connector' = 'kafka', > > 'properties.group.id' = 'XX', > > 'properties.bootstrap.servers' = 'XX', > > 'topic' = 'ODS_XXX', > > 'scan.startup.mode' = 'group-offsets', > > 'format' = 'canal-json'); > > > > > > 上面用于查询的dwd_XXX表是基于这个ods表做了一层数据清洗在insert into 进去的, > > 建kafka表的格式,使用的changelog-json: > > > > > > WITH ( > > 'connector' = 'kafka', > > 'properties.group.id' = 'XX', > > 'properties.bootstrap.servers' = 'XXX', > > 'topic' = 'DWD_XXX', > > 'scan.startup.mode' = 'group-offsets', > > 'format' = 'changelog-json'); > > > > > > > > > > > > > > > > > > > > > > > > 在 2020-10-30 14:53:09,"admin" <[hidden email]> 写道: > > >Hi, > > >能贴一下完整的sql吗,数据源是CDC的数据吗? > > > > > >> 2020年10月30日 下午2:48,夜思流年梦 <[hidden email]> 写道: > > >> > > >> 开发者你好: > > >> 现有此场景: > > >> 求每个小时的收入,打算用TUMBLE函数,但是发现不支持 回撤流 > > >> select > > >> > > >>> HOUR(TUMBLE_START(write_time,interval '1' HOUR)) as ftime > > >> > > >>> ,sum(amt) as paymoney_h > > >> > > >>> from XXXX > > >> > > >>> group by TUMBLE(write_time,interval '1' HOUR); > > >> > > >> > > >> 报错: > > >> org.apache.flink.table.api.TableException: GroupWindowAggregate > doesn't > > support consuming update and delete changes which is produced by node > > TableSourceScan > > >> > > >> > > >> > > >> > > >> 发现把kafka建表语句改成 json格式就可以 > > >> > > >> > > >> 现在只能自己group by 时间分区获取统计结果,想用这个分组窗口却不支持回撤流,这么好的功能为啥不支持咧,有没有计划支持的啊 > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |