flinksql创建源表添加水位线失败

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flinksql创建源表添加水位线失败

丁浩浩
当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
    'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
    'format.type' = 'json'
)
出现错误
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts'
我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts'
,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
在sql-client上执行还是错误,是我缺少什么jar包吗?





Reply | Threaded
Open this post in threaded view
|

Re: flinksql创建源表添加水位线失败

Jark
Administrator
Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。
你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite?

Best,
Jark

On Tue, 24 Mar 2020 at 23:37, flink小猪 <[hidden email]> wrote:

> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
> CREATE TABLE user_behavior (
>     user_id BIGINT,
>     item_id BIGINT,
>     category_id BIGINT,
>     behavior STRING,
>     ts TIMESTAMP(3),
>     proctime as PROCTIME(),
>     WATERMARK FOR ts as ts - INTERVAL '5' SECOND
> ) WITH (
>     'connector.type' = 'kafka',
>     'connector.version' = 'universal',
>     'connector.topic' = 'user_behavior',
>     'connector.startup-mode' = 'earliest-offset',
>     'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
>     'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
>     'format.type' = 'json'
> )
> 出现错误
> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier
> 'ts'
> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
> Unknown identifier 'ts'
> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
> flink-sql-connector-kafka_2.11-1.10.0.jar
> flink-json-1.10.0.jar
> 在sql-client上执行还是错误,是我缺少什么jar包吗?
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: flinksql创建源表添加水位线失败

丁浩浩
感谢您的回复,这是我lib目录下的jar包

flink-dist_2.11-1.10.0.jar  flink-sql-connector-kafka_2.11-1.10.0.jar  flink-table-blink_2.11-1.10.0.jar  slf4j-log4j12-1.7.15.jar

flink-json-1.10.0.jar       flink-table_2.11-1.10.0.jar                log4j-1.2.17.jar
以下是提交任务的异常信息
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: From line 9, column 25 to line 9, column 26: Unknown identifier 'ts'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 9, column 25 to line 9, column 26: Unknown identifier 'ts'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501)
at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at WindowUv$.main(WindowUv.scala:49)
at WindowUv.main(WindowUv.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts'
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 39 more


另外有一个问题是当我把flink-connector-kafkajar包放入lib目录下时,我的项目没有吧flink-connector-kafka打入jar包里,运行时提示我缺少flink-connector-kafka-base包,当我再添加时flink-connector-kafka-base包时
提示我缺少org/apache/kafka/common/serialization/ByteArrayDeserializer,我观察maven依赖是发现,flink-connector-kafka_2.11-1.10.0.jar它依赖着flink-connector-kafka-base和kafka-client包,当我只把flink-connector-kafka加入lib
目录下时,由于缺少它所依赖的包,导入执行不成功。在这里,我想知道一般提交flink任务,是尽量少添加这类jar包,然后通过maven将这些依赖打到项目中去吗?
期待您的回复


















在 2020-03-25 09:56:56,"Jark Wu" <[hidden email]> 写道:

>Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。
>你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite?
>
>Best,
>Jark
>
>On Tue, 24 Mar 2020 at 23:37, flink小猪 <[hidden email]> wrote:
>
>> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
>> CREATE TABLE user_behavior (
>>     user_id BIGINT,
>>     item_id BIGINT,
>>     category_id BIGINT,
>>     behavior STRING,
>>     ts TIMESTAMP(3),
>>     proctime as PROCTIME(),
>>     WATERMARK FOR ts as ts - INTERVAL '5' SECOND
>> ) WITH (
>>     'connector.type' = 'kafka',
>>     'connector.version' = 'universal',
>>     'connector.topic' = 'user_behavior',
>>     'connector.startup-mode' = 'earliest-offset',
>>     'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
>>     'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
>>     'format.type' = 'json'
>> )
>> 出现错误
>> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier
>> 'ts'
>> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
>> Unknown identifier 'ts'
>> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
>> flink-sql-connector-kafka_2.11-1.10.0.jar
>> flink-json-1.10.0.jar
>> 在sql-client上执行还是错误,是我缺少什么jar包吗?
>>
>>
>>
>>
>>
>>