hi
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3),\n" + " proctime as PROCTIME(),\n" + " WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = 'universal', \n" + " 'connector.topic' = 'flink_im02', \n" + " 'connector.properties.group.id' = 'flink_im02_new',\n" + " 'connector.startup-mode' = 'earliest-offset', \n" + " 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + " 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + " 'format.type' = 'csv',\n" + " 'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰 |
参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 下面的语法应该是不支持的: 'format.type' = 'csv',\n" + " 'format.field-delimiter' = '|'\n" 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} tEnv.sqlUpdate("CREATE TABLE pick_order (\n" + " order_no VARCHAR,\n" + " status INT\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'wanglei_test',\n" + " 'connector.startup-mode' = 'latest-offset',\n" + " 'connector.properties.0.key' = 'zookeeper.connect',\n" + " 'connector.properties.0.value' = 'xxx:2181',\n" + " 'connector.properties.1.key' = 'bootstrap.servers',\n" + " 'connector.properties.1.value' = 'xxx:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'true'\n" + ")"); 王磊 [hidden email] 发件人: 赵峰 发送时间: 2020-03-24 21:28 收件人: user-zh 主题: Flink JDBC Driver是否支持创建流数据表 hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3),\n" + " proctime as PROCTIME(),\n" + " WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = 'universal', \n" + " 'connector.topic' = 'flink_im02', \n" + " 'connector.properties.group.id' = 'flink_im02_new',\n" + " 'connector.startup-mode' = 'earliest-offset', \n" + " 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + " 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + " 'format.type' = 'csv',\n" + " 'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰 |
不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
<quote author='[hidden email]'> 参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 下面的语法应该是不支持的: 'format.type' = 'csv',\n" + " 'format.field-delimiter' = '|'\n" 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} tEnv.sqlUpdate("CREATE TABLE pick_order (\n" + " order_no VARCHAR,\n" + " status INT\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'wanglei_test',\n" + " 'connector.startup-mode' = 'latest-offset',\n" + " 'connector.properties.0.key' = 'zookeeper.connect',\n" + " 'connector.properties.0.value' = 'xxx:2181',\n" + " 'connector.properties.1.key' = 'bootstrap.servers',\n" + " 'connector.properties.1.value' = 'xxx:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'true'\n" + ")"); 王磊 [hidden email] 发件人: 赵峰 发送时间: 2020-03-24 21:28 收件人: user-zh 主题: Flink JDBC Driver是否支持创建流数据表 hi Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate( "CREATE TABLE table_kafka (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3),\n" + " proctime as PROCTIME(),\n" + " WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = 'universal', \n" + " 'connector.topic' = 'flink_im02', \n" + " 'connector.properties.group.id' = 'flink_im02_new',\n" + " 'connector.startup-mode' = 'earliest-offset', \n" + " 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + " 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + " 'format.type' = 'csv',\n" + " 'format.field-delimiter' = '|'\n" + ")"); ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); while (rs1.next()) { System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); } statement.close(); connection.close(); 报错: Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' 赵峰 </quote> Quoted from: http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html 赵峰 |
请确认一下 kafka connector 的jar包是否在 flink/lib 下。
目前的报错看起来是找不到kafka connector的jar包。 *Best Regards,* *Zhenghua Gao* On Wed, Mar 25, 2020 at 4:18 PM 赵峰 <[hidden email]> wrote: > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 > > > <quote author='[hidden email]'> > > 参考下这个文档: > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > 下面的语法应该是不支持的: > 'format.type' = 'csv',\n" + > " 'format.field-delimiter' = '|'\n" > > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} > tEnv.sqlUpdate("CREATE TABLE pick_order (\n" > + " order_no VARCHAR,\n" > + " status INT\n" > + ") WITH (\n" > + " 'connector.type' = 'kafka',\n" > + " 'connector.version' = 'universal',\n" > + " 'connector.topic' = 'wanglei_test',\n" > + " 'connector.startup-mode' = 'latest-offset',\n" > + " 'connector.properties.0.key' = 'zookeeper.connect',\n" > + " 'connector.properties.0.value' = 'xxx:2181',\n" > + " 'connector.properties.1.key' = 'bootstrap.servers',\n" > + " 'connector.properties.1.value' = 'xxx:9092',\n" > + " 'update-mode' = 'append',\n" > + " 'format.type' = 'json',\n" > + " 'format.derive-schema' = 'true'\n" > + ")"); > > 王磊 > > > [hidden email] > 发件人: 赵峰 > 发送时间: 2020-03-24 21:28 > 收件人: user-zh > 主题: Flink JDBC Driver是否支持创建流数据表 > hi > > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: > Connection connection = > DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); > Statement statement = connection.createStatement(); > statement.executeUpdate( > "CREATE TABLE table_kafka (\n" + > " user_id BIGINT,\n" + > " item_id BIGINT,\n" + > " category_id BIGINT,\n" + > " behavior STRING,\n" + > " ts TIMESTAMP(3),\n" + > " proctime as PROCTIME(),\n" + > " WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka', \n" + > " 'connector.version' = 'universal', \n" + > " 'connector.topic' = 'flink_im02', \n" + > " 'connector.properties.group.id' = 'flink_im02_new',\n" + > " 'connector.startup-mode' = 'earliest-offset', \n" + > " 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + > " 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + > " 'format.type' = 'csv',\n" + > " 'format.field-delimiter' = '|'\n" + > ")"); > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); > while (rs1.next()) { > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); > } > statement.close(); > connection.close(); > 报错: > Reason: Required context properties mismatch. > The matching candidates: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > 赵峰 > > </quote> > Quoted from: > http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html > > > > > 赵峰 |
hi 赵峰,
你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。 Best, Godfrey Zhenghua Gao <[hidden email]> 于2020年3月25日周三 下午4:26写道: > 请确认一下 kafka connector 的jar包是否在 flink/lib 下。 > 目前的报错看起来是找不到kafka connector的jar包。 > > *Best Regards,* > *Zhenghua Gao* > > > On Wed, Mar 25, 2020 at 4:18 PM 赵峰 <[hidden email]> wrote: > > > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 > > > > > > <quote author='[hidden email]'> > > > > 参考下这个文档: > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector > > 下面的语法应该是不支持的: > > 'format.type' = 'csv',\n" + > > " 'format.field-delimiter' = '|'\n" > > > > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} > > tEnv.sqlUpdate("CREATE TABLE pick_order (\n" > > + " order_no VARCHAR,\n" > > + " status INT\n" > > + ") WITH (\n" > > + " 'connector.type' = 'kafka',\n" > > + " 'connector.version' = 'universal',\n" > > + " 'connector.topic' = 'wanglei_test',\n" > > + " 'connector.startup-mode' = 'latest-offset',\n" > > + " 'connector.properties.0.key' = 'zookeeper.connect',\n" > > + " 'connector.properties.0.value' = 'xxx:2181',\n" > > + " 'connector.properties.1.key' = 'bootstrap.servers',\n" > > + " 'connector.properties.1.value' = 'xxx:9092',\n" > > + " 'update-mode' = 'append',\n" > > + " 'format.type' = 'json',\n" > > + " 'format.derive-schema' = 'true'\n" > > + ")"); > > > > 王磊 > > > > > > [hidden email] > > 发件人: 赵峰 > > 发送时间: 2020-03-24 21:28 > > 收件人: user-zh > > 主题: Flink JDBC Driver是否支持创建流数据表 > > hi > > > > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: > > Connection connection = > > DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); > > Statement statement = connection.createStatement(); > > statement.executeUpdate( > > "CREATE TABLE table_kafka (\n" + > > " user_id BIGINT,\n" + > > " item_id BIGINT,\n" + > > " category_id BIGINT,\n" + > > " behavior STRING,\n" + > > " ts TIMESTAMP(3),\n" + > > " proctime as PROCTIME(),\n" + > > " WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + > > ") WITH (\n" + > > " 'connector.type' = 'kafka', \n" + > > " 'connector.version' = 'universal', \n" + > > " 'connector.topic' = 'flink_im02', \n" + > > " 'connector.properties.group.id' = 'flink_im02_new',\n" + > > " 'connector.startup-mode' = 'earliest-offset', \n" + > > " 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + > > " 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + > > " 'format.type' = 'csv',\n" + > > " 'format.field-delimiter' = '|'\n" + > > ")"); > > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); > > while (rs1.next()) { > > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); > > } > > statement.close(); > > connection.close(); > > 报错: > > Reason: Required context properties mismatch. > > The matching candidates: > > org.apache.flink.table.sources.CsvBatchTableSourceFactory > > Mismatched properties: > > 'connector.type' expects 'filesystem', but is 'kafka' > > 赵峰 > > > > </quote> > > Quoted from: > > > http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html > > > > > > > > > > 赵峰 > |
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下
godfrey he <[hidden email]> 于2020年3月25日周三 下午6:24写道: > hi 赵峰, > > 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink > JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。 > > Best, > Godfrey > > Zhenghua Gao <[hidden email]> 于2020年3月25日周三 下午4:26写道: > >> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。 >> 目前的报错看起来是找不到kafka connector的jar包。 >> >> *Best Regards,* >> *Zhenghua Gao* >> >> >> On Wed, Mar 25, 2020 at 4:18 PM 赵峰 <[hidden email]> wrote: >> >> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中 >> > >> > >> > <quote author='[hidden email]'> >> > >> > 参考下这个文档: >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector >> > 下面的语法应该是不支持的: >> > 'format.type' = 'csv',\n" + >> > " 'format.field-delimiter' = '|'\n" >> > >> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90} >> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n" >> > + " order_no VARCHAR,\n" >> > + " status INT\n" >> > + ") WITH (\n" >> > + " 'connector.type' = 'kafka',\n" >> > + " 'connector.version' = 'universal',\n" >> > + " 'connector.topic' = 'wanglei_test',\n" >> > + " 'connector.startup-mode' = 'latest-offset',\n" >> > + " 'connector.properties.0.key' = 'zookeeper.connect',\n" >> > + " 'connector.properties.0.value' = 'xxx:2181',\n" >> > + " 'connector.properties.1.key' = 'bootstrap.servers',\n" >> > + " 'connector.properties.1.value' = 'xxx:9092',\n" >> > + " 'update-mode' = 'append',\n" >> > + " 'format.type' = 'json',\n" >> > + " 'format.derive-schema' = 'true'\n" >> > + ")"); >> > >> > 王磊 >> > >> > >> > [hidden email] >> > 发件人: 赵峰 >> > 发送时间: 2020-03-24 21:28 >> > 收件人: user-zh >> > 主题: Flink JDBC Driver是否支持创建流数据表 >> > hi >> > >> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下: >> > Connection connection = >> > >> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); >> > Statement statement = connection.createStatement(); >> > statement.executeUpdate( >> > "CREATE TABLE table_kafka (\n" + >> > " user_id BIGINT,\n" + >> > " item_id BIGINT,\n" + >> > " category_id BIGINT,\n" + >> > " behavior STRING,\n" + >> > " ts TIMESTAMP(3),\n" + >> > " proctime as PROCTIME(),\n" + >> > " WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" + >> > ") WITH (\n" + >> > " 'connector.type' = 'kafka', \n" + >> > " 'connector.version' = 'universal', \n" + >> > " 'connector.topic' = 'flink_im02', \n" + >> > " 'connector.properties.group.id' = 'flink_im02_new',\n" + >> > " 'connector.startup-mode' = 'earliest-offset', \n" + >> > " 'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" + >> > " 'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" + >> > " 'format.type' = 'csv',\n" + >> > " 'format.field-delimiter' = '|'\n" + >> > ")"); >> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka"); >> > while (rs1.next()) { >> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2)); >> > } >> > statement.close(); >> > connection.close(); >> > 报错: >> > Reason: Required context properties mismatch. >> > The matching candidates: >> > org.apache.flink.table.sources.CsvBatchTableSourceFactory >> > Mismatched properties: >> > 'connector.type' expects 'filesystem', but is 'kafka' >> > 赵峰 >> > >> > </quote> >> > Quoted from: >> > >> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html >> > >> > >> > >> > >> > 赵峰 >> > |
Free forum by Nabble | Edit this page |