使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 createTemporaryTable
为事件时间,程序包 Field null does not exist 错误,是我用法有问题? 看了下 https://issues.apache.org/jira/browse/FLINK-16160 <https://issues.apache.org/jira/browse/FLINK-16160> 这个 issue 是解决的这个问题吗? tableEnv.connect(kafka) .withSchema( new Schema().field("searchTime", DataTypes.TIMESTAMP()).rowtime(rowtime); ) .withFormat( new Json().failOnMissingField(false) ) .createTemporaryTable("tablename"); -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
能贴下完整的异常栈么?
Btw,TableEnvironment上的 connect API 目前不建议使用,有许多已知的问题和缺失的 feature,建议用 executeSql(ddl) 来替代。 社区计划在 1.12 中系统地重构和修复 connect API 。 Best, Jark On Mon, 13 Jul 2020 at 17:24, Hito Zhu <[hidden email]> wrote: > 使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定 > createTemporaryTable > 为事件时间,程序包 Field null does not exist 错误,是我用法有问题? > 看了下 https://issues.apache.org/jira/browse/FLINK-16160 > <https://issues.apache.org/jira/browse/FLINK-16160> 这个 issue 是解决的这个问题吗? > > tableEnv.connect(kafka) > .withSchema( > new Schema().field("searchTime", > DataTypes.TIMESTAMP()).rowtime(rowtime); > ) > .withFormat( > new Json().failOnMissingField(false) > ) > .createTemporaryTable("tablename"); > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Hi Jark 异常信息如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field null does not exist at org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85) at java.util.OptionalInt.orElseThrow(OptionalInt.java:189) at org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85) at org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73) at org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65) at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:321) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:297) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:288) -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
你的源码中 new
Schema().field("searchTime",DataTypes.TIMESTAMP()).rowtime(rowtime); 里面的 rowtime 的定义能贴下吗? On Mon, 13 Jul 2020 at 20:53, Hito Zhu <[hidden email]> wrote: > Hi Jark 异常信息如下: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field null does not exist > at > > org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85) > at java.util.OptionalInt.orElseThrow(OptionalInt.java:189) > at > > org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85) > at > > org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at > > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) > at > > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > at > java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) > at > > org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73) > at > > org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65) > at > > org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118) > at scala.Option.map(Option.scala:146) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) > at > > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:321) > at > > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:297) > at > > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:288) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
Rowtime rowtime = new Rowtime() .timestampsFromField("searchTime") .watermarksPeriodicBounded(5 * 1000); -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Administrator
|
这可能是 connect API 的某个 bug 吧。 建议先用 DDL 。
Best, Jark On Tue, 14 Jul 2020 at 08:54, Hito Zhu <[hidden email]> wrote: > rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。 > Rowtime rowtime = new Rowtime() > .timestampsFromField("searchTime") > .watermarksPeriodicBounded(5 * 1000); > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |