Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

肖越
pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
通过如下方式定义:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env = StreamTableEnvironment \
        .create(env, environment_settings=EnvironmentSettings
                .new_instance()
                .use_blink_planner().build())
source_ddl1 = """
        CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
        tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
        ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
        is_valid INT,time_mark TIMESTAMP) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
        'connector.table' = 'ts_pf_ac_yldrate',
        'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
        'connector.username' = 'xxx',
        'connector.password' = 'xxx')
    """
sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
env.sql_update(source_ddl1)
table = env.sql_query(sql)
env.execute("flink_test")
报错信息:
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
 at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
 at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
 at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
 at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
 at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
 at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
 at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
 at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
 at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

Leonard Xu
目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。

祝好,
Leonard

> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>
> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
> 通过如下方式定义:
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env = StreamTableEnvironment \
>        .create(env, environment_settings=EnvironmentSettings
>                .new_instance()
>                .use_blink_planner().build())
> source_ddl1 = """
>        CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>        tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>        ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>        is_valid INT,time_mark TIMESTAMP) WITH (
>        'connector.type' = 'jdbc',
>        'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>        'connector.table' = 'ts_pf_ac_yldrate',
>        'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>        'connector.username' = 'xxx',
>        'connector.password' = 'xxx')
>    """
> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
> env.sql_update(source_ddl1)
> table = env.sql_query(sql)
> env.execute("flink_test")
> 报错信息:
>    raise java_exception
> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

肖越
谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?

















在 2020-12-17 09:55:08,"Leonard Xu" <[hidden email]> 写道:

>目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>
>祝好,
>Leonard
>
>> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>>
>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>> 通过如下方式定义:
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env = StreamTableEnvironment \
>>        .create(env, environment_settings=EnvironmentSettings
>>                .new_instance()
>>                .use_blink_planner().build())
>> source_ddl1 = """
>>        CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>        tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>        ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>        is_valid INT,time_mark TIMESTAMP) WITH (
>>        'connector.type' = 'jdbc',
>>        'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>        'connector.table' = 'ts_pf_ac_yldrate',
>>        'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>        'connector.username' = 'xxx',
>>        'connector.password' = 'xxx')
>>    """
>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>> env.sql_update(source_ddl1)
>> table = env.sql_query(sql)
>> env.execute("flink_test")
>> 报错信息:
>>    raise java_exception
>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

Dian Fu
1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
     a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 'jdbc',这个是老的使用方式
     b. JDBC connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars

> 在 2020年12月17日,上午10:06,肖越 <[hidden email]> 写道:
>
> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-17 09:55:08,"Leonard Xu" <[hidden email]> 写道:
>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>
>> 祝好,
>> Leonard
>>
>>> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>>>
>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>> 通过如下方式定义:
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> env = StreamTableEnvironment \
>>>       .create(env, environment_settings=EnvironmentSettings
>>>               .new_instance()
>>>               .use_blink_planner().build())
>>> source_ddl1 = """
>>>       CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>       tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>       ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>       is_valid INT,time_mark TIMESTAMP) WITH (
>>>       'connector.type' = 'jdbc',
>>>       'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>       'connector.table' = 'ts_pf_ac_yldrate',
>>>       'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>       'connector.username' = 'xxx',
>>>       'connector.password' = 'xxx')
>>>   """
>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>> env.sql_update(source_ddl1)
>>> table = env.sql_query(sql)
>>> env.execute("flink_test")
>>> 报错信息:
>>>   raise java_exception
>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>>> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>> at java.lang.Thread.run(Thread.java:748)

Reply | Threaded
Open this post in threaded view
|

Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

肖越
好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?

















在 2020-12-17 10:44:56,"Dian Fu" <[hidden email]> 写道:

>1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>     a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 'jdbc',这个是老的使用方式
>     b. JDBC connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>
>> 在 2020年12月17日,上午10:06,肖越 <[hidden email]> 写道:
>>
>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-12-17 09:55:08,"Leonard Xu" <[hidden email]> 写道:
>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>>
>>> 祝好,
>>> Leonard
>>>
>>>> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>>>>
>>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>>> 通过如下方式定义:
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> env.set_parallelism(1)
>>>> env = StreamTableEnvironment \
>>>>       .create(env, environment_settings=EnvironmentSettings
>>>>               .new_instance()
>>>>               .use_blink_planner().build())
>>>> source_ddl1 = """
>>>>       CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>>       tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>>       ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>>       is_valid INT,time_mark TIMESTAMP) WITH (
>>>>       'connector.type' = 'jdbc',
>>>>       'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>>       'connector.table' = 'ts_pf_ac_yldrate',
>>>>       'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>>       'connector.username' = 'xxx',
>>>>       'connector.password' = 'xxx')
>>>>   """
>>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>>> env.sql_update(source_ddl1)
>>>> table = env.sql_query(sql)
>>>> env.execute("flink_test")
>>>> 报错信息:
>>>>   raise java_exception
>>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>>>> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>> at java.lang.Thread.run(Thread.java:748)
>
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

