computed column转为timestamp类型后进行窗口聚合报错

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

computed column转为timestamp类型后进行窗口聚合报错

jun su
hi all,

flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:

ddl:

CREATE TABLE source(
    occur_time BIGINT,
    rowtime AS longToTimestamp(occur_time)
) WITH ('connector' = 'filesystem','format' = 'orc','path' =
'/path/to/data')

报错信息:

Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input
fields are: [occur_time]
at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
at
org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
at
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
at
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
... 27 more

--
Best,
Jun Su
Reply | Threaded
Open this post in threaded view
|

Re: computed column转为timestamp类型后进行窗口聚合报错

Danny Chan-2
有木有尝试补充 watermark 语法

jun su <[hidden email]> 于2020年12月11日周五 上午10:47写道:

> hi all,
>
> flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
>
> ddl:
>
> CREATE TABLE source(
>     occur_time BIGINT,
>     rowtime AS longToTimestamp(occur_time)
> ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> '/path/to/data')
>
> 报错信息:
>
> Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input
> fields are: [occur_time]
> at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> at
>
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> at
>
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> at
>
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> at
>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> ... 27 more
>
> --
> Best,
> Jun Su
>
Reply | Threaded
Open this post in threaded view
|

Re: computed column转为timestamp类型后进行窗口聚合报错

jun su
hi Danny,
     尝试过是一样报错,debug看了下是LogicalWindowAggregateRuleBase在构建window时没有将Expr信息带下去
, 只带了别名,导致后续优化规则报错退出

Danny Chan <[hidden email]> 于2020年12月11日周五 上午11:47写道:

> 有木有尝试补充 watermark 语法
>
> jun su <[hidden email]> 于2020年12月11日周五 上午10:47写道:
>
> > hi all,
> >
> > flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
> >
> > ddl:
> >
> > CREATE TABLE source(
> >     occur_time BIGINT,
> >     rowtime AS longToTimestamp(occur_time)
> > ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> > '/path/to/data')
> >
> > 报错信息:
> >
> > Caused by: java.lang.IllegalArgumentException: field [$f0] not found;
> input
> > fields are: [occur_time]
> > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> > at
> >
> >
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> > at
> >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> > at
> >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> > at
> >
> >
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> > ... 27 more
> >
> > --
> > Best,
> > Jun Su
> >
>


--
Best,
Jun Su
Reply | Threaded
Open this post in threaded view
|

Re: computed column转为timestamp类型后进行窗口聚合报错

Jark
Administrator
建议将完整的代码展示出来,现在的信息不足以分析问题。

On Fri, 11 Dec 2020 at 11:53, jun su <[hidden email]> wrote:

> hi Danny,
>      尝试过是一样报错,debug看了下是LogicalWindowAggregateRuleBase在构建window时没有将Expr信息带下去
> , 只带了别名,导致后续优化规则报错退出
>
> Danny Chan <[hidden email]> 于2020年12月11日周五 上午11:47写道:
>
> > 有木有尝试补充 watermark 语法
> >
> > jun su <[hidden email]> 于2020年12月11日周五 上午10:47写道:
> >
> > > hi all,
> > >
> > > flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
> > >
> > > ddl:
> > >
> > > CREATE TABLE source(
> > >     occur_time BIGINT,
> > >     rowtime AS longToTimestamp(occur_time)
> > > ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> > > '/path/to/data')
> > >
> > > 报错信息:
> > >
> > > Caused by: java.lang.IllegalArgumentException: field [$f0] not found;
> > input
> > > fields are: [occur_time]
> > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> > > at
> > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> > > at
> > >
> > >
> >
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> > > ... 27 more
> > >
> > > --
> > > Best,
> > > Jun Su
> > >
> >
>
>
> --
> Best,
> Jun Su
>
Reply | Threaded
Open this post in threaded view
|

Re: computed column转为timestamp类型后进行窗口聚合报错

jun su
hi all,
      完整代码如下:


public class WindowAggWithBigintTest {
      public static void main(String[] args) throws Exception {
            EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
            TableEnvironment tEnv = TableEnvironment.create(settings);

            tEnv.registerFunction("longToTimestamp",new LongToTimestamp());

            String ddl = "CREATE TABLE source(occur_time bigint,rowtime AS
longToTimestamp(occur_time)) WITH ('connector' = 'filesystem', 'format' =
'orc', 'path' = '/path/to/orc')";
            tEnv.executeSql(ddl);

