DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

云长
hello,
使用版本 Flink 1.10
部分业务数据kafka数据存在字段`@timestamp`,
DDL如下:
CREATE TABLE kafkaSrc(
    `@timestamp` TIMESTAMP(3)
    ,domain VARCHAR
    ,proctime AS proctime()
 )WITH(
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = ’topic',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.group.id' = ‘id',
    'connector.properties.zookeeper.connect' = ‘xxx',
    'connector.properties.bootstrap.servers' = ‘xxx',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
 );

如果DML为:
insert into MyResult
select
    ,`@timestamp`
    ,domain
    ,proctime
from
    kafkaSrc
;

则会提示SQL解析失败,异常如下:
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after : ""
        at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
        at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)

Debug发现:
https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52

函数:public SqlNode parse(String sql)入参sql的value是:SELECT @timestamp,domain,PROCTIME() FROM __temp_table__

图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png

另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助

Best,
Oliver 云长

Reply | Threaded
Open this post in threaded view
|

Re: DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

Benchao Li
Hi,

这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。

[1] https://issues.apache.org/jira/browse/FLINK-16068

oliver <[hidden email]> 于2020年5月22日周五 下午5:38写道:

> hello,
> 使用版本 Flink 1.10
> 部分业务数据kafka数据存在字段`@timestamp`,
> DDL如下:
> CREATE TABLE kafkaSrc(
>     `@timestamp` TIMESTAMP(3)
>     ,domain VARCHAR
>     ,proctime AS proctime()
>  )WITH(
>     'connector.type' = 'kafka',
>     'connector.version' = 'universal',
>     'connector.topic' = ’topic',
>     'connector.startup-mode' = 'latest-offset',
>     'connector.properties.group.id' = ‘id',
>     'connector.properties.zookeeper.connect' = ‘xxx',
>     'connector.properties.bootstrap.servers' = ‘xxx',
>     'format.type' = 'json',
>     'format.derive-schema' = 'true'
>  );
>
> 如果DML为:
> insert into MyResult
> select
>     ,`@timestamp`
>     ,domain
>     ,proctime
> from
>     kafkaSrc
> ;
>
> 则会提示SQL解析失败,异常如下:
> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
> failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after :
> ""
>         at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>         at
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>         at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>         at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>         at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>         at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>         at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>         at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>
> Debug发现:
>
> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52
>
> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT
> @timestamp,domain,PROCTIME() FROM __temp_table__
>
> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png
>
> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助
>
> Best,
> Oliver 云长
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: DDL定义@timestamp字段问题提示org.apache.flink.table.api.SqlParserException

云长
谢谢 Benchao
是的,1.10.0,我升级到1.10.1试试

> 2020年5月22日 下午6:48,Benchao Li <[hidden email]> 写道:
>
> Hi,
>
> 这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16068
>
> oliver <[hidden email]> 于2020年5月22日周五 下午5:38写道:
>
>> hello,
>> 使用版本 Flink 1.10
>> 部分业务数据kafka数据存在字段`@timestamp`,
>> DDL如下:
>> CREATE TABLE kafkaSrc(
>>    `@timestamp` TIMESTAMP(3)
>>    ,domain VARCHAR
>>    ,proctime AS proctime()
>> )WITH(
>>    'connector.type' = 'kafka',
>>    'connector.version' = 'universal',
>>    'connector.topic' = ’topic',
>>    'connector.startup-mode' = 'latest-offset',
>>    'connector.properties.group.id' = ‘id',
>>    'connector.properties.zookeeper.connect' = ‘xxx',
>>    'connector.properties.bootstrap.servers' = ‘xxx',
>>    'format.type' = 'json',
>>    'format.derive-schema' = 'true'
>> );
>>
>> 如果DML为:
>> insert into MyResult
>> select
>>    ,`@timestamp`
>>    ,domain
>>    ,proctime
>> from
>>    kafkaSrc
>> ;
>>
>> 则会提示SQL解析失败,异常如下:
>> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse
>> failed. Lexical error at line 1, column 8.  Encountered: "@" (64), after :
>> ""
>>        at
>> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
>>        at
>> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79)
>>        at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
>>        at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
>>        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
>>        at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
>>        at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
>>        at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
>>        at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
>>        at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>>        at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>>        at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>>        at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>>
>> Debug发现:
>>
>> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52
>>
>> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT
>> @timestamp,domain,PROCTIME() FROM __temp_table__
>>
>> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png
>>
>> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助
>>
>> Best,
>> Oliver 云长
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [hidden email]; [hidden email]
>