Hi,
我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1 我的原代码如下: from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar") # 2. create source Table table_env.execute_sql(""" CREATE TABLE table_source ( e string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://********:3306/test', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = 'enum_test', 'username' = 'pms_etl', 'password' = 'pms_etl_q' ) """) # 3. create sink Table table_env.execute_sql(""" CREATE TABLE print ( e string ) WITH ( 'connector' = 'print' ) """) table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() 我直接用python执行时候错误返回如下 Traceback (most recent call last): File "demo.py", line 41, in <module> table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'. Table options are: 'connector'='jdbc' 'driver'='com.mysql.cj.jdbc.Driver' 'password'='pms_etl_q' 'table-name'='enum_test' 'url'='jdbc:mysql://*******:3306/test' 'username'='pms_etl' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133) ... 31 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) ... 33 more 我用flink run -py demo.py 返回错误如下: File "./demo.py", line 41, in <module> table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: java.lang.RuntimeException: Python process exits with code: 1 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) ... 13 more 请问我该如何解决? |
Hi,
本地执行: 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的 flink run: 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。 > 2021年6月1日 下午4:33,琴师 <[hidden email]> 写道: > > Hi, > 我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1 > 我的原代码如下: > > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > env = StreamExecutionEnvironment.get_execution_environment() > table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar") > > > # 2. create source Table > table_env.execute_sql(""" > > > CREATE TABLE table_source ( > e string > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://********:3306/test', > 'driver' = 'com.mysql.cj.jdbc.Driver', > 'table-name' = 'enum_test', > 'username' = 'pms_etl', > 'password' = 'pms_etl_q' > ) > > > """) > > > # 3. create sink Table > table_env.execute_sql(""" > CREATE TABLE print ( > e string > ) WITH ( > 'connector' = 'print' > ) > """) > > > > table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > > > > 我直接用python执行时候错误返回如下 > > > Traceback (most recent call last): > File "demo.py", line 41, in <module> > table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql > return TableResult(self._j_tenv.executeSql(stmt)) > File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco > return f(*a, **kw) > File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. > : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'. > > > Table options are: > > > 'connector'='jdbc' > 'driver'='com.mysql.cj.jdbc.Driver' > 'password'='pms_etl_q' > 'table-name'='enum_test' > 'url'='jdbc:mysql://*******:3306/test' > 'username'='pms_etl' > at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) > at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) > at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) > at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) > at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595) > at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268) > at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' > at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133) > ... 31 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > > > Available factory identifiers are: > > > blackhole > datagen > filesystem > at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > ... 33 more > > > > 我用flink run -py demo.py 返回错误如下: > > > File "./demo.py", line 41, in <module> > table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql > File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ > File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco > pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists > at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233) > at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > > > org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > ... 13 more > > > > 请问我该如何解决? |
感谢,我换成2.11确实可以了!!!!
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年6月1日(星期二) 下午5:04 收件人: "user-zh"<[hidden email]>;"琴师"<[hidden email]>; 主题: Re: Pyflink jdbc相关 Hi, 本地执行: 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的 flink run: 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。 > 2021年6月1日 下午4:33,琴师 <[hidden email]> 写道: > > Hi, > &nbsp; &nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1 > 我的原代码如下: > > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > env = StreamExecutionEnvironment.get_execution_environment() > table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar") > > > # 2. create source Table > table_env.execute_sql(""" > > > CREATE TABLE table_source ( > &nbsp; e string > ) WITH ( > &nbsp;'connector' = 'jdbc', > &nbsp; 'url' = 'jdbc:mysql://********:3306/test', > &nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver', > &nbsp; 'table-name' = 'enum_test', > &nbsp; 'username' = 'pms_etl', > &nbsp; 'password' = 'pms_etl_q' > ) > > > """) > > > # 3. create sink Table > table_env.execute_sql(""" > &nbsp; &nbsp; CREATE TABLE print ( > &nbsp; &nbsp; &nbsp; &nbsp; e string > &nbsp; &nbsp; ) WITH ( > &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print' > &nbsp; &nbsp; ) > """) > &nbsp; &nbsp; > > > table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > > > > 我直接用python执行时候错误返回如下 > > > Traceback (most recent call last): > &nbsp; File "demo.py", line 41, in <module&gt; > &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql > &nbsp; &nbsp; return TableResult(self._j_tenv.executeSql(stmt)) > &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ > &nbsp; &nbsp; answer, self.gateway_client, self.target_id, self.name) > &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco > &nbsp; &nbsp; return f(*a, **kw) > &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value > &nbsp; &nbsp; format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. > : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'. > > > Table options are: > > > 'connector'='jdbc' > 'driver'='com.mysql.cj.jdbc.Driver' > 'password'='pms_etl_q' > 'table-name'='enum_test' > 'url'='jdbc:mysql://*******:3306/test' > 'username'='pms_etl' > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133) > &nbsp; &nbsp; &nbsp; &nbsp; ... 31 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > > > Available factory identifiers are: > > > blackhole > datagen > filesystem > print > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > &nbsp; &nbsp; &nbsp; &nbsp; ... 33 more > > > > 我用flink run -py demo.py 返回错误如下: > > > &nbsp; File "./demo.py", line 41, in <module&gt; > &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql > &nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ > &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco > pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > > > org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > &nbsp; &nbsp; &nbsp; &nbsp; ... 13 more > > > > 请问我该如何解决? |
再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar; 这样不能引入,大佬有用用过吗?
------------------ 原始邮件 ------------------ 发件人: "琴师" <[hidden email]>; 发送时间: 2021年6月1日(星期二) 下午5:30 收件人: "user-zh"<[hidden email]>; 主题: 回复: Pyflink jdbc相关 感谢,我换成2.11确实可以了!!!! ------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年6月1日(星期二) 下午5:04 收件人: "user-zh"<[hidden email]>;"琴师"<[hidden email]>; 主题: Re: Pyflink jdbc相关 Hi, 本地执行: 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的 flink run: 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。 > 2021年6月1日 下午4:33,琴师 <[hidden email]> 写道: > > Hi, > &nbsp; &nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1 > 我的原代码如下: > > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > env = StreamExecutionEnvironment.get_execution_environment() > table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar") > > > # 2. create source Table > table_env.execute_sql(""" > > > CREATE TABLE table_source ( > &nbsp; e string > ) WITH ( > &nbsp;'connector' = 'jdbc', > &nbsp; 'url' = 'jdbc:mysql://********:3306/test', > &nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver', > &nbsp; 'table-name' = 'enum_test', > &nbsp; 'username' = 'pms_etl', > &nbsp; 'password' = 'pms_etl_q' > ) > > > """) > > > # 3. create sink Table > table_env.execute_sql(""" > &nbsp; &nbsp; CREATE TABLE print ( > &nbsp; &nbsp; &nbsp; &nbsp; e string > &nbsp; &nbsp; ) WITH ( > &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print' > &nbsp; &nbsp; ) > """) > &nbsp; &nbsp; > > > table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > > > > 我直接用python执行时候错误返回如下 > > > Traceback (most recent call last): > &nbsp; File "demo.py", line 41, in <module&gt; > &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql > &nbsp; &nbsp; return TableResult(self._j_tenv.executeSql(stmt)) > &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ > &nbsp; &nbsp; answer, self.gateway_client, self.target_id, self.name) > &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco > &nbsp; &nbsp; return f(*a, **kw) > &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value > &nbsp; &nbsp; format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. > : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'. > > > Table options are: > > > 'connector'='jdbc' > 'driver'='com.mysql.cj.jdbc.Driver' > 'password'='pms_etl_q' > 'table-name'='enum_test' > 'url'='jdbc:mysql://*******:3306/test' > 'username'='pms_etl' > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133) > &nbsp; &nbsp; &nbsp; &nbsp; ... 31 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > > > Available factory identifiers are: > > > blackhole > datagen > filesystem > print > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > &nbsp; &nbsp; &nbsp; &nbsp; ... 33 more > > > > 我用flink run -py demo.py 返回错误如下: > > > &nbsp; File "./demo.py", line 41, in <module&gt; > &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql > &nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ > &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco > pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > > > org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > &nbsp; &nbsp; &nbsp; &nbsp; ... 13 more > > > > 请问我该如何解决? |
这样试试,把”\”改成”/“:
file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar > 2021年6月1日 下午5:40,琴师 <[hidden email]> 写道: > > 再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar; 这样不能引入,大佬有用用过吗? > > > ------------------ 原始邮件 ------------------ > 发件人: "琴师" <[hidden email]>; > 发送时间: 2021年6月1日(星期二) 下午5:30 > 收件人: "user-zh"<[hidden email]>; > > 主题: 回复: Pyflink jdbc相关 > > > > > > 感谢,我换成2.11确实可以了!!!! > > > ------------------ 原始邮件 ------------------ > 发件人: "user-zh" <[hidden email]>; > 发送时间: 2021年6月1日(星期二) 下午5:04 > 收件人: "user-zh"<[hidden email]>;"琴师"<[hidden email]>; > > 主题: Re: Pyflink jdbc相关 > > > > Hi, > > 本地执行: > 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的 > > > flink run: > 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。 > > > > 2021年6月1日 下午4:33,琴师 <[hidden email]> 写道: > > > > Hi, > > &nbsp; &nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1 > > 我的原代码如下: > > > > > > from pyflink.datastream import StreamExecutionEnvironment > > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > > env = StreamExecutionEnvironment.get_execution_environment() > > table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > > table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar") > > > > > > # 2. create source Table > > table_env.execute_sql(""" > > > > > > CREATE TABLE table_source ( > > &nbsp; e string > > ) WITH ( > > &nbsp;'connector' = 'jdbc', > > &nbsp; 'url' = 'jdbc:mysql://********:3306/test', > > &nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver', > > &nbsp; 'table-name' = 'enum_test', > > &nbsp; 'username' = 'pms_etl', > > &nbsp; 'password' = 'pms_etl_q' > > ) > > > > > > """) > > > > > > # 3. create sink Table > > table_env.execute_sql(""" > > &nbsp; &nbsp; CREATE TABLE print ( > > &nbsp; &nbsp; &nbsp; &nbsp; e string > > &nbsp; &nbsp; ) WITH ( > > &nbsp; &nbsp; &nbsp; &nbsp; 'connector' = 'print' > > &nbsp; &nbsp; ) > > """) > > &nbsp; &nbsp; > > > > > > table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > > > > > > > > 我直接用python执行时候错误返回如下 > > > > > > Traceback (most recent call last): > > &nbsp; File "demo.py", line 41, in <module&gt; > > &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > > &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql > > &nbsp; &nbsp; return TableResult(self._j_tenv.executeSql(stmt)) > > &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ > > &nbsp; &nbsp; answer, self.gateway_client, self.target_id, self.name) > > &nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco > > &nbsp; &nbsp; return f(*a, **kw) > > &nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value > > &nbsp; &nbsp; format(target_id, ".", name), value) > > py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. > > : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'. > > > > > > Table options are: > > > > > > 'connector'='jdbc' > > 'driver'='com.mysql.cj.jdbc.Driver' > > 'password'='pms_etl_q' > > 'table-name'='enum_test' > > 'url'='jdbc:mysql://*******:3306/test' > > 'username'='pms_etl' > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133) > > &nbsp; &nbsp; &nbsp; &nbsp; ... 31 more > > Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > > > > > > Available factory identifiers are: > > > > > > blackhole > > datagen > > filesystem > > print > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > > &nbsp; &nbsp; &nbsp; &nbsp; ... 33 more > > > > > > > > 我用flink run -py demo.py 返回错误如下: > > > > > > &nbsp; File "./demo.py", line 41, in <module&gt; > > &nbsp; &nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > > &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql > > &nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ > > &nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco > > pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > > &nbsp; &nbsp; &nbsp; &nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.Thread.run(Thread.java:748) > > > > > > org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > &nbsp; &nbsp; &nbsp; &nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > &nbsp; &nbsp; &nbsp; &nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > > &nbsp; &nbsp; &nbsp; &nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > > &nbsp; &nbsp; &nbsp; &nbsp; ... 13 more > > > > > > > > 请问我该如何解决? |
已解决,非常感谢!
------------------ 原始邮件 ------------------ 发件人: "user-zh" <[hidden email]>; 发送时间: 2021年6月1日(星期二) 晚上7:21 收件人: "user-zh"<[hidden email]>;"琴师"<[hidden email]>; 主题: Re: Pyflink jdbc相关 这样试试,把”\”改成”/“: file:///D:/Pyproject/flink-connector-jdbc_2.11-1.13.1.jar > 2021年6月1日 下午5:40,琴师 <[hidden email]> 写道: > > 再请叫一个问题,我在pycharm使用时候引用windows的地址不能引用,比如file:///D:\Pyproject\flink-connector-jdbc_2.11-1.13.1.jar; 这样不能引入,大佬有用用过吗? > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人: "琴师" <[hidden email]&gt;; > 发送时间:&nbsp;2021年6月1日(星期二) 下午5:30 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;; > > 主题:&nbsp;回复: Pyflink jdbc相关 > > > > > > 感谢,我换成2.11确实可以了!!!! > > > ------------------ 原始邮件 ------------------ > 发件人: "user-zh" <[hidden email]&gt;; > 发送时间:&nbsp;2021年6月1日(星期二) 下午5:04 > 收件人:&nbsp;"user-zh"<[hidden email]&gt;;"琴师"<[hidden email]&gt;; > > 主题:&nbsp;Re: Pyflink jdbc相关 > > > > Hi, > > 本地执行: > 1)用flink-connector-jdbc_2.11-1.13.1.jar试试?因为PyFlink里默认待的JAR包是scala 2.11的 > > > flink run: > 1) 你注册的sink表的名字为“print”,不是”table_sink”,但是在SQL语句里用的table_sink。 > > > &gt; 2021年6月1日 下午4:33,琴师 <[hidden email]&gt; 写道: > &gt; > &gt; Hi, > &gt; &amp;nbsp; &amp;nbsp;我按着微信分享https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q 试着使用pyflink,遇到了问题,我引用了jdbc的jar包,但是仍然提示我jdbc错误。我的flink版本是1.13.1 > &gt; 我的原代码如下: > &gt; > &gt; > &gt; from pyflink.datastream import StreamExecutionEnvironment > &gt; from pyflink.table import StreamTableEnvironment, EnvironmentSettings > &gt; env = StreamExecutionEnvironment.get_execution_environment() > &gt; table_env = StreamTableEnvironment.create( env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) > &gt; table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar") > &gt; > &gt; > &gt; # 2. create source Table > &gt; table_env.execute_sql(""" > &gt; > &gt; > &gt; CREATE TABLE table_source ( > &gt; &amp;nbsp; e string > &gt; ) WITH ( > &gt; &amp;nbsp;'connector' = 'jdbc', > &gt; &amp;nbsp; 'url' = 'jdbc:mysql://********:3306/test', > &gt; &amp;nbsp; 'driver' = 'com.mysql.cj.jdbc.Driver', > &gt; &amp;nbsp; 'table-name' = 'enum_test', > &gt; &amp;nbsp; 'username' = 'pms_etl', > &gt; &amp;nbsp; 'password' = 'pms_etl_q' > &gt; ) > &gt; > &gt; > &gt; """) > &gt; > &gt; > &gt; # 3. create sink Table > &gt; table_env.execute_sql(""" > &gt; &amp;nbsp; &amp;nbsp; CREATE TABLE print ( > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e string > &gt; &amp;nbsp; &amp;nbsp; ) WITH ( > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; 'connector' = 'print' > &gt; &amp;nbsp; &amp;nbsp; ) > &gt; """) > &gt; &amp;nbsp; &amp;nbsp; > &gt; > &gt; > &gt; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &gt; > &gt; > &gt; > &gt; 我直接用python执行时候错误返回如下 > &gt; > &gt; > &gt; Traceback (most recent call last): > &gt; &amp;nbsp; File "demo.py", line 41, in <module&amp;gt; > &gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 804, in execute_sql > &gt; &amp;nbsp; &amp;nbsp; return TableResult(self._j_tenv.executeSql(stmt)) > &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ > &gt; &amp;nbsp; &amp;nbsp; answer, self.gateway_client, self.target_id, self.name) > &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco > &gt; &amp;nbsp; &amp;nbsp; return f(*a, **kw) > &gt; &amp;nbsp; File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value > &gt; &amp;nbsp; &amp;nbsp; format(target_id, ".", name), value) > &gt; py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. > &gt; : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.table_source'. > &gt; > &gt; > &gt; Table options are: > &gt; > &gt; > &gt; 'connector'='jdbc' > &gt; 'driver'='com.mysql.cj.jdbc.Driver' > &gt; 'password'='pms_etl_q' > &gt; 'table-name'='enum_test' > &gt; 'url'='jdbc:mysql://*******:3306/test' > &gt; 'username'='pms_etl' > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748) > &gt; Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc' > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 31 more > &gt; Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > &gt; > &gt; > &gt; Available factory identifiers are: > &gt; > &gt; > &gt; blackhole > &gt; datagen > &gt; filesystem > &gt; print > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 33 more > &gt; > &gt; > &gt; > &gt; 我用flink run -py demo.py 返回错误如下: > &gt; > &gt; > &gt; &amp;nbsp; File "./demo.py", line 41, in <module&amp;gt; > &gt; &amp;nbsp; &amp;nbsp; table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_source").wait() > &gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 804, in execute_sql > &gt; &amp;nbsp; File "/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ > &gt; &amp;nbsp; File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco > &gt; pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Sink `default_catalog`.`default_database`.`table_sink` does not exists > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach(Iterator.scala:937) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.Iterator.foreach$(Iterator.scala:937) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach(IterableLike.scala:70) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map(TraversableLike.scala:233) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at scala.collection.AbstractTraversable.map(Traversable.scala:104) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.Thread.run(Thread.java:748) > &gt; > &gt; > &gt; org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1 > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at java.lang.reflect.Method.invoke(Method.java:498) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > &gt; Caused by: java.lang.RuntimeException: Python process exits with code: 1 > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ... 13 more > &gt; > &gt; > &gt; > &gt; 请问我该如何解决? |
Free forum by Nabble | Edit this page |