flink 1.11 es未定义pk的sink问题

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

flink 1.11 es未定义pk的sink问题

sunfulin
hi,
根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
不确定是我配置使用的方式不对,还是确实存在bug。。


CREATE TABLE ES6_SENSORDATA_OUTPUT (
  event varchar,
  user_id varchar,
  distinct_id varchar,
  _date varchar,
  _event_time varchar,
  recv_time varchar,
  _browser_version varchar,
  path_name varchar,
  _search varchar,
  event_type varchar,
  _current_project varchar,
  message varchar,
  stack varchar,
  component_stack varchar,
  _screen_width varchar,
  _screen_height varchar
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = '<ES_YUNTU.SERVERS>',
'index' = 'flink_sensordata_target_event',
'document-type' = 'default',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.interval' = '1000',
'failure-handler' = 'fail',
'format' = 'json'
)




[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 es未定义pk的sink问题

Yangze Guo
Hi,

如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。

[1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
[2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509

Best,
Yangze Guo

On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:

>
> hi,
> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
> 不确定是我配置使用的方式不对,还是确实存在bug。。
>
>
> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>   event varchar,
>   user_id varchar,
>   distinct_id varchar,
>   _date varchar,
>   _event_time varchar,
>   recv_time varchar,
>   _browser_version varchar,
>   path_name varchar,
>   _search varchar,
>   event_type varchar,
>   _current_project varchar,
>   message varchar,
>   stack varchar,
>   component_stack varchar,
>   _screen_width varchar,
>   _screen_height varchar
> ) WITH (
> 'connector' = 'elasticsearch-6',
> 'hosts' = '<ES_YUNTU.SERVERS>',
> 'index' = 'flink_sensordata_target_event',
> 'document-type' = 'default',
> 'document-id.key-delimiter' = '$',
> 'sink.bulk-flush.interval' = '1000',
> 'failure-handler' = 'fail',
> 'format' = 'json'
> )
>
>
>
>
> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 es未定义pk的sink问题

Leonard Xu
Hello, fulin

这个问题能提供段可以复现的代码吗?

祝好,
Leonard Xu


> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道:
>
> Hi,
>
> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>
> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
>
> Best,
> Yangze Guo
>
> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:
>>
>> hi,
>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
>> 不确定是我配置使用的方式不对,还是确实存在bug。。
>>
>>
>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>> event varchar,
>> user_id varchar,
>> distinct_id varchar,
>> _date varchar,
>> _event_time varchar,
>> recv_time varchar,
>> _browser_version varchar,
>> path_name varchar,
>> _search varchar,
>> event_type varchar,
>> _current_project varchar,
>> message varchar,
>> stack varchar,
>> component_stack varchar,
>> _screen_width varchar,
>> _screen_height varchar
>> ) WITH (
>> 'connector' = 'elasticsearch-6',
>> 'hosts' = '<ES_YUNTU.SERVERS>',
>> 'index' = 'flink_sensordata_target_event',
>> 'document-type' = 'default',
>> 'document-id.key-delimiter' = '$',
>> 'sink.bulk-flush.interval' = '1000',
>> 'failure-handler' = 'fail',
>> 'format' = 'json'
>> )
>>
>>
>>
>>
>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling

Reply | Threaded
Open this post in threaded view
|

Re:Re: flink 1.11 es未定义pk的sink问题

sunfulin



hi, Leonard
我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。


 @[hidden email]  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?



CREATE TABLE ES6_SENSORDATA_SERVER_API (
  event varchar,
  user_id varchar,
  distinct_id varchar,
  _date varchar,
  _event_time varchar,
  recv_time varchar,
  code varchar,
  _current_project varchar,
  api varchar,
  elapsed int ,
  `start` bigint,
  is_err int
) WITH (
'connector' = 'elasticsearch-6',
'hosts' = '<ES_YUNTU.SERVERS>',
'index' = 'flink_sensordata_server_api',
'document-type' = 'default',
'document-id.key-delimiter' = '$',
'sink.bulk-flush.interval' = '1000',
'failure-handler' = 'fail',
'format' = 'json'
)







INSERT INTO ES6_SENSORDATA_SERVER_API

SELECT event,

       user_id,

       distinct_id,

       ts2Date(`time`, 'yyyy-MM-dd') as _date,

       ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,

       ts2Date(recv_time, false, false) as recv_time,

       properties.code as code,

       properties.`project` as _current_project,

       properties.api as api,

       properties.elapsed as elapsed,

       properties.`start` as `start`,

       case when properties.code = '0' then 0 else 1 end as is_err

FROM KafkaEventTopic

where `type` in ('track') and event in ('serverApiReqEvt')


在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道:

>Hello, fulin
>
>这个问题能提供段可以复现的代码吗?
>
>祝好,
>Leonard Xu
>
>
>> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道:
>>
>> Hi,
>>
>> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
>> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>>
>> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
>> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
>>
>> Best,
>> Yangze Guo
>>
>> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:
>>>
>>> hi,
>>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
>>> 不确定是我配置使用的方式不对,还是确实存在bug。。
>>>
>>>
>>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>>> event varchar,
>>> user_id varchar,
>>> distinct_id varchar,
>>> _date varchar,
>>> _event_time varchar,
>>> recv_time varchar,
>>> _browser_version varchar,
>>> path_name varchar,
>>> _search varchar,
>>> event_type varchar,
>>> _current_project varchar,
>>> message varchar,
>>> stack varchar,
>>> component_stack varchar,
>>> _screen_width varchar,
>>> _screen_height varchar
>>> ) WITH (
>>> 'connector' = 'elasticsearch-6',
>>> 'hosts' = '<ES_YUNTU.SERVERS>',
>>> 'index' = 'flink_sensordata_target_event',
>>> 'document-type' = 'default',
>>> 'document-id.key-delimiter' = '$',
>>> 'sink.bulk-flush.interval' = '1000',
>>> 'failure-handler' = 'fail',
>>> 'format' = 'json'
>>> )
>>>
>>>
>>>
>>>
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
Reply | Threaded
Open this post in threaded view
|

