pyflink kafka connector的问题

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

pyflink kafka connector的问题

Wanmail1997
大家好,我直接使用ddl定义kafka数据源出现了问题。

kafka里是logstash采上来的json格式数据。




ddl如下:

CREATE TABLE vpn_source (

c_real_ip VARCHAR,

d_real_ip VARCHAR,

c_real_port INT,

d_real_port INT,

logtype INT,

`user` VARCHAR,

host_ip VARCHAR

) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = 'vpnlog',

'connector.properties.zookeeper.connect' = 'localhost:2181',

'connector.properties.bootstrap.servers' = 'localhost:9092',

'connector.properties.group.id' = 'flink_test',

'format.type' = 'json'

)




报错如下:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:

connector.properties.bootstrap.servers=10.208.0.73:9092

connector.properties.group.id=flink_test

connector.properties.zookeeper.connect=10.208.0.73:2181

connector.topic=vpnlog

connector.type=kafka

connector.version=universal

format.type=json

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=c_real_ip

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=d_real_ip

schema.2.data-type=INT

schema.2.name=c_real_port

schema.3.data-type=INT

schema.3.name=d_real_port

schema.4.data-type=INT

schema.4.name=logtype

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=user

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=host_ip

The following factories have been considered:

org.apache.flink.table.sources.CsvBatchTableSourceFactory

org.apache.flink.table.sources.CsvAppendTableSourceFactory

org.apache.flink.table.filesystem.FileSystemTableFactory




flink环境

本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。








Reply | Threaded
Open this post in threaded view
|

Re: pyflink kafka connector的问题

Xingbo Huang
Hi,

你的DDL没有问题,问题应该是你没有把kafka的jar包添加进来。你可以到
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
这里下载kafaka的universal版本的jar包。关于如何把jar包添加到pyflink里面使用,你可以参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files

Best,
Xingbo

Wanmail1997 <[hidden email]> 于2020年8月26日周三 下午5:33写道:

> 大家好,我直接使用ddl定义kafka数据源出现了问题。
>
> kafka里是logstash采上来的json格式数据。
>
>
>
>
> ddl如下:
>
> CREATE TABLE vpn_source (
>
> c_real_ip VARCHAR,
>
> d_real_ip VARCHAR,
>
> c_real_port INT,
>
> d_real_port INT,
>
> logtype INT,
>
> `user` VARCHAR,
>
> host_ip VARCHAR
>
> ) WITH (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = 'universal',
>
> 'connector.topic' = 'vpnlog',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'connector.properties.group.id' = 'flink_test',
>
> 'format.type' = 'json'
>
> )
>
>
>
>
> 报错如下:
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
>
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
>
> connector.properties.bootstrap.servers=10.208.0.73:9092
>
> connector.properties.group.id=flink_test
>
> connector.properties.zookeeper.connect=10.208.0.73:2181
>
> connector.topic=vpnlog
>
> connector.type=kafka
>
> connector.version=universal
>
> format.type=json
>
> schema.0.data-type=VARCHAR(2147483647)
>
> schema.0.name=c_real_ip
>
> schema.1.data-type=VARCHAR(2147483647)
>
> schema.1.name=d_real_ip
>
> schema.2.data-type=INT
>
> schema.2.name=c_real_port
>
> schema.3.data-type=INT
>
> schema.3.name=d_real_port
>
> schema.4.data-type=INT
>
> schema.4.name=logtype
>
> schema.5.data-type=VARCHAR(2147483647)
>
> schema.5.name=user
>
> schema.6.data-type=VARCHAR(2147483647)
>
> schema.6.name=host_ip
>
> The following factories have been considered:
>
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>
> org.apache.flink.table.filesystem.FileSystemTableFactory
>
>
>
>
> flink环境
>
> 本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。
>
>
>
>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Re: pyflink kafka connector的问题

Wanmail1997
感谢您的解答,问题解决了。


> ------------------ 原始邮件 ------------------
> 发 件 人:"Xingbo Huang" <[hidden email]>
> 发送时间:2020-08-27 09:51:49
> 收 件 人:user-zh <[hidden email]>
> 抄 送:
> 主 题:Re: pyflink kafka connector的问题
>
> Hi,
>
> 你的DDL没有问题,问题应该是你没有把kafka的jar包添加进来。你可以到
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> 这里下载kafaka的universal版本的jar包。关于如何把jar包添加到pyflink里面使用,你可以参考文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/faq.html#adding-jar-files
>
> Best,
> Xingbo
>
> Wanmail1997 于2020年8月26日周三 下午5:33写道:
>
> > 大家好,我直接使用ddl定义kafka数据源出现了问题。
> >
> > kafka里是logstash采上来的json格式数据。
> >
> >
> >
> >
> > ddl如下:
> >
> > CREATE TABLE vpn_source (
> >
> > c_real_ip VARCHAR,
> >
> > d_real_ip VARCHAR,
> >
> > c_real_port INT,
> >
> > d_real_port INT,
> >
> > logtype INT,
> >
> > `user` VARCHAR,
> >
> > host_ip VARCHAR
> >
> > ) WITH (
> >
> > 'connector.type' = 'kafka',
> >
> > 'connector.version' = 'universal',
> >
> > 'connector.topic' = 'vpnlog',
> >
> > 'connector.properties.zookeeper.connect' = 'localhost:2181',
> >
> > 'connector.properties.bootstrap.servers' = 'localhost:9092',
> >
> > 'connector.properties.group.id' = 'flink_test',
> >
> > 'format.type' = 'json'
> >
> > )
> >
> >
> >
> >
> > 报错如下:
> >
> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > Could not find a suitable table factory for
> > 'org.apache.flink.table.factories.TableSourceFactory' in
> >
> > the classpath.
> >
> > Reason: Required context properties mismatch.
> >
> > The following properties are requested:
> >
> > connector.properties.bootstrap.servers=10.208.0.73:9092
> >
> > connector.properties.group.id=flink_test
> >
> > connector.properties.zookeeper.connect=10.208.0.73:2181
> >
> > connector.topic=vpnlog
> >
> > connector.type=kafka
> >
> > connector.version=universal
> >
> > format.type=json
> >
> > schema.0.data-type=VARCHAR(2147483647)
> >
> > schema.0.name=c_real_ip
> >
> > schema.1.data-type=VARCHAR(2147483647)
> >
> > schema.1.name=d_real_ip
> >
> > schema.2.data-type=INT
> >
> > schema.2.name=c_real_port
> >
> > schema.3.data-type=INT
> >
> > schema.3.name=d_real_port
> >
> > schema.4.data-type=INT
> >
> > schema.4.name=logtype
> >
> > schema.5.data-type=VARCHAR(2147483647)
> >
> > schema.5.name=user
> >
> > schema.6.data-type=VARCHAR(2147483647)
> >
> > schema.6.name=host_ip
> >
> > The following factories have been considered:
> >
> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >
> > org.apache.flink.table.filesystem.FileSystemTableFactory
> >
> >
> >
> >
> > flink环境
> >
> > 本地源码编译的flink1.11,直接通过start-cluster.sh启动的本地环境。
> >
> >
> >
> >
> >
> >
> >
> >
> >