            Table table = tEnv.sqlQuery("select TUMBLE_START(rowtime,
INTERVAL '1' HOUR) as ts,count(1) as ct from source group by
TUMBLE(rowtime, INTERVAL '1' HOUR)");

            DiscardingOutputFormat<String> outputFormat = new
DiscardingOutputFormat();
            TableResultSink tableResultSink = new
TableResultSink(table.getSchema(), outputFormat);
            tEnv.registerTableSink("sink",tableResultSink);
            table.insertInto("sink");
            tEnv.execute("test");
      }

      private static class TableResultSink implements
StreamTableSink<String> {
            private final TableSchema schema;
            private final DataType rowType;
            private final OutputFormat<String> outputFormat;

            TableResultSink(TableSchema schema, OutputFormat<String>
outputFormat) {
                  this.schema = schema;
                  this.rowType = schema.toRowDataType();
                  this.outputFormat = outputFormat;
            }

            @Override
            public DataType getConsumedDataType() {
                  return rowType;
            }

            @Override
            public TableSchema getTableSchema() {
                  return schema;
            }

            @Override
            public TableSink<String> configure(String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
                  throw new UnsupportedOperationException(
                              "This sink is configured by passing a static
schema when initiating");
            }

            @Override
            public DataStreamSink<?> consumeDataStream(DataStream<String>
dataStream) {
                  return
dataStream.writeUsingOutputFormat(outputFormat).setParallelism(1).name("tableResult");
            }
      }
}

============================分割线============================

报错如下:

Exception in thread "main" java.lang.RuntimeException: Error while applying
rule BatchExecWindowAggregateRule, args
[rel#264:FlinkLogicalWindowAggregate.LOGICAL.any.[](input=RelSubset#263,group={},ct=COUNT(),window=TumblingGroupWindow('w$,
$f0, 3600000),properties=w$start, w$end, w$rowtime),
rel#250:FlinkLogicalLegacyTableSourceScan.LOGICAL.any.[](table=[default_catalog,
default_database, source, source: [FileSystemTableSource(occur_time,
rowtime)]],fields=occur_time, rowtime)]
      at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
      at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
      at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
      at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
      at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      at
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
      at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org
$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
      at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
      at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
      at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
      at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
      at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
      at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
      at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
      at
example.WindowAggWithBigintTest.main(WindowAggWithBigintTest.java:34)
Caused by: java.lang.IllegalArgumentException: field [$f0] not found; input
fields are: [occur_time]
      at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
      at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
      at
org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
      at
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
      at
org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
      at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
      ... 27 more

Jark Wu <[hidden email]> 于2020年12月11日周五 下午5:25写道:

> 建议将完整的代码展示出来,现在的信息不足以分析问题。
>
> On Fri, 11 Dec 2020 at 11:53, jun su <[hidden email]> wrote:
>
> > hi Danny,
> >
> 尝试过是一样报错,debug看了下是LogicalWindowAggregateRuleBase在构建window时没有将Expr信息带下去
> > , 只带了别名,导致后续优化规则报错退出
> >
> > Danny Chan <[hidden email]> 于2020年12月11日周五 上午11:47写道:
> >
> > > 有木有尝试补充 watermark 语法
> > >
> > > jun su <[hidden email]> 于2020年12月11日周五 上午10:47写道:
> > >
> > > > hi all,
> > > >
> > > > flink 1.11.0版本, 使用computed column将long类型字段转为timestamp后进行时间窗口聚合,报如下错误:
> > > >
> > > > ddl:
> > > >
> > > > CREATE TABLE source(
> > > >     occur_time BIGINT,
> > > >     rowtime AS longToTimestamp(occur_time)
> > > > ) WITH ('connector' = 'filesystem','format' = 'orc','path' =
> > > > '/path/to/data')
> > > >
> > > > 报错信息:
> > > >
> > > > Caused by: java.lang.IllegalArgumentException: field [$f0] not found;
> > > input
> > > > fields are: [occur_time]
> > > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:402)
> > > > at org.apache.calcite.tools.RelBuilder.field(RelBuilder.java:385)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.utils.AggregateUtil$.timeFieldIndex(AggregateUtil.scala:720)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.transformTimeSlidingWindow(BatchExecWindowAggregateRule.scala:161)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecWindowAggregateRule.onMatch(BatchExecWindowAggregateRule.scala:111)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
> > > > ... 27 more
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> >
> >
> > --
> > Best,
> > Jun Su
> >
>


--
Best,
Jun Su