Hi
有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL 如下 INSERT INTO sink_es // 将更改同步 upsert 到 ES SELECT * FROM binlog // mysql 表的 binlog 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 但是上面的 SQL 是做不到的,只会一直 Insert 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? 社区的 FLIP-87 https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API 可以解决这个问题吗? 感谢 |
使用自定义的Table Sink就可以了啊.
Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道: > Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > SELECT * > FROM binlog // mysql 表的 binlog > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > 但是上面的 SQL 是做不到的,只会一直 Insert > > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > > 社区的 FLIP-87 > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > 可以解决这个问题吗? > > 感谢 > |
自定义 sink 有这么几个疑问
1. 自带的 sink 得都改成 upsert 比如 jdbc/es 2. 这样 append/upsert 代码有大量重复 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本 4. 得有地方另外指定 update key 这么做感觉会挖坑 有其他办法吗 lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道: > 使用自定义的Table Sink就可以了啊. > > Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道: > > > Hi > > > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 > ID,SQL > > 如下 > > > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > > SELECT * > > FROM binlog // mysql 表的 binlog > > > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > > 但是上面的 SQL 是做不到的,只会一直 Insert > > > > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > > > > 社区的 FLIP-87 > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > > 可以解决这个问题吗? > > > > 感谢 > > > |
In reply to this post by Luan Cooper
Hi,
现在的ES Sink的确是没有办法做到这种情况下的的update,因为主键信息都是通过SQL推断出来的,你这个SQL是推断不出来主键信息的。 而且你已经找到了正确的方向,我理解FLIP-87之后,我们是可以在DDL里面指定主键信息的,也就是可以达到你想要的效果。 Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道: > Hi > > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 ID,SQL > 如下 > > INSERT INTO sink_es // 将更改同步 upsert 到 ES > SELECT * > FROM binlog // mysql 表的 binlog > > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > 但是上面的 SQL 是做不到的,只会一直 Insert > > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > > 社区的 FLIP-87 > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > 可以解决这个问题吗? > > 感谢 > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by Luan Cooper
> 有其他办法吗
可以尝试group by id并配合UDF:LAST_VALUE, SQL示例如下: insert into sink_es select id,LAST_VALUE(column_name) from binlog group by id; Best, Oliver yunchang > 2020年5月7日 下午8:55,Luan Cooper <[hidden email]> 写道: > > 自定义 sink 有这么几个疑问 > 1. 自带的 sink 得都改成 upsert 比如 jdbc/es > 2. 这样 append/upsert 代码有大量重复 > 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本 > 4. 得有地方另外指定 update key > > 这么做感觉会挖坑 > > 有其他办法吗 > > lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道: > >> 使用自定义的Table Sink就可以了啊. >> >> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道: >> >>> Hi >>> >>> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 >> ID,SQL >>> 如下 >>> >>> INSERT INTO sink_es // 将更改同步 upsert 到 ES >>> SELECT * >>> FROM binlog // mysql 表的 binlog >>> >>> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 >>> 但是上面的 SQL 是做不到的,只会一直 Insert >>> >>> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? >>> >>> 社区的 FLIP-87 >>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API >>> 可以解决这个问题吗? >>> >>> 感谢 >>> >> |
Administrator
|
Hi,
FLIP-95 和 FLIP-105 后,上述的 query 就可以原生支持了。 FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 update/delete/insert 消息,而不是全当成 append 消息。 预计 1.11 能见到这些功能。 Best, Jark On Thu, 7 May 2020 at 21:34, oliver <[hidden email]> wrote: > > 有其他办法吗 > > 可以尝试group by id并配合UDF:LAST_VALUE, > SQL示例如下: > insert into sink_es > select id,LAST_VALUE(column_name) > from binlog group by id; > > > Best, > Oliver yunchang > > > 2020年5月7日 下午8:55,Luan Cooper <[hidden email]> 写道: > > > > 自定义 sink 有这么几个疑问 > > 1. 自带的 sink 得都改成 upsert 比如 jdbc/es > > 2. 这样 append/upsert 代码有大量重复 > > 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本 > > 4. 得有地方另外指定 update key > > > > 这么做感觉会挖坑 > > > > 有其他办法吗 > > > > lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道: > > > >> 使用自定义的Table Sink就可以了啊. > >> > >> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道: > >> > >>> Hi > >>> > >>> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 > >> ID,SQL > >>> 如下 > >>> > >>> INSERT INTO sink_es // 将更改同步 upsert 到 ES > >>> SELECT * > >>> FROM binlog // mysql 表的 binlog > >>> > >>> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > >>> 但是上面的 SQL 是做不到的,只会一直 Insert > >>> > >>> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > >>> > >>> 社区的 FLIP-87 > >>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > >>> 可以解决这个问题吗? > >>> > >>> 感谢 > >>> > >> > > |
圆满,感谢
On Fri, May 8, 2020 at 10:19 AM Jark Wu <[hidden email]> wrote: > Hi, > > FLIP-95 和 FLIP-105 后,上述的 query 就可以原生支持了。 > FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 update/delete/insert 消息,而不是全当成 > append 消息。 > 预计 1.11 能见到这些功能。 > > Best, > Jark > > On Thu, 7 May 2020 at 21:34, oliver <[hidden email]> wrote: > > > > 有其他办法吗 > > > > 可以尝试group by id并配合UDF:LAST_VALUE, > > SQL示例如下: > > insert into sink_es > > select id,LAST_VALUE(column_name) > > from binlog group by id; > > > > > > Best, > > Oliver yunchang > > > > > 2020年5月7日 下午8:55,Luan Cooper <[hidden email]> 写道: > > > > > > 自定义 sink 有这么几个疑问 > > > 1. 自带的 sink 得都改成 upsert 比如 jdbc/es > > > 2. 这样 append/upsert 代码有大量重复 > > > 3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本 > > > 4. 得有地方另外指定 update key > > > > > > 这么做感觉会挖坑 > > > > > > 有其他办法吗 > > > > > > lec ssmi <[hidden email]>于2020年5月7日 周四20:42写道: > > > > > >> 使用自定义的Table Sink就可以了啊. > > >> > > >> Luan Cooper <[hidden email]> 于2020年5月7日周四 下午8:39写道: > > >> > > >>> Hi > > >>> > > >>> 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是 > > >> ID,SQL > > >>> 如下 > > >>> > > >>> INSERT INTO sink_es // 将更改同步 upsert 到 ES > > >>> SELECT * > > >>> FROM binlog // mysql 表的 binlog > > >>> > > >>> 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录 > > >>> 但是上面的 SQL 是做不到的,只会一直 Insert > > >>> > > >>> 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持? > > >>> > > >>> 社区的 FLIP-87 > > >>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API > > >>> 可以解决这个问题吗? > > >>> > > >>> 感谢 > > >>> > > >> > > > > > |
Free forum by Nabble | Edit this page |