大家好,我直接使用ddl定义kafka数据源出现了问题。
kafka里是logstash采上来的json格式数据。 ddl如下: CREATE TABLE vpn_source ( c_real_ip VARCHAR, d_real_ip VARCHAR, c_real_port INT, d_real_port INT, logtype INT, `user` VARCHAR, host_ip VARCHAR ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'vpnlog', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'flink_test', 'format.type' = 'json' ) 报错如下: Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.properties.bootstrap.servers=10.208.0.73:9092 connector.properties.group.id=flink_test connector.properties.zookeeper.connect=10.208.0.73:2181 connector.topic=vpnlog connector.type=kafka connector.version=universal format.type=json schema.0.data-type=VARCHAR(2147483647) schema.0.name=c_real_ip schema.1.data-type=VARCHAR(2147483647) schema.1.name=d_real_ip schema.2.data-type=INT schema.2.name=c_real_port schema.3.data-type=INT schema.3.name=d_real_port schema.4.data-type=INT schema.4.name=logtype schema.5.data-type=VARCHAR(2147483647) schema.5.name=user schema.6.data-type=VARCHAR(2147483647) schema.6.name=host_ip The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.filesystem.FileSystemTableFactory flink环境 本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。 |
Hi,
你的DDL没有问题,问题应该是你没有把kafka的jar包添加进来。你可以到 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html 这里下载kafaka的universal版本的jar包。关于如何把jar包添加到pyflink里面使用,你可以参考文档 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files Best, Xingbo Wanmail1997 <[hidden email]> 于2020年8月26日周三 下午5:33写道: > 大家好,我直接使用ddl定义kafka数据源出现了问题。 > > kafka里是logstash采上来的json格式数据。 > > > > > ddl如下: > > CREATE TABLE vpn_source ( > > c_real_ip VARCHAR, > > d_real_ip VARCHAR, > > c_real_port INT, > > d_real_port INT, > > logtype INT, > > `user` VARCHAR, > > host_ip VARCHAR > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'vpnlog', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = 'localhost:9092', > > 'connector.properties.group.id' = 'flink_test', > > 'format.type' = 'json' > > ) > > > > > 报错如下: > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > > the classpath. > > Reason: Required context properties mismatch. > > The following properties are requested: > > connector.properties.bootstrap.servers=10.208.0.73:9092 > > connector.properties.group.id=flink_test > > connector.properties.zookeeper.connect=10.208.0.73:2181 > > connector.topic=vpnlog > > connector.type=kafka > > connector.version=universal > > format.type=json > > schema.0.data-type=VARCHAR(2147483647) > > schema.0.name=c_real_ip > > schema.1.data-type=VARCHAR(2147483647) > > schema.1.name=d_real_ip > > schema.2.data-type=INT > > schema.2.name=c_real_port > > schema.3.data-type=INT > > schema.3.name=d_real_port > > schema.4.data-type=INT > > schema.4.name=logtype > > schema.5.data-type=VARCHAR(2147483647) > > schema.5.name=user > > schema.6.data-type=VARCHAR(2147483647) > > schema.6.name=host_ip > > The following factories have been considered: > > org.apache.flink.table.sources.CsvBatchTableSourceFactory > > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > org.apache.flink.table.filesystem.FileSystemTableFactory > > > > > flink环境 > > 本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。 > > > > > > > > > |
感谢您的解答,问题解决了。
> ------------------ 原始邮件 ------------------ > 发 件 人:"Xingbo Huang" <[hidden email]> > 发送时间:2020-08-27 09:51:49 > 收 件 人:user-zh <[hidden email]> > 抄 送: > 主 题:Re: pyflink kafka connector的问题 > > Hi, > > 你的DDL没有问题,问题应该是你没有把kafka的jar包添加进来。你可以到 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html > 这里下载kafaka的universal版本的jar包。关于如何把jar包添加到pyflink里面使用,你可以参考文档 > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files > > Best, > Xingbo > > Wanmail1997 于2020年8月26日周三 下午5:33写道: > > > 大家好,我直接使用ddl定义kafka数据源出现了问题。 > > > > kafka里是logstash采上来的json格式数据。 > > > > > > > > > > ddl如下: > > > > CREATE TABLE vpn_source ( > > > > c_real_ip VARCHAR, > > > > d_real_ip VARCHAR, > > > > c_real_port INT, > > > > d_real_port INT, > > > > logtype INT, > > > > `user` VARCHAR, > > > > host_ip VARCHAR > > > > ) WITH ( > > > > 'connector.type' = 'kafka', > > > > 'connector.version' = 'universal', > > > > 'connector.topic' = 'vpnlog', > > > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > > > 'connector.properties.bootstrap.servers' = 'localhost:9092', > > > > 'connector.properties.group.id' = 'flink_test', > > > > 'format.type' = 'json' > > > > ) > > > > > > > > > > 报错如下: > > > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > > Could not find a suitable table factory for > > 'org.apache.flink.table.factories.TableSourceFactory' in > > > > the classpath. > > > > Reason: Required context properties mismatch. > > > > The following properties are requested: > > > > connector.properties.bootstrap.servers=10.208.0.73:9092 > > > > connector.properties.group.id=flink_test > > > > connector.properties.zookeeper.connect=10.208.0.73:2181 > > > > connector.topic=vpnlog > > > > connector.type=kafka > > > > connector.version=universal > > > > format.type=json > > > > schema.0.data-type=VARCHAR(2147483647) > > > > schema.0.name=c_real_ip > > > > schema.1.data-type=VARCHAR(2147483647) > > > > schema.1.name=d_real_ip > > > > schema.2.data-type=INT > > > > schema.2.name=c_real_port > > > > schema.3.data-type=INT > > > > schema.3.name=d_real_port > > > > schema.4.data-type=INT > > > > schema.4.name=logtype > > > > schema.5.data-type=VARCHAR(2147483647) > > > > schema.5.name=user > > > > schema.6.data-type=VARCHAR(2147483647) > > > > schema.6.name=host_ip > > > > The following factories have been considered: > > > > org.apache.flink.table.sources.CsvBatchTableSourceFactory > > > > org.apache.flink.table.sources.CsvAppendTableSourceFactory > > > > org.apache.flink.table.filesystem.FileSystemTableFactory > > > > > > > > > > flink环境 > > > > 本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。 > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |