Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

Luan Cooper
Hi

有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL
如下

INSERT INTO sink_es // 将更改同步 upsert 到 ES
SELECT *
FROM binlog // mysql 表的 binlog

假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
但是上面的 SQL 是做不到的,只会一直 Insert

如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?

社区的 FLIP-87
https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
 可以解决这个问题吗?

感谢
Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

lec ssmi
使用自定义的Table Sink就可以了啊.

Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道:

> Hi
>
> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL
> 如下
>
> INSERT INTO sink_es // 将更改同步 upsert 到 ES
> SELECT *
> FROM binlog // mysql 表的 binlog
>
> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> 但是上面的 SQL 是做不到的,只会一直 Insert
>
> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
>
> 社区的 FLIP-87
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
>  可以解决这个问题吗?
>
> 感谢
>
Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

Luan Cooper
自定义 sink 有这么几个疑问
1. 自带的 sink 得都改成 upsert 比如 jdbc/es
2. 这样 append/upsert 代码有大量重复
3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本
4. 得有地方另外指定 update key

这么做感觉会挖坑

有其他办法吗

lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道:

> 使用自定义的Table Sink就可以了啊.
>
> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道:
>
> > Hi
> >
> > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是
> ID,SQL
> > 如下
> >
> > INSERT INTO sink_es // 将更改同步 upsert 到 ES
> > SELECT *
> > FROM binlog // mysql 表的 binlog
> >
> > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> > 但是上面的 SQL 是做不到的,只会一直 Insert
> >
> > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
> >
> > 社区的 FLIP-87
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
> >  可以解决这个问题吗?
> >
> > 感谢
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

Benchao Li
In reply to this post by Luan Cooper
Hi,

现在的ES Sink的确是没有办法做到这种情况下的的update,因为主键信息都是通过SQL推断出来的,你这个SQL是推断不出来主键信息的。
而且你已经找到了正确的方向,我理解FLIP-87之后,我们是可以在DDL里面指定主键信息的,也就是可以达到你想要的效果。

Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道:

> Hi
>
> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL
> 如下
>
> INSERT INTO sink_es // 将更改同步 upsert 到 ES
> SELECT *
> FROM binlog // mysql 表的 binlog
>
> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> 但是上面的 SQL 是做不到的,只会一直 Insert
>
> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
>
> 社区的 FLIP-87
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
>  可以解决这个问题吗?
>
> 感谢
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [hidden email]; [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

云长
In reply to this post by Luan Cooper
> 有其他办法吗

可以尝试group by id并配合UDF:LAST_VALUE,
SQL示例如下:
insert into sink_es
select id,LAST_VALUE(column_name)
from binlog group by id;


Best,
Oliver yunchang

> 2020年5月7日 下午8:55,Luan Cooper <[hidden email]> 写道:
>
> 自定义 sink 有这么几个疑问
> 1. 自带的 sink 得都改成 upsert 比如 jdbc/es
> 2. 这样 append/upsert 代码有大量重复
> 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本
> 4. 得有地方另外指定 update key
>
> 这么做感觉会挖坑
>
> 有其他办法吗
>
> lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道:
>
>> 使用自定义的Table Sink就可以了啊.
>>
>> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道:
>>
>>> Hi
>>>
>>> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是
>> ID,SQL
>>> 如下
>>>
>>> INSERT INTO sink_es // 将更改同步 upsert 到 ES
>>> SELECT *
>>> FROM binlog // mysql 表的 binlog
>>>
>>> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
>>> 但是上面的 SQL 是做不到的,只会一直 Insert
>>>
>>> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
>>>
>>> 社区的 FLIP-87
>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
>>> 可以解决这个问题吗?
>>>
>>> 感谢
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

Jark
Administrator
Hi,

FLIP-95 和 FLIP-105 后,上述的 query 就可以原生支持了。
FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 update/delete/insert 消息,而不是全当成
append 消息。
预计 1.11 能见到这些功能。

Best,
Jark

On Thu, 7 May 2020 at 21:34, oliver <[hidden email]> wrote:

> > 有其他办法吗
>
> 可以尝试group by id并配合UDF:LAST_VALUE,
> SQL示例如下:
> insert into sink_es
> select id,LAST_VALUE(column_name)
> from binlog group by id;
>
>
> Best,
> Oliver yunchang
>
> > 2020年5月7日 下午8:55,Luan Cooper <[hidden email]> 写道:
> >
> > 自定义 sink 有这么几个疑问
> > 1. 自带的 sink 得都改成 upsert 比如 jdbc/es
> > 2. 这样 append/upsert 代码有大量重复
> > 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本
> > 4. 得有地方另外指定 update key
> >
> > 这么做感觉会挖坑
> >
> > 有其他办法吗
> >
> > lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道:
> >
> >> 使用自定义的Table Sink就可以了啊.
> >>
> >> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道:
> >>
> >>> Hi
> >>>
> >>> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是
> >> ID,SQL
> >>> 如下
> >>>
> >>> INSERT INTO sink_es // 将更改同步 upsert 到 ES
> >>> SELECT *
> >>> FROM binlog // mysql 表的 binlog
> >>>
> >>> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> >>> 但是上面的 SQL 是做不到的,只会一直 Insert
> >>>
> >>> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
> >>>
> >>> 社区的 FLIP-87
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
> >>> 可以解决这个问题吗?
> >>>
> >>> 感谢
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Streaming SQL 的 Source/Sink 在 Append Mode/Upsert Mode 上的疑问

Luan Cooper
圆满,感谢

On Fri, May 8, 2020 at 10:19 AM Jark Wu <[hidden email]> wrote:

> Hi,
>
> FLIP-95 和 FLIP-105 后,上述的 query 就可以原生支持了。
> FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 update/delete/insert 消息,而不是全当成
> append 消息。
> 预计 1.11 能见到这些功能。
>
> Best,
> Jark
>
> On Thu, 7 May 2020 at 21:34, oliver <[hidden email]> wrote:
>
> > > 有其他办法吗
> >
> > 可以尝试group by id并配合UDF:LAST_VALUE,
> > SQL示例如下:
> > insert into sink_es
> > select id,LAST_VALUE(column_name)
> > from binlog group by id;
> >
> >
> > Best,
> > Oliver yunchang
> >
> > > 2020年5月7日 下午8:55,Luan Cooper <[hidden email]> 写道:
> > >
> > > 自定义 sink 有这么几个疑问
> > > 1. 自带的 sink 得都改成 upsert 比如 jdbc/es
> > > 2. 这样 append/upsert 代码有大量重复
> > > 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本
> > > 4. 得有地方另外指定 update key
> > >
> > > 这么做感觉会挖坑
> > >
> > > 有其他办法吗
> > >
> > > lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道:
> > >
> > >> 使用自定义的Table Sink就可以了啊.
> > >>
> > >> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道:
> > >>
> > >>> Hi
> > >>>
> > >>> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是
> > >> ID,SQL
> > >>> 如下
> > >>>
> > >>> INSERT INTO sink_es // 将更改同步 upsert 到 ES
> > >>> SELECT *
> > >>> FROM binlog // mysql 表的 binlog
> > >>>
> > >>> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> > >>> 但是上面的 SQL 是做不到的,只会一直 Insert
> > >>>
> > >>> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
> > >>>
> > >>> 社区的 FLIP-87
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
> > >>> 可以解决这个问题吗?
> > >>>
> > >>> 感谢
> > >>>
> > >>
> >
> >
>