Dian Fu
发一下完整的异常信息?

> 在 2020年12月17日,上午11:53,肖越 <[hidden email]> 写道:
>
> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-17 10:44:56,"Dian Fu" <[hidden email]> 写道:
>> 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>> 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>>    a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 'jdbc',这个是老的使用方式
>>    b. JDBC connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>>
>>> 在 2020年12月17日,上午10:06,肖越 <[hidden email]> 写道:
>>>
>>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-12-17 09:55:08,"Leonard Xu" <[hidden email]> 写道:
>>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>>>
>>>> 祝好,
>>>> Leonard
>>>>
>>>>> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>>>>>
>>>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>>>> 通过如下方式定义:
>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>> env.set_parallelism(1)
>>>>> env = StreamTableEnvironment \
>>>>>      .create(env, environment_settings=EnvironmentSettings
>>>>>              .new_instance()
>>>>>              .use_blink_planner().build())
>>>>> source_ddl1 = """
>>>>>      CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>>>      tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>>>      ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>>>      is_valid INT,time_mark TIMESTAMP) WITH (
>>>>>      'connector.type' = 'jdbc',
>>>>>      'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>>>      'connector.table' = 'ts_pf_ac_yldrate',
>>>>>      'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>>>      'connector.username' = 'xxx',
>>>>>      'connector.password' = 'xxx')
>>>>>  """
>>>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>>>> env.sql_update(source_ddl1)
>>>>> table = env.sql_query(sql)
>>>>> env.execute("flink_test")
>>>>> 报错信息:
>>>>>  raise java_exception
>>>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>>>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>>>>> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>

Reply | Threaded
Open this post in threaded view
|

Re:Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

肖越
您好,这是完整的报错信息:
Traceback (most recent call last):

  File "C:\projects\dataService-calculate-code-python\src\test\test_oracle_connector.py", line 24, in <module>

    "C:\projects\dataService-calculate-code-python\src\\test\\flink_connector-jdbc\\flink-connector-jdbc_2.11-1.12.0.jar")

  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\common\configuration.py", line 72, in set_string

    add_jars_to_context_class_loader(value.split(";"))

  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py", line 114, in add_jars_to_context_class_loader

    jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]

  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py", line 114, in <listcomp>

    jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]

  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1554, in __call__

    answer, self._gateway_client, None, self._fqn)

  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco

    return f(*a, **kw)

  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value

    format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.

: java.net.MalformedURLException: unknown protocol: c

at java.net.URL.<init>(URL.java:617)

at java.net.URL.<init>(URL.java:507)

at java.net.URL.<init>(URL.java:456)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)

at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)

at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)

at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)

at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)

at java.lang.Thread.run(Thread.java:748)

















在 2020-12-17 14:57:36,"Dian Fu" <[hidden email]> 写道:

>发一下完整的异常信息?
>
>> 在 2020年12月17日,上午11:53,肖越 <[hidden email]> 写道:
>>
>> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-12-17 10:44:56,"Dian Fu" <[hidden email]> 写道:
>>> 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>>> 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>>>    a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 'jdbc',这个是老的使用方式
>>>    b. JDBC connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>>>
>>>> 在 2020年12月17日,上午10:06,肖越 <[hidden email]> 写道:
>>>>
>>>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 在 2020-12-17 09:55:08,"Leonard Xu" <[hidden email]> 写道:
>>>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>>>>
>>>>> 祝好,
>>>>> Leonard
>>>>>
>>>>>> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>>>>>>
>>>>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>>>>> 通过如下方式定义:
>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> env.set_parallelism(1)
>>>>>> env = StreamTableEnvironment \
>>>>>>      .create(env, environment_settings=EnvironmentSettings
>>>>>>              .new_instance()
>>>>>>              .use_blink_planner().build())
>>>>>> source_ddl1 = """
>>>>>>      CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>>>>      tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>>>>      ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>>>>      is_valid INT,time_mark TIMESTAMP) WITH (
>>>>>>      'connector.type' = 'jdbc',
>>>>>>      'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>>>>      'connector.table' = 'ts_pf_ac_yldrate',
>>>>>>      'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>>>>      'connector.username' = 'xxx',
>>>>>>      'connector.password' = 'xxx')
>>>>>>  """
>>>>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>>>>> env.sql_update(source_ddl1)
>>>>>> table = env.sql_query(sql)
>>>>>> env.execute("flink_test")
>>>>>> 报错信息:
>>>>>>  raise java_exception
>>>>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>>>>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>>>>>> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink1.12尝试连接oracle数据,报错findAndCreateTableSource failed

Dian Fu
看起来是jar的路径的问题,你需要看一下在windows上,jar的路径用URL表示应该是什么样的。另外也可以直接把jar包放到pyflink安装目录中的lib目录下。

