hi,
根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: 不确定是我配置使用的方式不对,还是确实存在bug。。 CREATE TABLE ES6_SENSORDATA_OUTPUT ( event varchar, user_id varchar, distinct_id varchar, _date varchar, _event_time varchar, recv_time varchar, _browser_version varchar, path_name varchar, _search varchar, event_type varchar, _current_project varchar, message varchar, stack varchar, component_stack varchar, _screen_width varchar, _screen_height varchar ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<ES_YUNTU.SERVERS>', 'index' = 'flink_sensordata_target_event', 'document-type' = 'default', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.interval' = '1000', 'failure-handler' = 'fail', 'format' = 'json' ) [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling |
Hi,
如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 Best, Yangze Guo On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: > > hi, > 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: > 不确定是我配置使用的方式不对,还是确实存在bug。。 > > > CREATE TABLE ES6_SENSORDATA_OUTPUT ( > event varchar, > user_id varchar, > distinct_id varchar, > _date varchar, > _event_time varchar, > recv_time varchar, > _browser_version varchar, > path_name varchar, > _search varchar, > event_type varchar, > _current_project varchar, > message varchar, > stack varchar, > component_stack varchar, > _screen_width varchar, > _screen_height varchar > ) WITH ( > 'connector' = 'elasticsearch-6', > 'hosts' = '<ES_YUNTU.SERVERS>', > 'index' = 'flink_sensordata_target_event', > 'document-type' = 'default', > 'document-id.key-delimiter' = '$', > 'sink.bulk-flush.interval' = '1000', > 'failure-handler' = 'fail', > 'format' = 'json' > ) > > > > > [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling |
Hello, fulin
这个问题能提供段可以复现的代码吗? 祝好, Leonard Xu > 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道: > > Hi, > > 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. > 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 > > [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 > [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 > > Best, > Yangze Guo > > On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: >> >> hi, >> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: >> 不确定是我配置使用的方式不对,还是确实存在bug。。 >> >> >> CREATE TABLE ES6_SENSORDATA_OUTPUT ( >> event varchar, >> user_id varchar, >> distinct_id varchar, >> _date varchar, >> _event_time varchar, >> recv_time varchar, >> _browser_version varchar, >> path_name varchar, >> _search varchar, >> event_type varchar, >> _current_project varchar, >> message varchar, >> stack varchar, >> component_stack varchar, >> _screen_width varchar, >> _screen_height varchar >> ) WITH ( >> 'connector' = 'elasticsearch-6', >> 'hosts' = '<ES_YUNTU.SERVERS>', >> 'index' = 'flink_sensordata_target_event', >> 'document-type' = 'default', >> 'document-id.key-delimiter' = '$', >> 'sink.bulk-flush.interval' = '1000', >> 'failure-handler' = 'fail', >> 'format' = 'json' >> ) >> >> >> >> >> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling |
hi, Leonard 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 @[hidden email] 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? CREATE TABLE ES6_SENSORDATA_SERVER_API ( event varchar, user_id varchar, distinct_id varchar, _date varchar, _event_time varchar, recv_time varchar, code varchar, _current_project varchar, api varchar, elapsed int , `start` bigint, is_err int ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<ES_YUNTU.SERVERS>', 'index' = 'flink_sensordata_server_api', 'document-type' = 'default', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.interval' = '1000', 'failure-handler' = 'fail', 'format' = 'json' ) INSERT INTO ES6_SENSORDATA_SERVER_API SELECT event, user_id, distinct_id, ts2Date(`time`, 'yyyy-MM-dd') as _date, ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time, ts2Date(recv_time, false, false) as recv_time, properties.code as code, properties.`project` as _current_project, properties.api as api, properties.elapsed as elapsed, properties.`start` as `start`, case when properties.code = '0' then 0 else 1 end as is_err FROM KafkaEventTopic where `type` in ('track') and event in ('serverApiReqEvt') 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道: >Hello, fulin > >这个问题能提供段可以复现的代码吗? > >祝好, >Leonard Xu > > >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道: >> >> Hi, >> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 >> >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 >> >> Best, >> Yangze Guo >> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: >>> >>> hi, >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: >>> 不确定是我配置使用的方式不对,还是确实存在bug。。 >>> >>> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT ( >>> event varchar, >>> user_id varchar, >>> distinct_id varchar, >>> _date varchar, >>> _event_time varchar, >>> recv_time varchar, >>> _browser_version varchar, >>> path_name varchar, >>> _search varchar, >>> event_type varchar, >>> _current_project varchar, >>> message varchar, >>> stack varchar, >>> component_stack varchar, >>> _screen_width varchar, >>> _screen_height varchar >>> ) WITH ( >>> 'connector' = 'elasticsearch-6', >>> 'hosts' = '<ES_YUNTU.SERVERS>', >>> 'index' = 'flink_sensordata_target_event', >>> 'document-type' = 'default', >>> 'document-id.key-delimiter' = '$', >>> 'sink.bulk-flush.interval' = '1000', >>> 'failure-handler' = 'fail', >>> 'format' = 'json' >>> ) >>> >>> >>> >>> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling |
INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。
Best, Yangze Guo On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote: > > > hi, Leonard > 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 > 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 > > @[hidden email] 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? > > CREATE TABLE ES6_SENSORDATA_SERVER_API ( > event varchar, > user_id varchar, > distinct_id varchar, > _date varchar, > _event_time varchar, > recv_time varchar, > code varchar, > _current_project varchar, > api varchar, > elapsed int , > `start` bigint, > is_err int > ) WITH ( > 'connector' = 'elasticsearch-6', > 'hosts' = '<ES_YUNTU.SERVERS>', > 'index' = 'flink_sensordata_server_api', > 'document-type' = 'default', > 'document-id.key-delimiter' = '$', > 'sink.bulk-flush.interval' = '1000', > 'failure-handler' = 'fail', > 'format' = 'json' > ) > > > > INSERT INTO ES6_SENSORDATA_SERVER_API > > SELECT event, > > user_id, > > distinct_id, > > ts2Date(`time`, 'yyyy-MM-dd') as _date, > > ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time, > > ts2Date(recv_time, false, false) as recv_time, > > properties.code as code, > > properties.`project` as _current_project, > > properties.api as api, > > properties.elapsed as elapsed, > > properties.`start` as `start`, > > case when properties.code = '0' then 0 else 1 end as is_err > > FROM KafkaEventTopic > > where `type` in ('track') and event in ('serverApiReqEvt') > > > 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道: > >Hello, fulin > > > >这个问题能提供段可以复现的代码吗? > > > >祝好, > >Leonard Xu > > > > > >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道: > >> > >> Hi, > >> > >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. > >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 > >> > >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 > >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 > >> > >> Best, > >> Yangze Guo > >> > >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: > >>> > >>> hi, > >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: > >>> 不确定是我配置使用的方式不对,还是确实存在bug。。 > >>> > >>> > >>> CREATE TABLE ES6_SENSORDATA_OUTPUT ( > >>> event varchar, > >>> user_id varchar, > >>> distinct_id varchar, > >>> _date varchar, > >>> _event_time varchar, > >>> recv_time varchar, > >>> _browser_version varchar, > >>> path_name varchar, > >>> _search varchar, > >>> event_type varchar, > >>> _current_project varchar, > >>> message varchar, > >>> stack varchar, > >>> component_stack varchar, > >>> _screen_width varchar, > >>> _screen_height varchar > >>> ) WITH ( > >>> 'connector' = 'elasticsearch-6', > >>> 'hosts' = '<ES_YUNTU.SERVERS>', > >>> 'index' = 'flink_sensordata_target_event', > >>> 'document-type' = 'default', > >>> 'document-id.key-delimiter' = '$', > >>> 'sink.bulk-flush.interval' = '1000', > >>> 'failure-handler' = 'fail', > >>> 'format' = 'json' > >>> ) > >>> > >>> > >>> > >>> > >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling > > > > |
hi,YangZe,Leonard,
我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。 import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class ESNewJobTest { //构建StreamExecutionEnvironment public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //构建EnvironmentSettings 并指定Blink Planner private static final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment public static final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //DDL语句 public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE es_sink_test_no_pk (\n" + " idx integer,\n" + " firstx varchar\n" + ") WITH (\n" + "'connector' = 'elasticsearch-6',\n" + "'hosts' = '168.61.113.171:9200',\n" + "'index' = 'es_sink_test_no_pk',\n" + "'document-type' = 'default',\n" + "'document-id.key-delimiter' = '$',\n" + "'sink.bulk-flush.interval' = '1000',\n" + "'failure-handler' = 'fail',\n" + "'format' = 'json'\n" + ")"; public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE es_sink_test_with_pk (\n" + " idx integer,\n" + " firstx varchar,\n" + " primary key (idx, firstx) not enforced\n" + ") WITH (\n" + "'connector' = 'elasticsearch-6',\n" + "'hosts' = '168.61.113.171:9200',\n" + "'index' = 'es_sink_test_with_pk',\n" + "'document-type' = 'default',\n" + "'document-id.key-delimiter' = '$',\n" + "'sink.bulk-flush.interval' = '1000',\n" + "'failure-handler' = 'fail',\n" + "'format' = 'json'\n" + ")"; public static String getCharAndNumr(int length) { StringBuffer valSb = new StringBuffer(); for (int i = 0; i < length; i++) { String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : "num"; // 输出字母还是数字 if ("char".equalsIgnoreCase(charOrNum)) { // 字符串 int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97; // 取得大写字母还是小写字母 valSb.append((char) (choice + Math.round(Math.random()*25))); } else if ("num".equalsIgnoreCase(charOrNum)) { // 数字 valSb.append(String.valueOf(Math.round(Math.random()*9))); } } return valSb.toString(); } public static void main(String[] args) throws Exception { DataStream<Row> ds = env.addSource(new RichParallelSourceFunction<Row>() { volatile boolean flag = true; @Override public void run(SourceContext<Row> ctx) throws Exception { while (flag) { Row row = new Row(2); row.setField(0, 2207); row.setField(1, getCharAndNumr(4)); ctx.collect(row); Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING)); //ES sink测试ddl tEnv.executeSql(ES_SINK_DDL_NO_PK); tEnv.executeSql(ES_SINK_DDL_WITH_PK); //source注册成表 tEnv.createTemporaryView("test", ds, $("f0").as("idx"), $("f1").as("firstx"), $("p").proctime()); //sink写入 StatementSet ss = tEnv.createStatementSet(); ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from test"); ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx from test"); ss.execute(); } } 在 2020-07-13 14:03:21,"Yangze Guo" <[hidden email]> 写道: >INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。 > >Best, >Yangze Guo > >On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote: >> >> >> hi, Leonard >> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 >> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 >> >> @[hidden email] 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? >> >> CREATE TABLE ES6_SENSORDATA_SERVER_API ( >> event varchar, >> user_id varchar, >> distinct_id varchar, >> _date varchar, >> _event_time varchar, >> recv_time varchar, >> code varchar, >> _current_project varchar, >> api varchar, >> elapsed int , >> `start` bigint, >> is_err int >> ) WITH ( >> 'connector' = 'elasticsearch-6', >> 'hosts' = '<ES_YUNTU.SERVERS>', >> 'index' = 'flink_sensordata_server_api', >> 'document-type' = 'default', >> 'document-id.key-delimiter' = '$', >> 'sink.bulk-flush.interval' = '1000', >> 'failure-handler' = 'fail', >> 'format' = 'json' >> ) >> >> >> >> INSERT INTO ES6_SENSORDATA_SERVER_API >> >> SELECT event, >> >> user_id, >> >> distinct_id, >> >> ts2Date(`time`, 'yyyy-MM-dd') as _date, >> >> ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time, >> >> ts2Date(recv_time, false, false) as recv_time, >> >> properties.code as code, >> >> properties.`project` as _current_project, >> >> properties.api as api, >> >> properties.elapsed as elapsed, >> >> properties.`start` as `start`, >> >> case when properties.code = '0' then 0 else 1 end as is_err >> >> FROM KafkaEventTopic >> >> where `type` in ('track') and event in ('serverApiReqEvt') >> >> >> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道: >> >Hello, fulin >> > >> >这个问题能提供段可以复现的代码吗? >> > >> >祝好, >> >Leonard Xu >> > >> > >> >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道: >> >> >> >> Hi, >> >> >> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. >> >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 >> >> >> >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 >> >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 >> >> >> >> Best, >> >> Yangze Guo >> >> >> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: >> >>> >> >>> hi, >> >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: >> >>> 不确定是我配置使用的方式不对,还是确实存在bug。。 >> >>> >> >>> >> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT ( >> >>> event varchar, >> >>> user_id varchar, >> >>> distinct_id varchar, >> >>> _date varchar, >> >>> _event_time varchar, >> >>> recv_time varchar, >> >>> _browser_version varchar, >> >>> path_name varchar, >> >>> _search varchar, >> >>> event_type varchar, >> >>> _current_project varchar, >> >>> message varchar, >> >>> stack varchar, >> >>> component_stack varchar, >> >>> _screen_width varchar, >> >>> _screen_height varchar >> >>> ) WITH ( >> >>> 'connector' = 'elasticsearch-6', >> >>> 'hosts' = '<ES_YUNTU.SERVERS>', >> >>> 'index' = 'flink_sensordata_target_event', >> >>> 'document-type' = 'default', >> >>> 'document-id.key-delimiter' = '$', >> >>> 'sink.bulk-flush.interval' = '1000', >> >>> 'failure-handler' = 'fail', >> >>> 'format' = 'json' >> >>> ) >> >>> >> >>> >> >>> >> >>> >> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling >> >> >> >> |
验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。
[1] https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285 Best, Yangze Guo On Mon, Jul 13, 2020 at 3:44 PM sunfulin <[hidden email]> wrote: > > hi,YangZe,Leonard, > 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。 > > import org.apache.flink.api.common.typeinfo.Types; > > import org.apache.flink.streaming.api.datastream.DataStream; > > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; > > import org.apache.flink.table.api.EnvironmentSettings; > > import org.apache.flink.table.api.StatementSet; > > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > import org.apache.flink.types.Row; > > > import static org.apache.flink.table.api.Expressions.$; > > > public class ESNewJobTest { > > > //构建StreamExecutionEnvironment > > public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > > > //构建EnvironmentSettings 并指定Blink Planner > > private static final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > > //构建StreamTableEnvironment > > public static final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); > > > //DDL语句 > > public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE es_sink_test_no_pk (\n" + > > " idx integer,\n" + > > " firstx varchar\n" + > > ") WITH (\n" + > > "'connector' = 'elasticsearch-6',\n" + > > "'hosts' = '168.61.113.171:9200',\n" + > > "'index' = 'es_sink_test_no_pk',\n" + > > "'document-type' = 'default',\n" + > > "'document-id.key-delimiter' = '$',\n" + > > "'sink.bulk-flush.interval' = '1000',\n" + > > "'failure-handler' = 'fail',\n" + > > "'format' = 'json'\n" + > > ")"; > > public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE es_sink_test_with_pk (\n" + > > " idx integer,\n" + > > " firstx varchar,\n" + > > " primary key (idx, firstx) not enforced\n" + > > ") WITH (\n" + > > "'connector' = 'elasticsearch-6',\n" + > > "'hosts' = '168.61.113.171:9200',\n" + > > "'index' = 'es_sink_test_with_pk',\n" + > > "'document-type' = 'default',\n" + > > "'document-id.key-delimiter' = '$',\n" + > > "'sink.bulk-flush.interval' = '1000',\n" + > > "'failure-handler' = 'fail',\n" + > > "'format' = 'json'\n" + > > ")"; > > > public static String getCharAndNumr(int length) { > > StringBuffer valSb = new StringBuffer(); > > for (int i = 0; i < length; i++) { > > String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : "num"; // 输出字母还是数字 > > if ("char".equalsIgnoreCase(charOrNum)) { > > // 字符串 > > int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97; // 取得大写字母还是小写字母 > > valSb.append((char) (choice + Math.round(Math.random()*25))); > > } else if ("num".equalsIgnoreCase(charOrNum)) { > > // 数字 > > valSb.append(String.valueOf(Math.round(Math.random()*9))); > > } > > } > > return valSb.toString(); > > > } > > > public static void main(String[] args) throws Exception { > > > DataStream<Row> ds = env.addSource(new RichParallelSourceFunction<Row>() { > > > volatile boolean flag = true; > > > @Override > > public void run(SourceContext<Row> ctx) throws Exception { > > while (flag) { > > Row row = new Row(2); > > row.setField(0, 2207); > > row.setField(1, getCharAndNumr(4)); > > ctx.collect(row); > > Thread.sleep(1000); > > } > > > } > > > @Override > > public void cancel() { > > flag = false; > > } > > }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING)); > > > > //ES sink测试ddl > > tEnv.executeSql(ES_SINK_DDL_NO_PK); > > tEnv.executeSql(ES_SINK_DDL_WITH_PK); > > > //source注册成表 > > tEnv.createTemporaryView("test", ds, $("f0").as("idx"), $("f1").as("firstx"), $("p").proctime()); > > > //sink写入 > > StatementSet ss = tEnv.createStatementSet(); > > ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from test"); > > ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx from test"); > > ss.execute(); > > } > > } > > > > 在 2020-07-13 14:03:21,"Yangze Guo" <[hidden email]> 写道: > >INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。 > > > >Best, > >Yangze Guo > > > >On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote: > >> > >> > >> hi, Leonard > >> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 > >> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 > >> > >> @[hidden email] 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? > >> > >> CREATE TABLE ES6_SENSORDATA_SERVER_API ( > >> event varchar, > >> user_id varchar, > >> distinct_id varchar, > >> _date varchar, > >> _event_time varchar, > >> recv_time varchar, > >> code varchar, > >> _current_project varchar, > >> api varchar, > >> elapsed int , > >> `start` bigint, > >> is_err int > >> ) WITH ( > >> 'connector' = 'elasticsearch-6', > >> 'hosts' = '<ES_YUNTU.SERVERS>', > >> 'index' = 'flink_sensordata_server_api', > >> 'document-type' = 'default', > >> 'document-id.key-delimiter' = '$', > >> 'sink.bulk-flush.interval' = '1000', > >> 'failure-handler' = 'fail', > >> 'format' = 'json' > >> ) > >> > >> > >> > >> INSERT INTO ES6_SENSORDATA_SERVER_API > >> > >> SELECT event, > >> > >> user_id, > >> > >> distinct_id, > >> > >> ts2Date(`time`, 'yyyy-MM-dd') as _date, > >> > >> ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time, > >> > >> ts2Date(recv_time, false, false) as recv_time, > >> > >> properties.code as code, > >> > >> properties.`project` as _current_project, > >> > >> properties.api as api, > >> > >> properties.elapsed as elapsed, > >> > >> properties.`start` as `start`, > >> > >> case when properties.code = '0' then 0 else 1 end as is_err > >> > >> FROM KafkaEventTopic > >> > >> where `type` in ('track') and event in ('serverApiReqEvt') > >> > >> > >> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道: > >> >Hello, fulin > >> > > >> >这个问题能提供段可以复现的代码吗? > >> > > >> >祝好, > >> >Leonard Xu > >> > > >> > > >> >> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道: > >> >> > >> >> Hi, > >> >> > >> >> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. > >> >> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 > >> >> > >> >> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 > >> >> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 > >> >> > >> >> Best, > >> >> Yangze Guo > >> >> > >> >> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: > >> >>> > >> >>> hi, > >> >>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: > >> >>> 不确定是我配置使用的方式不对,还是确实存在bug。。 > >> >>> > >> >>> > >> >>> CREATE TABLE ES6_SENSORDATA_OUTPUT ( > >> >>> event varchar, > >> >>> user_id varchar, > >> >>> distinct_id varchar, > >> >>> _date varchar, > >> >>> _event_time varchar, > >> >>> recv_time varchar, > >> >>> _browser_version varchar, > >> >>> path_name varchar, > >> >>> _search varchar, > >> >>> event_type varchar, > >> >>> _current_project varchar, > >> >>> message varchar, > >> >>> stack varchar, > >> >>> component_stack varchar, > >> >>> _screen_width varchar, > >> >>> _screen_height varchar > >> >>> ) WITH ( > >> >>> 'connector' = 'elasticsearch-6', > >> >>> 'hosts' = '<ES_YUNTU.SERVERS>', > >> >>> 'index' = 'flink_sensordata_target_event', > >> >>> 'document-type' = 'default', > >> >>> 'document-id.key-delimiter' = '$', > >> >>> 'sink.bulk-flush.interval' = '1000', > >> >>> 'failure-handler' = 'fail', > >> >>> 'format' = 'json' > >> >>> ) > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling > >> > >> > >> > >> > > > > |
HI,fulin
如 Yangze所说,这是es6 new connector 引入的一个bug, 你可以使用用old connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。 祝好, Leonard Xu > 在 2020年7月13日,17:19,Yangze Guo <[hidden email]> 写道: > > 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。 > > [1] https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285 > > Best, > Yangze Guo > > On Mon, Jul 13, 2020 at 3:44 PM sunfulin <[hidden email]> wrote: >> >> hi,YangZe,Leonard, >> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。 >> >> import org.apache.flink.api.common.typeinfo.Types; >> >> import org.apache.flink.streaming.api.datastream.DataStream; >> >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >> >> import org.apache.flink.table.api.EnvironmentSettings; >> >> import org.apache.flink.table.api.StatementSet; >> >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> >> import org.apache.flink.types.Row; >> >> >> import static org.apache.flink.table.api.Expressions.$; >> >> >> public class ESNewJobTest { >> >> >> //构建StreamExecutionEnvironment >> >> public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> //构建EnvironmentSettings 并指定Blink Planner >> >> private static final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> >> //构建StreamTableEnvironment >> >> public static final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); >> >> >> //DDL语句 >> >> public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE es_sink_test_no_pk (\n" + >> >> " idx integer,\n" + >> >> " firstx varchar\n" + >> >> ") WITH (\n" + >> >> "'connector' = 'elasticsearch-6',\n" + >> >> "'hosts' = '168.61.113.171:9200',\n" + >> >> "'index' = 'es_sink_test_no_pk',\n" + >> >> "'document-type' = 'default',\n" + >> >> "'document-id.key-delimiter' = '$',\n" + >> >> "'sink.bulk-flush.interval' = '1000',\n" + >> >> "'failure-handler' = 'fail',\n" + >> >> "'format' = 'json'\n" + >> >> ")"; >> >> public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE es_sink_test_with_pk (\n" + >> >> " idx integer,\n" + >> >> " firstx varchar,\n" + >> >> " primary key (idx, firstx) not enforced\n" + >> >> ") WITH (\n" + >> >> "'connector' = 'elasticsearch-6',\n" + >> >> "'hosts' = '168.61.113.171:9200',\n" + >> >> "'index' = 'es_sink_test_with_pk',\n" + >> >> "'document-type' = 'default',\n" + >> >> "'document-id.key-delimiter' = '$',\n" + >> >> "'sink.bulk-flush.interval' = '1000',\n" + >> >> "'failure-handler' = 'fail',\n" + >> >> "'format' = 'json'\n" + >> >> ")"; >> >> >> public static String getCharAndNumr(int length) { >> >> StringBuffer valSb = new StringBuffer(); >> >> for (int i = 0; i < length; i++) { >> >> String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : "num"; // 输出字母还是数字 >> >> if ("char".equalsIgnoreCase(charOrNum)) { >> >> // 字符串 >> >> int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97; // 取得大写字母还是小写字母 >> >> valSb.append((char) (choice + Math.round(Math.random()*25))); >> >> } else if ("num".equalsIgnoreCase(charOrNum)) { >> >> // 数字 >> >> valSb.append(String.valueOf(Math.round(Math.random()*9))); >> >> } >> >> } >> >> return valSb.toString(); >> >> >> } >> >> >> public static void main(String[] args) throws Exception { >> >> >> DataStream<Row> ds = env.addSource(new RichParallelSourceFunction<Row>() { >> >> >> volatile boolean flag = true; >> >> >> @Override >> >> public void run(SourceContext<Row> ctx) throws Exception { >> >> while (flag) { >> >> Row row = new Row(2); >> >> row.setField(0, 2207); >> >> row.setField(1, getCharAndNumr(4)); >> >> ctx.collect(row); >> >> Thread.sleep(1000); >> >> } >> >> >> } >> >> >> @Override >> >> public void cancel() { >> >> flag = false; >> >> } >> >> }).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING)); >> >> >> >> //ES sink测试ddl >> >> tEnv.executeSql(ES_SINK_DDL_NO_PK); >> >> tEnv.executeSql(ES_SINK_DDL_WITH_PK); >> >> >> //source注册成表 >> >> tEnv.createTemporaryView("test", ds, $("f0").as("idx"), $("f1").as("firstx"), $("p").proctime()); >> >> >> //sink写入 >> >> StatementSet ss = tEnv.createStatementSet(); >> >> ss.addInsertSql("insert into es_sink_test_no_pk select idx, firstx from test"); >> >> ss.addInsertSql("insert into es_sink_test_with_pk select idx, firstx from test"); >> >> ss.execute(); >> >> } >> >> } >> >> >> >> 在 2020-07-13 14:03:21,"Yangze Guo" <[hidden email]> 写道: >>> INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。 >>> >>> Best, >>> Yangze Guo >>> >>> On Mon, Jul 13, 2020 at 2:00 PM sunfulin <[hidden email]> wrote: >>>> >>>> >>>> hi, Leonard >>>> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 >>>> 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 >>>> >>>> @[hidden email] 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? >>>> >>>> CREATE TABLE ES6_SENSORDATA_SERVER_API ( >>>> event varchar, >>>> user_id varchar, >>>> distinct_id varchar, >>>> _date varchar, >>>> _event_time varchar, >>>> recv_time varchar, >>>> code varchar, >>>> _current_project varchar, >>>> api varchar, >>>> elapsed int , >>>> `start` bigint, >>>> is_err int >>>> ) WITH ( >>>> 'connector' = 'elasticsearch-6', >>>> 'hosts' = '<ES_YUNTU.SERVERS>', >>>> 'index' = 'flink_sensordata_server_api', >>>> 'document-type' = 'default', >>>> 'document-id.key-delimiter' = '$', >>>> 'sink.bulk-flush.interval' = '1000', >>>> 'failure-handler' = 'fail', >>>> 'format' = 'json' >>>> ) >>>> >>>> >>>> >>>> INSERT INTO ES6_SENSORDATA_SERVER_API >>>> >>>> SELECT event, >>>> >>>> user_id, >>>> >>>> distinct_id, >>>> >>>> ts2Date(`time`, 'yyyy-MM-dd') as _date, >>>> >>>> ts2Date(`time`, 'yyyy-MM-dd HH:mm:ss.SSS') as _event_time, >>>> >>>> ts2Date(recv_time, false, false) as recv_time, >>>> >>>> properties.code as code, >>>> >>>> properties.`project` as _current_project, >>>> >>>> properties.api as api, >>>> >>>> properties.elapsed as elapsed, >>>> >>>> properties.`start` as `start`, >>>> >>>> case when properties.code = '0' then 0 else 1 end as is_err >>>> >>>> FROM KafkaEventTopic >>>> >>>> where `type` in ('track') and event in ('serverApiReqEvt') >>>> >>>> >>>> 在 2020-07-13 13:44:29,"Leonard Xu" <[hidden email]> 写道: >>>>> Hello, fulin >>>>> >>>>> 这个问题能提供段可以复现的代码吗? >>>>> >>>>> 祝好, >>>>> Leonard Xu >>>>> >>>>> >>>>>> 在 2020年7月13日,09:50,Yangze Guo <[hidden email]> 写道: >>>>>> >>>>>> Hi, >>>>>> >>>>>> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. >>>>>> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 >>>>>> >>>>>> [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102 >>>>>> [2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509 >>>>>> >>>>>> Best, >>>>>> Yangze Guo >>>>>> >>>>>> On Sat, Jul 11, 2020 at 11:33 PM sunfulin <[hidden email]> wrote: >>>>>>> >>>>>>> hi, >>>>>>> 根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: >>>>>>> 不确定是我配置使用的方式不对,还是确实存在bug。。 >>>>>>> >>>>>>> >>>>>>> CREATE TABLE ES6_SENSORDATA_OUTPUT ( >>>>>>> event varchar, >>>>>>> user_id varchar, >>>>>>> distinct_id varchar, >>>>>>> _date varchar, >>>>>>> _event_time varchar, >>>>>>> recv_time varchar, >>>>>>> _browser_version varchar, >>>>>>> path_name varchar, >>>>>>> _search varchar, >>>>>>> event_type varchar, >>>>>>> _current_project varchar, >>>>>>> message varchar, >>>>>>> stack varchar, >>>>>>> component_stack varchar, >>>>>>> _screen_width varchar, >>>>>>> _screen_height varchar >>>>>>> ) WITH ( >>>>>>> 'connector' = 'elasticsearch-6', >>>>>>> 'hosts' = '<ES_YUNTU.SERVERS>', >>>>>>> 'index' = 'flink_sensordata_target_event', >>>>>>> 'document-type' = 'default', >>>>>>> 'document-id.key-delimiter' = '$', >>>>>>> 'sink.bulk-flush.interval' = '1000', >>>>>>> 'failure-handler' = 'fail', >>>>>>> 'format' = 'json' >>>>>>> ) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling >>>> >>>> >>>> >>>> >> >> >> >> |
Free forum by Nabble | Edit this page |