Hi:
我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: 分组计算的SQL如下: 在执行计算时,报了如下异常: Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, orderInformationId, userId, categoryId, productId, price, productCount, priceSum, shipAddress, receiverAddress]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.immutable.Range.foreach(Range.scala:155) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 那面对我这样的情况,该用什么方案来解决? 望知道的各位告知一下,感谢! 祝好 |
Administrator
|
Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
你可以使用非 window 聚合来代替。 Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? Best, Jark On Mon, 23 Nov 2020 at 10:28, jy l <[hidden email]> wrote: > Hi: > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: > [image: image.png] > [image: image.png] > 分组计算的SQL如下: > [image: image.png] > 在执行计算时,报了如下异常: > Exception in thread "main" org.apache.flink.table.api.TableException: > GroupWindowAggregate doesn't support consuming update and delete changes > which is produced by node TableSourceScan(table=[[default_catalog, > default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, > orderInformationId, userId, categoryId, productId, price, productCount, > priceSum, shipAddress, receiverAddress]) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.immutable.Range.foreach(Range.scala:155) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 > 那面对我这样的情况,该用什么方案来解决? > 望知道的各位告知一下,感谢! > > 祝好 > > |
Administrator
|
我建了个 issue 跟进这个功能:https://issues.apache.org/jira/browse/FLINK-20281
On Mon, 23 Nov 2020 at 10:35, Jark Wu <[hidden email]> wrote: > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > 你可以使用非 window 聚合来代替。 > > Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? > > Best, > Jark > > On Mon, 23 Nov 2020 at 10:28, jy l <[hidden email]> wrote: > >> Hi: >> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: >> [image: image.png] >> [image: image.png] >> 分组计算的SQL如下: >> [image: image.png] >> 在执行计算时,报了如下异常: >> Exception in thread "main" org.apache.flink.table.api.TableException: >> GroupWindowAggregate doesn't support consuming update and delete changes >> which is produced by node TableSourceScan(table=[[default_catalog, >> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, >> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, >> orderInformationId, userId, categoryId, productId, price, productCount, >> priceSum, shipAddress, receiverAddress]) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> at scala.collection.immutable.Range.foreach(Range.scala:155) >> at scala.collection.TraversableLike.map(TraversableLike.scala:233) >> at scala.collection.TraversableLike.map$(TraversableLike.scala:226) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) >> at >> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) >> >> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 >> 那面对我这样的情况,该用什么方案来解决? >> 望知道的各位告知一下,感谢! >> >> 祝好 >> >> |
In reply to this post by Jark
使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
目前我们的系统大致架构是mysql(debezium)---->kafka--->flink---->es Jark Wu <[hidden email]> 于2020年11月23日周一 上午10:35写道: > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > 你可以使用非 window 聚合来代替。 > > Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? > > Best, > Jark > > On Mon, 23 Nov 2020 at 10:28, jy l <[hidden email]> wrote: > > > Hi: > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: > > [image: image.png] > > [image: image.png] > > 分组计算的SQL如下: > > [image: image.png] > > 在执行计算时,报了如下异常: > > Exception in thread "main" org.apache.flink.table.api.TableException: > > GroupWindowAggregate doesn't support consuming update and delete changes > > which is produced by node TableSourceScan(table=[[default_catalog, > > default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, > > orderInformationId, userId, categoryId, productId, price, productCount, > > priceSum, shipAddress, receiverAddress]) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > > at > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > at scala.collection.immutable.Range.foreach(Range.scala:155) > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > > at > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > > at > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 > > 那面对我这样的情况,该用什么方案来解决? > > 望知道的各位告知一下,感谢! > > > > 祝好 > > > > > |
Administrator
|
那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?
Best, Jark On Mon, 23 Nov 2020 at 13:16, jy l <[hidden email]> wrote: > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。 > 目前我们的系统大致架构是mysql(debezium)---->kafka--->flink---->es > > Jark Wu <[hidden email]> 于2020年11月23日周一 上午10:35写道: > > > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > > 你可以使用非 window 聚合来代替。 > > > > Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? > > > > Best, > > Jark > > > > On Mon, 23 Nov 2020 at 10:28, jy l <[hidden email]> wrote: > > > > > Hi: > > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: > > > [image: image.png] > > > [image: image.png] > > > 分组计算的SQL如下: > > > [image: image.png] > > > 在执行计算时,报了如下异常: > > > Exception in thread "main" org.apache.flink.table.api.TableException: > > > GroupWindowAggregate doesn't support consuming update and delete > changes > > > which is produced by node TableSourceScan(table=[[default_catalog, > > > default_database, t_order, > watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, > > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, > > > orderInformationId, userId, categoryId, productId, price, productCount, > > > priceSum, shipAddress, receiverAddress]) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > > > at > > > > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > > at scala.collection.immutable.Range.foreach(Range.scala:155) > > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > > > at > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > > > at > > > > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > > > > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 > > > 那面对我这样的情况,该用什么方案来解决? > > > 望知道的各位告知一下,感谢! > > > > > > 祝好 > > > > > > > > > |
好的,我试一下。谢谢
Best Jark Wu <[hidden email]> 于2020年11月23日周一 下午2:06写道: > 那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求? > > Best, > Jark > > On Mon, 23 Nov 2020 at 13:16, jy l <[hidden email]> wrote: > > > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。 > > 目前我们的系统大致架构是mysql(debezium)---->kafka--->flink---->es > > > > Jark Wu <[hidden email]> 于2020年11月23日周一 上午10:35写道: > > > > > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。 > > > 你可以使用非 window 聚合来代替。 > > > > > > Btw,你可能说一下你的需求场景么? 为什么需要在 CDC 上做 window 操作呢? > > > > > > Best, > > > Jark > > > > > > On Mon, 23 Nov 2020 at 10:28, jy l <[hidden email]> wrote: > > > > > > > Hi: > > > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下: > > > > [image: image.png] > > > > [image: image.png] > > > > 分组计算的SQL如下: > > > > [image: image.png] > > > > 在执行计算时,报了如下异常: > > > > Exception in thread "main" org.apache.flink.table.api.TableException: > > > > GroupWindowAggregate doesn't support consuming update and delete > > changes > > > > which is produced by node TableSourceScan(table=[[default_catalog, > > > > default_database, t_order, > > watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1, > > > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps, > > > > orderInformationId, userId, categoryId, productId, price, > productCount, > > > > priceSum, shipAddress, receiverAddress]) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > > > > at > > > > > > > > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > > > at scala.collection.immutable.Range.foreach(Range.scala:155) > > > > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326) > > > > at > > > > > > > > > > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325) > > > > at > > > > > > > > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > > > > > > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。 > > > > 那面对我这样的情况,该用什么方案来解决? > > > > 望知道的各位告知一下,感谢! > > > > > > > > 祝好 > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |