各位大佬,
我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | `database` varchar, | `data` row < transaction_id varchar, | user_id int, | amount int, | >, | maxwell_ts bigint, | ts_watermark as TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin error The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL parse failed. Encountered "table" at line 1, column 8. Was expecting one of: "ABS" ... "ALL" ... "ARRAY" ... "AVG" ... "CARDINALITY" ... "CASE" ... "CAST" ... "CEIL" ... |
Hi,
看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道: > 各位大佬, > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > val bsSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > val sourceTable = """CREATE TABLE my_kafak_source ( > | `table` varchar, > | `database` varchar, > | `data` row < transaction_id varchar, > | user_id int, > | amount int, > | >, > | maxwell_ts bigint, > | ts_watermark as > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > |) WITH ( > |)""".stripMargin > > error > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: SQL parse failed. Encountered "table" at line > 1, column 8. > Was expecting one of: > "ABS" ... > "ALL" ... > "ARRAY" ... > "AVG" ... > "CARDINALITY" ... > "CASE" ... > "CAST" ... > "CEIL" ... > -- Best, Benchao Li |
下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道: > Hi, > 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 > > macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道: > > > 各位大佬, > > > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > > > val bsSettings = > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > > val sourceTable = """CREATE TABLE my_kafak_source ( > > | `table` varchar, > > | `database` varchar, > > | `data` row < transaction_id varchar, > > | user_id int, > > | amount int, > > | >, > > | maxwell_ts bigint, > > | ts_watermark as > > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > > |) WITH ( > > |)""".stripMargin > > > > error > > > > The program finished with the following exception: > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method caused an error: SQL parse failed. Encountered "table" at line > > 1, column 8. > > Was expecting one of: > > "ABS" ... > > "ALL" ... > > "ARRAY" ... > > "AVG" ... > > "CARDINALITY" ... > > "CASE" ... > > "CAST" ... > > "CEIL" ... > > > > > -- > > Best, > Benchao Li > |
```scala
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | `database` varchar, | `data` row < transaction_id varchar, | user_id int, | amount int, | reference_id varchar, | status int, | transaction_type int, | merchant_id int, | update_time int, | create_time int | >, | maxwell_ts bigint, | ts_watermark as TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = '0.11', | 'connector.topic' = 'xx', | 'connector.startup-mode' = 'latest-offset', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |)""".stripMargin val dstTable = """CREATE TABLE my_kafak_dst ( | transaction_type int, | transaction_id VARCHAR, | reference_id VARCHAR, | merchant_id int, | status int, | create_time int, | maxwell_ts bigint, | ts_watermark TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = '0.11', | 'connector.topic' = 'uu', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |)""".stripMargin bsTableEnv.sqlUpdate(sourceTable) bsTableEnv.sqlUpdate(dstTable) val main_table = bsTableEnv.sqlQuery("SELECT transaction_type, transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts, ts_watermark FROM my_kafak_source") bsTableEnv.createTemporaryView("main_table", main_table) bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table") ``` macia kk <[hidden email]> 于2020年6月7日周日 下午3:41写道: > 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 > > Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道: > >> Hi, >> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 >> >> macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道: >> >> > 各位大佬, >> > >> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 >> > >> > val bsSettings = >> > >> > >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) >> > val sourceTable = """CREATE TABLE my_kafak_source ( >> > | `table` varchar, >> > | `database` varchar, >> > | `data` row < transaction_id varchar, >> > | user_id int, >> > | amount int, >> > | >, >> > | maxwell_ts bigint, >> > | ts_watermark as >> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) >> > |) WITH ( >> > |)""".stripMargin >> > >> > error >> > >> > The program finished with the following exception: >> > >> > org.apache.flink.client.program.ProgramInvocationException: The main >> > method caused an error: SQL parse failed. Encountered "table" at line >> > 1, column 8. >> > Was expecting one of: >> > "ABS" ... >> > "ALL" ... >> > "ARRAY" ... >> > "AVG" ... >> > "CARDINALITY" ... >> > "CASE" ... >> > "CAST" ... >> > "CEIL" ... >> > >> >> >> -- >> >> Best, >> Benchao Li >> > |
val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | `database` varchar, | `data` row < transaction_id varchar, | user_id int, | amount int, | reference_id varchar, | status int, | transaction_type int, | merchant_id int, | update_time int, | create_time int | >, | maxwell_ts bigint, | ts_watermark as TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = '0.11', | 'connector.topic' = 'xx', | 'connector.startup-mode' = 'latest-offset', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |)""".stripMargin val dstTable = """CREATE TABLE my_kafak_dst ( | transaction_type int, | transaction_id VARCHAR, | reference_id VARCHAR, | merchant_id int, | status int, | create_time int, | maxwell_ts bigint, | ts_watermark TIMESTAMP(3) |) WITH ( | 'connector.type' = 'kafka', | 'connector.version' = '0.11', | 'connector.topic' = 'uu', | 'update-mode' = 'append', | 'format.type' = 'json', | 'format.derive-schema' = 'true' |)""".stripMargin bsTableEnv.sqlUpdate(sourceTable) bsTableEnv.sqlUpdate(dstTable) val main_table = bsTableEnv.sqlQuery("SELECT transaction_type, transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts, ts_watermark FROM my_kafak_source") bsTableEnv.createTemporaryView("main_table", main_table) bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table") macia kk <[hidden email]> 于2020年6月7日周日 下午3:45写道: > ```scala > val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > val sourceTable = """CREATE TABLE my_kafak_source ( > | `table` varchar, > | `database` varchar, > | `data` row < transaction_id varchar, > | user_id int, > | amount int, > | reference_id varchar, > | status int, > | transaction_type int, > | merchant_id int, > | update_time int, > | create_time int > | >, > | maxwell_ts bigint, > | ts_watermark as > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = '0.11', > | 'connector.topic' = 'xx', > | 'connector.startup-mode' = 'latest-offset', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |)""".stripMargin > > val dstTable = """CREATE TABLE my_kafak_dst ( > | transaction_type int, > | transaction_id VARCHAR, > | reference_id VARCHAR, > | merchant_id int, > | status int, > | create_time int, > | maxwell_ts bigint, > | ts_watermark TIMESTAMP(3) > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = '0.11', > | 'connector.topic' = 'uu', > | 'update-mode' = 'append', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |)""".stripMargin > > > bsTableEnv.sqlUpdate(sourceTable) > bsTableEnv.sqlUpdate(dstTable) > > > val main_table = bsTableEnv.sqlQuery("SELECT transaction_type, > transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts, > ts_watermark FROM my_kafak_source") > bsTableEnv.createTemporaryView("main_table", main_table) > > bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM > main_table") > ``` > > macia kk <[hidden email]> 于2020年6月7日周日 下午3:41写道: > >> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 >> >> Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道: >> >>> Hi, >>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 >>> >>> macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道: >>> >>> > 各位大佬, >>> > >>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 >>> > >>> > val bsSettings = >>> > >>> > >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) >>> > val sourceTable = """CREATE TABLE my_kafak_source ( >>> > | `table` varchar, >>> > | `database` varchar, >>> > | `data` row < transaction_id varchar, >>> > | user_id int, >>> > | amount int, >>> > | >, >>> > | maxwell_ts bigint, >>> > | ts_watermark as >>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) >>> > |) WITH ( >>> > |)""".stripMargin >>> > >>> > error >>> > >>> > The program finished with the following exception: >>> > >>> > org.apache.flink.client.program.ProgramInvocationException: The main >>> > method caused an error: SQL parse failed. Encountered "table" at line >>> > 1, column 8. >>> > Was expecting one of: >>> > "ABS" ... >>> > "ALL" ... >>> > "ARRAY" ... >>> > "AVG" ... >>> > "CARDINALITY" ... >>> > "CASE" ... >>> > "CAST" ... >>> > "CEIL" ... >>> > >>> >>> >>> -- >>> >>> Best, >>> Benchao Li >>> >> |
In reply to this post by macia kk
嗯,你这个是哪个版本呢?曾经的确是有过计算列的时候会有这种bug,不过后来已经修复了。
macia kk <[hidden email]> 于2020年6月7日周日 下午3:42写道: > 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 > > Benchao Li <[hidden email]> 于2020年6月7日周日 下午3:38写道: > > > Hi, > > 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 > > > > macia kk <[hidden email]> 于2020年6月7日周日 下午3:33写道: > > > > > 各位大佬, > > > > > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > > > > > val bsSettings = > > > > > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > > > val sourceTable = """CREATE TABLE my_kafak_source ( > > > | `table` varchar, > > > | `database` varchar, > > > | `data` row < transaction_id varchar, > > > | user_id int, > > > | amount int, > > > | >, > > > | maxwell_ts bigint, > > > | ts_watermark as > > > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > > > |) WITH ( > > > |)""".stripMargin > > > > > > error > > > > > > The program finished with the following exception: > > > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > > method caused an error: SQL parse failed. Encountered "table" at line > > > 1, column 8. > > > Was expecting one of: > > > "ABS" ... > > > "ALL" ... > > > "ARRAY" ... > > > "AVG" ... > > > "CARDINALITY" ... > > > "CASE" ... > > > "CAST" ... > > > "CEIL" ... > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li |
In reply to this post by macia kk
Hi,
1.10确实有这个bug, 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 jark wu 修复的。 Best, Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-16526 <https://issues.apache.org/jira/browse/FLINK-16526> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道: > > 各位大佬, > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > val sourceTable = """CREATE TABLE my_kafak_source ( > | `table` varchar, > | `database` varchar, > | `data` row < transaction_id varchar, > | user_id int, > | amount int, > | >, > | maxwell_ts bigint, > | ts_watermark as > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > |) WITH ( > |)""".stripMargin > > error > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: SQL parse failed. Encountered "table" at line > 1, column 8. > Was expecting one of: > "ABS" ... > "ALL" ... > "ARRAY" ... > "AVG" ... > "CARDINALITY" ... > "CASE" ... > "CAST" ... > "CEIL" ... |
再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 millseconds,我看了函数的使用方法,没想到哪里有问题 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | `database` varchar, | `data` row < transaction_id varchar, | user_id int, | amount int, | >, | maxwell_ts bigint, | ts_watermark as TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道: > Hi, > 1.10确实有这个bug, > 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 > jark wu 修复的。 > > Best, > Leonard Xu > [1] https://issues.apache.org/jira/browse/FLINK-16526 < > https://issues.apache.org/jira/browse/FLINK-16526> > > > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道: > > > > 各位大佬, > > > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > > > val bsSettings = > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) > > val sourceTable = """CREATE TABLE my_kafak_source ( > > | `table` varchar, > > | `database` varchar, > > | `data` row < transaction_id varchar, > > | user_id int, > > | amount int, > > | >, > > | maxwell_ts bigint, > > | ts_watermark as > > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) > > |) WITH ( > > |)""".stripMargin > > > > error > > > > The program finished with the following exception: > > > > org.apache.flink.client.program.ProgramInvocationException: The main > > method caused an error: SQL parse failed. Encountered "table" at line > > 1, column 8. > > Was expecting one of: > > "ABS" ... > > "ALL" ... > > "ARRAY" ... > > "AVG" ... > > "CARDINALITY" ... > > "CASE" ... > > "CAST" ... > > "CEIL" ... > > |
打印出来是这样的
"maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" macia kk <[hidden email]> 于2020年6月7日周日 下午5:53写道: > 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 > > 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 > millseconds,我看了函数的使用方法,没想到哪里有问题 > > val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val > sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | > `database` varchar, | `data` row < transaction_id varchar, | user_id int, | > amount int, | >, | maxwell_ts bigint, | ts_watermark as > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin > > Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道: > >> Hi, >> 1.10确实有这个bug, >> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 >> jark wu 修复的。 >> >> Best, >> Leonard Xu >> [1] https://issues.apache.org/jira/browse/FLINK-16526 < >> https://issues.apache.org/jira/browse/FLINK-16526> >> >> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道: >> > >> > 各位大佬, >> > >> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 >> > >> > val bsSettings = >> > >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) >> > val sourceTable = """CREATE TABLE my_kafak_source ( >> > | `table` varchar, >> > | `database` varchar, >> > | `data` row < transaction_id varchar, >> > | user_id int, >> > | amount int, >> > | >, >> > | maxwell_ts bigint, >> > | ts_watermark as >> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) >> > |) WITH ( >> > |)""".stripMargin >> > >> > error >> > >> > The program finished with the following exception: >> > >> > org.apache.flink.client.program.ProgramInvocationException: The main >> > method caused an error: SQL parse failed. Encountered "table" at line >> > 1, column 8. >> > Was expecting one of: >> > "ABS" ... >> > "ALL" ... >> > "ARRAY" ... >> > "AVG" ... >> > "CARDINALITY" ... >> > "CASE" ... >> > "CAST" ... >> > "CEIL" ... >> >> |
FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/1000000吧
macia kk <[hidden email]> 于2020年6月7日周日 下午6:15写道: > 打印出来是这样的 > > "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" > > macia kk <[hidden email]> 于2020年6月7日周日 下午5:53写道: > >> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 >> >> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 >> millseconds,我看了函数的使用方法,没想到哪里有问题 >> >> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val >> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | >> `database` varchar, | `data` row < transaction_id varchar, | user_id int, | >> amount int, | >, | maxwell_ts bigint, | ts_watermark as >> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin >> >> Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道: >> >>> Hi, >>> 1.10确实有这个bug, >>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 >>> jark wu 修复的。 >>> >>> Best, >>> Leonard Xu >>> [1] https://issues.apache.org/jira/browse/FLINK-16526 < >>> https://issues.apache.org/jira/browse/FLINK-16526> >>> >>> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道: >>> > >>> > 各位大佬, >>> > >>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 >>> > >>> > val bsSettings = >>> > >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) >>> > val sourceTable = """CREATE TABLE my_kafak_source ( >>> > | `table` varchar, >>> > | `database` varchar, >>> > | `data` row < transaction_id varchar, >>> > | user_id int, >>> > | amount int, >>> > | >, >>> > | maxwell_ts bigint, >>> > | ts_watermark as >>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) >>> > |) WITH ( >>> > |)""".stripMargin >>> > >>> > error >>> > >>> > The program finished with the following exception: >>> > >>> > org.apache.flink.client.program.ProgramInvocationException: The main >>> > method caused an error: SQL parse failed. Encountered "table" at line >>> > 1, column 8. >>> > Was expecting one of: >>> > "ABS" ... >>> > "ALL" ... >>> > "ARRAY" ... >>> > "AVG" ... >>> > "CARDINALITY" ... >>> > "CASE" ... >>> > "CAST" ... >>> > "CEIL" ... >>> >>> -- Best, Benchao Li |
😂
不好意思,没注意到 感谢 Benchao Li <[hidden email]> 于2020年6月7日周日 下午6:47写道: > FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/1000000吧 > > macia kk <[hidden email]> 于2020年6月7日周日 下午6:15写道: > >> 打印出来是这样的 >> >> "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" >> >> macia kk <[hidden email]> 于2020年6月7日周日 下午5:53写道: >> >>> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 >>> >>> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 >>> millseconds,我看了函数的使用方法,没想到哪里有问题 >>> >>> val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val >>> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, | >>> `database` varchar, | `data` row < transaction_id varchar, | user_id int, | >>> amount int, | >, | maxwell_ts bigint, | ts_watermark as >>> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin >>> >>> Leonard Xu <[hidden email]> 于2020年6月7日周日 下午5:51写道: >>> >>>> Hi, >>>> 1.10确实有这个bug, >>>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 >>>> jark wu 修复的。 >>>> >>>> Best, >>>> Leonard Xu >>>> [1] https://issues.apache.org/jira/browse/FLINK-16526 < >>>> https://issues.apache.org/jira/browse/FLINK-16526> >>>> >>>> > 在 2020年6月7日,15:32,macia kk <[hidden email]> 写道: >>>> > >>>> > 各位大佬, >>>> > >>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 >>>> > >>>> > val bsSettings = >>>> > >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) >>>> > val sourceTable = """CREATE TABLE my_kafak_source ( >>>> > | `table` varchar, >>>> > | `database` varchar, >>>> > | `data` row < transaction_id varchar, >>>> > | user_id int, >>>> > | amount int, >>>> > | >, >>>> > | maxwell_ts bigint, >>>> > | ts_watermark as >>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) >>>> > |) WITH ( >>>> > |)""".stripMargin >>>> > >>>> > error >>>> > >>>> > The program finished with the following exception: >>>> > >>>> > org.apache.flink.client.program.ProgramInvocationException: The main >>>> > method caused an error: SQL parse failed. Encountered "table" at line >>>> > 1, column 8. >>>> > Was expecting one of: >>>> > "ABS" ... >>>> > "ALL" ... >>>> > "ARRAY" ... >>>> > "AVG" ... >>>> > "CARDINALITY" ... >>>> > "CASE" ... >>>> > "CAST" ... >>>> > "CEIL" ... >>>> >>>> > > -- > > Best, > Benchao Li > |
Free forum by Nabble | Edit this page |