flink sql 函数FIRST_VALUE调用报错

classic Classic list List threaded Threaded
1 message Options
me
Reply | Threaded
Open this post in threaded view
|

flink sql 函数FIRST_VALUE调用报错

me
1. FLINK版本 flink1.11


2. 使用的是useBlinkPlanner


3.执行sql
SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip


4.报错信息
Exception in thread "main" org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.
at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229)
at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67)