Flink SQL create table 关键字 table 加反引号 解析失败

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
各位大佬,

我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢

    val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
    val sourceTable = """CREATE TABLE my_kafak_source (
                        |    `table` varchar,
                        |    `database` varchar,
                        |    `data` row < transaction_id varchar,
                        |               user_id int,
                        |               amount int,
                        |    >,
                        |    maxwell_ts bigint,
                        |    ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
                        |) WITH (
                        |)""".stripMargin

error

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: SQL parse failed. Encountered "table" at line
1, column 8.
Was expecting one of:
    "ABS" ...
    "ALL" ...
    "ARRAY" ...
    "AVG" ...
    "CARDINALITY" ...
    "CASE" ...
    "CAST" ...
    "CEIL" ...
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

Benchao Li
Hi,
看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。

macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道:

> 各位大佬,
>
> 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>
>     val bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>     val sourceTable = """CREATE TABLE my_kafak_source (
>                         |    `table` varchar,
>                         |    `database` varchar,
>                         |    `data` row < transaction_id varchar,
>                         |               user_id int,
>                         |               amount int,
>                         |    >,
>                         |    maxwell_ts bigint,
>                         |    ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>                         |) WITH (
>                         |)""".stripMargin
>
> error
>
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: SQL parse failed. Encountered "table" at line
> 1, column 8.
> Was expecting one of:
>     "ABS" ...
>     "ALL" ...
>     "ARRAY" ...
>     "AVG" ...
>     "CARDINALITY" ...
>     "CASE" ...
>     "CAST" ...
>     "CEIL" ...
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂

Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道:

> Hi,
> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>
> macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道:
>
> > 各位大佬,
> >
> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
> >
> >     val bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> >     val sourceTable = """CREATE TABLE my_kafak_source (
> >                         |    `table` varchar,
> >                         |    `database` varchar,
> >                         |    `data` row < transaction_id varchar,
> >                         |               user_id int,
> >                         |               amount int,
> >                         |    >,
> >                         |    maxwell_ts bigint,
> >                         |    ts_watermark as
> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> >                         |) WITH (
> >                         |)""".stripMargin
> >
> > error
> >
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: SQL parse failed. Encountered "table" at line
> > 1, column 8.
> > Was expecting one of:
> >     "ABS" ...
> >     "ALL" ...
> >     "ARRAY" ...
> >     "AVG" ...
> >     "CARDINALITY" ...
> >     "CASE" ...
> >     "CAST" ...
> >     "CEIL" ...
> >
>
>
> --
>
> Best,
> Benchao Li
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
```scala
    val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
    val sourceTable = """CREATE TABLE my_kafak_source (
                        |    `table` varchar,
                        |    `database` varchar,
                        |    `data` row < transaction_id varchar,
                        |               user_id int,
                        |               amount int,
                        |               reference_id varchar,
                        |               status int,
                        |               transaction_type int,
                        |               merchant_id int,
                        |               update_time int,
                        |               create_time int
                        |    >,
                        |    maxwell_ts bigint,
                        |    ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
                        |) WITH (
                        |    'connector.type' = 'kafka',
                        |    'connector.version' = '0.11',
                        |    'connector.topic' = 'xx',
                        |    'connector.startup-mode' = 'latest-offset',
                        |    'update-mode' = 'append',
                        |    'format.type' = 'json',
                        |    'format.derive-schema' = 'true'
                        |)""".stripMargin

    val dstTable = """CREATE TABLE my_kafak_dst (
                     |     transaction_type int,
                     |     transaction_id VARCHAR,
                     |     reference_id VARCHAR,
                     |     merchant_id int,
                     |     status int,
                     |     create_time int,
                     |     maxwell_ts bigint,
                     |     ts_watermark TIMESTAMP(3)
                     |) WITH (
                     |    'connector.type' = 'kafka',
                     |    'connector.version' = '0.11',
                     |    'connector.topic' = 'uu',
                     |    'update-mode' = 'append',
                     |    'format.type' = 'json',
                     |    'format.derive-schema' = 'true'
                     |)""".stripMargin


    bsTableEnv.sqlUpdate(sourceTable)
    bsTableEnv.sqlUpdate(dstTable)


    val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
ts_watermark FROM my_kafak_source")
    bsTableEnv.createTemporaryView("main_table", main_table)

    bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
main_table")
```

macia kk <[hidden email]> 于2020年6月7日周日 下午3:41写道:

> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>
> Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道:
>
>> Hi,
>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>
>> macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道:
>>
>> > 各位大佬,
>> >
>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>> >
>> >     val bsSettings =
>> >
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>> >     val sourceTable = """CREATE TABLE my_kafak_source (
>> >                         |    `table` varchar,
>> >                         |    `database` varchar,
>> >                         |    `data` row < transaction_id varchar,
>> >                         |               user_id int,
>> >                         |               amount int,
>> >                         |    >,
>> >                         |    maxwell_ts bigint,
>> >                         |    ts_watermark as
>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>> >                         |) WITH (
>> >                         |)""".stripMargin
>> >
>> > error
>> >
>> >  The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: SQL parse failed. Encountered "table" at line
>> > 1, column 8.
>> > Was expecting one of:
>> >     "ABS" ...
>> >     "ALL" ...
>> >     "ARRAY" ...
>> >     "AVG" ...
>> >     "CARDINALITY" ...
>> >     "CASE" ...
>> >     "CAST" ...
>> >     "CEIL" ...
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
    val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
    val sourceTable = """CREATE TABLE my_kafak_source (
                        |    `table` varchar,
                        |    `database` varchar,
                        |    `data` row < transaction_id varchar,
                        |               user_id int,
                        |               amount int,
                        |               reference_id varchar,
                        |               status int,
                        |               transaction_type int,
                        |               merchant_id int,
                        |               update_time int,
                        |               create_time int
                        |    >,
                        |    maxwell_ts bigint,
                        |    ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
                        |) WITH (
                        |    'connector.type' = 'kafka',
                        |    'connector.version' = '0.11',
                        |    'connector.topic' = 'xx',
                        |    'connector.startup-mode' = 'latest-offset',
                        |    'update-mode' = 'append',
                        |    'format.type' = 'json',
                        |    'format.derive-schema' = 'true'
                        |)""".stripMargin

    val dstTable = """CREATE TABLE my_kafak_dst (
                     |     transaction_type int,
                     |     transaction_id VARCHAR,
                     |     reference_id VARCHAR,
                     |     merchant_id int,
                     |     status int,
                     |     create_time int,
                     |     maxwell_ts bigint,
                     |     ts_watermark TIMESTAMP(3)
                     |) WITH (
                     |    'connector.type' = 'kafka',
                     |    'connector.version' = '0.11',
                     |    'connector.topic' = 'uu',
                     |    'update-mode' = 'append',
                     |    'format.type' = 'json',
                     |    'format.derive-schema' = 'true'
                     |)""".stripMargin

    bsTableEnv.sqlUpdate(sourceTable)
    bsTableEnv.sqlUpdate(dstTable)


    val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time,
maxwell_ts, ts_watermark FROM my_kafak_source")
    bsTableEnv.createTemporaryView("main_table", main_table)

    bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table")


macia kk <[hidden email]> 于2020年6月7日周日 下午3:45写道:

> ```scala
>     val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>     val sourceTable = """CREATE TABLE my_kafak_source (
>                         |    `table` varchar,
>                         |    `database` varchar,
>                         |    `data` row < transaction_id varchar,
>                         |               user_id int,
>                         |               amount int,
>                         |               reference_id varchar,
>                         |               status int,
>                         |               transaction_type int,
>                         |               merchant_id int,
>                         |               update_time int,
>                         |               create_time int
>                         |    >,
>                         |    maxwell_ts bigint,
>                         |    ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>                         |) WITH (
>                         |    'connector.type' = 'kafka',
>                         |    'connector.version' = '0.11',
>                         |    'connector.topic' = 'xx',
>                         |    'connector.startup-mode' = 'latest-offset',
>                         |    'update-mode' = 'append',
>                         |    'format.type' = 'json',
>                         |    'format.derive-schema' = 'true'
>                         |)""".stripMargin
>
>     val dstTable = """CREATE TABLE my_kafak_dst (
>                      |     transaction_type int,
>                      |     transaction_id VARCHAR,
>                      |     reference_id VARCHAR,
>                      |     merchant_id int,
>                      |     status int,
>                      |     create_time int,
>                      |     maxwell_ts bigint,
>                      |     ts_watermark TIMESTAMP(3)
>                      |) WITH (
>                      |    'connector.type' = 'kafka',
>                      |    'connector.version' = '0.11',
>                      |    'connector.topic' = 'uu',
>                      |    'update-mode' = 'append',
>                      |    'format.type' = 'json',
>                      |    'format.derive-schema' = 'true'
>                      |)""".stripMargin
>
>
>     bsTableEnv.sqlUpdate(sourceTable)
>     bsTableEnv.sqlUpdate(dstTable)
>
>
>     val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
> transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
> ts_watermark FROM my_kafak_source")
>     bsTableEnv.createTemporaryView("main_table", main_table)
>
>     bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
> main_table")
> ```
>
> macia kk <[hidden email]> 于2020年6月7日周日 下午3:41写道:
>
>> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>>
>> Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道:
>>
>>> Hi,
>>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>>
>>> macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道:
>>>
>>> > 各位大佬,
>>> >
>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>>> >
>>> >     val bsSettings =
>>> >
>>> >
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> >     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>>> >     val sourceTable = """CREATE TABLE my_kafak_source (
>>> >                         |    `table` varchar,
>>> >                         |    `database` varchar,
>>> >                         |    `data` row < transaction_id varchar,
>>> >                         |               user_id int,
>>> >                         |               amount int,
>>> >                         |    >,
>>> >                         |    maxwell_ts bigint,
>>> >                         |    ts_watermark as
>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>>> >                         |) WITH (
>>> >                         |)""".stripMargin
>>> >
>>> > error
>>> >
>>> >  The program finished with the following exception:
>>> >
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> > method caused an error: SQL parse failed. Encountered "table" at line
>>> > 1, column 8.
>>> > Was expecting one of:
>>> >     "ABS" ...
>>> >     "ALL" ...
>>> >     "ARRAY" ...
>>> >     "AVG" ...
>>> >     "CARDINALITY" ...
>>> >     "CASE" ...
>>> >     "CAST" ...
>>> >     "CEIL" ...
>>> >
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

Benchao Li
In reply to this post by macia kk
嗯,你这个是哪个版本呢?曾经的确是有过计算列的时候会有这种bug,不过后来已经修复了。

macia kk <[hidden email]> 于2020年6月7日周日 下午3:42写道:

> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>
> Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道:
>
> > Hi,
> > 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
> >
> > macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道:
> >
> > > 各位大佬,
> > >
> > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
> > >
> > >     val bsSettings =
> > >
> > >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> > >     val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> > >     val sourceTable = """CREATE TABLE my_kafak_source (
> > >                         |    `table` varchar,
> > >                         |    `database` varchar,
> > >                         |    `data` row < transaction_id varchar,
> > >                         |               user_id int,
> > >                         |               amount int,
> > >                         |    >,
> > >                         |    maxwell_ts bigint,
> > >                         |    ts_watermark as
> > > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> > >                         |) WITH (
> > >                         |)""".stripMargin
> > >
> > > error
> > >
> > >  The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The main
> > > method caused an error: SQL parse failed. Encountered "table" at line
> > > 1, column 8.
> > > Was expecting one of:
> > >     "ABS" ...
> > >     "ALL" ...
> > >     "ARRAY" ...
> > >     "AVG" ...
> > >     "CARDINALITY" ...
> > >     "CASE" ...
> > >     "CAST" ...
> > >     "CEIL" ...
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

Leonard Xu
In reply to this post by macia kk
Hi,
1.10确实有这个bug, 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 jark wu 修复的。

Best,
Leonard Xu
[1] https://issues.apache.org/jira/browse/FLINK-16526 <https://issues.apache.org/jira/browse/FLINK-16526>