> 在 2020年12月17日,下午3:04,肖越 <[hidden email]> 写道:
>
> 您好,这是完整的报错信息:
> Traceback (most recent call last):
>
>  File "C:\projects\dataService-calculate-code-python\src\test\test_oracle_connector.py", line 24, in <module>
>
>    "C:\projects\dataService-calculate-code-python\src\\test\\flink_connector-jdbc\\flink-connector-jdbc_2.11-1.12.0.jar")
>
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\common\configuration.py", line 72, in set_string
>
>    add_jars_to_context_class_loader(value.split(";"))
>
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py", line 114, in add_jars_to_context_class_loader
>
>    jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]
>
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\utils.py", line 114, in <listcomp>
>
>    jar_urls = [gateway.jvm.java.net.URL(url).toString() for url in jar_urls]
>
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1554, in __call__
>
>    answer, self._gateway_client, None, self._fqn)
>
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco
>
>    return f(*a, **kw)
>
>  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
>
>    format(target_id, ".", name), value)
>
> py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL.
>
> : java.net.MalformedURLException: unknown protocol: c
>
> at java.net.URL.<init>(URL.java:617)
>
> at java.net.URL.<init>(URL.java:507)
>
> at java.net.URL.<init>(URL.java:456)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>
> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
>
> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
>
> at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
>
> at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
>
> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-12-17 14:57:36,"Dian Fu" <[hidden email]> 写道:
>> 发一下完整的异常信息?
>>
>>> 在 2020年12月17日,上午11:53,肖越 <[hidden email]> 写道:
>>>
>>> 好的,非常感谢您的帮助,刚根据分享的连接,显示增加了jar包依赖,报错:py4j.protocol.Py4JJavaError: An error occurred while calling None.java.net.URL,应该就是没有对应的Oracle connector处理的原因吧?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-12-17 10:44:56,"Dian Fu" <[hidden email]> 写道:
>>>> 1)如Leonard Xu老师所说,目前JDBC connector还不支持oracle
>>>> 2)从报错看,你这还都没有走到那一步就报错了,可以检查一下这些:
>>>>   a. JDBC connector的使用方式参考[1],比如'connector' = 'jdbc',而不是'connector.type' = 'jdbc',这个是老的使用方式
>>>>   b. JDBC connector的jar需要显式添加相关依赖,可以如何在PyFlink中添加jar依赖,可以看一下[2]。其中JDBC需要的的jar,可以参考[1]
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html>
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#download-connector-and-format-jars
>>>>
>>>>> 在 2020年12月17日,上午10:06,肖越 <[hidden email]> 写道:
>>>>>
>>>>> 谢谢您的帮助,所以没有办法通过定义connector的方式获取连接是么? 有没有其他办法呢?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 在 2020-12-17 09:55:08,"Leonard Xu" <[hidden email]> 写道:
>>>>>> 目前 JDBC connector 只支持 MySQL, Pg和Derby(一般测试用)这几种dialect, Oracle还不支持。
>>>>>>
>>>>>> 祝好,
>>>>>> Leonard
>>>>>>
>>>>>>> 在 2020年12月17日,09:47,肖越 <[hidden email]> 写道:
>>>>>>>
>>>>>>> pyflink小白,测试pyflink1.12功能,想通过定义connector从oracle数据库中获取数据
>>>>>>> 通过如下方式定义:
>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>> env.set_parallelism(1)
>>>>>>> env = StreamTableEnvironment \
>>>>>>>     .create(env, environment_settings=EnvironmentSettings
>>>>>>>             .new_instance()
>>>>>>>             .use_blink_planner().build())
>>>>>>> source_ddl1 = """
>>>>>>>     CREATE TABLE source_table (id BIGINT,pf_id VARCHAR,\
>>>>>>>     tree_id VARCHAR,node_id VARCHAR,biz_date DATE,\
>>>>>>>     ccy_type VARCHAR,cur_id_d VARCHAR,tldrate DOUBLE,\
>>>>>>>     is_valid INT,time_mark TIMESTAMP) WITH (
>>>>>>>     'connector.type' = 'jdbc',
>>>>>>>     'connector.url' = 'jdbc:oracle:thin:@ip:dbnamel',
>>>>>>>     'connector.table' = 'ts_pf_ac_yldrate',
>>>>>>>     'connector.driver' = 'oracle.jdbc.driver.OracleDriver',
>>>>>>>     'connector.username' = 'xxx',
>>>>>>>     'connector.password' = 'xxx')
>>>>>>> """
>>>>>>> sql = "select pf_id,biz_date from source_table where biz_date='20160701' "
>>>>>>> env.sql_update(source_ddl1)
>>>>>>> table = env.sql_query(sql)
>>>>>>> env.execute("flink_test")
>>>>>>> 报错信息:
>>>>>>> raise java_exception
>>>>>>> pyflink.util.exceptions.TableException: findAndCreateTableSource failed.
>>>>>>> at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>>>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
>>>>>>> at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:94)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>>>>>>> at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
>>>>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
>>>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
>>>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
>>>>>>> at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
>>>>>>> at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>>>>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>>>>>> at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>>>>>> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>>>>>> at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>>>>>> at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>>>> at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>