Re: Re: flink 1.11 es未定义pk的sink问题

Yangze Guo
INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote:

>
>
> hi, Leonard
> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
>
>  @[hidden email]  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
>
> CREATE TABLE ES6_SENSORDATA_SERVER_API (
>   event varchar,
>   user_id varchar,
>   distinct_id varchar,
>   _date varchar,
>   _event_time varchar,
>   recv_time varchar,
>   code varchar,
>   _current_project varchar,
>   api varchar,
>   elapsed int ,
>   `start` bigint,
>   is_err int
> ) WITH (
> 'connector' = 'elasticsearch-6',
> 'hosts' = '<ES_YUNTU.SERVERS>',
> 'index' = 'flink_sensordata_server_api',
> 'document-type' = 'default',
> 'document-id.key-delimiter' = '$',
> 'sink.bulk-flush.interval' = '1000',
> 'failure-handler' = 'fail',
> 'format' = 'json'
> )
>
>
>
> INSERT INTO ES6_SENSORDATA_SERVER_API
>
> SELECT event,
>
>        user_id,
>
>        distinct_id,
>
>        ts2Date(`time`, 'yyyy-MM-dd') as _date,
>
>        ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,
>
>        ts2Date(recv_time, false, false) as recv_time,
>
>        properties.code as code,
>
>        properties.`project` as _current_project,
>
>        properties.api as api,
>
>        properties.elapsed as elapsed,
>
>        properties.`start` as `start`,
>
>        case when properties.code = '0' then 0 else 1 end as is_err
>
> FROM KafkaEventTopic
>
> where `type` in ('track') and event in ('serverApiReqEvt')
>
>
> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道:
> >Hello, fulin
> >
> >这个问题能提供段可以复现的代码吗?
> >
> >祝好,
> >Leonard Xu
> >
> >
> >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道:
> >>
> >> Hi,
> >>
> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
> >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
> >>
> >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
> >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:
> >>>
> >>> hi,
> >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
> >>> 不确定是我配置使用的方式不对,还是确实存在bug。。
> >>>
> >>>
> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
> >>> event varchar,
> >>> user_id varchar,
> >>> distinct_id varchar,
> >>> _date varchar,
> >>> _event_time varchar,
> >>> recv_time varchar,
> >>> _browser_version varchar,
> >>> path_name varchar,
> >>> _search varchar,
> >>> event_type varchar,
> >>> _current_project varchar,
> >>> message varchar,
> >>> stack varchar,
> >>> component_stack varchar,
> >>> _screen_width varchar,
> >>> _screen_height varchar
> >>> ) WITH (
> >>> 'connector' = 'elasticsearch-6',
> >>> 'hosts' = '<ES_YUNTU.SERVERS>',
> >>> 'index' = 'flink_sensordata_target_event',
> >>> 'document-type' = 'default',
> >>> 'document-id.key-delimiter' = '$',
> >>> 'sink.bulk-flush.interval' = '1000',
> >>> 'failure-handler' = 'fail',
> >>> 'format' = 'json'
> >>> )
> >>>
> >>>
> >>>
> >>>
> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re:Re: Re: flink 1.11 es未定义pk的sink问题

