我在用 flink最新的版本 (1.11.2) 去连接mysql的数据
下面是我的环境 docker flink 环境 是 flink:scala_2.12-java8 docker pull flink:scala_2.12-java8 jdbc 使用的是最新的 flink-connector-jdbc_2.11-1.11.2.jar,并且已经使用了 flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar 这几个jdbc的库已经放到了{FLINK}/lib。 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html docker 的环境 version: '2.1' services: jobmanager: build: . image: flink:latest hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:latest expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - jobmanager:jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager mysql: image: debezium/example-mysql ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw 为了使用jdbc的库,我用了下面两种方法 1. 把 flink-connector-jdbc_2.11-1.11.2.jar 放在路径下 /usr/local/lib/python3.7/site-packages/pyflink/lib 2. 用python代码来配置flink base_dir = "/Users/huhu/Documents/projects/webapp/libs/" flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar" BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars) 我的代码是 T_CONFIG = TableConfig() B_EXEC_ENV = ExecutionEnvironment.get_execution_environment() B_EXEC_ENV.set_parallelism(1) BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG) base_dir = "/Users/huhu/Documents/projects/webapp/libs/" flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar" BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars) ddl = """ CREATE TABLE nba_player4 ( first_name STRING , last_name STRING, email STRING, id INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/inventory', 'username' = 'root', 'password' = 'debezium', 'table-name' = 'customers' ) """; BT_ENV.sql_update(ddl) sinkddl = """ CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING, f3 DOUBLE ) WITH ( 'connector' = 'print' ) """; BT_ENV.sql_update(sinkddl) sqlquery("SELECT first_name, last_name FROM nba_player4 "); BT_ENV.execute("table_job") 运行代码的时候出现的错误 py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=jdbc password=debezium schema.0.data-type=VARCHAR(2147483647) schema.0.name=first_name schema.1.data-type=VARCHAR(2147483647) schema.1.name=last_name schema.2.data-type=VARCHAR(2147483647) schema.2.name=email schema.3.data-type=INT schema.3.name=id table-name=customers url=jdbc:mysql://localhost:3306/inventory username=root The following factories have been considered: org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.filesystem.FileSystemTableFactory 但是依然出现了问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/ |
Free forum by Nabble | Edit this page |