Flink JDBC Driver是否支持创建流数据表

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink JDBC Driver是否支持创建流数据表

赵峰
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();

statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"    user_id BIGINT,\n" +
"    item_id BIGINT,\n" +
"    category_id BIGINT,\n" +
"    behavior STRING,\n" +
"    ts TIMESTAMP(3),\n" +
"    proctime as PROCTIME(),\n" +
"    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"    'connector.type' = 'kafka', \n" +
"    'connector.version' = 'universal', \n" +
"    'connector.topic' = 'flink_im02', \n" +
"    'connector.properties.group.id' = 'flink_im02_new',\n" +
"    'connector.startup-mode' = 'earliest-offset', \n" +
"    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"    'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
}

statement.close();
connection.close();

报错:
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'




赵峰
Reply | Threaded
Open this post in threaded view
|

回复: Flink JDBC Driver是否支持创建流数据表

wanglei2@geekplus.com.cn

参考下这个文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
    + "    order_no VARCHAR,\n"
    + "    status INT\n"
    + ") WITH (\n"
    + "    'connector.type' = 'kafka',\n"
    + "    'connector.version' = 'universal',\n"
    + "    'connector.topic' = 'wanglei_test',\n"
    + "    'connector.startup-mode' = 'latest-offset',\n"
    + "    'connector.properties.0.key' = 'zookeeper.connect',\n"
    + "    'connector.properties.0.value' = 'xxx:2181',\n"
    + "    'connector.properties.1.key' = 'bootstrap.servers',\n"
    + "    'connector.properties.1.value' = 'xxx:9092',\n"
    + "    'update-mode' = 'append',\n"
    + "    'format.type' = 'json',\n"
    + "    'format.derive-schema' = 'true'\n"
    + ")");

王磊


[hidden email]
 
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
 
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"    user_id BIGINT,\n" +
"    item_id BIGINT,\n" +
"    category_id BIGINT,\n" +
"    behavior STRING,\n" +
"    ts TIMESTAMP(3),\n" +
"    proctime as PROCTIME(),\n" +
"    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"    'connector.type' = 'kafka', \n" +
"    'connector.version' = 'universal', \n" +
"    'connector.topic' = 'flink_im02', \n" +
"    'connector.properties.group.id' = 'flink_im02_new',\n" +
"    'connector.startup-mode' = 'earliest-offset', \n" +
"    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"    'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
}
 
statement.close();
connection.close();
 
报错:
Reason: Required context properties mismatch.
 
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
 
 
 
 
赵峰
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink JDBC Driver是否支持创建流数据表

赵峰
不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中


<quote author='[hidden email]'>

参考下这个文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector 
下面的语法应该是不支持的:
  'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n"

下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
    + "    order_no VARCHAR,\n"
    + "    status INT\n"
    + ") WITH (\n"
    + "    'connector.type' = 'kafka',\n"
    + "    'connector.version' = 'universal',\n"
    + "    'connector.topic' = 'wanglei_test',\n"
    + "    'connector.startup-mode' = 'latest-offset',\n"
    + "    'connector.properties.0.key' = 'zookeeper.connect',\n"
    + "    'connector.properties.0.value' = 'xxx:2181',\n"
    + "    'connector.properties.1.key' = 'bootstrap.servers',\n"
    + "    'connector.properties.1.value' = 'xxx:9092',\n"
    + "    'update-mode' = 'append',\n"
    + "    'format.type' = 'json',\n"
    + "    'format.derive-schema' = 'true'\n"
    + ")");

王磊


