FlinkSQL Upsert/Retraction 写入 MySQL 的问题

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkSQL Upsert/Retraction 写入 MySQL 的问题

wanglei2@geekplus.com.cn


INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1

每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
但这个 Sink 到底是用到了 UpsertStream 还是 RetractStream 呢,怎么判断是 UpsertStream 还是 RetractStream 呢?

我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?



如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?




[hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Leonard Xu
Hi,wanglei

> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)

> 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。

> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu
Reply | Threaded
Open this post in threaded view
|

回复: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

wanglei2@geekplus.com.cn
Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



[hidden email]

 
发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei
 
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)
 
> 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。
 
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
 
不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
 
Best,
 
Leonard Xu
Reply | Threaded
Open this post in threaded view
|

Re: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Jark
Administrator
https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java#L261

Best,
Jark

On Tue, 28 Apr 2020 at 10:24, [hidden email] <
[hidden email]> wrote:

> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> > INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> > 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> > 我看
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> > 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> > 如若不带 group by 直接:
> > INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> > 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>
Reply | Threaded
Open this post in threaded view
|

回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

wanglei2@geekplus.com.cn
In reply to this post by wanglei2@geekplus.com.cn

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



[hidden email]

 
发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei
 
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)
 
> 我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。
 
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
 
不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
 
Best,
 
Leonard Xu
Reply | Threaded
Open this post in threaded view
|

回复:回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

1101300123
我看源码这样写道:
/**
 * Get dialect upsert statement, the database has its own upsert syntax, such as Mysql
 * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO UPDATE SET..
 *
 * @return None if dialect does not support upsert statement, the writer will degrade to
 * the use of select + update/insert, this performance is poor.
 */
default Optional<String> getUpsertStatement(
      String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
不同的数据库产品有不同的语句,所以默认实现是delete +insert


但是我看
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
         Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
         if (tuple.f0) {
            processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
      }
      internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}


方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
在2020年4月28日 11:43,[hidden email]<[hidden email]> 写道:

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



[hidden email]


发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)

我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu
Reply | Threaded
Open this post in threaded view
|

Re: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Jark
Administrator
UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
,当 false 时代表 delelte,true 时代表 upsert 消息。

Best,
Jark

On Tue, 28 Apr 2020 at 14:05, 1101300123 <[hidden email]> wrote:

> 我看源码这样写道:
> /**
>  * Get dialect upsert statement, the database has its own upsert syntax,
> such as Mysql
>  * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
> UPDATE SET..
>  *
>  * @return None if dialect does not support upsert statement, the writer
> will degrade to
>  * the use of select + update/insert, this performance is poor.
>  */
> default Optional<String> getUpsertStatement(
>       String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> return Optional.empty();
> }
> 不同的数据库产品有不同的语句,所以默认实现是delete +insert
>
>
> 但是我看
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
>          Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
>          if (tuple.f0) {
>             processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
>       }
>       internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }
>
>
> 方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
> 在2020年4月28日 11:43,[hidden email]<[hidden email]> 写道:
>
> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> 我看
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

1101300123
我知道,我的意思是,upsert自己实现了,我可以直接丢弃false的数据不做处理这样会不会提升部分效率


在2020年4月28日 14:11,Jark Wu<[hidden email]> 写道:
UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
,当 false 时代表 delelte,true 时代表 upsert 消息。

Best,
Jark

On Tue, 28 Apr 2020 at 14:05, 1101300123 <[hidden email]> wrote:

