hello,
使用版本 Flink 1.10 部分业务数据kafka数据存在字段`@timestamp`, DDL如下: CREATE TABLE kafkaSrc( `@timestamp` TIMESTAMP(3) ,domain VARCHAR ,proctime AS proctime() )WITH( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = ’topic', 'connector.startup-mode' = 'latest-offset', 'connector.properties.group.id' = ‘id', 'connector.properties.zookeeper.connect' = ‘xxx', 'connector.properties.bootstrap.servers' = ‘xxx', 'format.type' = 'json', 'format.derive-schema' = 'true' ); 如果DML为: insert into MyResult select ,`@timestamp` ,domain ,proctime from kafkaSrc ; 则会提示SQL解析失败,异常如下: Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Lexical error at line 1, column 8. Encountered: "@" (64), after : "" at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) at org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) Debug发现: https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52 函数:public SqlNode parse(String sql)入参sql的value是:SELECT @timestamp,domain,PROCTIME() FROM __temp_table__ 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助 Best, Oliver 云长 |
Hi,
这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。 [1] https://issues.apache.org/jira/browse/FLINK-16068 oliver <[hidden email]> 于2020年5月22日周五 下午5:38写道: > hello, > 使用版本 Flink 1.10 > 部分业务数据kafka数据存在字段`@timestamp`, > DDL如下: > CREATE TABLE kafkaSrc( > `@timestamp` TIMESTAMP(3) > ,domain VARCHAR > ,proctime AS proctime() > )WITH( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = ’topic', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.group.id' = ‘id', > 'connector.properties.zookeeper.connect' = ‘xxx', > 'connector.properties.bootstrap.servers' = ‘xxx', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ); > > 如果DML为: > insert into MyResult > select > ,`@timestamp` > ,domain > ,proctime > from > kafkaSrc > ; > > 则会提示SQL解析失败,异常如下: > Caused by: org.apache.flink.table.api.SqlParserException: SQL parse > failed. Lexical error at line 1, column 8. Encountered: "@" (64), after : > "" > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) > at > org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > Debug发现: > > https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52 > > 函数:public SqlNode parse(String sql)入参sql的value是:SELECT > @timestamp,domain,PROCTIME() FROM __temp_table__ > > 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png > > 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助 > > Best, > Oliver 云长 > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
谢谢 Benchao
是的,1.10.0,我升级到1.10.1试试 > 2020年5月22日 下午6:48,Benchao Li <[hidden email]> 写道: > > Hi, > > 这个应该是一个已知bug[1], 你应该用的是1.10.0版本吧?这个在1.10.1已经修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-16068 > > oliver <[hidden email]> 于2020年5月22日周五 下午5:38写道: > >> hello, >> 使用版本 Flink 1.10 >> 部分业务数据kafka数据存在字段`@timestamp`, >> DDL如下: >> CREATE TABLE kafkaSrc( >> `@timestamp` TIMESTAMP(3) >> ,domain VARCHAR >> ,proctime AS proctime() >> )WITH( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = ’topic', >> 'connector.startup-mode' = 'latest-offset', >> 'connector.properties.group.id' = ‘id', >> 'connector.properties.zookeeper.connect' = ‘xxx', >> 'connector.properties.bootstrap.servers' = ‘xxx', >> 'format.type' = 'json', >> 'format.derive-schema' = 'true' >> ); >> >> 如果DML为: >> insert into MyResult >> select >> ,`@timestamp` >> ,domain >> ,proctime >> from >> kafkaSrc >> ; >> >> 则会提示SQL解析失败,异常如下: >> Caused by: org.apache.flink.table.api.SqlParserException: SQL parse >> failed. Lexical error at line 1, column 8. Encountered: "@" (64), after : >> "" >> at >> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50) >> at >> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:79) >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:111) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) >> at >> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) >> >> Debug发现: >> >> https://github.com/apache/flink/blob/release-1.10/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/CalciteParser.java#L45-L52 >> >> 函数:public SqlNode parse(String sql)入参sql的value是:SELECT >> @timestamp,domain,PROCTIME() FROM __temp_table__ >> >> 图片地址:http://chuantu.xyz/t6/735/1590139759x2728303292.png >> >> 另外,去掉proctime AS proctime(),可以正常输出结果,不知道有没有帮助 >> >> Best, >> Oliver 云长 >> >> > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > |
Free forum by Nabble | Edit this page |