sunfulin
hi,YangZe,Leonard,
我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。



import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.StatementSet;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;




import static org.apache.flink.table.api.Expressions.$;




public class ESNewJobTest {




    //构建StreamExecutionEnvironment

    public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();




    //构建EnvironmentSettings 并指定Blink Planner

    private static final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();




    //构建StreamTableEnvironment

    public static final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);




    //DDL语句

    public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE es_sink_test_no_pk (\n" +

            "  idx integer,\n" +

            "  firstx varchar\n" +

            ") WITH (\n" +

            "'connector' = 'elasticsearch-6',\n" +

            "'hosts' = '168.61.113.171:9200',\n" +

            "'index' = 'es_sink_test_no_pk',\n" +

            "'document-type' = 'default',\n" +

            "'document-id.key-delimiter' = '$',\n" +

            "'sink.bulk-flush.interval' = '1000',\n" +

            "'failure-handler' = 'fail',\n" +

            "'format' = 'json'\n" +

            ")";

    public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE es_sink_test_with_pk (\n" +

            "  idx integer,\n" +

            "  firstx varchar,\n" +

            "  primary key (idx, firstx) not enforced\n" +

            ") WITH (\n" +

            "'connector' = 'elasticsearch-6',\n" +

            "'hosts' = '168.61.113.171:9200',\n" +

            "'index' = 'es_sink_test_with_pk',\n" +

            "'document-type' = 'default',\n" +

            "'document-id.key-delimiter' = '$',\n" +

            "'sink.bulk-flush.interval' = '1000',\n" +

            "'failure-handler' = 'fail',\n" +

            "'format' = 'json'\n" +

            ")";




    public static String getCharAndNumr(int length) {

        StringBuffer valSb = new StringBuffer();

        for (int i = 0; i < length; i++) {

            String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : "num"; // 输出字母还是数字

            if ("char".equalsIgnoreCase(charOrNum)) {

                // 字符串

                int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  // 取得大写字母还是小写字母

                valSb.append((char) (choice + Math.round(Math.random()*25)));

            } else if ("num".equalsIgnoreCase(charOrNum)) {

                // 数字

                valSb.append(String.valueOf(Math.round(Math.random()*9)));

            }

        }

        return valSb.toString();




    }




    public static void main(String[] args) throws Exception {




        DataStream<Row> ds = env.addSource(new RichParallelSourceFunction<Row>() {




            volatile boolean flag = true;




            @Override

            public void run(SourceContext<Row> ctx) throws Exception {

                while (flag) {

                    Row row = new Row(2);

                    row.setField(0, 2207);

                    row.setField(1, getCharAndNumr(4));

                    ctx.collect(row);

                    Thread.sleep(1000);

                }




            }




            @Override

            public void cancel() {

                flag = false;

            }

        }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));







        //ES sink测试ddl

        tEnv.executeSql(ES_SINK_DDL_NO_PK);

        tEnv.executeSql(ES_SINK_DDL_WITH_PK);




        //source注册成表

        tEnv.createTemporaryView("test", ds, $("f0").as("idx"), $("f1").as("firstx"), $("p").proctime());




        //sink写入

        StatementSet ss = tEnv.createStatementSet();

        ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from test");

        ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx from test");

        ss.execute();

    }

}





在 2020-07-13 14:03:21,"Yangze Guo" <[hidden email]> 写道:

>INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。
>
>Best,
>Yangze Guo
>
>On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote:
>>
>>
>> hi, Leonard
>> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
>> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
>>
>>  @[hidden email]  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
>>
>> CREATE TABLE ES6_SENSORDATA_SERVER_API (
>>   event varchar,
>>   user_id varchar,
>>   distinct_id varchar,
>>   _date varchar,
>>   _event_time varchar,
>>   recv_time varchar,
>>   code varchar,
>>   _current_project varchar,
>>   api varchar,
>>   elapsed int ,
>>   `start` bigint,
>>   is_err int
>> ) WITH (
>> 'connector' = 'elasticsearch-6',
>> 'hosts' = '<ES_YUNTU.SERVERS>',
>> 'index' = 'flink_sensordata_server_api',
>> 'document-type' = 'default',
>> 'document-id.key-delimiter' = '$',
>> 'sink.bulk-flush.interval' = '1000',
>> 'failure-handler' = 'fail',
>> 'format' = 'json'
>> )
>>
>>
>>
>> INSERT INTO ES6_SENSORDATA_SERVER_API
>>
>> SELECT event,
>>
>>        user_id,
>>
>>        distinct_id,
>>
>>        ts2Date(`time`, 'yyyy-MM-dd') as _date,
>>
>>        ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,
>>
>>        ts2Date(recv_time, false, false) as recv_time,
>>
>>        properties.code as code,
>>
>>        properties.`project` as _current_project,
>>
>>        properties.api as api,
>>
>>        properties.elapsed as elapsed,
>>
>>        properties.`start` as `start`,
>>
>>        case when properties.code = '0' then 0 else 1 end as is_err
>>
>> FROM KafkaEventTopic
>>
>> where `type` in ('track') and event in ('serverApiReqEvt')
>>
>>
>> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道:
>> >Hello, fulin
>> >
>> >这个问题能提供段可以复现的代码吗?
>> >
>> >祝好,
>> >Leonard Xu
>> >
>> >
>> >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道:
>> >>
>> >> Hi,
>> >>
>> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
>> >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>> >>
>> >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
>> >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
>> >>
>> >> Best,
>> >> Yangze Guo
>> >>
>> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:
>> >>>
>> >>> hi,
>> >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
>> >>> 不确定是我配置使用的方式不对,还是确实存在bug。。
>> >>>
>> >>>
>> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>> >>> event varchar,
>> >>> user_id varchar,
>> >>> distinct_id varchar,
>> >>> _date varchar,
>> >>> _event_time varchar,
>> >>> recv_time varchar,
>> >>> _browser_version varchar,
>> >>> path_name varchar,
>> >>> _search varchar,
>> >>> event_type varchar,
>> >>> _current_project varchar,
>> >>> message varchar,
>> >>> stack varchar,
>> >>> component_stack varchar,
>> >>> _screen_width varchar,
>> >>> _screen_height varchar
>> >>> ) WITH (
>> >>> 'connector' = 'elasticsearch-6',
>> >>> 'hosts' = '<ES_YUNTU.SERVERS>',
>> >>> 'index' = 'flink_sensordata_target_event',
>> >>> 'document-type' = 'default',
>> >>> 'document-id.key-delimiter' = '$',
>> >>> 'sink.bulk-flush.interval' = '1000',
>> >>> 'failure-handler' = 'fail',
>> >>> 'format' = 'json'
>> >>> )
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Re: Re: flink 1.11 es未定义pk的sink问题

Yangze Guo
验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。

[1] https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 3:44 PM sunfulin <[hidden email]> wrote:

>
> hi,YangZe,Leonard,
> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。
>
> import org.apache.flink.api.common.typeinfo.Types;
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>
> import org.apache.flink.table.api.EnvironmentSettings;
>
> import org.apache.flink.table.api.StatementSet;
>
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> import org.apache.flink.types.Row;
>
>
> import static org.apache.flink.table.api.Expressions.$;
>
>
> public class ESNewJobTest {
>
>
>     //构建StreamExecutionEnvironment
>
>     public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>     //构建EnvironmentSettings 并指定Blink Planner
>
>     private static final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>
>
>     //构建StreamTableEnvironment
>
>     public static final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
>
>
>     //DDL语句
>
>     public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE es_sink_test_no_pk (\n" +
>
>             "  idx integer,\n" +
>
>             "  firstx varchar\n" +
>
>             ") WITH (\n" +
>
>             "'connector' = 'elasticsearch-6',\n" +
>
>             "'hosts' = '168.61.113.171:9200',\n" +
>
>             "'index' = 'es_sink_test_no_pk',\n" +
>
>             "'document-type' = 'default',\n" +
>
>             "'document-id.key-delimiter' = '$',\n" +
>
>             "'sink.bulk-flush.interval' = '1000',\n" +
>
>             "'failure-handler' = 'fail',\n" +
>
>             "'format' = 'json'\n" +
>
>             ")";
>
>     public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE es_sink_test_with_pk (\n" +
>
>             "  idx integer,\n" +
>
>             "  firstx varchar,\n" +
>
>             "  primary key (idx, firstx) not enforced\n" +
>
>             ") WITH (\n" +
>
>             "'connector' = 'elasticsearch-6',\n" +
>
>             "'hosts' = '168.61.113.171:9200',\n" +
>
>             "'index' = 'es_sink_test_with_pk',\n" +
>
>             "'document-type' = 'default',\n" +
>
>             "'document-id.key-delimiter' = '$',\n" +
>
>             "'sink.bulk-flush.interval' = '1000',\n" +
>
>             "'failure-handler' = 'fail',\n" +
>
>             "'format' = 'json'\n" +
>
>             ")";
>
>
>     public static String getCharAndNumr(int length) {
>
>         StringBuffer valSb = new StringBuffer();
>
>         for (int i = 0; i < length; i++) {
>
>             String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : "num"; // 输出字母还是数字
>
>             if ("char".equalsIgnoreCase(charOrNum)) {
>
>                 // 字符串
>
>                 int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  // 取得大写字母还是小写字母
>
>                 valSb.append((char) (choice + Math.round(Math.random()*25)));
>
>             } else if ("num".equalsIgnoreCase(charOrNum)) {
>
>                 // 数字
>
>                 valSb.append(String.valueOf(Math.round(Math.random()*9)));
>
>             }
>
>         }
>
>         return valSb.toString();
>
>
>     }
>
>
>     public static void main(String[] args) throws Exception {
>
>
>         DataStream<Row> ds = env.addSource(new RichParallelSourceFunction<Row>() {
>
>
>             volatile boolean flag = true;
>
>
>             @Override
>
>             public void run(SourceContext<Row> ctx) throws Exception {
>
>                 while (flag) {
>
>                     Row row = new Row(2);
>
>                     row.setField(0, 2207);
>
>                     row.setField(1, getCharAndNumr(4));
>
>                     ctx.collect(row);
>
>                     Thread.sleep(1000);
>
>                 }
>
>
>             }
>
>
>             @Override
>
>             public void cancel() {
>
>                 flag = false;
>
>             }
>
>         }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));
>
>
>
>         //ES sink测试ddl
>
>         tEnv.executeSql(ES_SINK_DDL_NO_PK);
>
>         tEnv.executeSql(ES_SINK_DDL_WITH_PK);
>
>
>         //source注册成表
>
>         tEnv.createTemporaryView("test", ds, $("f0").as("idx"), $("f1").as("firstx"), $("p").proctime());
>
>
>         //sink写入
>
>         StatementSet ss = tEnv.createStatementSet();
>
>         ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from test");
>
>         ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx from test");
>
>         ss.execute();
>
>     }
>
> }
>
>
>
> 在 2020-07-13 14:03:21,"Yangze Guo" <[hidden email]> 写道:
> >INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。
> >
> >Best,
> >Yangze Guo
> >
> >On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote:
> >>
> >>
> >> hi, Leonard
> >> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
> >> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
> >>
> >>  @[hidden email]  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
> >>
> >> CREATE TABLE ES6_SENSORDATA_SERVER_API (
> >>   event varchar,
> >>   user_id varchar,
> >>   distinct_id varchar,
> >>   _date varchar,
> >>   _event_time varchar,
> >>   recv_time varchar,
> >>   code varchar,
> >>   _current_project varchar,
> >>   api varchar,
> >>   elapsed int ,
> >>   `start` bigint,
> >>   is_err int
> >> ) WITH (
> >> 'connector' = 'elasticsearch-6',
> >> 'hosts' = '<ES_YUNTU.SERVERS>',
> >> 'index' = 'flink_sensordata_server_api',
> >> 'document-type' = 'default',
> >> 'document-id.key-delimiter' = '$',
> >> 'sink.bulk-flush.interval' = '1000',
> >> 'failure-handler' = 'fail',
> >> 'format' = 'json'
> >> )
> >>
> >>
> >>
> >> INSERT INTO ES6_SENSORDATA_SERVER_API
> >>
> >> SELECT event,
> >>
> >>        user_id,
> >>
> >>        distinct_id,
> >>
> >>        ts2Date(`time`, 'yyyy-MM-dd') as _date,
> >>
> >>        ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,
> >>
> >>        ts2Date(recv_time, false, false) as recv_time,
> >>
> >>        properties.code as code,
> >>
> >>        properties.`project` as _current_project,
> >>
> >>        properties.api as api,
> >>
> >>        properties.elapsed as elapsed,
> >>
> >>        properties.`start` as `start`,
> >>
> >>        case when properties.code = '0' then 0 else 1 end as is_err
> >>
> >> FROM KafkaEventTopic
> >>
> >> where `type` in ('track') and event in ('serverApiReqEvt')
> >>
> >>
> >> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道:
> >> >Hello, fulin
> >> >
> >> >这个问题能提供段可以复现的代码吗?
> >> >
> >> >祝好,
> >> >Leonard Xu
> >> >
> >> >
> >> >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道:
> >> >>
> >> >> Hi,
> >> >>
> >> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
> >> >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
> >> >>
> >> >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
> >> >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
> >> >>
> >> >> Best,
> >> >> Yangze Guo
> >> >>
> >> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:
> >> >>>
> >> >>> hi,
> >> >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
> >> >>> 不确定是我配置使用的方式不对,还是确实存在bug。。
> >> >>>
> >> >>>
> >> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
> >> >>> event varchar,
> >> >>> user_id varchar,
> >> >>> distinct_id varchar,
> >> >>> _date varchar,
> >> >>> _event_time varchar,
> >> >>> recv_time varchar,
> >> >>> _browser_version varchar,
> >> >>> path_name varchar,
> >> >>> _search varchar,
> >> >>> event_type varchar,
> >> >>> _current_project varchar,
> >> >>> message varchar,
> >> >>> stack varchar,
> >> >>> component_stack varchar,
> >> >>> _screen_width varchar,
> >> >>> _screen_height varchar
> >> >>> ) WITH (
> >> >>> 'connector' = 'elasticsearch-6',
> >> >>> 'hosts' = '<ES_YUNTU.SERVERS>',
> >> >>> 'index' = 'flink_sensordata_target_event',
> >> >>> 'document-type' = 'default',
> >> >>> 'document-id.key-delimiter' = '$',
> >> >>> 'sink.bulk-flush.interval' = '1000',
> >> >>> 'failure-handler' = 'fail',
> >> >>> 'format' = 'json'
> >> >>> )
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
> >>
> >>
> >>
> >>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: flink 1.11 es未定义pk的sink问题