我看源码这样写道:
/**
* Get dialect upsert statement, the database has its own upsert syntax,
such as Mysql
* using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer
will degrade to
* the use of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
不同的数据库产品有不同的语句,所以默认实现是delete +insert


但是我看
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}


方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
在2020年4月28日 11:43,[hidden email]<[hidden email]> 写道:

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



[hidden email]


发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
RetractStreamSink)

我看
https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu

Reply | Threaded
Open this post in threaded view
|

Re: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

Jark
Administrator
是能提高一定的效率。不过可能会导致结果正确性问题。

Best,
Jark

On Tue, 28 Apr 2020 at 14:16, 1101300123 <[hidden email]> wrote:

> 我知道,我的意思是,upsert自己实现了,我可以直接丢弃false的数据不做处理这样会不会提升部分效率
>
>
> 在2020年4月28日 14:11,Jark Wu<[hidden email]> 写道:
> UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
> ,当 false 时代表 delelte,true 时代表 upsert 消息。
>
> Best,
> Jark
>
> On Tue, 28 Apr 2020 at 14:05, 1101300123 <[hidden email]> wrote:
>
> 我看源码这样写道:
> /**
> * Get dialect upsert statement, the database has its own upsert syntax,
> such as Mysql
> * using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
> UPDATE SET..
> *
> * @return None if dialect does not support upsert statement, the writer
> will degrade to
> * the use of select + update/insert, this performance is poor.
> */
> default Optional<String> getUpsertStatement(
> String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> return Optional.empty();
> }
> 不同的数据库产品有不同的语句,所以默认实现是delete +insert
>
>
> 但是我看
> @Override
> public void executeBatch() throws SQLException {
> if (keyToRows.size() > 0) {
> for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
> Row pk = entry.getKey();
> Tuple2<Boolean, Row> tuple = entry.getValue();
> if (tuple.f0) {
> processOneRowInBatch(pk, tuple.f1);
> } else {
> setRecordToStatement(deleteStatement, pkTypes, pk);
> deleteStatement.addBatch();
> }
> }
> internalExecuteBatch();
> deleteStatement.executeBatch();
> keyToRows.clear();
> }
> }
>
>
> 方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
> 在2020年4月28日 11:43,[hidden email]<[hidden email]> 写道:
>
> Thanks Leonard,
>
> JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
> DUPLICATE KEY 吗?
> 这个在源代码哪个地方呢?
>
> 谢谢,
> 王磊
>
>
>
> [hidden email]
>
>
> 发件人: Leonard Xu
> 发送时间: 2020-04-27 12:58
> 收件人: user-zh
> 主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
> Hi,wanglei
>
> INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
> 每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
> 这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
> 需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
> RetractStreamSink)
>
> 我看
>
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 没有 Retract 方式
> 实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
> 现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
> 你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
> 也不支持retract。
>
> 如若不带 group by 直接:
> INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
> 主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?
>
> 不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
> 只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可
>
> Best,
>
> Leonard Xu
>
>
Reply | Threaded
Open this post in threaded view
|

回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

1101300123
In reply to this post by Leonard Xu
如果我的sink是mysql 支持主键索引,我可不可以理解处理逻辑是retract 和upsert是一样的;上游数据false标记的是失效的记录,我删除失效的或者更新失效的数据是无区别的;


其实我还是对retract流和upsert流有点疑问
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/streaming/dynamic_tables.html



Append-only stream: A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.

Retract stream: A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into an retract stream by encoding an INSERT change as add message, a DELETE change as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.





Upsert stream: An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a stream by encoding INSERT and UPDATE changes as upsert messages and DELETE changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.





The API to convert a dynamic table into a DataStream is discussed on the Common Concepts page. Please note that only append and retract streams are supported when converting a dynamic table into a DataStream. The TableSink interface to emit a dynamic table to an external system are discussed on the TableSources and TableSinks page.

retract流做分组聚合,满足条件的group 字段数据来了,需要更新,发出一条false 标识的旧数据告诉下游这条数据失效了;

但是Upsert 是什么意思;An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. 有点不明白



在2020年4月27日 12:58,Leonard Xu<[hidden email]> 写道:
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink, RetractStreamSink)

我看 https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc 没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理, 也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu
Reply | Threaded
Open this post in threaded view
|

回复: 回复: 回复: FlinkSQL Upsert/Retraction 写入 MySQL 的问题

1101300123
In reply to this post by Jark
结果的正确性问题?我不太明白。比如我的结果是
(true,(Mary,1))
(true,(Bob,1))
(false,(Mary,1))
(true,(Mary,2))
(true,(Liz,1))
(false,(Bob,1))
(true,(Bob,2))
我只做upsert好像没什么问题。针对retract流处理没问题;
其实我还是不太明白 upsert 流


