pyflink1.12 使用connector read.query参数报错

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink1.12 使用connector read.query参数报错

肖越
使用DDL 定义connector连接Mysql数据库,想通过发送sql的方式直接获取数据:
source_ddl = """
CREATE TABLE source_table(
                yldrate DECIMAL,
                pf_id VARCHAR,
                symbol_id VARCHAR) WITH(
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://ip/db',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'username' = 'xxx',
                'password' = 'xxx',
                'table-name' = 'TS_PF_SEC_YLDRATE'
                'read.query' = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE = 'AC' AND PF_ID = '1030100122' AND SYMBOL_ID = '2030004042' AND BIZ_DATE between '20160701' AND '20170307'"
                )
"""
报错信息:
File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 766, in execute_sql
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o6.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "=" at line 12, column 30.
Was expecting one of:
    "UESCAPE" ...
    <QUOTED_STRING> ...
    ")" ...
    "," ...
   
提示期待的语法信息,没有看懂,为什么不能出现“=” ?希望路过的大佬,能够指导一下~~谢谢!
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.12 使用connector read.query参数报错

Dian Fu
'table-name' = 'TS_PF_SEC_YLDRATE' 这一行后面少个逗号

> 在 2020年12月24日,下午2:02,肖越 <[hidden email]> 写道:
>
> 使用DDL 定义connector连接Mysql数据库,想通过发送sql的方式直接获取数据:
> source_ddl = """
> CREATE TABLE source_table(
>                yldrate DECIMAL,
>                pf_id VARCHAR,
>                symbol_id VARCHAR) WITH(
>                'connector' = 'jdbc',
>                'url' = 'jdbc:mysql://ip/db',
>                'driver' = 'com.mysql.cj.jdbc.Driver',
>                'username' = 'xxx',
>                'password' = 'xxx',
>                'table-name' = 'TS_PF_SEC_YLDRATE'
>                'read.query' = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE = 'AC' AND PF_ID = '1030100122' AND SYMBOL_ID = '2030004042' AND BIZ_DATE between '20160701' AND '20170307'"
>                )
> """
> 报错信息:
> File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 766, in execute_sql
>    return TableResult(self._j_tenv.executeSql(stmt))
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__
>    answer, self.gateway_client, self.target_id, self.name)
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco
>    return f(*a, **kw)
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
>    format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o6.executeSql.
> : org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "=" at line 12, column 30.
> Was expecting one of:
>    "UESCAPE" ...
>    <QUOTED_STRING> ...
>    ")" ...
>    "," ...
>
> 提示期待的语法信息,没有看懂,为什么不能出现“=” ?希望路过的大佬,能够指导一下~~谢谢!

Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.12 使用connector read.query参数报错

冯嘉伟
In reply to this post by 肖越
hi! 试试这个

CREATE TABLE source_table(
                yldrate DECIMAL,
                pf_id VARCHAR,
                symbol_id VARCHAR) WITH(
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://ip/db',
                'driver' = 'com.mysql.cj.jdbc.Driver',
                'username' = 'xxx',
                'password' = 'xxx',
                'table-name' = 'TS_PF_SEC_YLDRATE',
                'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
"AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
between "20160701" AND "20170307"'
                )



--
Sent from: http://apache-flink.147419.n8.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: pyflink1.12 使用connector read.query参数报错

Leonard Xu
Hi, 嘉伟  

1.12 应该不支持 read.query, 社区还在讨论是否要开放这个,有些concern, 简单的讲,就如你这个query写的,创建的这张JDBC 表应该是一个 View 而不是对应一张JDBC 表,同时这个表只能用来作为source,不能用来作为sink。

祝好,
Leonard

> 在 2020年12月24日,19:16,冯嘉伟 <[hidden email]> 写道:
>
> hi! 试试这个
>
> CREATE TABLE source_table(
>                yldrate DECIMAL,
>                pf_id VARCHAR,
>                symbol_id VARCHAR) WITH(
>                'connector' = 'jdbc',
>                'url' = 'jdbc:mysql://ip/db',
>                'driver' = 'com.mysql.cj.jdbc.Driver',
>                'username' = 'xxx',
>                'password' = 'xxx',
>                'table-name' = 'TS_PF_SEC_YLDRATE',
>                'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
> TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
> "AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
> between "20160701" AND "20170307"'
>                )
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re:Re: pyflink1.12 使用connector read.query参数报错

肖越
谢谢,老师们的指导,根据嘉伟的建议,发现pyflink1.12确实并不支持这个参数~

还是希望官方能够开放这个参数,就目前的工作情景来说,取数据就需要定义整张表,如果数据库更改,代码这边很不便于维护;
从本机的实验结果上看,pyflink内部进行query的效率并不高,正准备放到集群上试试~







在 2020-12-25 09:45:28,"Leonard Xu" <[hidden email]> 写道:

>Hi, 嘉伟  
>
>1.12 应该不支持 read.query, 社区还在讨论是否要开放这个,有些concern, 简单的讲,就如你这个query写的,创建的这张JDBC 表应该是一个 View 而不是对应一张JDBC 表,同时这个表只能用来作为source,不能用来作为sink。
>
>祝好,
>Leonard
>
>> 在 2020年12月24日,19:16,冯嘉伟 <[hidden email]> 写道:
>>
>> hi! 试试这个
>>
>> CREATE TABLE source_table(
>>                yldrate DECIMAL,
>>                pf_id VARCHAR,
>>                symbol_id VARCHAR) WITH(
>>                'connector' = 'jdbc',
>>                'url' = 'jdbc:mysql://ip/db',
>>                'driver' = 'com.mysql.cj.jdbc.Driver',
>>                'username' = 'xxx',
>>                'password' = 'xxx',
>>                'table-name' = 'TS_PF_SEC_YLDRATE',
>>                'read.query' = 'SELECT YLDRATE, PF_ID, SYMBOL_ID FROM
>> TS_PF_SEC_YLDRATE LEFT JOIN TP_GL_DAY ON DAY_ID = BIZ_DATE WHERE CCY_TYPE =
>> "AC" AND PF_ID = "1030100122" AND SYMBOL_ID = "2030004042" AND BIZ_DATE
>> between "20160701" AND "20170307"'
>>                )
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>