1. FLINK版本 flink1.11
2. 使用的是useBlinkPlanner 3.执行sql SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip 4.报错信息 Exception in thread "main" org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static. at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439) at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318) at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273) at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929) at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229) at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67) |
Hi me,
HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。 Best, hailong 在 2020-11-12 17:07:58,"李世钰" <[hidden email]> 写道: >1. FLINK版本 flink1.11 > > > > >2. 使用的是useBlinkPlanner > > > > >3.执行sql > >SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip > > > > >4.报错信息 > >Exception in thread "main" org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static. > > at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439) > > at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318) > > at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273) > > at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474) > > at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) > > at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) > > at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > > at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115) > > at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929) > > at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) > > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) > > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) > > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) > > at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) > > at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) > > at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229) > > at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67) > > > > > > |
Administrator
|
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。
On Thu, 12 Nov 2020 at 20:37, hailongwang <[hidden email]> wrote: > Hi me, > > > HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane > 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value > 不支持 merge。 > > > Best, > hailong > > 在 2020-11-12 17:07:58,"李世钰" <[hidden email]> 写道: > >1. FLINK版本 flink1.11 > > > > > > > > > >2. 使用的是useBlinkPlanner > > > > > > > > > >3.执行sql > > > >SELECT FIRST_VALUE(kafka_table.src_ip) AS > kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS > kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' > SECOND(2), INTERVAL '2' MINUTE(1)) AS > __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), > INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') > AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = > '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' > SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip > > > > > > > > > >4.报错信息 > > > >Exception in thread "main" > org.apache.flink.table.api.ValidationException: Function class > 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' > does not implement at least one method named 'merge' which is public, not > abstract and (in case of table functions) not static. > > > > at > org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439) > > > > at > org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318) > > > > at > org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273) > > > > at > org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474) > > > > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) > > > > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) > > > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > > > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > > > > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115) > > > > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929) > > > > at > org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) > > > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > > > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > > > > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > > > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > > > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) > > > > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > > > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > > > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > > > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) > > > > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) > > > > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) > > > > at > org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) > > > > at > cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229) > > > > at > cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67) > > > > > > > > > > > > > |
已经建了支持 first_value 和 last_value 的 merge 方法 issue[1]。 同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。 @李世钰 <[hidden email]> 感兴趣的话可以认领下哈。 [1] https://issues.apache.org/jira/browse/FLINK-20110 [2] https://issues.apache.org/jira/browse/FLINK-20111 At 2020-11-12 19:51:29, "Jark Wu" <[hidden email]> wrote: >可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。 > >On Thu, 12 Nov 2020 at 20:37, hailongwang <[hidden email]> wrote: > >> Hi me, >> >> >> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane >> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value >> 不支持 merge。 >> >> >> Best, >> hailong >> >> 在 2020-11-12 17:07:58,"李世钰" <[hidden email]> 写道: >> >1. FLINK版本 flink1.11 >> > >> > >> > >> > >> >2. 使用的是useBlinkPlanner >> > >> > >> > >> > >> >3.执行sql >> > >> >SELECT FIRST_VALUE(kafka_table.src_ip) AS >> kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS >> kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' >> SECOND(2), INTERVAL '2' MINUTE(1)) AS >> __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), >> INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') >> AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = >> '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' >> SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip >> > >> > >> > >> > >> >4.报错信息 >> > >> >Exception in thread "main" >> org.apache.flink.table.api.ValidationException: Function class >> 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' >> does not implement at least one method named 'merge' which is public, not >> abstract and (in case of table functions) not static. >> > >> > at >> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439) >> > >> > at >> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318) >> > >> > at >> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273) >> > >> > at >> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474) >> > >> > at >> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) >> > >> > at >> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116) >> > >> > at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> > >> > at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >> > >> > at >> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115) >> > >> > at >> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929) >> > >> > at >> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> > >> > at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) >> > >> > at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) >> > >> > at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) >> > >> > at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> > >> > at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> > >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> > >> > at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> > >> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> > >> > at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> > >> > at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> > >> > at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) >> > >> > at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) >> > >> > at >> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178) >> > >> > at >> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113) >> > >> > at >> cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229) >> > >> > at >> cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67) >> > >> > >> > >> > >> > >> > >> |
Free forum by Nabble | Edit this page |