[hidden email]
发件人: 赵峰
发送时间: 2020-03-24 21:28
收件人: user-zh
主题: Flink JDBC Driver是否支持创建流数据表
hi
   
Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
Connection connection =
DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
Statement statement = connection.createStatement();
statement.executeUpdate(
"CREATE TABLE table_kafka (\n" +
"    user_id BIGINT,\n" +
"    item_id BIGINT,\n" +
"    category_id BIGINT,\n" +
"    behavior STRING,\n" +
"    ts TIMESTAMP(3),\n" +
"    proctime as PROCTIME(),\n" +
"    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
"    'connector.type' = 'kafka', \n" +
"    'connector.version' = 'universal', \n" +
"    'connector.topic' = 'flink_im02', \n" +
"    'connector.properties.group.id' = 'flink_im02_new',\n" +
"    'connector.startup-mode' = 'earliest-offset', \n" +
"    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
"    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
"    'format.type' = 'csv',\n" +
"    'format.field-delimiter' = '|'\n" +
")");
ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
while (rs1.next()) {
System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
}
statement.close();
connection.close();
报错:
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
赵峰

</quote>
Quoted from:
http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html




赵峰
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink JDBC Driver是否支持创建流数据表

Zhenghua Gao
请确认一下 kafka connector 的jar包是否在 flink/lib 下。
目前的报错看起来是找不到kafka connector的jar包。

*Best Regards,*
*Zhenghua Gao*


On Wed, Mar 25, 2020 at 4:18 PM 赵峰 <[hidden email]> wrote:

> 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>
>
> <quote author='[hidden email]'>
>
> 参考下这个文档:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> 下面的语法应该是不支持的:
>   'format.type' = 'csv',\n" +
> "    'format.field-delimiter' = '|'\n"
>
> 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
> tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
>     + "    order_no VARCHAR,\n"
>     + "    status INT\n"
>     + ") WITH (\n"
>     + "    'connector.type' = 'kafka',\n"
>     + "    'connector.version' = 'universal',\n"
>     + "    'connector.topic' = 'wanglei_test',\n"
>     + "    'connector.startup-mode' = 'latest-offset',\n"
>     + "    'connector.properties.0.key' = 'zookeeper.connect',\n"
>     + "    'connector.properties.0.value' = 'xxx:2181',\n"
>     + "    'connector.properties.1.key' = 'bootstrap.servers',\n"
>     + "    'connector.properties.1.value' = 'xxx:9092',\n"
>     + "    'update-mode' = 'append',\n"
>     + "    'format.type' = 'json',\n"
>     + "    'format.derive-schema' = 'true'\n"
>     + ")");
>
> 王磊
>
>
> [hidden email]
> 发件人: 赵峰
> 发送时间: 2020-03-24 21:28
> 收件人: user-zh
> 主题: Flink JDBC Driver是否支持创建流数据表
> hi
>
> Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
> Connection connection =
> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
> Statement statement = connection.createStatement();
> statement.executeUpdate(
> "CREATE TABLE table_kafka (\n" +
> "    user_id BIGINT,\n" +
> "    item_id BIGINT,\n" +
> "    category_id BIGINT,\n" +
> "    behavior STRING,\n" +
> "    ts TIMESTAMP(3),\n" +
> "    proctime as PROCTIME(),\n" +
> "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
> ") WITH (\n" +
> "    'connector.type' = 'kafka', \n" +
> "    'connector.version' = 'universal', \n" +
> "    'connector.topic' = 'flink_im02', \n" +
> "    'connector.properties.group.id' = 'flink_im02_new',\n" +
> "    'connector.startup-mode' = 'earliest-offset', \n" +
> "    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
> "    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
> "    'format.type' = 'csv',\n" +
> "    'format.field-delimiter' = '|'\n" +
> ")");
> ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
> while (rs1.next()) {
> System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
> }
> statement.close();
> connection.close();
> 报错:
> Reason: Required context properties mismatch.
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
> 赵峰
>
> </quote>
> Quoted from:
> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>
>
>
>
> 赵峰
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink JDBC Driver是否支持创建流数据表

godfrey he
hi 赵峰,

你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。

Best,
Godfrey

Zhenghua Gao <[hidden email]> 于2020年3月25日周三 下午4:26写道:

> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
> 目前的报错看起来是找不到kafka connector的jar包。
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Wed, Mar 25, 2020 at 4:18 PM 赵峰 <[hidden email]> wrote:
>
> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
> >
> >
> > <quote author='[hidden email]'>
> >
> > 参考下这个文档:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
> > 下面的语法应该是不支持的:
> >   'format.type' = 'csv',\n" +
> > "    'format.field-delimiter' = '|'\n"
> >
> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
> >     + "    order_no VARCHAR,\n"
> >     + "    status INT\n"
> >     + ") WITH (\n"
> >     + "    'connector.type' = 'kafka',\n"
> >     + "    'connector.version' = 'universal',\n"
> >     + "    'connector.topic' = 'wanglei_test',\n"
> >     + "    'connector.startup-mode' = 'latest-offset',\n"
> >     + "    'connector.properties.0.key' = 'zookeeper.connect',\n"
> >     + "    'connector.properties.0.value' = 'xxx:2181',\n"
> >     + "    'connector.properties.1.key' = 'bootstrap.servers',\n"
> >     + "    'connector.properties.1.value' = 'xxx:9092',\n"
> >     + "    'update-mode' = 'append',\n"
> >     + "    'format.type' = 'json',\n"
> >     + "    'format.derive-schema' = 'true'\n"
> >     + ")");
> >
> > 王磊
> >
> >
> > [hidden email]
> > 发件人: 赵峰
> > 发送时间: 2020-03-24 21:28
> > 收件人: user-zh
> > 主题: Flink JDBC Driver是否支持创建流数据表
> > hi
> >
> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
> > Connection connection =
> > DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
> > Statement statement = connection.createStatement();
> > statement.executeUpdate(
> > "CREATE TABLE table_kafka (\n" +
> > "    user_id BIGINT,\n" +
> > "    item_id BIGINT,\n" +
> > "    category_id BIGINT,\n" +
> > "    behavior STRING,\n" +
> > "    ts TIMESTAMP(3),\n" +
> > "    proctime as PROCTIME(),\n" +
> > "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
> > ") WITH (\n" +
> > "    'connector.type' = 'kafka', \n" +
> > "    'connector.version' = 'universal', \n" +
> > "    'connector.topic' = 'flink_im02', \n" +
> > "    'connector.properties.group.id' = 'flink_im02_new',\n" +
> > "    'connector.startup-mode' = 'earliest-offset', \n" +
> > "    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
> > "    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
> > "    'format.type' = 'csv',\n" +
> > "    'format.field-delimiter' = '|'\n" +
> > ")");
> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
> > while (rs1.next()) {
> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
> > }
> > statement.close();
> > connection.close();
> > 报错:
> > Reason: Required context properties mismatch.
> > The matching candidates:
> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'kafka'
> > 赵峰
> >
> > </quote>
> > Quoted from:
> >
> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
> >
> >
> >
> >
> > 赵峰
>
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink JDBC Driver是否支持创建流数据表

godfrey he
还有一种方式是sql gateway 支持 --jar 和 --library 指定用户的jar,这种方式不需要用户将jar放到flink的lib下

godfrey he <[hidden email]> 于2020年3月25日周三 下午6:24写道:

> hi 赵峰,
>
> 你出现的这个问题,是在classpath中找不到Kafka相关TableFactory,按照zhenghua说的方式可以解决。但是现在Flink
> JDBC Driver只支持Batch模式,而Kafka table source目前只支持stream模式。
>
> Best,
> Godfrey
>
> Zhenghua Gao <[hidden email]> 于2020年3月25日周三 下午4:26写道:
>
>> 请确认一下 kafka connector 的jar包是否在 flink/lib 下。
>> 目前的报错看起来是找不到kafka connector的jar包。
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Wed, Mar 25, 2020 at 4:18 PM 赵峰 <[hidden email]> wrote:
>>
>> > 不是语法问题,我建表也没有问题,是查询报错。你有没有试查询数据或者数据写人文件表中
>> >
>> >
>> > <quote author='[hidden email]'>
>> >
>> > 参考下这个文档:
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>> > 下面的语法应该是不支持的:
>> >   'format.type' = 'csv',\n" +
>> > "    'format.field-delimiter' = '|'\n"
>> >
>> > 下面是我可以跑通的代码, kafka 里的数据需要是这种格式:{"order_no":"abcdefg","status":90}
>> > tEnv.sqlUpdate("CREATE TABLE pick_order (\n"
>> >     + "    order_no VARCHAR,\n"
>> >     + "    status INT\n"
>> >     + ") WITH (\n"
>> >     + "    'connector.type' = 'kafka',\n"
>> >     + "    'connector.version' = 'universal',\n"
>> >     + "    'connector.topic' = 'wanglei_test',\n"
>> >     + "    'connector.startup-mode' = 'latest-offset',\n"
>> >     + "    'connector.properties.0.key' = 'zookeeper.connect',\n"
>> >     + "    'connector.properties.0.value' = 'xxx:2181',\n"
>> >     + "    'connector.properties.1.key' = 'bootstrap.servers',\n"
>> >     + "    'connector.properties.1.value' = 'xxx:9092',\n"
>> >     + "    'update-mode' = 'append',\n"
>> >     + "    'format.type' = 'json',\n"
>> >     + "    'format.derive-schema' = 'true'\n"
>> >     + ")");
>> >
>> > 王磊
>> >
>> >
>> > [hidden email]
>> > 发件人: 赵峰
>> > 发送时间: 2020-03-24 21:28
>> > 收件人: user-zh
>> > 主题: Flink JDBC Driver是否支持创建流数据表
>> > hi
>> >
>> > Flink JDBC Driver创建kafka表报错,是我的建表代码不正确?代码如下:
>> > Connection connection =
>> >
>> DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink");
>> > Statement statement = connection.createStatement();
>> > statement.executeUpdate(
>> > "CREATE TABLE table_kafka (\n" +
>> > "    user_id BIGINT,\n" +
>> > "    item_id BIGINT,\n" +
>> > "    category_id BIGINT,\n" +
>> > "    behavior STRING,\n" +
>> > "    ts TIMESTAMP(3),\n" +
>> > "    proctime as PROCTIME(),\n" +
>> > "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND\n" +
>> > ") WITH (\n" +
>> > "    'connector.type' = 'kafka', \n" +
>> > "    'connector.version' = 'universal', \n" +
>> > "    'connector.topic' = 'flink_im02', \n" +
>> > "    'connector.properties.group.id' = 'flink_im02_new',\n" +
>> > "    'connector.startup-mode' = 'earliest-offset', \n" +
>> > "    'connector.properties.zookeeper.connect' = '*.*.*.*:2181', \n" +
>> > "    'connector.properties.bootstrap.servers' = '*.*.*.*:9092', \n" +
>> > "    'format.type' = 'csv',\n" +
>> > "    'format.field-delimiter' = '|'\n" +
>> > ")");
>> > ResultSet rs1 = statement.executeQuery("SELECT * FROM table_kafka");
>> > while (rs1.next()) {
>> > System.out.println(rs1.getInt(1) + ", " + rs1.getInt(2));
>> > }
>> > statement.close();
>> > connection.close();
>> > 报错:
>> > Reason: Required context properties mismatch.
>> > The matching candidates:
>> > org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> > Mismatched properties:
>> > 'connector.type' expects 'filesystem', but is 'kafka'
>> > 赵峰
>> >
>> > </quote>
>> > Quoted from:
>> >
>> http://apache-flink.147419.n8.nabble.com/Flink-JDBC-Driver-tp2103p2104.html
>> >
>> >
>> >
>> >
>> > 赵峰
>>
>