Login  Register

Re:Re: 关于elasticSearch table sink 构造过于复杂

Posted by hb on Aug 26, 2019; 9:20am
URL: http://apache-flink.370.s1.nabble.com/elasticSearch-table-sink-tp475p480.html

没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.



在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道:

>嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
>
>Best,
>Jark
>
>
>
>> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道:
>>
>> 感谢解答,
>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>>
>>
>>
>>
>>
>>
>> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道:
>>> Hi ,
>>>
>>>
>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。
>>> 或者使用 DDL 方式注册。
>>>
>>>
>>> Best,
>>> Jark
>>>
>>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道:
>>>>
>>>> Elasticsearch6UpsertTableSink
>>>> 的构造方法过于复杂参数非常多
>>>>
>>>> public Elasticsearch6UpsertTableSink(
>>>>     boolean isAppendOnly,
>>>>     TableSchema schema,
>>>>     List<Host> hosts,
>>>>     String index,
>>>>     String docType,
>>>>     String keyDelimiter,
>>>>     String keyNullLiteral,
>>>>     SerializationSchema<Row> serializationSchema,
>>>>     XContentType contentType,
>>>>     ActionRequestFailureHandler failureHandler,
>>>>     Map<SinkOption, String> sinkOptions) {
>>>>
>>>>  super(
>>>>     isAppendOnly,
>>>>     schema,
>>>>     hosts,
>>>>     index,
>>>>     docType,
>>>>     keyDelimiter,
>>>>     keyNullLiteral,
>>>>     serializationSchema,
>>>>     contentType,
>>>>     failureHandler,
>>>>     sinkOptions,
>>>>     UPDATE_REQUEST_FACTORY);
>>>> }
>>>>
>>>>
>>>> 请问,是不是我的用法不对?
>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>>>