> 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道:
>
> 各位大佬,
>
> 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>
>    val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>    val sourceTable = """CREATE TABLE my_kafak_source (
>                        |    `table` varchar,
>                        |    `database` varchar,
>                        |    `data` row < transaction_id varchar,
>                        |               user_id int,
>                        |               amount int,
>                        |    >,
>                        |    maxwell_ts bigint,
>                        |    ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>                        |) WITH (
>                        |)""".stripMargin
>
> error
>
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: SQL parse failed. Encountered "table" at line
> 1, column 8.
> Was expecting one of:
>    "ABS" ...
>    "ALL" ...
>    "ARRAY" ...
>    "AVG" ...
>    "CARDINALITY" ...
>    "CASE" ...
>    "CAST" ...
>    "CEIL" ...

Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛

如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
millseconds,我看了函数的使用方法,没想到哪里有问题

val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
`database` varchar, | `data` row < transaction_id varchar, | user_id int, |
amount int, | >, | maxwell_ts bigint, | ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin

Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道:

> Hi,
> 1.10确实有这个bug,
> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
> jark wu 修复的。
>
> Best,
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
> https://issues.apache.org/jira/browse/FLINK-16526>
>
> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道:
> >
> > 各位大佬,
> >
> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
> >
> >    val bsSettings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> >    val sourceTable = """CREATE TABLE my_kafak_source (
> >                        |    `table` varchar,
> >                        |    `database` varchar,
> >                        |    `data` row < transaction_id varchar,
> >                        |               user_id int,
> >                        |               amount int,
> >                        |    >,
> >                        |    maxwell_ts bigint,
> >                        |    ts_watermark as
> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> >                        |) WITH (
> >                        |)""".stripMargin
> >
> > error
> >
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: SQL parse failed. Encountered "table" at line
> > 1, column 8.
> > Was expecting one of:
> >    "ABS" ...
> >    "ALL" ...
> >    "ARRAY" ...
> >    "AVG" ...
> >    "CARDINALITY" ...
> >    "CASE" ...
> >    "CAST" ...
> >    "CEIL" ...
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
打印出来是这样的

"maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12"

macia kk <[hidden email]> 于2020年6月7日周日 下午5:53写道:

> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
>
> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
> millseconds,我看了函数的使用方法,没想到哪里有问题
>
> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
> `database` varchar, | `data` row < transaction_id varchar, | user_id int, |
> amount int, | >, | maxwell_ts bigint, | ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin
>
> Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道:
>
>> Hi,
>> 1.10确实有这个bug,
>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
>> jark wu 修复的。
>>
>> Best,
>> Leonard Xu
>> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
>> https://issues.apache.org/jira/browse/FLINK-16526>
>>
>> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道:
>> >
>> > 各位大佬,
>> >
>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>> >
>> >    val bsSettings =
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>> >    val sourceTable = """CREATE TABLE my_kafak_source (
>> >                        |    `table` varchar,
>> >                        |    `database` varchar,
>> >                        |    `data` row < transaction_id varchar,
>> >                        |               user_id int,
>> >                        |               amount int,
>> >                        |    >,
>> >                        |    maxwell_ts bigint,
>> >                        |    ts_watermark as
>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>> >                        |) WITH (
>> >                        |)""".stripMargin
>> >
>> > error
>> >
>> > The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: SQL parse failed. Encountered "table" at line
>> > 1, column 8.
>> > Was expecting one of:
>> >    "ABS" ...
>> >    "ALL" ...
>> >    "ARRAY" ...
>> >    "AVG" ...
>> >    "CARDINALITY" ...
>> >    "CASE" ...
>> >    "CAST" ...
>> >    "CEIL" ...
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

Benchao Li
FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/1000000吧

macia kk <[hidden email]> 于2020年6月7日周日 下午6:15写道:

> 打印出来是这样的
>
> "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12"
>
> macia kk <[hidden email]> 于2020年6月7日周日 下午5:53写道:
>
>> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
>>
>> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
>> millseconds,我看了函数的使用方法,没想到哪里有问题
>>
>> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
>> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
>> `database` varchar, | `data` row < transaction_id varchar, | user_id int, |
>> amount int, | >, | maxwell_ts bigint, | ts_watermark as
>> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin
>>
>> Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道:
>>
>>> Hi,
>>> 1.10确实有这个bug,
>>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
>>> jark wu 修复的。
>>>
>>> Best,
>>> Leonard Xu
>>> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
>>> https://issues.apache.org/jira/browse/FLINK-16526>
>>>
>>> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道:
>>> >
>>> > 各位大佬,
>>> >
>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>>> >
>>> >    val bsSettings =
>>> >
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> >    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>>> >    val sourceTable = """CREATE TABLE my_kafak_source (
>>> >                        |    `table` varchar,
>>> >                        |    `database` varchar,
>>> >                        |    `data` row < transaction_id varchar,
>>> >                        |               user_id int,
>>> >                        |               amount int,
>>> >                        |    >,
>>> >                        |    maxwell_ts bigint,
>>> >                        |    ts_watermark as
>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>>> >                        |) WITH (
>>> >                        |)""".stripMargin
>>> >
>>> > error
>>> >
>>> > The program finished with the following exception:
>>> >
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> > method caused an error: SQL parse failed. Encountered "table" at line
>>> > 1, column 8.
>>> > Was expecting one of:
>>> >    "ABS" ...
>>> >    "ALL" ...
>>> >    "ARRAY" ...
>>> >    "AVG" ...
>>> >    "CARDINALITY" ...
>>> >    "CASE" ...
>>> >    "CAST" ...
>>> >    "CEIL" ...
>>>
>>>

--

Best,
Benchao Li
Reply | Threaded
Open this post in threaded view
|

Re: Flink SQL create table 关键字 table 加反引号 解析失败

macia kk
😂

不好意思,没注意到

感谢

Benchao Li <[hidden email]> 于2020年6月7日周日 下午6:47写道:

> FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/1000000吧
>
> macia kk <[hidden email]> 于2020年6月7日周日 下午6:15写道:
>
>> 打印出来是这样的
>>
>> "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12"
>>
>> macia kk <[hidden email]> 于2020年6月7日周日 下午5:53写道:
>>
>>> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
>>>
>>> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
>>> millseconds,我看了函数的使用方法,没想到哪里有问题
>>>
>>> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
>>> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
>>> `database` varchar, | `data` row < transaction_id varchar, | user_id int, |
>>> amount int, | >, | maxwell_ts bigint, | ts_watermark as
>>> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin
>>>
>>> Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道:
>>>
>>>> Hi,
>>>> 1.10确实有这个bug,
>>>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
>>>> jark wu 修复的。
>>>>
>>>> Best,
>>>> Leonard Xu
>>>> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
>>>> https://issues.apache.org/jira/browse/FLINK-16526>
>>>>
>>>> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道:
>>>> >
>>>> > 各位大佬,
>>>> >
>>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>>>> >
>>>> >    val bsSettings =
>>>> >
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>> >    val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>>>> >    val sourceTable = """CREATE TABLE my_kafak_source (
>>>> >                        |    `table` varchar,
>>>> >                        |    `database` varchar,
>>>> >                        |    `data` row < transaction_id varchar,
>>>> >                        |               user_id int,
>>>> >                        |               amount int,
>>>> >                        |    >,
>>>> >                        |    maxwell_ts bigint,
>>>> >                        |    ts_watermark as
>>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>>>> >                        |) WITH (
>>>> >                        |)""".stripMargin
>>>> >
>>>> > error
>>>> >
>>>> > The program finished with the following exception:
>>>> >
>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>> > method caused an error: SQL parse failed. Encountered "table" at line
>>>> > 1, column 8.
>>>> > Was expecting one of:
>>>> >    "ABS" ...
>>>> >    "ALL" ...
>>>> >    "ARRAY" ...
>>>> >    "AVG" ...
>>>> >    "CARDINALITY" ...
>>>> >    "CASE" ...
>>>> >    "CAST" ...
>>>> >    "CEIL" ...
>>>>
>>>>
>
> --
>
> Best,
> Benchao Li
>