Leonard Xu
HI,fulin

如 Yangze所说,这是es6 new connector 引入的一个bug,  你可以使用用old connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。

祝好,
Leonard Xu

> 在 2020年7月13日,17:19,Yangze Guo <[hidden email]> 写道:
>
> 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。
>
> [1] https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285
>
> Best,
> Yangze Guo
>
> On Mon, Jul 13, 2020 at 3:44 PM sunfulin <[hidden email]> wrote:
>>
>> hi,YangZe,Leonard,
>> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。
>>
>> import org.apache.flink.api.common.typeinfo.Types;
>>
>> import org.apache.flink.streaming.api.datastream.DataStream;
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>>
>> import org.apache.flink.table.api.StatementSet;
>>
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>
>> import org.apache.flink.types.Row;
>>
>>
>> import static org.apache.flink.table.api.Expressions.$;
>>
>>
>> public class ESNewJobTest {
>>
>>
>>    //构建StreamExecutionEnvironment
>>
>>    public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>    //构建EnvironmentSettings 并指定Blink Planner
>>
>>    private static final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>
>>
>>    //构建StreamTableEnvironment
>>
>>    public static final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
>>
>>
>>    //DDL语句
>>
>>    public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE es_sink_test_no_pk (\n" +
>>
>>            "  idx integer,\n" +
>>
>>            "  firstx varchar\n" +
>>
>>            ") WITH (\n" +
>>
>>            "'connector' = 'elasticsearch-6',\n" +
>>
>>            "'hosts' = '168.61.113.171:9200',\n" +
>>
>>            "'index' = 'es_sink_test_no_pk',\n" +
>>
>>            "'document-type' = 'default',\n" +
>>
>>            "'document-id.key-delimiter' = '$',\n" +
>>
>>            "'sink.bulk-flush.interval' = '1000',\n" +
>>
>>            "'failure-handler' = 'fail',\n" +
>>
>>            "'format' = 'json'\n" +
>>
>>            ")";
>>
>>    public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE es_sink_test_with_pk (\n" +
>>
>>            "  idx integer,\n" +
>>
>>            "  firstx varchar,\n" +
>>
>>            "  primary key (idx, firstx) not enforced\n" +
>>
>>            ") WITH (\n" +
>>
>>            "'connector' = 'elasticsearch-6',\n" +
>>
>>            "'hosts' = '168.61.113.171:9200',\n" +
>>
>>            "'index' = 'es_sink_test_with_pk',\n" +
>>
>>            "'document-type' = 'default',\n" +
>>
>>            "'document-id.key-delimiter' = '$',\n" +
>>
>>            "'sink.bulk-flush.interval' = '1000',\n" +
>>
>>            "'failure-handler' = 'fail',\n" +
>>
>>            "'format' = 'json'\n" +
>>
>>            ")";
>>
>>
>>    public static String getCharAndNumr(int length) {
>>
>>        StringBuffer valSb = new StringBuffer();
>>
>>        for (int i = 0; i < length; i++) {
>>
>>            String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : "num"; // 输出字母还是数字
>>
>>            if ("char".equalsIgnoreCase(charOrNum)) {
>>
>>                // 字符串
>>
>>                int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  // 取得大写字母还是小写字母
>>
>>                valSb.append((char) (choice + Math.round(Math.random()*25)));
>>
>>            } else if ("num".equalsIgnoreCase(charOrNum)) {
>>
>>                // 数字
>>
>>                valSb.append(String.valueOf(Math.round(Math.random()*9)));
>>
>>            }
>>
>>        }
>>
>>        return valSb.toString();
>>
>>
>>    }
>>
>>
>>    public static void main(String[] args) throws Exception {
>>
>>
>>        DataStream<Row> ds = env.addSource(new RichParallelSourceFunction<Row>() {
>>
>>
>>            volatile boolean flag = true;
>>
>>
>>            @Override
>>
>>            public void run(SourceContext<Row> ctx) throws Exception {
>>
>>                while (flag) {
>>
>>                    Row row = new Row(2);
>>
>>                    row.setField(0, 2207);
>>
>>                    row.setField(1, getCharAndNumr(4));
>>
>>                    ctx.collect(row);
>>
>>                    Thread.sleep(1000);
>>
>>                }
>>
>>
>>            }
>>
>>
>>            @Override
>>
>>            public void cancel() {
>>
>>                flag = false;
>>
>>            }
>>
>>        }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));
>>
>>
>>
>>        //ES sink测试ddl
>>
>>        tEnv.executeSql(ES_SINK_DDL_NO_PK);
>>
>>        tEnv.executeSql(ES_SINK_DDL_WITH_PK);
>>
>>
>>        //source注册成表
>>
>>        tEnv.createTemporaryView("test", ds, $("f0").as("idx"), $("f1").as("firstx"), $("p").proctime());
>>
>>
>>        //sink写入
>>
>>        StatementSet ss = tEnv.createStatementSet();
>>
>>        ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from test");
>>
>>        ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx from test");
>>
>>        ss.execute();
>>
>>    }
>>
>> }
>>
>>
>>
>> 在 2020-07-13 14:03:21,"Yangze Guo" <[hidden email]> 写道:
>>> INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote:
>>>>
>>>>
>>>> hi, Leonard
>>>> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
>>>> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
>>>>
>>>> @[hidden email]  我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
>>>>
>>>> CREATE TABLE ES6_SENSORDATA_SERVER_API (
>>>>  event varchar,
>>>>  user_id varchar,
>>>>  distinct_id varchar,
>>>>  _date varchar,
>>>>  _event_time varchar,
>>>>  recv_time varchar,
>>>>  code varchar,
>>>>  _current_project varchar,
>>>>  api varchar,
>>>>  elapsed int ,
>>>>  `start` bigint,
>>>>  is_err int
>>>> ) WITH (
>>>> 'connector' = 'elasticsearch-6',
>>>> 'hosts' = '<ES_YUNTU.SERVERS>',
>>>> 'index' = 'flink_sensordata_server_api',
>>>> 'document-type' = 'default',
>>>> 'document-id.key-delimiter' = '$',
>>>> 'sink.bulk-flush.interval' = '1000',
>>>> 'failure-handler' = 'fail',
>>>> 'format' = 'json'
>>>> )
>>>>
>>>>
>>>>
>>>> INSERT INTO ES6_SENSORDATA_SERVER_API
>>>>
>>>> SELECT event,
>>>>
>>>>       user_id,
>>>>
>>>>       distinct_id,
>>>>
>>>>       ts2Date(`time`, 'yyyy-MM-dd') as _date,
>>>>
>>>>       ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time,
>>>>
>>>>       ts2Date(recv_time, false, false) as recv_time,
>>>>
>>>>       properties.code as code,
>>>>
>>>>       properties.`project` as _current_project,
>>>>
>>>>       properties.api as api,
>>>>
>>>>       properties.elapsed as elapsed,
>>>>
>>>>       properties.`start` as `start`,
>>>>
>>>>       case when properties.code = '0' then 0 else 1 end as is_err
>>>>
>>>> FROM KafkaEventTopic
>>>>
>>>> where `type` in ('track') and event in ('serverApiReqEvt')
>>>>
>>>>
>>>> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道:
>>>>> Hello, fulin
>>>>>
>>>>> 这个问题能提供段可以复现的代码吗?
>>>>>
>>>>> 祝好,
>>>>> Leonard Xu
>>>>>
>>>>>
>>>>>> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
>>>>>> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>>>>>>
>>>>>> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102
>>>>>> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509
>>>>>>
>>>>>> Best,
>>>>>> Yangze Guo
>>>>>>
>>>>>> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote:
>>>>>>>
>>>>>>> hi,
>>>>>>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义:
>>>>>>> 不确定是我配置使用的方式不对,还是确实存在bug。。
>>>>>>>
>>>>>>>
>>>>>>> CREATE TABLE ES6_SENSORDATA_OUTPUT (
>>>>>>> event varchar,
>>>>>>> user_id varchar,
>>>>>>> distinct_id varchar,
>>>>>>> _date varchar,
>>>>>>> _event_time varchar,
>>>>>>> recv_time varchar,
>>>>>>> _browser_version varchar,
>>>>>>> path_name varchar,
>>>>>>> _search varchar,
>>>>>>> event_type varchar,
>>>>>>> _current_project varchar,
>>>>>>> message varchar,
>>>>>>> stack varchar,
>>>>>>> component_stack varchar,
>>>>>>> _screen_width varchar,
>>>>>>> _screen_height varchar
>>>>>>> ) WITH (
>>>>>>> 'connector' = 'elasticsearch-6',
>>>>>>> 'hosts' = '<ES_YUNTU.SERVERS>',
>>>>>>> 'index' = 'flink_sensordata_target_event',
>>>>>>> 'document-type' = 'default',
>>>>>>> 'document-id.key-delimiter' = '$',
>>>>>>> 'sink.bulk-flush.interval' = '1000',
>>>>>>> 'failure-handler' = 'fail',
>>>>>>> 'format' = 'json'
>>>>>>> )
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling
>>>>
>>>>
>>>>
>>>>
>>
>>
>>
>>