Elasticsearch6UpsertTableSink
的构造方法过于复杂参数非常多 public Elasticsearch6UpsertTableSink( boolean isAppendOnly, TableSchema schema, List<Host> hosts, String index, String docType, String keyDelimiter, String keyNullLiteral, SerializationSchema<Row> serializationSchema, XContentType contentType, ActionRequestFailureHandler failureHandler, Map<SinkOption, String> sinkOptions) { super( isAppendOnly, schema, hosts, index, docType, keyDelimiter, keyNullLiteral, serializationSchema, contentType, failureHandler, sinkOptions, UPDATE_REQUEST_FACTORY); } 请问,是不是我的用法不对? 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 |
Administrator
|
Hi ,
Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。 或者使用 DDL 方式注册。 Best, Jark > 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道: > > Elasticsearch6UpsertTableSink > 的构造方法过于复杂参数非常多 > > public Elasticsearch6UpsertTableSink( > boolean isAppendOnly, > TableSchema schema, > List<Host> hosts, > String index, > String docType, > String keyDelimiter, > String keyNullLiteral, > SerializationSchema<Row> serializationSchema, > XContentType contentType, > ActionRequestFailureHandler failureHandler, > Map<SinkOption, String> sinkOptions) { > > super( > isAppendOnly, > schema, > hosts, > index, > docType, > keyDelimiter, > keyNullLiteral, > serializationSchema, > contentType, > failureHandler, > sinkOptions, > UPDATE_REQUEST_FACTORY); > } > > > 请问,是不是我的用法不对? > 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 > |
感谢解答,
我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道: >Hi , > > >Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。 >或者使用 DDL 方式注册。 > > >Best, >Jark > >> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道: >> >> Elasticsearch6UpsertTableSink >> 的构造方法过于复杂参数非常多 >> >> public Elasticsearch6UpsertTableSink( >> boolean isAppendOnly, >> TableSchema schema, >> List<Host> hosts, >> String index, >> String docType, >> String keyDelimiter, >> String keyNullLiteral, >> SerializationSchema<Row> serializationSchema, >> XContentType contentType, >> ActionRequestFailureHandler failureHandler, >> Map<SinkOption, String> sinkOptions) { >> >> super( >> isAppendOnly, >> schema, >> hosts, >> index, >> docType, >> keyDelimiter, >> keyNullLiteral, >> serializationSchema, >> contentType, >> failureHandler, >> sinkOptions, >> UPDATE_REQUEST_FACTORY); >> } >> >> >> 请问,是不是我的用法不对? >> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 >> |
Administrator
|
嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
Best, Jark > 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道: > > 感谢解答, > 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 > > > > > > > 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道: >> Hi , >> >> >> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。 >> 或者使用 DDL 方式注册。 >> >> >> Best, >> Jark >> >>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道: >>> >>> Elasticsearch6UpsertTableSink >>> 的构造方法过于复杂参数非常多 >>> >>> public Elasticsearch6UpsertTableSink( >>> boolean isAppendOnly, >>> TableSchema schema, >>> List<Host> hosts, >>> String index, >>> String docType, >>> String keyDelimiter, >>> String keyNullLiteral, >>> SerializationSchema<Row> serializationSchema, >>> XContentType contentType, >>> ActionRequestFailureHandler failureHandler, >>> Map<SinkOption, String> sinkOptions) { >>> >>> super( >>> isAppendOnly, >>> schema, >>> hosts, >>> index, >>> docType, >>> keyDelimiter, >>> keyNullLiteral, >>> serializationSchema, >>> contentType, >>> failureHandler, >>> sinkOptions, >>> UPDATE_REQUEST_FACTORY); >>> } >>> >>> >>> 请问,是不是我的用法不对? >>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 >>> |
没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. 在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道: >嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。 > >Best, >Jark > > > >> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道: >> >> 感谢解答, >> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 >> >> >> >> >> >> >> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道: >>> Hi , >>> >>> >>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。 >>> 或者使用 DDL 方式注册。 >>> >>> >>> Best, >>> Jark >>> >>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道: >>>> >>>> Elasticsearch6UpsertTableSink >>>> 的构造方法过于复杂参数非常多 >>>> >>>> public Elasticsearch6UpsertTableSink( >>>> boolean isAppendOnly, >>>> TableSchema schema, >>>> List<Host> hosts, >>>> String index, >>>> String docType, >>>> String keyDelimiter, >>>> String keyNullLiteral, >>>> SerializationSchema<Row> serializationSchema, >>>> XContentType contentType, >>>> ActionRequestFailureHandler failureHandler, >>>> Map<SinkOption, String> sinkOptions) { >>>> >>>> super( >>>> isAppendOnly, >>>> schema, >>>> hosts, >>>> index, >>>> docType, >>>> keyDelimiter, >>>> keyNullLiteral, >>>> serializationSchema, >>>> contentType, >>>> failureHandler, >>>> sinkOptions, >>>> UPDATE_REQUEST_FACTORY); >>>> } >>>> >>>> >>>> 请问,是不是我的用法不对? >>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 >>>> |
Administrator
|
> ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.
据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。 如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 key (也就是去重 key)。 文档还在 review 中,可以先看这个PR: https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953 <https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953> Best, Jark > 在 2019年8月26日,17:20,hb <[hidden email]> 写道: > > 没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es, > 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. > > > > 在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道: >> 嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。 >> >> Best, >> Jark >> >> >> >>> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道: >>> >>> 感谢解答, >>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 >>> >>> >>> >>> >>> >>> >>> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道: >>>> Hi , >>>> >>>> >>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。 >>>> 或者使用 DDL 方式注册。 >>>> >>>> >>>> Best, >>>> Jark >>>> >>>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道: >>>>> >>>>> Elasticsearch6UpsertTableSink >>>>> 的构造方法过于复杂参数非常多 >>>>> >>>>> public Elasticsearch6UpsertTableSink( >>>>> boolean isAppendOnly, >>>>> TableSchema schema, >>>>> List<Host> hosts, >>>>> String index, >>>>> String docType, >>>>> String keyDelimiter, >>>>> String keyNullLiteral, >>>>> SerializationSchema<Row> serializationSchema, >>>>> XContentType contentType, >>>>> ActionRequestFailureHandler failureHandler, >>>>> Map<SinkOption, String> sinkOptions) { >>>>> >>>>> super( >>>>> isAppendOnly, >>>>> schema, >>>>> hosts, >>>>> index, >>>>> docType, >>>>> keyDelimiter, >>>>> keyNullLiteral, >>>>> serializationSchema, >>>>> contentType, >>>>> failureHandler, >>>>> sinkOptions, >>>>> UPDATE_REQUEST_FACTORY); >>>>> } >>>>> >>>>> >>>>> 请问,是不是我的用法不对? >>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 >>>>> |
你好:
可以自己构建 indexRequest 设置id,type,source 等字段 ElasticsearchSinkFunction 不知道是否满足你的需求? 发件人: Jark Wu 发送时间: 2019年8月26日 18:00 主题: Re: 关于elasticSearch table sink 构造过于复杂 > ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. 据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。 如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 key (也就是去重 key)。 文档还在 review 中,可以先看这个PR: https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953 <https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953> Best, Jark > 在 2019年8月26日,17:20,hb <[hidden email]> 写道: > > 没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es, > 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. > > > > 在 2019-08-26 15:47:53,"Jark Wu" <[hidden email]> 写道: >> 嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。 >> >> Best, >> Jark >> >> >> >>> 在 2019年8月26日,16:44,巫旭阳 <[hidden email]> 写道: >>> >>> 感谢解答, >>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入 >>> >>> >>> >>> >>> >>> >>> 在 2019-08-26 16:39:49,"Jark Wu" <[hidden email]> 写道: >>>> Hi , >>>> >>>> >>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。 >>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 org.apache.flink.table.descriptors.Elasticsearch。 >>>> 或者使用 DDL 方式注册。 >>>> >>>> >>>> Best, >>>> Jark >>>> >>>>> 在 2019年8月26日,16:33,aven.wu <[hidden email]> 写道: >>>>> >>>>> Elasticsearch6UpsertTableSink >>>>> 的构造方法过于复杂参数非常多 >>>>> >>>>> public Elasticsearch6UpsertTableSink( >>>>> boolean isAppendOnly, >>>>> TableSchema schema, >>>>> List<Host> hosts, >>>>> String index, >>>>> String docType, >>>>> String keyDelimiter, >>>>> String keyNullLiteral, >>>>> SerializationSchema<Row> serializationSchema, >>>>> XContentType contentType, >>>>> ActionRequestFailureHandler failureHandler, >>>>> Map<SinkOption, String> sinkOptions) { >>>>> >>>>> super( >>>>> isAppendOnly, >>>>> schema, >>>>> hosts, >>>>> index, >>>>> docType, >>>>> keyDelimiter, >>>>> keyNullLiteral, >>>>> serializationSchema, >>>>> contentType, >>>>> failureHandler, >>>>> sinkOptions, >>>>> UPDATE_REQUEST_FACTORY); >>>>> } >>>>> >>>>> >>>>> 请问,是不是我的用法不对? >>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。 >>>>> |
Free forum by Nabble | Edit this page |