关于row number over的用法

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

关于row number over的用法

star

文档上还没有更新topN怎么使用,我尝试用row_number() over() 跑了一下,但是报错,请问topN可以是RetractStream吗?




val monthstats = bsTableEnv.sqlQuery(
  """
    |select
    |id,province,amount,
    |row_number() over(partition by id,province order by amount ) as rn
    |from mytable where type=1
    |group by
    |id,province,amount
  """.stripMargin
)
monthstats.toRetractStream[Row].print()
Exception in thread "main" org.apache.flink.table.api.TableException: Retraction on Over window aggregation is not supported yet. Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:178)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
        at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
Reply | Threaded
Open this post in threaded view
|

Re: 关于row number over的用法

Jark
Administrator
Hi

你的 query 并不是 topn 语法。 可以先看这篇文档了解 topn 语法: http://blink.flink-china.org/dev/table/sql.html#topn <http://blink.flink-china.org/dev/table/sql.html#topn>


Best,
Jark


> 在 2019年8月26日,19:02,ddwcg <[hidden email]> 写道:
>
>
> 文档上还没有更新topN怎么使用,我尝试用row_number() over() 跑了一下,但是报错,请问topN可以是RetractStream吗?
>
>
>
>
> val monthstats = bsTableEnv.sqlQuery(
>  """
>    |select
>    |id,province,amount,
>    |row_number() over(partition by id,province order by amount ) as rn
>    |from mytable where type=1
>    |group by
>    |id,province,amount
>  """.stripMargin
> )
> monthstats.toRetractStream[Row].print()
> Exception in thread "main" org.apache.flink.table.api.TableException: Retraction on Over window aggregation is not supported yet. Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:178)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)