ddl es 报错

classic Classic list List threaded Threaded
6 messages Options
kcz
Reply | Threaded
Open this post in threaded view
|

ddl es 报错

kcz

源码如下:
CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'format.type' = 'json',
    'update-mode' = 'append'
)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT, buy_cnt BIGINT "
+ ") WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6',"
+ " 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'buy_cnt_per_hour',"
+ " 'connector.document-type' = 'user_behavior',"
+ " 'connector.bulk-flush.max-actions' = '1',\n" + " 'format.type' = 'json',"
+ " 'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}

}
具体error
The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'elasticsearch'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.bulk-flush.max-actions=1
connector.document-type=user_behavior
connector.hosts=http://localhost:9200
connector.index=buy_cnt_per_hour
connector.type=elasticsearch
connector.version=6
format.type=json
schema.0.data-type=BIGINT
schema.0.name=hour_of_day
schema.1.data-type=BIGINT
schema.1.name=buy_cnt
update-mode=append
Reply | Threaded
Open this post in threaded view
|

Re: ddl es 报错

Leonard Xu
Hi, 出发
看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem connector只支持csv format,所以会有这个错误。
在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector>


> 在 2020年3月23日,23:30,出发 <[hidden email]> 写道:
>
>
> 源码如下:
> CREATE TABLE buy_cnt_per_hour (
>     hour_of_day BIGINT,
>     buy_cnt BIGINT
> ) WITH (
>     'connector.type' = 'elasticsearch',
>     'connector.version' = '6',
>     'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>     'connector.index' = 'buy_cnt_per_hour',
>     'connector.document-type' = 'user_behavior',
>     'connector.bulk-flush.max-actions' = '1',
>     'format.type' = 'json',
>     'update-mode' = 'append'
> )
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
> public class ESTest {
>
>     public static void main(String[] args) throws Exception {
>
>         //2、设置运行环境
>         StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
>         streamEnv.setParallelism(1);
>         String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  buy_cnt BIGINT "
>                 + ") WITH ( 'connector.type' = 'elasticsearch',  'connector.version' = '6',"
>                 + "    'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',  'connector.index' = 'buy_cnt_per_hour',"
>                 + "    'connector.document-type' = 'user_behavior',"
>                 + "    'connector.bulk-flush.max-actions' = '1',\n" + "    'format.type' = 'json',"
>                 + "    'update-mode' = 'append' )";
>         tableEnv.sqlUpdate(sinkDDL);
>         Table table = tableEnv.sqlQuery("select * from test_es ");
>         tableEnv.toRetractStream(table, Row.class).print();
>         streamEnv.execute("");
>     }
>
> }
> 具体error
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'elasticsearch'
> 'format.type' expects 'csv', but is 'json'
>
> The following properties are requested:
> connector.bulk-flush.max-actions=1
> connector.document-type=user_behavior
> connector.hosts=http://localhost:9200
> connector.index=buy_cnt_per_hour
> connector.type=elasticsearch
> connector.version=6
> format.type=json
> schema.0.data-type=BIGINT
> schema.0.name=hour_of_day
> schema.1.data-type=BIGINT
> schema.1.name=buy_cnt
> update-mode=append

Reply | Threaded
Open this post in threaded view
|

Re: ddl es 报错

zhisheng
hi,Leonar Xu

官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?

效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png

Best Wishes!

zhisheng

Leonard Xu <[hidden email]> 于2020年3月24日周二 下午5:53写道:

> Hi, 出发
> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
> connector只支持csv format,所以会有这个错误。
> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
>     <version>${flink.version}</version>
> </dependency>
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-json</artifactId>
>     <version>${flink.version}</version>
> </dependency>
>
> Best,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> <
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
> >
>
>
> > 在 2020年3月23日,23:30,出发 <[hidden email]> 写道:
> >
> >
> > 源码如下:
> > CREATE TABLE buy_cnt_per_hour (
> >     hour_of_day BIGINT,
> >     buy_cnt BIGINT
> > ) WITH (
> >     'connector.type' = 'elasticsearch',
> >     'connector.version' = '6',
> >     'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
> >     'connector.index' = 'buy_cnt_per_hour',
> >     'connector.document-type' = 'user_behavior',
> >     'connector.bulk-flush.max-actions' = '1',
> >     'format.type' = 'json',
> >     'update-mode' = 'append'
> > )
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> > public class ESTest {
> >
> >     public static void main(String[] args) throws Exception {
> >
> >         //2、设置运行环境
> >         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >         EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
> >         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, settings);
> >         streamEnv.setParallelism(1);
> >         String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
> buy_cnt BIGINT "
> >                 + ") WITH ( 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',"
> >                 + "    'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
> 'connector.index' = 'buy_cnt_per_hour',"
> >                 + "    'connector.document-type' = 'user_behavior',"
> >                 + "    'connector.bulk-flush.max-actions' = '1',\n" + "
>   'format.type' = 'json',"
> >                 + "    'update-mode' = 'append' )";
> >         tableEnv.sqlUpdate(sinkDDL);
> >         Table table = tableEnv.sqlQuery("select * from test_es ");
> >         tableEnv.toRetractStream(table, Row.class).print();
> >         streamEnv.execute("");
> >     }
> >
> > }
> > 具体error
> > The matching candidates:
> > org.apache.flink.table.sources.CsvAppendTableSourceFactory
> > Mismatched properties:
> > 'connector.type' expects 'filesystem', but is 'elasticsearch'
> > 'format.type' expects 'csv', but is 'json'
> >
> > The following properties are requested:
> > connector.bulk-flush.max-actions=1
> > connector.document-type=user_behavior
> > connector.hosts=http://localhost:9200
> > connector.index=buy_cnt_per_hour
> > connector.type=elasticsearch
> > connector.version=6
> > format.type=json
> > schema.0.data-type=BIGINT
> > schema.0.name=hour_of_day
> > schema.1.data-type=BIGINT
> > schema.1.name=buy_cnt
> > update-mode=append
>
>
Reply | Threaded
Open this post in threaded view
|

Re: ddl es 报错

wang jinhai
优秀!可以提个improve issue


Best Regards

[hidden email]

