在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
#DDL定义 source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\ symbol_id VARCHAR,biz_date VARCHAR,\ ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\ is_valid DECIMAL,time_mark TIMESTAMP) WITH ( 'connector' = 'jdbc', 'connector.url' = 'jdbc:mysql://ip:port/db_base', 'connector.table' = 'ts_pf_sec_yldrate', 'table-name' = 'ts_pf_sec_yldrate', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxx') """ 错误信息: Traceback (most recent call last): File "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py", line 67, in <module> print(join.to_pandas().head(6)) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py", line 807, in to_pandas .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'. Table options are: 'connector'='jdbc' 'connector.driver'='com.mysql.jdbc.Driver' 'connector.password'='xxx' 'connector.table'='ts_pf_sec_yldrate' 'connector.url'='jdbc:mysql://ip:port/db_base' 'connector.username'='xxx' 'table-name'='ts_pf_sec_yldrate' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100) at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495) at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099) at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149) at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) at java.util.Collections$SingletonList.forEach(Collections.java:4824) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47) at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) at java.util.Arrays$ArrayList.forEach(Arrays.java:3880) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62) at org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128) at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) at java.util.Collections$SingletonList.forEach(Collections.java:4824) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:72) at org.apache.flink.table.operations.FilterQueryOperation.accept(FilterQueryOperation.java:68) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:202) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:630) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing. Missing required options are: url at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:303) at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:280) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:553) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSource(JdbcDynamicTableFactory.java:179) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ... 58 more |
Hi,
正如报错中提示的,with参数里需要的是"url"参数,你可以尝试将connector.url改成url试试看会不会报错了。 > 在 2020年12月21日,13:44,肖越 <[hidden email]> 写道: > > 在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答? > #DDL定义 > source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\ > > symbol_id VARCHAR,biz_date VARCHAR,\ > > ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\ > > is_valid DECIMAL,time_mark TIMESTAMP) WITH ( > > 'connector' = 'jdbc', > > 'connector.url' = 'jdbc:mysql://ip:port/db_base', > > 'connector.table' = 'ts_pf_sec_yldrate', > > 'table-name' = 'ts_pf_sec_yldrate', > > 'connector.driver' = 'com.mysql.jdbc.Driver', > > 'connector.username' = 'xxx', > > 'connector.password' = 'xxx') > > """ > 错误信息: > Traceback (most recent call last): > File "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py", line 67, in <module> > print(join.to_pandas().head(6)) > File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py", line 807, in to_pandas > .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size) > File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco > return f(*a, **kw) > File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame. > : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'. > > > Table options are: > > > 'connector'='jdbc' > 'connector.driver'='com.mysql.jdbc.Driver' > 'connector.password'='xxx' > 'connector.table'='ts_pf_sec_yldrate' > 'connector.url'='jdbc:mysql://ip:port/db_base' > 'connector.username'='xxx' > 'table-name'='ts_pf_sec_yldrate' > at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) > at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265) > at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100) > at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495) > at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099) > at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123) > at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339) > at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149) > at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) > at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) > at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) > at java.util.Collections$SingletonList.forEach(Collections.java:4824) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47) > at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) > at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) > at java.util.Arrays$ArrayList.forEach(Arrays.java:3880) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62) > at org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128) > at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) > at java.util.Collections$SingletonList.forEach(Collections.java:4824) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:72) > at org.apache.flink.table.operations.FilterQueryOperation.accept(FilterQueryOperation.java:68) > at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186) > at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:202) > at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164) > at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703) > at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570) > at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:630) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing. > > > Missing required options are: > > > url > at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:303) > at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:280) > at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:553) > at org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSource(JdbcDynamicTableFactory.java:179) > at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) > ... 58 more |
Free forum by Nabble | Edit this page |