Hi, 请教下
启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下 这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理 DataStreamSource<PerfEvent> source = env.addSource(consumer); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tableEnv.createTemporaryView(table_name, source, $(timeField).rowtime(), $("cpu")); tableEnv.from(table_name).window( Tumble.over(lit(1).minutes()) .on($(timeField)) .as(table_name + "Window") ); tableEnv.executeSql(sql1); // CREATE TABLE t_out (`ts` TIMESTAMP(3), `count` BIGINT) WITH ('connector' = 'print') 没有报错 tableEnv.executeSql(sql2); // INSERT INTO t_out SELECT TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY TUMBLE(`ts`, INTERVAL '1' MINUTE) 抛异常 异常堆栈: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Index: 1, Size: 1 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at java.util.Collections$UnmodifiableList.get(Collections.java:1311) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158) at org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:117) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) ... |
测试出来了, rowtime参数需要是最后一个参数, $(timeField).rowtime()
但是这个报错也太隐晦了吧 . 在 2020-08-30 14:54:15,"RS" <[hidden email]> 写道: >Hi, 请教下 > > >启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下 >这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理 > DataStreamSource<PerfEvent> source = env.addSource(consumer); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > tableEnv.createTemporaryView(table_name, source, $(timeField).rowtime(), $("cpu")); > tableEnv.from(table_name).window( > Tumble.over(lit(1).minutes()) > .on($(timeField)) > .as(table_name + "Window") > ); > tableEnv.executeSql(sql1); // CREATE TABLE t_out (`ts` TIMESTAMP(3), `count` BIGINT) WITH ('connector' = 'print') 没有报错 > tableEnv.executeSql(sql2); // INSERT INTO t_out SELECT TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY TUMBLE(`ts`, INTERVAL '1' MINUTE) 抛异常 > > >异常堆栈: >org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Index: 1, Size: 1 > >at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > >at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > >at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > >at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) > >at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) > >at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) > >at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) > >at java.security.AccessController.doPrivileged(Native Method) > >at javax.security.auth.Subject.doAs(Subject.java:422) > >at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) > >at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > >at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) > >Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 > >at java.util.ArrayList.rangeCheck(ArrayList.java:657) > >at java.util.ArrayList.get(ArrayList.java:433) > >at java.util.Collections$UnmodifiableList.get(Collections.java:1311) > >at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682) > >at org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665) > >at org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561) > >at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184) > >at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158) > >at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > >at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) > >at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) > >at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242) > >at scala.collection.TraversableLike.map(TraversableLike.scala:233) > >at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > >at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242) > >at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158) > >at org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:117) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58) > >at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56) > >at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43) > >at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67) > >at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > >at scala.collection.Iterator.foreach(Iterator.scala:937) > >at scala.collection.Iterator.foreach$(Iterator.scala:937) > >at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > >at scala.collection.IterableLike.foreach(IterableLike.scala:70) > >at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > >at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >at scala.collection.TraversableLike.map(TraversableLike.scala:233) > >at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > >at scala.collection.AbstractTraversable.map(Traversable.scala:104) > >at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) > >at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) > >at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > >at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > >at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) > >at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > >... |
Free forum by Nabble | Edit this page |