> 2020年3月25日 下午1:40,zhisheng <[hidden email]> 写道:
>
> hi,Leonar Xu
>
> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
>
> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
>
> Best Wishes!
>
> zhisheng
>
> Leonard Xu <[hidden email]> 于2020年3月24日周二 下午5:53写道:
>
>> Hi, 出发
>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>> connector只支持csv format,所以会有这个错误。
>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>>
>> <dependency>
>>    <groupId>org.apache.flink</groupId>
>>    <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
>>    <version>${flink.version}</version>
>> </dependency>
>> <dependency>
>>    <groupId>org.apache.flink</groupId>
>>    <artifactId>flink-json</artifactId>
>>    <version>${flink.version}</version>
>> </dependency>
>>
>> Best,
>> Leonard
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>> <
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>>
>>
>>
>>> 在 2020年3月23日,23:30,出发 <[hidden email]> 写道:
>>>
>>>
>>> 源码如下:
>>> CREATE TABLE buy_cnt_per_hour (
>>>    hour_of_day BIGINT,
>>>    buy_cnt BIGINT
>>> ) WITH (
>>>    'connector.type' = 'elasticsearch',
>>>    'connector.version' = '6',
>>>    'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>>    'connector.index' = 'buy_cnt_per_hour',
>>>    'connector.document-type' = 'user_behavior',
>>>    'connector.bulk-flush.max-actions' = '1',
>>>    'format.type' = 'json',
>>>    'update-mode' = 'append'
>>> )
>>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.types.Row;
>>>
>>> public class ESTest {
>>>
>>>    public static void main(String[] args) throws Exception {
>>>
>>>        //2、设置运行环境
>>>        StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>        EnvironmentSettings settings =
>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>>>        StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, settings);
>>>        streamEnv.setParallelism(1);
>>>        String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>> buy_cnt BIGINT "
>>>                + ") WITH ( 'connector.type' = 'elasticsearch',
>> 'connector.version' = '6',"
>>>                + "    'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>> 'connector.index' = 'buy_cnt_per_hour',"
>>>                + "    'connector.document-type' = 'user_behavior',"
>>>                + "    'connector.bulk-flush.max-actions' = '1',\n" + "
>>  'format.type' = 'json',"
>>>                + "    'update-mode' = 'append' )";
>>>        tableEnv.sqlUpdate(sinkDDL);
>>>        Table table = tableEnv.sqlQuery("select * from test_es ");
>>>        tableEnv.toRetractStream(table, Row.class).print();
>>>        streamEnv.execute("");
>>>    }
>>>
>>> }
>>> 具体error
>>> The matching candidates:
>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>> Mismatched properties:
>>> 'connector.type' expects 'filesystem', but is 'elasticsearch'
>>> 'format.type' expects 'csv', but is 'json'
>>>
>>> The following properties are requested:
>>> connector.bulk-flush.max-actions=1
>>> connector.document-type=user_behavior
>>> connector.hosts=http://localhost:9200
>>> connector.index=buy_cnt_per_hour
>>> connector.type=elasticsearch
>>> connector.version=6
>>> format.type=json
>>> schema.0.data-type=BIGINT
>>> schema.0.name=hour_of_day
>>> schema.1.data-type=BIGINT
>>> schema.1.name=buy_cnt
>>> update-mode=append
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: ddl es 报错

Leonard Xu
👍, zhisheng
    我觉得支持ES鉴权在生产中是蛮有用的功能,nice to have, 如jinhai所说,可以先提个improvement的issue,在社区里讨论下(具体参数名,这些参数应该是可选的),讨论一致后开PR就可以了。

Best,
Leonard
 


