关于elasticSearch table sink 构造过于复杂

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

关于elasticSearch table sink 构造过于复杂

aven.wu
Elasticsearch6UpsertTableSink
的构造方法过于复杂参数非常多

public Elasticsearch6UpsertTableSink(
      boolean isAppendOnly,
      TableSchema schema,
      List<Host> hosts,
      String index,
      String docType,
      String keyDelimiter,
      String keyNullLiteral,
      SerializationSchema<Row> serializationSchema,
      XContentType contentType,
      ActionRequestFailureHandler failureHandler,
      Map<SinkOption, String> sinkOptions) {

   super(
      isAppendOnly,
      schema,
      hosts,
      index,
      docType,
      keyDelimiter,
      keyNullLiteral,
      serializationSchema,
      contentType,
      failureHandler,
      sinkOptions,
      UPDATE_REQUEST_FACTORY);
}


请问,是不是我的用法不对?
有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。

Reply | Threaded
Open this post in threaded view
|

Re: 关于elasticSearch table sink 构造过于复杂

Jark
Administrator
Hi ,


Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
或者使用 DDL 方式注册。


Best,
Jark

> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>
> Elasticsearch6UpsertTableSink
> 的构造方法过于复杂参数非常多
>
> public Elasticsearch6UpsertTableSink(
>      boolean isAppendOnly,
>      TableSchema schema,
>      List<Host> hosts,
>      String index,
>      String docType,
>      String keyDelimiter,
>      String keyNullLiteral,
>      SerializationSchema<Row> serializationSchema,
>      XContentType contentType,
>      ActionRequestFailureHandler failureHandler,
>      Map<SinkOption, String> sinkOptions) {
>
>   super(
>      isAppendOnly,
>      schema,
>      hosts,
>      index,
>      docType,
>      keyDelimiter,
>      keyNullLiteral,
>      serializationSchema,
>      contentType,
>      failureHandler,
>      sinkOptions,
>      UPDATE_REQUEST_FACTORY);
> }
>
>
> 请问,是不是我的用法不对?
> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>

Reply | Threaded
Open this post in threaded view
|

Re:Re: 关于elasticSearch table sink 构造过于复杂

aven.wu
感谢解答,
我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入






在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道:

>Hi ,
>
>
>Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
>或者使用 DDL 方式注册。
>
>
>Best,
>Jark
>
>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>>
>> Elasticsearch6UpsertTableSink
>> 的构造方法过于复杂参数非常多
>>
>> public Elasticsearch6UpsertTableSink(
>>      boolean isAppendOnly,
>>      TableSchema schema,
>>      List<Host> hosts,
>>      String index,
>>      String docType,
>>      String keyDelimiter,
>>      String keyNullLiteral,
>>      SerializationSchema<Row> serializationSchema,
>>      XContentType contentType,
>>      ActionRequestFailureHandler failureHandler,
>>      Map<SinkOption, String> sinkOptions) {
>>
>>   super(
>>      isAppendOnly,
>>      schema,
>>      hosts,
>>      index,
>>      docType,
>>      keyDelimiter,
>>      keyNullLiteral,
>>      serializationSchema,
>>      contentType,
>>      failureHandler,
>>      sinkOptions,
>>      UPDATE_REQUEST_FACTORY);
>> }
>>
>>
>> 请问,是不是我的用法不对?
>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>
Reply | Threaded
Open this post in threaded view
|

Re: 关于elasticSearch table sink 构造过于复杂

Jark
Administrator
嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。

Best,
Jark



> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道:
>
> 感谢解答,
> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>
>
>
>
>
>
> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道:
>> Hi ,
>>
>>
>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
>> 或者使用 DDL 方式注册。
>>
>>
>> Best,
>> Jark
>>
>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>>>
>>> Elasticsearch6UpsertTableSink
>>> 的构造方法过于复杂参数非常多
>>>
>>> public Elasticsearch6UpsertTableSink(
>>>     boolean isAppendOnly,
>>>     TableSchema schema,
>>>     List<Host> hosts,
>>>     String index,
>>>     String docType,
>>>     String keyDelimiter,
>>>     String keyNullLiteral,
>>>     SerializationSchema<Row> serializationSchema,
>>>     XContentType contentType,
>>>     ActionRequestFailureHandler failureHandler,
>>>     Map<SinkOption, String> sinkOptions) {
>>>
>>>  super(
>>>     isAppendOnly,
>>>     schema,
>>>     hosts,
>>>     index,
>>>     docType,
>>>     keyDelimiter,
>>>     keyNullLiteral,
>>>     serializationSchema,
>>>     contentType,
>>>     failureHandler,
>>>     sinkOptions,
>>>     UPDATE_REQUEST_FACTORY);
>>> }
>>>
>>>
>>> 请问,是不是我的用法不对?
>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>>

hb
Reply | Threaded
Open this post in threaded view
|

Re:Re: 关于elasticSearch table sink 构造过于复杂

hb
没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.



在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道:

>嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
>
>Best,
>Jark
>
>
>
>> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道:
>>
>> 感谢解答,
>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>>
>>
>>
>>
>>
>>
>> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道:
>>> Hi ,
>>>
>>>
>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
>>> 或者使用 DDL 方式注册。
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>>>>
>>>> Elasticsearch6UpsertTableSink
>>>> 的构造方法过于复杂参数非常多
>>>>
>>>> public Elasticsearch6UpsertTableSink(
>>>>     boolean isAppendOnly,
>>>>     TableSchema schema,
>>>>     List<Host> hosts,
>>>>     String index,
>>>>     String docType,
>>>>     String keyDelimiter,
>>>>     String keyNullLiteral,
>>>>     SerializationSchema<Row> serializationSchema,
>>>>     XContentType contentType,
>>>>     ActionRequestFailureHandler failureHandler,
>>>>     Map<SinkOption, String> sinkOptions) {
>>>>
>>>>  super(
>>>>     isAppendOnly,
>>>>     schema,
>>>>     hosts,
>>>>     index,
>>>>     docType,
>>>>     keyDelimiter,
>>>>     keyNullLiteral,
>>>>     serializationSchema,
>>>>     contentType,
>>>>     failureHandler,
>>>>     sinkOptions,
>>>>     UPDATE_REQUEST_FACTORY);
>>>> }
>>>>
>>>>
>>>> 请问,是不是我的用法不对?
>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: 关于elasticSearch table sink 构造过于复杂

Jark
Administrator
> ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.

据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。


如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 key (也就是去重 key)。
文档还在 review 中,可以先看这个PR: https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953 <https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953>


Best,
Jark


> 在 2019年8月26日,17:20,hb <[hidden email]> 写道:
>
> 没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
> 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.
>
>
>
> 在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道:
>> 嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
>>
>> Best,
>> Jark
>>
>>
>>
>>> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道:
>>>
>>> 感谢解答,
>>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道:
>>>> Hi ,
>>>>
>>>>
>>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
>>>> 或者使用 DDL 方式注册。
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>>>>>
>>>>> Elasticsearch6UpsertTableSink
>>>>> 的构造方法过于复杂参数非常多
>>>>>
>>>>> public Elasticsearch6UpsertTableSink(
>>>>>    boolean isAppendOnly,
>>>>>    TableSchema schema,
>>>>>    List<Host> hosts,
>>>>>    String index,
>>>>>    String docType,
>>>>>    String keyDelimiter,
>>>>>    String keyNullLiteral,
>>>>>    SerializationSchema<Row> serializationSchema,
>>>>>    XContentType contentType,
>>>>>    ActionRequestFailureHandler failureHandler,
>>>>>    Map<SinkOption, String> sinkOptions) {
>>>>>
>>>>> super(
>>>>>    isAppendOnly,
>>>>>    schema,
>>>>>    hosts,
>>>>>    index,
>>>>>    docType,
>>>>>    keyDelimiter,
>>>>>    keyNullLiteral,
>>>>>    serializationSchema,
>>>>>    contentType,
>>>>>    failureHandler,
>>>>>    sinkOptions,
>>>>>    UPDATE_REQUEST_FACTORY);
>>>>> }
>>>>>
>>>>>
>>>>> 请问,是不是我的用法不对?
>>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>>>>

Reply | Threaded
Open this post in threaded view
|

答复: 关于elasticSearch table sink 构造过于复杂

aven.wu
你好:
可以自己构建 indexRequest 设置id,type,source 等字段
 ElasticsearchSinkFunction 不知道是否满足你的需求?


发件人: Jark Wu
发送时间: 2019年8月26日 18:00
主题: Re: 关于elasticSearch table sink 构造过于复杂

> ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.

据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。


如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 key (也就是去重 key)。
文档还在 review 中,可以先看这个PR: https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953 <https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953>


Best,
Jark


> 在 2019年8月26日,17:20,hb <[hidden email]> 写道:
>
> 没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
> 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.
>
>
>
> 在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道:
>> 嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
>>
>> Best,
>> Jark
>>
>>
>>
>>> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道:
>>>
>>> 感谢解答,
>>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道:
>>>> Hi ,
>>>>
>>>>
>>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
>>>> 或者使用 DDL 方式注册。
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>>>>>
>>>>> Elasticsearch6UpsertTableSink
>>>>> 的构造方法过于复杂参数非常多
>>>>>
>>>>> public Elasticsearch6UpsertTableSink(
>>>>>    boolean isAppendOnly,
>>>>>    TableSchema schema,
>>>>>    List<Host> hosts,
>>>>>    String index,
>>>>>    String docType,
>>>>>    String keyDelimiter,
>>>>>    String keyNullLiteral,
>>>>>    SerializationSchema<Row> serializationSchema,
>>>>>    XContentType contentType,
>>>>>    ActionRequestFailureHandler failureHandler,
>>>>>    Map<SinkOption, String> sinkOptions) {
>>>>>
>>>>> super(
>>>>>    isAppendOnly,
>>>>>    schema,
>>>>>    hosts,
>>>>>    index,
>>>>>    docType,
>>>>>    keyDelimiter,
>>>>>    keyNullLiteral,
>>>>>    serializationSchema,
>>>>>    contentType,
>>>>>    failureHandler,
>>>>>    sinkOptions,
>>>>>    UPDATE_REQUEST_FACTORY);
>>>>> }
>>>>>
>>>>>
>>>>> 请问,是不是我的用法不对?
>>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>>>>