当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '192.168.1.214:2181', 'connector.properties.bootstrap.servers' = '192.168.1.214:9092', 'format.type' = 'json' ) 出现错误 org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts' 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts' ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了 flink-sql-connector-kafka_2.11-1.10.0.jar flink-json-1.10.0.jar 在sql-client上执行还是错误,是我缺少什么jar包吗? |
Administrator
|
Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。
你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite? Best, Jark On Tue, 24 Mar 2020 at 23:37, flink小猪 <[hidden email]> wrote: > 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章), > CREATE TABLE user_behavior ( > user_id BIGINT, > item_id BIGINT, > category_id BIGINT, > behavior STRING, > ts TIMESTAMP(3), > proctime as PROCTIME(), > WATERMARK FOR ts as ts - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_behavior', > 'connector.startup-mode' = 'earliest-offset', > 'connector.properties.zookeeper.connect' = '192.168.1.214:2181', > 'connector.properties.bootstrap.servers' = '192.168.1.214:9092', > 'format.type' = 'json' > ) > 出现错误 > org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier > 'ts' > 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException: > Unknown identifier 'ts' > ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了 > flink-sql-connector-kafka_2.11-1.10.0.jar > flink-json-1.10.0.jar > 在sql-client上执行还是错误,是我缺少什么jar包吗? > > > > > > |
感谢您的回复,这是我lib目录下的jar包
flink-dist_2.11-1.10.0.jar flink-sql-connector-kafka_2.11-1.10.0.jar flink-table-blink_2.11-1.10.0.jar slf4j-log4j12-1.7.15.jar flink-json-1.10.0.jar flink-table_2.11-1.10.0.jar log4j-1.2.17.jar 以下是提交任务的异常信息 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: From line 9, column 25 to line 9, column 26: Unknown identifier 'ts' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968) Caused by: org.apache.calcite.runtime.CalciteContextException: From line 9, column 25 to line 9, column 26: Unknown identifier 'ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930) at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at WindowUv$.main(WindowUv.scala:49) at WindowUv.main(WindowUv.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) ... 8 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 39 more 另外有一个问题是当我把flink-connector-kafkajar包放入lib目录下时,我的项目没有吧flink-connector-kafka打入jar包里,运行时提示我缺少flink-connector-kafka-base包,当我再添加时flink-connector-kafka-base包时 提示我缺少org/apache/kafka/common/serialization/ByteArrayDeserializer,我观察maven依赖是发现,flink-connector-kafka_2.11-1.10.0.jar它依赖着flink-connector-kafka-base和kafka-client包,当我只把flink-connector-kafka加入lib 目录下时,由于缺少它所依赖的包,导入执行不成功。在这里,我想知道一般提交flink任务,是尽量少添加这类jar包,然后通过maven将这些依赖打到项目中去吗? 期待您的回复 在 2020-03-25 09:56:56,"Jark Wu" <[hidden email]> 写道: >Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。 >你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite? > >Best, >Jark > >On Tue, 24 Mar 2020 at 23:37, flink小猪 <[hidden email]> wrote: > >> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章), >> CREATE TABLE user_behavior ( >> user_id BIGINT, >> item_id BIGINT, >> category_id BIGINT, >> behavior STRING, >> ts TIMESTAMP(3), >> proctime as PROCTIME(), >> WATERMARK FOR ts as ts - INTERVAL '5' SECOND >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'user_behavior', >> 'connector.startup-mode' = 'earliest-offset', >> 'connector.properties.zookeeper.connect' = '192.168.1.214:2181', >> 'connector.properties.bootstrap.servers' = '192.168.1.214:9092', >> 'format.type' = 'json' >> ) >> 出现错误 >> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier >> 'ts' >> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException: >> Unknown identifier 'ts' >> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了 >> flink-sql-connector-kafka_2.11-1.10.0.jar >> flink-json-1.10.0.jar >> 在sql-client上执行还是错误,是我缺少什么jar包吗? >> >> >> >> >> >> |
Free forum by Nabble | Edit this page |