> 在 2020年3月25日,13:51,jinhai wang <[hidden email]> 写道:
>
> 优秀!可以提个improve issue
>
>
> Best Regards
>
> [hidden email]
>
>> 2020年3月25日 下午1:40,zhisheng <[hidden email]> 写道:
>>
>> hi,Leonar Xu
>>
>> 官方 ES DDL 现在不支持填写 ES 集群的用户名和密码,我现在在公司已经做了扩展,加了这个功能,请问社区是否需要这个功能?我该怎么贡献呢?
>>
>> 效果如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-03-25-053948.png
>>
>> Best Wishes!
>>
>> zhisheng
>>
>> Leonard Xu <[hidden email]> 于2020年3月24日周二 下午5:53写道:
>>
>>> Hi, 出发
>>> 看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem connector,目前内置的filesystem
>>> connector只支持csv format,所以会有这个错误。
>>> 在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到 flink的lib目录下。
>>>
>>> <dependency>
>>>   <groupId>org.apache.flink</groupId>
>>>   <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
>>>   <version>${flink.version}</version>
>>> </dependency>
>>> <dependency>
>>>   <groupId>org.apache.flink</groupId>
>>>   <artifactId>flink-json</artifactId>
>>>   <version>${flink.version}</version>
>>> </dependency>
>>>
>>> Best,
>>> Leonard
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>> <
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>>>>
>>>
>>>
>>>> 在 2020年3月23日,23:30,出发 <[hidden email]> 写道:
>>>>
>>>>
>>>> 源码如下:
>>>> CREATE TABLE buy_cnt_per_hour (
>>>>   hour_of_day BIGINT,
>>>>   buy_cnt BIGINT
>>>> ) WITH (
>>>>   'connector.type' = 'elasticsearch',
>>>>   'connector.version' = '6',
>>>>   'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>>>   'connector.index' = 'buy_cnt_per_hour',
>>>>   'connector.document-type' = 'user_behavior',
>>>>   'connector.bulk-flush.max-actions' = '1',
>>>>   'format.type' = 'json',
>>>>   'update-mode' = 'append'
>>>> )
>>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>>> import org.apache.flink.types.Row;
>>>>
>>>> public class ESTest {
>>>>
>>>>   public static void main(String[] args) throws Exception {
>>>>
>>>>       //2、设置运行环境
>>>>       StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>       EnvironmentSettings settings =
>>> EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>>>>       StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv, settings);
>>>>       streamEnv.setParallelism(1);
>>>>       String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,
>>> buy_cnt BIGINT "
>>>>               + ") WITH ( 'connector.type' = 'elasticsearch',
>>> 'connector.version' = '6',"
>>>>               + "    'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
>>> 'connector.index' = 'buy_cnt_per_hour',"
>>>>               + "    'connector.document-type' = 'user_behavior',"
>>>>               + "    'connector.bulk-flush.max-actions' = '1',\n" + "
>>> 'format.type' = 'json',"
>>>>               + "    'update-mode' = 'append' )";
>>>>       tableEnv.sqlUpdate(sinkDDL);
>>>>       Table table = tableEnv.sqlQuery("select * from test_es ");
>>>>       tableEnv.toRetractStream(table, Row.class).print();
>>>>       streamEnv.execute("");
>>>>   }
>>>>
>>>> }
>>>> 具体error
>>>> The matching candidates:
>>>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>>>> Mismatched properties:
>>>> 'connector.type' expects 'filesystem', but is 'elasticsearch'
>>>> 'format.type' expects 'csv', but is 'json'
>>>>
>>>> The following properties are requested:
>>>> connector.bulk-flush.max-actions=1
>>>> connector.document-type=user_behavior
>>>> connector.hosts=http://localhost:9200
>>>> connector.index=buy_cnt_per_hour
>>>> connector.type=elasticsearch
>>>> connector.version=6
>>>> format.type=json
>>>> schema.0.data-type=BIGINT
>>>> schema.0.name=hour_of_day
>>>> schema.1.data-type=BIGINT
>>>> schema.1.name=buy_cnt
>>>> update-mode=append
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

回复:ddl es 报错

Evan
In reply to this post by kcz
Hello,


这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作
真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。
而tableEnv.toRetractStream(table, Row.class).print();&nbsp;
这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


2020年7月9日15:31:56&nbsp;




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"出发"<[hidden email]&gt;;
发送时间:&nbsp;2020年3月23日(星期一) 晚上11:30
收件人:&nbsp;"user-zh"<[hidden email]&gt;;

主题:&nbsp;ddl es 报错



源码如下:
CREATE TABLE buy_cnt_per_hour (
 &nbsp; hour_of_day BIGINT,
 &nbsp; buy_cnt BIGINT
) WITH (
 &nbsp; 'connector.type' = 'elasticsearch',
 &nbsp; 'connector.version' = '6',
 &nbsp; 'connector.hosts' = '<a href="http://localhost:9200'">http://localhost:9200',
 &nbsp; 'connector.index' = 'buy_cnt_per_hour',
 &nbsp; 'connector.document-type' = 'user_behavior',
 &nbsp; 'connector.bulk-flush.max-actions' = '1',
 &nbsp; 'format.type' = 'json',
 &nbsp; 'update-mode' = 'append'
)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
streamEnv.setParallelism(1);
String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "
+ ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
+ "'connector.hosts' = '<a href="http://localhost:9200','connector.index'">http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
+ "'connector.document-type' = 'user_behavior',"
+ "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
+ "'update-mode' = 'append' )";
tableEnv.sqlUpdate(sinkDDL);
Table table = tableEnv.sqlQuery("select * from test_es ");
tableEnv.toRetractStream(table, Row.class).print();
streamEnv.execute("");
}

}

具体error
The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append