使用最新1.11版本 flink和pyflink连接mysql出现 Required context properties mismatch 错误

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

使用最新1.11版本 flink和pyflink连接mysql出现 Required context properties mismatch 错误

helxsz
我在用 flink最新的版本 (1.11.2) 去连接mysql的数据


下面是我的环境

docker flink 环境 是 flink:scala_2.12-java8

    docker pull flink:scala_2.12-java8

jdbc 使用的是最新的 flink-connector-jdbc_2.11-1.11.2.jar,并且已经使用了
flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar,
postgresql-42.2.17.jar  这几个jdbc的库已经放到了{FLINK}/lib。

   
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

docker 的环境

    version: '2.1'
    services:
      jobmanager:
        build: .
        image: flink:latest
        hostname: "jobmanager"
        expose:
          - "6123"
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
      taskmanager:
        image: flink:latest
        expose:
          - "6121"
          - "6122"
        depends_on:
          - jobmanager
        command: taskmanager
        links:
          - jobmanager:jobmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
      mysql:
        image: debezium/example-mysql
        ports:
         - "3306:3306"
        environment:
         - MYSQL_ROOT_PASSWORD=debezium
         - MYSQL_USER=mysqluser
         - MYSQL_PASSWORD=mysqlpw



为了使用jdbc的库,我用了下面两种方法
1. 把 flink-connector-jdbc_2.11-1.11.2.jar 放在路径下
/usr/local/lib/python3.7/site-packages/pyflink/lib

2. 用python代码来配置flink

        base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
        flink_jdbc_jar =
f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
   
     
BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)

我的代码是

    T_CONFIG = TableConfig()
    B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
    B_EXEC_ENV.set_parallelism(1)
    BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)

     base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
     flink_jdbc_jar =
f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
 
   
BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)

    ddl = """
                CREATE TABLE nba_player4 (
                     first_name STRING ,
                     last_name STRING,
                     email STRING,
                     id INT
                ) WITH (
                    'connector' = 'jdbc',
                    'url' = 'jdbc:mysql://localhost:3306/inventory',
                    'username' = 'root',
                    'password' = 'debezium',
                    'table-name' = 'customers'
                )
          """;
    BT_ENV.sql_update(ddl)
   
    sinkddl = """
            CREATE TABLE print_table (
             f0 INT,
             f1 INT,
             f2 STRING,
             f3 DOUBLE
            ) WITH (
             'connector' = 'print'
            )
          """;
    BT_ENV.sql_update(sinkddl)


    sqlquery("SELECT first_name, last_name  FROM nba_player4 ");
    BT_ENV.execute("table_job")


运行代码的时候出现的错误

    py4j.protocol.Py4JJavaError: An error occurred while calling
o23.sqlQuery.
    : org.apache.flink.table.api.ValidationException: SQL validation failed.
findAndCreateTableSource failed.
   
    Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
    the classpath.
   
    Reason: Required context properties mismatch.
   
    The following properties are requested:
    connector=jdbc
    password=debezium
    schema.0.data-type=VARCHAR(2147483647)
    schema.0.name=first_name
    schema.1.data-type=VARCHAR(2147483647)
    schema.1.name=last_name
    schema.2.data-type=VARCHAR(2147483647)
    schema.2.name=email
    schema.3.data-type=INT
    schema.3.name=id
    table-name=customers
    url=jdbc:mysql://localhost:3306/inventory
    username=root
   
    The following factories have been considered:
    org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
    org.apache.flink.table.sources.CsvBatchTableSourceFactory
    org.apache.flink.table.sources.CsvAppendTableSourceFactory
    org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
    org.apache.flink.table.filesystem.FileSystemTableFactory





但是依然出现了问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/