flink sql 函数FIRST_VALUE调用报错

classic Classic list List threaded Threaded
4 messages Options
me
Reply | Threaded
Open this post in threaded view
|

flink sql 函数FIRST_VALUE调用报错

me
1. FLINK版本 flink1.11




2. 使用的是useBlinkPlanner




3.执行sql

SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip




4.报错信息

Exception in thread "main" org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.

        at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)

        at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)

        at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)

        at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)

        at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)

        at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)

        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

        at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)

        at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)

        at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)

        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)

        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)

        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)

        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)

        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

        at scala.collection.Iterator$class.foreach(Iterator.scala:891)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

        at scala.collection.AbstractTraversable.map(Traversable.scala:104)

        at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)

        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)

        at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)

        at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)

        at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229)

        at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67)



 

 
Reply | Threaded
Open this post in threaded view
|

Re:flink sql 函数FIRST_VALUE调用报错

hailongwang
Hi me,


HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。


Best,
hailong

在 2020-11-12 17:07:58,"李世钰" <[hidden email]> 写道:

>1. FLINK版本 flink1.11
>
>
>
>
>2. 使用的是useBlinkPlanner
>
>
>
>
>3.执行sql
>
>SELECT FIRST_VALUE(kafka_table.src_ip) AS kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',') AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip = '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30' SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip
>
>
>
>
>4.报错信息
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.
>
> at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
>
> at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
>
> at org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
>
> at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
>
> at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>
> at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>
> at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
>
> at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
>
> at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
>
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
> at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>
> at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>
> at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>
> at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229)
>
> at cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67)
>
>
>
>
>
>&nbsp;
Reply | Threaded
Open this post in threaded view
|

Re: flink sql 函数FIRST_VALUE调用报错

Jark
Administrator
可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。

On Thu, 12 Nov 2020 at 20:37, hailongwang <[hidden email]> wrote:

> Hi me,
>
>
> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value
> 不支持 merge。
>
>
> Best,
> hailong
>
> 在 2020-11-12 17:07:58,"李世钰" <[hidden email]> 写道:
> >1. FLINK版本 flink1.11
> >
> >
> >
> >
> >2. 使用的是useBlinkPlanner
> >
> >
> >
> >
> >3.执行sql
> >
> >SELECT FIRST_VALUE(kafka_table.src_ip) AS
> kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
> kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
> SECOND(2), INTERVAL '2' MINUTE(1)) AS
> __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2),
> INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',')
> AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip =
> '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30'
> SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip
> >
> >
> >
> >
> >4.报错信息
> >
> >Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Function class
> 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
> does not implement at least one method named 'merge' which is public, not
> abstract and (in case of table functions) not static.
> >
> >       at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
> >
> >       at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
> >
> >       at
> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
> >
> >       at
> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
> >
> >       at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
> >
> >       at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
> >
> >       at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> >
> >       at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> >
> >       at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
> >
> >       at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
> >
> >       at
> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >
> >       at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> >
> >       at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> >
> >       at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> >
> >       at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >
> >       at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >
> >       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >
> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >
> >       at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >
> >       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >
> >       at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >
> >       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >
> >       at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
> >
> >       at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
> >
> >       at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
> >
> >       at
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
> >
> >       at
> cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229)
> >
> >       at
> cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67)
> >
> >
> >
> >
> >
> >&nbsp;
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flink sql 函数FIRST_VALUE调用报错

hailongwang


已经建了支持  first_value 和 last_value 的 merge 方法 issue[1]。
同时也建了个 issue 来修改 自定义 UDAF 需要 merge 方法的描述文档,可以增加一个 hop window [2]。
@李世钰 <[hidden email]> 感兴趣的话可以认领下哈。


[1] https://issues.apache.org/jira/browse/FLINK-20110
[2] https://issues.apache.org/jira/browse/FLINK-20111



At 2020-11-12 19:51:29, "Jark Wu" <[hidden email]> wrote:

>可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。
>
>On Thu, 12 Nov 2020 at 20:37, hailongwang <[hidden email]> wrote:
>
>> Hi me,
>>
>>
>> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
>> 进行合并,故也需要调用 UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value
>> 不支持 merge。
>>
>>
>> Best,
>> hailong
>>
>> 在 2020-11-12 17:07:58,"李世钰" <[hidden email]> 写道:
>> >1. FLINK版本 flink1.11
>> >
>> >
>> >
>> >
>> >2. 使用的是useBlinkPlanner
>> >
>> >
>> >
>> >
>> >3.执行sql
>> >
>> >SELECT FIRST_VALUE(kafka_table.src_ip) AS
>> kafka_table_src_ip_0,FIRST_VALUE(kafka_table.dest_ip) AS
>> kafka_table_dest_ip_0,HOP_START(kafka_table.process_time, INTERVAL '30'
>> SECOND(2), INTERVAL '2' MINUTE(1)) AS
>> __window_start,HOP_END(kafka_table.process_time, INTERVAL '30' SECOND(2),
>> INTERVAL '2' MINUTE(1)) AS __window_end,CONCAT_AGG(kafka_table.log_id, ',')
>> AS __origin_log_id FROM kafka_table WHERE kafka_table.dest_ip =
>> '25.25.205.14' GROUP BY HOP(kafka_table.process_time, INTERVAL '30'
>> SECOND(2), INTERVAL '2' MINUTE(1)),kafka_table.src_ip
>> >
>> >
>> >
>> >
>> >4.报错信息
>> >
>> >Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Function class
>> 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction.StringFirstValueAggFunction'
>> does not implement at least one method named 'merge' which is public, not
>> abstract and (in case of table functions) not static.
>> >
>> >       at
>> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:439)
>> >
>> >       at
>> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
>> >
>> >       at
>> org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
>> >
>> >       at
>> org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
>> >
>> >       at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>> >
>> >       at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1116)
>> >
>> >       at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >
>> >       at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> >
>> >       at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1115)
>> >
>> >       at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
>> >
>> >       at
>> org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:244)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:157)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> >
>> >       at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
>> >
>> >       at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>> >
>> >       at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>> >
>> >       at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >
>> >       at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >
>> >       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >
>> >       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >
>> >       at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >
>> >       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >
>> >       at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >
>> >       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >
>> >       at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>> >
>> >       at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>> >
>> >       at
>> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:178)
>> >
>> >       at
>> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:113)
>> >
>> >       at
>> cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.realTimeSqlExec(SqlPlatformRealTimeV2.scala:229)
>> >
>> >       at
>> cn.chaitin.sqlplatform.main.SqlPlatformRealTimeV2$.main(SqlPlatformRealTimeV2.scala:67)
>> >
>> >
>> >
>> >
>> >
>> >&nbsp;
>>