大佬好,Flink小白用户,在做示例时报错,还请指点一二错误代码如下:Traceback (most recent call last): File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o46.insertInto. : org.apache.flink.table.api.TableException: BatchTableSink or OutputFormatTableSink required to emit batch Table. at org.apache.flink.table.api.internal.BatchTableEnvImpl.writeToSink(BatchTableEnvImpl.scala:154) at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:664) at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:607) at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/python3/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/python3/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/3de6e8aa-f179-4356-b2d3-792601cdc714/flinkrescount.py", line 33, in <module> t_env.from_path('mySource').group_by('word').select('word,count(1)').insert_into('flink_test') File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/table/table.py", line 679, in insert_into File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/util/exceptions.py", line 154, in deco pyflink.util.exceptions.TableException: 'BatchTableSink or OutputFormatTableSink required to emit batch Table.' org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
python源码如下: from pyflink.datastream import StreamExecutionEnvironment from pyflink.dataset import ExecutionEnvironment from pyflink.table import BatchTableEnvironment,TableEnvironment,TableConfig,DataTypes,StreamTableEnvironment,EnvironmentSettings from pyflink.table.descriptors import Schema, OldCsv, FileSystem flink_test = """ CREATE TABLE flink_test ( name VARCHAR, cnt BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/test', 'connector.table' = 'flink_test', 'connector.username' = 'root', 'connector.password' = '11122344', 'connector.write.flush.interval' = '1s' ) """ #exec_env = ExecutionEnvironment.get_execution_environment() #t_config = TableConfig() #t_env = BatchTableEnvironment.create(exec_env,t_config) env = ExecutionEnvironment.get_execution_environment() #environment_settions = EnvironmentSettings.new_instance().use_blink_planner().build() t_config = TableConfig() t_env = BatchTableEnvironment.create(env,t_config) #,environment_settings=environment_settions t_env.connect(FileSystem().path('/home/admin/data/input')).with_format(OldCsv().field('word',DataTypes.STRING())).with_schema(Schema().field('word',DataTypes.STRING())).create_temporary_table('mySource') t_env.sql_update(flink_test) t_env.from_path('mySource').group_by('word').select('word,count(1)').insert_into('flink_test') t_env.execute("flink_link_mysql") |
Hello, 雪魂
在1.10里面的batch模式(flink planner和blink planner)都是没法直接使用sql ddl的方式将jdbc作为sink的。 需要你注册使用JDBCAppendSink。 对于PyFlink的用户来说,需要wrapper一下这个类。我写了一个简单的wrapper,你可以参考一下 from pyflink.java_gateway import get_gateway from pyflink.table.types import _to_java_type from pyflink.util import utils class JDBCAppendSink(TableSink): def __init__(self, field_names: list, field_types: list, driver_name: str, db_url: str, username: str, password: str, query: str): gateway = get_gateway() j_field_names = utils.to_jarray(gateway.jvm.String, field_names) j_field_types = utils.to_jarray(gateway.jvm.TypeInformation, [_to_java_type(field_type) for field_type in field_types]) builder = gateway.jvm.org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder() builder.setUsername(username) builder.setPassword(password) builder.setDrivername(driver_name) builder.setDBUrl(db_url) builder.setParameterTypes(j_field_types) builder.setQuery(query) j_jdbc_sink = builder.build() j_jdbc_sink = j_jdbc_sink.configure(j_field_names, j_field_types) super(JDBCAppendSink, self).__init__(j_jdbc_sink) 注册sink用这个(你把user_name,pass_word,表呀之类的换成你的,然后那个query改成你的) bt_env.register_table_sink("jdbc", JDBCAppendSink( ["a", "interval"], [ DataTypes.STRING(), DataTypes.BIGINT()], "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/flink_test", "root", "xxtxxthmhxb0643", "insert into buyback (a, b) values (?, ?)" )) ...这里是你的result产生的逻辑 # 写入 result.insert_into("jdbc") 雪魂 <[hidden email]> 于2020年6月15日周一 下午3:44写道: > 大佬好,Flink小白用户,在做示例时报错,还请指点一二错误代码如下:Traceback (most recent call last): > File > "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco File > "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value py4j.protocol.Py4JJavaError: An error > occurred while calling o46.insertInto. : > org.apache.flink.table.api.TableException: BatchTableSink or > OutputFormatTableSink required to emit batch Table. at > org.apache.flink.table.api.internal.BatchTableEnvImpl.writeToSink(BatchTableEnvImpl.scala:154) > at > org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:664) > at > org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:607) > at > org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) During handling of the above > exception, another exception occurred: Traceback (most recent call last): > File "/usr/local/python3/lib/python3.7/runpy.py", line 193, in > _run_module_as_main "__main__", mod_spec) File > "/usr/local/python3/lib/python3.7/runpy.py", line 85, in _run_code > exec(code, run_globals) File > "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/3de6e8aa-f179-4356-b2d3-792601cdc714/flinkrescount.py", > line 33, in <module> > t_env.from_path('mySource').group_by('word').select('word,count(1)').insert_into('flink_test') > File > "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/table/table.py", > line 679, in insert_into File > "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ File > "/tmp/pyflink/db00a36e-521f-4109-b787-712687fcb3b4/2add2d51-8a74-40c4-9c42-69c7c12feb05pyflink.zip/pyflink/util/exceptions.py", > line 154, in deco pyflink.util.exceptions.TableException: 'BatchTableSink > or OutputFormatTableSink required to emit batch Table.' > org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:87) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) > > python源码如下: > > > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.dataset import ExecutionEnvironment > from pyflink.table import > BatchTableEnvironment,TableEnvironment,TableConfig,DataTypes,StreamTableEnvironment,EnvironmentSettings > from pyflink.table.descriptors import Schema, OldCsv, FileSystem > > > > flink_test = """ > CREATE TABLE flink_test ( > name VARCHAR, > cnt BIGINT > ) WITH ( > 'connector.type' = 'jdbc', > 'connector.url' = 'jdbc:mysql://localhost:3306/test', > 'connector.table' = 'flink_test', > 'connector.username' = 'root', > 'connector.password' = '11122344', > 'connector.write.flush.interval' = '1s' > ) > """ > > #exec_env = ExecutionEnvironment.get_execution_environment() > #t_config = TableConfig() > #t_env = BatchTableEnvironment.create(exec_env,t_config) > env = ExecutionEnvironment.get_execution_environment() > #environment_settions = > EnvironmentSettings.new_instance().use_blink_planner().build() > t_config = TableConfig() > t_env = BatchTableEnvironment.create(env,t_config) > #,environment_settings=environment_settions > > > t_env.connect(FileSystem().path('/home/admin/data/input')).with_format(OldCsv().field('word',DataTypes.STRING())).with_schema(Schema().field('word',DataTypes.STRING())).create_temporary_table('mySource') > t_env.sql_update(flink_test) > > t_env.from_path('mySource').group_by('word').select('word,count(1)').insert_into('flink_test') > > t_env.execute("flink_link_mysql") |
This post was updated on .
hello Xingbo Huang,
when I run , I had some error java.lang.AbstractMethodError: Method org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.consumeDataSet(Lorg/apache/flink/api/java/DataSet;)Lorg/apache/flink/api/java/operators/DataSink; is abstract how to solve issue ? Thank hieule -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Hi hieule,
This work around method is used in flink 1.10, in flink 1.11 you can use ddl directly (blink planner) which you can refer to [1]. For how to use blink planner in PyFlink, you can refer to following code: t_env = BatchTableEnvironment.create( environment_settings=EnvironmentSettings.new_instance() .in_batch_mode().use_blink_planner().build()) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html Best, Xingbo hieule <[hidden email]> 于2020年7月13日周一 下午4:46写道: > hello Xingbo Huang, > > when I run , I had some error > > `TypeError: Could not found the Java class > 'org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder'. The Java > dependencies could be specified via command line argument '--jarfile' or > the > config option 'pipeline.jars' ` > > > how to solve issue ? > > Thank > hieule > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > |
Free forum by Nabble | Edit this page |