在2020年4月28日 14:34,Jark Wu<[hidden email]> 写道:
是能提高一定的效率。不过可能会导致结果正确性问题。

Best,
Jark

On Tue, 28 Apr 2020 at 14:16, 1101300123 <[hidden email]> wrote:

我知道,我的意思是,upsert自己实现了,我可以直接丢弃false的数据不做处理这样会不会提升部分效率


在2020年4月28日 14:11,Jark Wu<[hidden email]> 写道:
UpsertSink 仍然可能会收到 delete 消息的,所以你可以看到 UpsertSink 的输入是 Tuple2<Boolean, Row>
,当 false 时代表 delelte,true 时代表 upsert 消息。

Best,
Jark

On Tue, 28 Apr 2020 at 14:05, 1101300123 <[hidden email]> wrote:

我看源码这样写道:
/**
* Get dialect upsert statement, the database has its own upsert syntax,
such as Mysql
* using DUPLICATE KEY UPDATE, and PostgresSQL using ON CONFLICT... DO
UPDATE SET..
*
* @return None if dialect does not support upsert statement, the writer
will degrade to
* the use of select + update/insert, this performance is poor.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
不同的数据库产品有不同的语句,所以默认实现是delete +insert


但是我看
@Override
public void executeBatch() throws SQLException {
if (keyToRows.size() > 0) {
for (Map.Entry<Row, Tuple2<Boolean, Row>> entry : keyToRows.entrySet()) {
Row pk = entry.getKey();
Tuple2<Boolean, Row> tuple = entry.getValue();
if (tuple.f0) {
processOneRowInBatch(pk, tuple.f1);
} else {
setRecordToStatement(deleteStatement, pkTypes, pk);
deleteStatement.addBatch();
}
}
internalExecuteBatch();
deleteStatement.executeBatch();
keyToRows.clear();
}
}


方法中是有delete的,如果我自己实现了upset,是不是没有delete的必要也不用取false的记录
在2020年4月28日 11:43,[hidden email]<[hidden email]> 写道:

Thanks Leonard,

JDBCUpsertTableSink 按照 Upsert 的方式处理,实际执行的 SQL 语句是 INSERT INTO  ON
DUPLICATE KEY 吗?
这个在源代码哪个地方呢?

谢谢,
王磊



[hidden email]


发件人: Leonard Xu
发送时间: 2020-04-27 12:58
收件人: user-zh
主题: Re: FlinkSQL Upsert/Retraction 写入 MySQL 的问题
Hi,wanglei

INSERT INTO  mysql_sink SELECT  f1, count(*) FROM kafka_src GROUP BY f1
每从 kafka 过来一条新的记录,会生成两条记录 Tuple2<Row, Boolean>, 旧的被删除,新的会添加上。
这是query是会一个会产生retract stream的query,可以简单理解成每条kafka的数据过来会产生两条记录,但是最终写入下游的系统
需要看下游的系统支持和实现的sink(现在有三种sink AppendStreamSink, UpsertStreamSink,
RetractStreamSink)

我看

https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
没有 Retract 方式
实际上使用了 JDBCUpsertTableSink.java 的代码写入 MySQL 吗?
现有的sink中,kafka是实现的AppendStreamSink,所以只支持insert 的记录,不支持retract.
你用DDL声明的mysql表,对应的jdbc sink 是JDBCUpsertTableSink,所以会按照Upsert的逻辑处理,
也不支持retract。

如若不带 group by 直接:
INSERT INTO  mysql_sink SELECT  f1,  f2 FROM kafka_src
主键冲突写入 mysql 是会出错的,怎么可以用 Upsert 的方式直接覆盖呢?

不带 group by时无法推导出query的 unique key,没法做按照unique key的更新,
只需要将 query的 key (你这里是group by 后的字段)和db中主键保持一致即可

Best,

Leonard Xu