JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
duplicate方式写入。

但我在使用中,发现报了 duplicate entry的错误。例如:
Caused by: com.mysql.jdbc.exceptions.jdbc4.
MySQLIntegrityConstraintViolationException: Duplicate entry
'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
    at com.mysql.jdbc.Util.getInstance(Util.java:386)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
.java:2157)
    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2460)
    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2377)
    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
.java:2361)
    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
PreparedStatement.java:1793)

(2)
此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
但这个冲突的entry是在14.11分那一波才报错的。
Reply | Threaded
Open this post in threaded view
|

Re:JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

hailongwang
数据库中主键的设置跟 primary key 定义的一样不?


Best,
Hailong
在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:

>如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>duplicate方式写入。
>
>但我在使用中,发现报了 duplicate entry的错误。例如:
>Caused by: com.mysql.jdbc.exceptions.jdbc4.
>MySQLIntegrityConstraintViolationException: Duplicate entry
>'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>NativeConstructorAccessorImpl.java:62)
>    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>DelegatingConstructorAccessorImpl.java:45)
>    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>    at com.mysql.jdbc.Util.getInstance(Util.java:386)
>    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>.java:2157)
>    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>.java:2460)
>    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>.java:2377)
>    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>.java:2361)
>    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>PreparedStatement.java:1793)
>
>(2)
>此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>但这个冲突的entry是在14.11分那一波才报错的。
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
@hailongwang 一样的。

有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。




hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:

> 数据库中主键的设置跟 primary key 定义的一样不?
>
>
> Best,
> Hailong
> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> >duplicate方式写入。
> >
> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >NativeConstructorAccessorImpl.java:62)
> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >DelegatingConstructorAccessorImpl.java:45)
> >    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >.java:2157)
> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >.java:2460)
> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >.java:2377)
> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >.java:2361)
> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >PreparedStatement.java:1793)
> >
> >(2)
> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >但这个冲突的entry是在14.11分那一波才报错的。
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
补充sql:

DDL:

CREATE TABLE flink_recent_pv_subid
(
    `supply_id` STRING,
    `subid`     STRING,
    `mark`      STRING,
    `time`      STRING,
    `pv`        BIGINT,
    PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
) WITH (
  'connector.type' = 'jdbc',

  ......

);


查询SQL:

INSERT INTO
    flink_recent_pv_subid
SELECT
    `sid`,
    `subid`,
    `mark`,
    DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
'yyyyMMddHHmm') as `time`,
    count(1) AS `pv`
FROM baidu_log_view
GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);


赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:

> @hailongwang 一样的。
>
> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>
>
>
>
> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
>
>> 数据库中主键的设置跟 primary key 定义的一样不?
>>
>>
>> Best,
>> Hailong
>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>> >duplicate方式写入。
>> >
>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>> >NativeConstructorAccessorImpl.java:62)
>> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>> >DelegatingConstructorAccessorImpl.java:45)
>> >    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
>> >    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>> >    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>> >    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>> >.java:2157)
>> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2460)
>> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2377)
>> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> >.java:2361)
>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>> >PreparedStatement.java:1793)
>> >
>> >(2)
>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>> >但这个冲突的entry是在14.11分那一波才报错的。
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
如下是Flink官方文档JBDC connector的部分内容。Key handling
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>

Flink uses the primary key that defined in DDL when writing data to
external databases. The connector operate in upsert mode if the primary key
was defined, otherwise, the connector operate in append mode.

In upsert mode, Flink will insert a new row or update the existing row
according to the primary key, Flink can ensure the idempotence in this way.
To guarantee the output result is as expected, it’s recommended to define
primary key for the table and make sure the primary key is one of the
unique key sets or primary key of the underlying database table. In append
mode, Flink will interpret all records as INSERT messages, the INSERT
operation may fail if a primary key or unique constraint violation happens
in the underlying database.

See CREATE TABLE DDL
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
for
more details about PRIMARY KEY syntax.


这里也有一点,In append mode, Flink will interpret all records as INSERT messages,
the INSERT operation may fail if a primary key or unique constraint
violation happens in the underlying database.  什么叫append
mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?

1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。



赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:

> 补充sql:
>
> DDL:
>
> CREATE TABLE flink_recent_pv_subid
> (
>     `supply_id` STRING,
>     `subid`     STRING,
>     `mark`      STRING,
>     `time`      STRING,
>     `pv`        BIGINT,
>     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> ) WITH (
>   'connector.type' = 'jdbc',
>
>   ......
>
> );
>
>
> 查询SQL:
>
> INSERT INTO
>     flink_recent_pv_subid
> SELECT
>     `sid`,
>     `subid`,
>     `mark`,
>     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE), 'yyyyMMddHHmm') as `time`,
>     count(1) AS `pv`
> FROM baidu_log_view
> GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
>
>
> 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
>
>> @hailongwang 一样的。
>>
>> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>>
>>
>>
>>
>> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
>>
>>> 数据库中主键的设置跟 primary key 定义的一样不?
>>>
>>>
>>> Best,
>>> Hailong
>>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
>>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>>> >duplicate方式写入。
>>> >
>>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
>>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>>> >NativeConstructorAccessorImpl.java:62)
>>> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>>> >DelegatingConstructorAccessorImpl.java:45)
>>> >    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
>>> >    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>>> >    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>>> >    at
>>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>>> >.java:2157)
>>> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> >.java:2460)
>>> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> >.java:2377)
>>> >    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> >.java:2361)
>>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>>> >PreparedStatement.java:1793)
>>> >
>>> >(2)
>>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>>> >但这个冲突的entry是在14.11分那一波才报错的。
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

Jark
Administrator
请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。

On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:

> 如下是Flink官方文档JBDC connector的部分内容。Key handling
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >
>
> Flink uses the primary key that defined in DDL when writing data to
> external databases. The connector operate in upsert mode if the primary key
> was defined, otherwise, the connector operate in append mode.
>
> In upsert mode, Flink will insert a new row or update the existing row
> according to the primary key, Flink can ensure the idempotence in this way.
> To guarantee the output result is as expected, it’s recommended to define
> primary key for the table and make sure the primary key is one of the
> unique key sets or primary key of the underlying database table. In append
> mode, Flink will interpret all records as INSERT messages, the INSERT
> operation may fail if a primary key or unique constraint violation happens
> in the underlying database.
>
> See CREATE TABLE DDL
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >
> for
> more details about PRIMARY KEY syntax.
>
>
> 这里也有一点,In append mode, Flink will interpret all records as INSERT messages,
> the INSERT operation may fail if a primary key or unique constraint
> violation happens in the underlying database.  什么叫append
> mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>
> 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>
>
>
> 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
>
> > 补充sql:
> >
> > DDL:
> >
> > CREATE TABLE flink_recent_pv_subid
> > (
> >     `supply_id` STRING,
> >     `subid`     STRING,
> >     `mark`      STRING,
> >     `time`      STRING,
> >     `pv`        BIGINT,
> >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > ) WITH (
> >   'connector.type' = 'jdbc',
> >
> >   ......
> >
> > );
> >
> >
> > 查询SQL:
> >
> > INSERT INTO
> >     flink_recent_pv_subid
> > SELECT
> >     `sid`,
> >     `subid`,
> >     `mark`,
> >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> 'yyyyMMddHHmm') as `time`,
> >     count(1) AS `pv`
> > FROM baidu_log_view
> > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5' MINUTE);
> >
> >
> > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
> >
> >> @hailongwang 一样的。
> >>
> >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> >>
> >>
> >>
> >>
> >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
> >>
> >>> 数据库中主键的设置跟 primary key 定义的一样不?
> >>>
> >>>
> >>> Best,
> >>> Hailong
> >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> >>> >duplicate方式写入。
> >>> >
> >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >>> Method)
> >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >>> >NativeConstructorAccessorImpl.java:62)
> >>> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >>> >DelegatingConstructorAccessorImpl.java:45)
> >>> >    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >>> >    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >>> >    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >>> >    at
> >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >>> >.java:2157)
> >>> >    at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2460)
> >>> >    at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2377)
> >>> >    at
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> >.java:2361)
> >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >>> >PreparedStatement.java:1793)
> >>> >
> >>> >(2)
> >>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >>> >但这个冲突的entry是在14.11分那一波才报错的。
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
页面。

1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。

Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:

> 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>
> On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
>
> > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > >
> >
> > Flink uses the primary key that defined in DDL when writing data to
> > external databases. The connector operate in upsert mode if the primary
> key
> > was defined, otherwise, the connector operate in append mode.
> >
> > In upsert mode, Flink will insert a new row or update the existing row
> > according to the primary key, Flink can ensure the idempotence in this
> way.
> > To guarantee the output result is as expected, it’s recommended to define
> > primary key for the table and make sure the primary key is one of the
> > unique key sets or primary key of the underlying database table. In
> append
> > mode, Flink will interpret all records as INSERT messages, the INSERT
> > operation may fail if a primary key or unique constraint violation
> happens
> > in the underlying database.
> >
> > See CREATE TABLE DDL
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > >
> > for
> > more details about PRIMARY KEY syntax.
> >
> >
> > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> messages,
> > the INSERT operation may fail if a primary key or unique constraint
> > violation happens in the underlying database.  什么叫append
> > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> >
> > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> >
> >
> >
> > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
> >
> > > 补充sql:
> > >
> > > DDL:
> > >
> > > CREATE TABLE flink_recent_pv_subid
> > > (
> > >     `supply_id` STRING,
> > >     `subid`     STRING,
> > >     `mark`      STRING,
> > >     `time`      STRING,
> > >     `pv`        BIGINT,
> > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > ) WITH (
> > >   'connector.type' = 'jdbc',
> > >
> > >   ......
> > >
> > > );
> > >
> > >
> > > 查询SQL:
> > >
> > > INSERT INTO
> > >     flink_recent_pv_subid
> > > SELECT
> > >     `sid`,
> > >     `subid`,
> > >     `mark`,
> > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > 'yyyyMMddHHmm') as `time`,
> > >     count(1) AS `pv`
> > > FROM baidu_log_view
> > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> MINUTE);
> > >
> > >
> > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
> > >
> > >> @hailongwang 一样的。
> > >>
> > >> 有个情况说明下,我是tumble window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > >>
> > >>
> > >>
> > >>
> > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
> > >>
> > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > >>>
> > >>>
> > >>> Best,
> > >>> Hailong
> > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> > >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
> > >>> >duplicate方式写入。
> > >>> >
> > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> > >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > >>> Method)
> > >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > >>> >NativeConstructorAccessorImpl.java:62)
> > >>> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > >>> >DelegatingConstructorAccessorImpl.java:45)
> > >>> >    at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > >>> >    at
> com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > >>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > >>> >    at
> com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> > >>> >    at
> > >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> > >>> >.java:2157)
> > >>> >    at
> > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> >.java:2460)
> > >>> >    at
> > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> >.java:2377)
> > >>> >    at
> > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> >.java:2361)
> > >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> > >>> >PreparedStatement.java:1793)
> > >>> >
> > >>> >(2)
> > >>> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> > >>> >但这个冲突的entry是在14.11分那一波才报错的。
> > >>>
> > >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

Jark
Administrator
这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。

新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。

Best,
Jark

On Mon, 23 Nov 2020 at 15:39, 赵一旦 <[hidden email]> wrote:

> 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> 页面。
>
>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>
> Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:
>
> > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >
> > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
> >
> > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > >
> > >
> > > Flink uses the primary key that defined in DDL when writing data to
> > > external databases. The connector operate in upsert mode if the primary
> > key
> > > was defined, otherwise, the connector operate in append mode.
> > >
> > > In upsert mode, Flink will insert a new row or update the existing row
> > > according to the primary key, Flink can ensure the idempotence in this
> > way.
> > > To guarantee the output result is as expected, it’s recommended to
> define
> > > primary key for the table and make sure the primary key is one of the
> > > unique key sets or primary key of the underlying database table. In
> > append
> > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > operation may fail if a primary key or unique constraint violation
> > happens
> > > in the underlying database.
> > >
> > > See CREATE TABLE DDL
> > > <
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > >
> > > for
> > > more details about PRIMARY KEY syntax.
> > >
> > >
> > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > messages,
> > > the INSERT operation may fail if a primary key or unique constraint
> > > violation happens in the underlying database.  什么叫append
> > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >
> > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >
> > >
> > >
> > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
> > >
> > > > 补充sql:
> > > >
> > > > DDL:
> > > >
> > > > CREATE TABLE flink_recent_pv_subid
> > > > (
> > > >     `supply_id` STRING,
> > > >     `subid`     STRING,
> > > >     `mark`      STRING,
> > > >     `time`      STRING,
> > > >     `pv`        BIGINT,
> > > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > ) WITH (
> > > >   'connector.type' = 'jdbc',
> > > >
> > > >   ......
> > > >
> > > > );
> > > >
> > > >
> > > > 查询SQL:
> > > >
> > > > INSERT INTO
> > > >     flink_recent_pv_subid
> > > > SELECT
> > > >     `sid`,
> > > >     `subid`,
> > > >     `mark`,
> > > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > 'yyyyMMddHHmm') as `time`,
> > > >     count(1) AS `pv`
> > > > FROM baidu_log_view
> > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > MINUTE);
> > > >
> > > >
> > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
> > > >
> > > >> @hailongwang 一样的。
> > > >>
> > > >> 有个情况说明下,我是tumble
> window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
> > > >>
> > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > >>>
> > > >>>
> > > >>> Best,
> > > >>> Hailong
> > > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> > > >>> >如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert
> on
> > > >>> >duplicate方式写入。
> > > >>> >
> > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key 'uniq_ssmt'
> > > >>> >    at
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > >>> Method)
> > > >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > > >>> >NativeConstructorAccessorImpl.java:62)
> > > >>> >    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > > >>> >    at
> > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > > >>> >    at
> > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > > >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > > >>> >    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > > >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > > >>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > > >>> >    at
> > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> > > >>> >    at
> > > >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> > > >>> >.java:2157)
> > > >>> >    at
> > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > > >>> >.java:2460)
> > > >>> >    at
> > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > > >>> >.java:2377)
> > > >>> >    at
> > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > > >>> >.java:2361)
> > > >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> > > >>> >PreparedStatement.java:1793)
> > > >>> >
> > > >>> >(2)
> > > >>>
> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> > > >>> >但这个冲突的entry是在14.11分那一波才报错的。
> > > >>>
> > > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
发现这种方式也不行,但是加了group by之后是可以的。

(1)
所以说是否还需要query带有key的语义才行呢?
比如group by的结果是可能update的,并且基于group by key也指出了key。

那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?

(2)如JarkWu所说,是mysql表的DDL部分决定。

如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?

Jark Wu <[hidden email]> 于2020年11月23日周一 下午4:28写道:

> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>
> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 15:39, 赵一旦 <[hidden email]> wrote:
>
> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> > 页面。
> >
> >
> >
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >
> > Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:
> >
> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> > >
> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
> > >
> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > > > >
> > > >
> > > > Flink uses the primary key that defined in DDL when writing data to
> > > > external databases. The connector operate in upsert mode if the
> primary
> > > key
> > > > was defined, otherwise, the connector operate in append mode.
> > > >
> > > > In upsert mode, Flink will insert a new row or update the existing
> row
> > > > according to the primary key, Flink can ensure the idempotence in
> this
> > > way.
> > > > To guarantee the output result is as expected, it’s recommended to
> > define
> > > > primary key for the table and make sure the primary key is one of the
> > > > unique key sets or primary key of the underlying database table. In
> > > append
> > > > mode, Flink will interpret all records as INSERT messages, the INSERT
> > > > operation may fail if a primary key or unique constraint violation
> > > happens
> > > > in the underlying database.
> > > >
> > > > See CREATE TABLE DDL
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > > > >
> > > > for
> > > > more details about PRIMARY KEY syntax.
> > > >
> > > >
> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> > > messages,
> > > > the INSERT operation may fail if a primary key or unique constraint
> > > > violation happens in the underlying database.  什么叫append
> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > > >
> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > > >
> > > >
> > > >
> > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
> > > >
> > > > > 补充sql:
> > > > >
> > > > > DDL:
> > > > >
> > > > > CREATE TABLE flink_recent_pv_subid
> > > > > (
> > > > >     `supply_id` STRING,
> > > > >     `subid`     STRING,
> > > > >     `mark`      STRING,
> > > > >     `time`      STRING,
> > > > >     `pv`        BIGINT,
> > > > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
> > > > > ) WITH (
> > > > >   'connector.type' = 'jdbc',
> > > > >
> > > > >   ......
> > > > >
> > > > > );
> > > > >
> > > > >
> > > > > 查询SQL:
> > > > >
> > > > > INSERT INTO
> > > > >     flink_recent_pv_subid
> > > > > SELECT
> > > > >     `sid`,
> > > > >     `subid`,
> > > > >     `mark`,
> > > > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> > > > 'yyyyMMddHHmm') as `time`,
> > > > >     count(1) AS `pv`
> > > > > FROM baidu_log_view
> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
> > > MINUTE);
> > > > >
> > > > >
> > > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
> > > > >
> > > > >> @hailongwang 一样的。
> > > > >>
> > > > >> 有个情况说明下,我是tumble
> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
> > > > >>
> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > > > >>>
> > > > >>>
> > > > >>> Best,
> > > > >>> Hailong
> > > > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> > > > >>> >如题,按照官方文档,当mysql表定义了primary
> key的时候,会使用UpsertTableSink,并且会使用insert
> > on
> > > > >>> >duplicate方式写入。
> > > > >>> >
> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
> 'uniq_ssmt'
> > > > >>> >    at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > >>> Method)
> > > > >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > > > >>> >NativeConstructorAccessorImpl.java:62)
> > > > >>> >    at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > > > >>> >    at
> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > > > >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > > > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > > > >>> >    at
> > > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > > > >>> >    at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > > > >>> >    at
> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > > > >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > > > >>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > > > >>> >    at
> > > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> > > > >>> >    at
> > > > >>>
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> > > > >>> >.java:2157)
> > > > >>> >    at
> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > > > >>> >.java:2460)
> > > > >>> >    at
> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > > > >>> >.java:2377)
> > > > >>> >    at
> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > > > >>> >.java:2361)
> > > > >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> > > > >>> >PreparedStatement.java:1793)
> > > > >>> >
> > > > >>> >(2)
> > > > >>>
> > >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> > > > >>> >但这个冲突的entry是在14.11分那一波才报错的。
> > > > >>>
> > > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
总结下:
(1)group
by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
(2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。

如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。


说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate update方式输出。
甚至DDL中推荐可以搞个自定义on
duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
update功能。




赵一旦 <[hidden email]> 于2020年11月23日周一 下午4:48写道:

> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> 发现这种方式也不行,但是加了group by之后是可以的。
>
> (1)
> 所以说是否还需要query带有key的语义才行呢?
> 比如group by的结果是可能update的,并且基于group by key也指出了key。
>
> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
>
> (2)如JarkWu所说,是mysql表的DDL部分决定。
>
> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
>
> Jark Wu <[hidden email]> 于2020年11月23日周一 下午4:28写道:
>
>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>>
>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>>
>> Best,
>> Jark
>>
>> On Mon, 23 Nov 2020 at 15:39, 赵一旦 <[hidden email]> wrote:
>>
>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>> >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
>> > 页面。
>> >
>> >
>> >
>> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>> >
>> > Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:
>> >
>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>> > >
>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
>> > >
>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>> > > > >
>> > > >
>> > > > Flink uses the primary key that defined in DDL when writing data to
>> > > > external databases. The connector operate in upsert mode if the
>> primary
>> > > key
>> > > > was defined, otherwise, the connector operate in append mode.
>> > > >
>> > > > In upsert mode, Flink will insert a new row or update the existing
>> row
>> > > > according to the primary key, Flink can ensure the idempotence in
>> this
>> > > way.
>> > > > To guarantee the output result is as expected, it’s recommended to
>> > define
>> > > > primary key for the table and make sure the primary key is one of
>> the
>> > > > unique key sets or primary key of the underlying database table. In
>> > > append
>> > > > mode, Flink will interpret all records as INSERT messages, the
>> INSERT
>> > > > operation may fail if a primary key or unique constraint violation
>> > > happens
>> > > > in the underlying database.
>> > > >
>> > > > See CREATE TABLE DDL
>> > > > <
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
>> > > > >
>> > > > for
>> > > > more details about PRIMARY KEY syntax.
>> > > >
>> > > >
>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
>> > > messages,
>> > > > the INSERT operation may fail if a primary key or unique constraint
>> > > > violation happens in the underlying database.  什么叫append
>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>> > > >
>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>> > > >
>> > > >
>> > > >
>> > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
>> > > >
>> > > > > 补充sql:
>> > > > >
>> > > > > DDL:
>> > > > >
>> > > > > CREATE TABLE flink_recent_pv_subid
>> > > > > (
>> > > > >     `supply_id` STRING,
>> > > > >     `subid`     STRING,
>> > > > >     `mark`      STRING,
>> > > > >     `time`      STRING,
>> > > > >     `pv`        BIGINT,
>> > > > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT ENFORCED
>> > > > > ) WITH (
>> > > > >   'connector.type' = 'jdbc',
>> > > > >
>> > > > >   ......
>> > > > >
>> > > > > );
>> > > > >
>> > > > >
>> > > > > 查询SQL:
>> > > > >
>> > > > > INSERT INTO
>> > > > >     flink_recent_pv_subid
>> > > > > SELECT
>> > > > >     `sid`,
>> > > > >     `subid`,
>> > > > >     `mark`,
>> > > > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
>> > > > 'yyyyMMddHHmm') as `time`,
>> > > > >     count(1) AS `pv`
>> > > > > FROM baidu_log_view
>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
>> > > MINUTE);
>> > > > >
>> > > > >
>> > > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
>> > > > >
>> > > > >> @hailongwang 一样的。
>> > > > >>
>> > > > >> 有个情况说明下,我是tumble
>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
>> > > > >>
>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
>> > > > >>>
>> > > > >>>
>> > > > >>> Best,
>> > > > >>> Hailong
>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
>> > > > >>> >如题,按照官方文档,当mysql表定义了primary
>> key的时候,会使用UpsertTableSink,并且会使用insert
>> > on
>> > > > >>> >duplicate方式写入。
>> > > > >>> >
>> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
>> 'uniq_ssmt'
>> > > > >>> >    at
>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> > > > >>> Method)
>> > > > >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>> > > > >>> >NativeConstructorAccessorImpl.java:62)
>> > > > >>> >    at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
>> > > > >>> >    at
>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> > > > >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>> > > > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
>> > > > >>> >    at
>> > > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>> > > > >>> >    at
>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>> > > > >>> >    at
>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>> > > > >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>> > > > >>> >    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>> > > > >>> >    at
>> > > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>> > > > >>> >    at
>> > > > >>>
>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>> > > > >>> >.java:2157)
>> > > > >>> >    at
>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> > > > >>> >.java:2460)
>> > > > >>> >    at
>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> > > > >>> >.java:2377)
>> > > > >>> >    at
>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>> > > > >>> >.java:2361)
>> > > > >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>> > > > >>> >PreparedStatement.java:1793)
>> > > > >>> >
>> > > > >>> >(2)
>> > > > >>>
>> > >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>> > > > >>> >但这个冲突的entry是在14.11分那一波才报错的。
>> > > > >>>
>> > > > >>
>> > > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。

此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。


赵一旦 <[hidden email]> 于2020年11月23日周一 下午5:09写道:

> 总结下:
> (1)group
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
>
>
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
>
>
> 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate update方式输出。
> 甚至DDL中推荐可以搞个自定义on
> duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> update功能。
>
>
>
>
> 赵一旦 <[hidden email]> 于2020年11月23日周一 下午4:48写道:
>
>> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
>> 发现这种方式也不行,但是加了group by之后是可以的。
>>
>> (1)
>> 所以说是否还需要query带有key的语义才行呢?
>> 比如group by的结果是可能update的,并且基于group by key也指出了key。
>>
>> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
>>
>> (2)如JarkWu所说,是mysql表的DDL部分决定。
>>
>> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
>>
>> Jark Wu <[hidden email]> 于2020年11月23日周一 下午4:28写道:
>>
>>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
>>>
>>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 23 Nov 2020 at 15:39, 赵一旦 <[hidden email]> wrote:
>>>
>>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
>>> >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
>>> > 页面。
>>> >
>>> >
>>> >
>>> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
>>> >
>>> > Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:
>>> >
>>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
>>> > >
>>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
>>> > >
>>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>> > > > >
>>> > > >
>>> > > > Flink uses the primary key that defined in DDL when writing data to
>>> > > > external databases. The connector operate in upsert mode if the
>>> primary
>>> > > key
>>> > > > was defined, otherwise, the connector operate in append mode.
>>> > > >
>>> > > > In upsert mode, Flink will insert a new row or update the existing
>>> row
>>> > > > according to the primary key, Flink can ensure the idempotence in
>>> this
>>> > > way.
>>> > > > To guarantee the output result is as expected, it’s recommended to
>>> > define
>>> > > > primary key for the table and make sure the primary key is one of
>>> the
>>> > > > unique key sets or primary key of the underlying database table. In
>>> > > append
>>> > > > mode, Flink will interpret all records as INSERT messages, the
>>> INSERT
>>> > > > operation may fail if a primary key or unique constraint violation
>>> > > happens
>>> > > > in the underlying database.
>>> > > >
>>> > > > See CREATE TABLE DDL
>>> > > > <
>>> > > >
>>> > >
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
>>> > > > >
>>> > > > for
>>> > > > more details about PRIMARY KEY syntax.
>>> > > >
>>> > > >
>>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
>>> > > messages,
>>> > > > the INSERT operation may fail if a primary key or unique constraint
>>> > > > violation happens in the underlying database.  什么叫append
>>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
>>> > > >
>>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
>>> > > >
>>> > > >
>>> > > >
>>> > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
>>> > > >
>>> > > > > 补充sql:
>>> > > > >
>>> > > > > DDL:
>>> > > > >
>>> > > > > CREATE TABLE flink_recent_pv_subid
>>> > > > > (
>>> > > > >     `supply_id` STRING,
>>> > > > >     `subid`     STRING,
>>> > > > >     `mark`      STRING,
>>> > > > >     `time`      STRING,
>>> > > > >     `pv`        BIGINT,
>>> > > > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
>>> ENFORCED
>>> > > > > ) WITH (
>>> > > > >   'connector.type' = 'jdbc',
>>> > > > >
>>> > > > >   ......
>>> > > > >
>>> > > > > );
>>> > > > >
>>> > > > >
>>> > > > > 查询SQL:
>>> > > > >
>>> > > > > INSERT INTO
>>> > > > >     flink_recent_pv_subid
>>> > > > > SELECT
>>> > > > >     `sid`,
>>> > > > >     `subid`,
>>> > > > >     `mark`,
>>> > > > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
>>> > > > 'yyyyMMddHHmm') as `time`,
>>> > > > >     count(1) AS `pv`
>>> > > > > FROM baidu_log_view
>>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL '5'
>>> > > MINUTE);
>>> > > > >
>>> > > > >
>>> > > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
>>> > > > >
>>> > > > >> @hailongwang 一样的。
>>> > > > >>
>>> > > > >> 有个情况说明下,我是tumble
>>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
>>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
>>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
>>> > > > >>
>>> > > > >>
>>> > > > >>
>>> > > > >>
>>> > > > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
>>> > > > >>
>>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
>>> > > > >>>
>>> > > > >>>
>>> > > > >>> Best,
>>> > > > >>> Hailong
>>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
>>> > > > >>> >如题,按照官方文档,当mysql表定义了primary
>>> key的时候,会使用UpsertTableSink,并且会使用insert
>>> > on
>>> > > > >>> >duplicate方式写入。
>>> > > > >>> >
>>> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
>>> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
>>> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
>>> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
>>> 'uniq_ssmt'
>>> > > > >>> >    at
>>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> > > > >>> Method)
>>> > > > >>> >    at sun.reflect.NativeConstructorAccessorImpl.newInstance(
>>> > > > >>> >NativeConstructorAccessorImpl.java:62)
>>> > > > >>> >    at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
>>> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
>>> > > > >>> >    at
>>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> > > > >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>>> > > > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
>>> > > > >>> >    at
>>> > > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
>>> > > > >>> >    at
>>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
>>> > > > >>> >    at
>>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
>>> > > > >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
>>> > > > >>> >    at
>>> com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
>>> > > > >>> >    at
>>> > > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
>>> > > > >>> >    at
>>> > > > >>>
>>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
>>> > > > >>> >.java:2157)
>>> > > > >>> >    at
>>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> > > > >>> >.java:2460)
>>> > > > >>> >    at
>>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> > > > >>> >.java:2377)
>>> > > > >>> >    at
>>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
>>> > > > >>> >.java:2361)
>>> > > > >>> >    at com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
>>> > > > >>> >PreparedStatement.java:1793)
>>> > > > >>> >
>>> > > > >>> >(2)
>>> > > > >>>
>>> > >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
>>> > > > >>> >但这个冲突的entry是在14.11分那一波才报错的。
>>> > > > >>>
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

Jark
Administrator
你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query 是否有更新来决定工作模式的。
如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。

Best,
Jark

On Mon, 23 Nov 2020 at 17:14, 赵一旦 <[hidden email]> wrote:

> duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
>
> 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
>
>
> 赵一旦 <[hidden email]> 于2020年11月23日周一 下午5:09写道:
>
> > 总结下:
> > (1)group
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> >
> >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> >
> >
> > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> update方式输出。
> > 甚至DDL中推荐可以搞个自定义on
> > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on duplicate
> > update功能。
> >
> >
> >
> >
> > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午4:48写道:
> >
> >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> >> 发现这种方式也不行,但是加了group by之后是可以的。
> >>
> >> (1)
> >> 所以说是否还需要query带有key的语义才行呢?
> >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> >>
> >> 那么group by + tumble window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> >>
> >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> >>
> >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> >>
> >> Jark Wu <[hidden email]> 于2020年11月23日周一 下午4:28写道:
> >>
> >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> >>>
> >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦 <[hidden email]> wrote:
> >>>
> >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> >>> >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> >>> > 页面。
> >>> >
> >>> >
> >>> >
> >>>
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> >>> >
> >>> > Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:
> >>> >
> >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> >>> > >
> >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
> >>> > >
> >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> >>> > > > >
> >>> > > >
> >>> > > > Flink uses the primary key that defined in DDL when writing data
> to
> >>> > > > external databases. The connector operate in upsert mode if the
> >>> primary
> >>> > > key
> >>> > > > was defined, otherwise, the connector operate in append mode.
> >>> > > >
> >>> > > > In upsert mode, Flink will insert a new row or update the
> existing
> >>> row
> >>> > > > according to the primary key, Flink can ensure the idempotence in
> >>> this
> >>> > > way.
> >>> > > > To guarantee the output result is as expected, it’s recommended
> to
> >>> > define
> >>> > > > primary key for the table and make sure the primary key is one of
> >>> the
> >>> > > > unique key sets or primary key of the underlying database table.
> In
> >>> > > append
> >>> > > > mode, Flink will interpret all records as INSERT messages, the
> >>> INSERT
> >>> > > > operation may fail if a primary key or unique constraint
> violation
> >>> > > happens
> >>> > > > in the underlying database.
> >>> > > >
> >>> > > > See CREATE TABLE DDL
> >>> > > > <
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> >>> > > > >
> >>> > > > for
> >>> > > > more details about PRIMARY KEY syntax.
> >>> > > >
> >>> > > >
> >>> > > > 这里也有一点,In append mode, Flink will interpret all records as INSERT
> >>> > > messages,
> >>> > > > the INSERT operation may fail if a primary key or unique
> constraint
> >>> > > > violation happens in the underlying database.  什么叫append
> >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> >>> > > >
> >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
> >>> > > >
> >>> > > > > 补充sql:
> >>> > > > >
> >>> > > > > DDL:
> >>> > > > >
> >>> > > > > CREATE TABLE flink_recent_pv_subid
> >>> > > > > (
> >>> > > > >     `supply_id` STRING,
> >>> > > > >     `subid`     STRING,
> >>> > > > >     `mark`      STRING,
> >>> > > > >     `time`      STRING,
> >>> > > > >     `pv`        BIGINT,
> >>> > > > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
> >>> ENFORCED
> >>> > > > > ) WITH (
> >>> > > > >   'connector.type' = 'jdbc',
> >>> > > > >
> >>> > > > >   ......
> >>> > > > >
> >>> > > > > );
> >>> > > > >
> >>> > > > >
> >>> > > > > 查询SQL:
> >>> > > > >
> >>> > > > > INSERT INTO
> >>> > > > >     flink_recent_pv_subid
> >>> > > > > SELECT
> >>> > > > >     `sid`,
> >>> > > > >     `subid`,
> >>> > > > >     `mark`,
> >>> > > > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5' MINUTE),
> >>> > > > 'yyyyMMddHHmm') as `time`,
> >>> > > > >     count(1) AS `pv`
> >>> > > > > FROM baidu_log_view
> >>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL
> '5'
> >>> > > MINUTE);
> >>> > > > >
> >>> > > > >
> >>> > > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
> >>> > > > >
> >>> > > > >> @hailongwang 一样的。
> >>> > > > >>
> >>> > > > >> 有个情况说明下,我是tumble
> >>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> >>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> >>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> >>> > > > >>
> >>> > > > >>
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
> >>> > > > >>
> >>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> >>> > > > >>>
> >>> > > > >>>
> >>> > > > >>> Best,
> >>> > > > >>> Hailong
> >>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> >>> > > > >>> >如题,按照官方文档,当mysql表定义了primary
> >>> key的时候,会使用UpsertTableSink,并且会使用insert
> >>> > on
> >>> > > > >>> >duplicate方式写入。
> >>> > > > >>> >
> >>> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> >>> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> >>> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate entry
> >>> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
> >>> 'uniq_ssmt'
> >>> > > > >>> >    at
> >>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> >>> > > > >>> Method)
> >>> > > > >>> >    at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(
> >>> > > > >>> >NativeConstructorAccessorImpl.java:62)
> >>> > > > >>> >    at
> >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> >>> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
> >>> > > > >>> >    at
> >>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> >>> > > > >>> >    at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> >>> > > > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> >>> > > > >>> >    at
> >>> > > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> >>> > > > >>> >    at
> >>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> >>> > > > >>> >    at
> >>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> >>> > > > >>> >    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> >>> > > > >>> >    at
> >>> com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> >>> > > > >>> >    at
> >>> > > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> >>> > > > >>> >    at
> >>> > > > >>>
> >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> >>> > > > >>> >.java:2157)
> >>> > > > >>> >    at
> >>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> > > > >>> >.java:2460)
> >>> > > > >>> >    at
> >>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> > > > >>> >.java:2377)
> >>> > > > >>> >    at
> >>> > > > com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> >>> > > > >>> >.java:2361)
> >>> > > > >>> >    at
> com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> >>> > > > >>> >PreparedStatement.java:1793)
> >>> > > > >>> >
> >>> > > > >>> >(2)
> >>> > > > >>>
> >>> > >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> >>> > > > >>> >但这个冲突的entry是在14.11分那一波才报错的。
> >>> > > > >>>
> >>> > > > >>
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: JdbcUpsertTableSink输出的时候报‘Duplicate entry’问题

nobleyd
哦哦, 好吧,我一直以为你说的“新旧”是是否指定了update-mode。理解错了。
good,那应该没问题了,我去改改。




Jark Wu <[hidden email]> 于2020年11月23日周一 下午5:18写道:

> 你用的还是老的 connector 吧?也就是 'connector.type' = 'jdbc'。这个是根据 query
> 是否有更新来决定工作模式的。
> 如果你用新的 connector,也就是 'connector' = 'jdbc',这个的行为就是你说的第 (2) 种行为。
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 17:14, 赵一旦 <[hidden email]> wrote:
>
> > duplicate情况可能update pv =values(pv), 也可能 update pv = pv +
> > values(pv),这个根据具体任务统计逻辑决定,所以直接交给用户去设置挺好的。
> >
> > 此外,当前jdbc connector中,对于source情况也支持自定义sql,我指参数connector.read.query'
> > 。所以其实可以类似来个参数connector.write.sql',sql中通过?占位符指定字段啥的。
> >
> >
> > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午5:09写道:
> >
> > > 总结下:
> > > (1)group
> > >
> >
> by查询,输出为update流,配合mysqlSink可实现更新。(DDL中不需要定义update-mode,1.11中本身也不支持update-mode参数了)。
> > > (2)group by+tumble window查询,输出为append流,配合mysqlSink不可实现更新。
> > >
> > >
> > >
> >
> 如上,如果是case(2)想要实现更新方式输出到mysql怎么办呢?感觉Flink不应该基于流的特征决定怎么输出,毕竟涉及到外部存储,实际情况和具体业务逻辑有关,仅根据流的特征决定怎么输出不合适。
> > > 还不如基于MySQL表的DDL(update-mode参数),或者隐式根据是否定义了primary key决定是否update方式输出。
> > >
> > >
> > > 说个假想的示例,如果我指定kafka从过去某个实际重新跑某个任务,这种情况部分输出肯定重复,用户一般都会希望duplicate
> > update方式输出。
> > > 甚至DDL中推荐可以搞个自定义on
> > > duplicate的处理逻辑,因为有些大窗口(天级别)的统计,有时候还可能基于小窗口叠加产生,也是需要依靠mysql提供的on
> duplicate
> > > update功能。
> > >
> > >
> > >
> > >
> > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午4:48写道:
> > >
> > >> 嗯。刚刚我做了个测试,简化,从kafka读入直接写mysql。
> > >> 发现这种方式也不行,但是加了group by之后是可以的。
> > >>
> > >> (1)
> > >> 所以说是否还需要query带有key的语义才行呢?
> > >> 比如group by的结果是可能update的,并且基于group by key也指出了key。
> > >>
> > >> 那么group by + tumble
> window情况下,输出貌似是append的,这种情况是否可以使用upsert方式输出到mysql呢?
> > >>
> > >> (2)如JarkWu所说,是mysql表的DDL部分决定。
> > >>
> > >> 如上1和2,哪个对呢?当然我个人期望是2。但是如果是2的话,那么为什么从kafka读取直接写mysql情况不可以呢?
> > >>
> > >> Jark Wu <[hidden email]> 于2020年11月23日周一 下午4:28写道:
> > >>
> > >>> 这个页面就是我上面说的 旧版connector,已经被废弃了,所以侧边栏没有导航。
> > >>>
> > >>> 新版的 jdbc sink,会根据 ddl 上是否有 PK 来决定是否工作在 upsert 模式。
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>> On Mon, 23 Nov 2020 at 15:39, 赵一旦 <[hidden email]> wrote:
> > >>>
> > >>> > 看了下,1.11中这个页面也有,但好像没左侧导航入口,在sql create页面中有链接可以链到
> > >>> >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#update-modes
> > >>> > 页面。
> > >>> >
> > >>> >
> > >>> >
> > >>>
> >
> 1.11中定义mysql表的时候需不需要显示指定upodate-mode,毕竟同时支持append-mode和upsert-mode。还是说flink会自己去判定。
> > >>> >
> > >>> > Jark Wu <[hidden email]> 于2020年11月23日周一 下午3:32写道:
> > >>> >
> > >>> > > 请用新的 jdbc connector 。老的 jdbc connector 行为不太一样。
> > >>> > >
> > >>> > > On Mon, 23 Nov 2020 at 15:21, 赵一旦 <[hidden email]> wrote:
> > >>> > >
> > >>> > > > 如下是Flink官方文档JBDC connector的部分内容。Key handling
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> > >>> > > > >
> > >>> > > >
> > >>> > > > Flink uses the primary key that defined in DDL when writing
> data
> > to
> > >>> > > > external databases. The connector operate in upsert mode if the
> > >>> primary
> > >>> > > key
> > >>> > > > was defined, otherwise, the connector operate in append mode.
> > >>> > > >
> > >>> > > > In upsert mode, Flink will insert a new row or update the
> > existing
> > >>> row
> > >>> > > > according to the primary key, Flink can ensure the idempotence
> in
> > >>> this
> > >>> > > way.
> > >>> > > > To guarantee the output result is as expected, it’s recommended
> > to
> > >>> > define
> > >>> > > > primary key for the table and make sure the primary key is one
> of
> > >>> the
> > >>> > > > unique key sets or primary key of the underlying database
> table.
> > In
> > >>> > > append
> > >>> > > > mode, Flink will interpret all records as INSERT messages, the
> > >>> INSERT
> > >>> > > > operation may fail if a primary key or unique constraint
> > violation
> > >>> > > happens
> > >>> > > > in the underlying database.
> > >>> > > >
> > >>> > > > See CREATE TABLE DDL
> > >>> > > > <
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> > >>> > > > >
> > >>> > > > for
> > >>> > > > more details about PRIMARY KEY syntax.
> > >>> > > >
> > >>> > > >
> > >>> > > > 这里也有一点,In append mode, Flink will interpret all records as
> INSERT
> > >>> > > messages,
> > >>> > > > the INSERT operation may fail if a primary key or unique
> > constraint
> > >>> > > > violation happens in the underlying database.  什么叫append
> > >>> > > > mode。这个是指根据sql的逻辑自己去判定是否append,还是啥?
> > >>> > > >
> > >>> > > > 1.10的文档中貌似DDL中支持这么定义 'update-mode' = 'append'。但是1.11中没提到这些东西。
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:02写道:
> > >>> > > >
> > >>> > > > > 补充sql:
> > >>> > > > >
> > >>> > > > > DDL:
> > >>> > > > >
> > >>> > > > > CREATE TABLE flink_recent_pv_subid
> > >>> > > > > (
> > >>> > > > >     `supply_id` STRING,
> > >>> > > > >     `subid`     STRING,
> > >>> > > > >     `mark`      STRING,
> > >>> > > > >     `time`      STRING,
> > >>> > > > >     `pv`        BIGINT,
> > >>> > > > >     PRIMARY KEY(`supply_id`, `subid`, `mark`, `time`) NOT
> > >>> ENFORCED
> > >>> > > > > ) WITH (
> > >>> > > > >   'connector.type' = 'jdbc',
> > >>> > > > >
> > >>> > > > >   ......
> > >>> > > > >
> > >>> > > > > );
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > 查询SQL:
> > >>> > > > >
> > >>> > > > > INSERT INTO
> > >>> > > > >     flink_recent_pv_subid
> > >>> > > > > SELECT
> > >>> > > > >     `sid`,
> > >>> > > > >     `subid`,
> > >>> > > > >     `mark`,
> > >>> > > > >     DATE_FORMAT(TUMBLE_END(`event_time`, INTERVAL '5'
> MINUTE),
> > >>> > > > 'yyyyMMddHHmm') as `time`,
> > >>> > > > >     count(1) AS `pv`
> > >>> > > > > FROM baidu_log_view
> > >>> > > > > GROUP BY `sid`, `subid`, `mark`, TUMBLE(event_time, INTERVAL
> > '5'
> > >>> > > MINUTE);
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > 赵一旦 <[hidden email]> 于2020年11月23日周一 下午3:00写道:
> > >>> > > > >
> > >>> > > > >> @hailongwang 一样的。
> > >>> > > > >>
> > >>> > > > >> 有个情况说明下,我是tumble
> > >>> > window统计,所以输出的是append流。duplicate是我手动在flink输出前手动添加进去的。
> > >>> > > > >> 目的在于测试flink这个sink是根据自身历史输出决定是否insert/update(比如retract流情况,
> > >>> > > > >> flink可能有能力知道当前这次输出是该key下第一次,还是第n次输出),还是会判定实际数据库中数据是否存在为准。
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >> hailongwang <[hidden email]> 于2020年11月23日周一 下午2:39写道:
> > >>> > > > >>
> > >>> > > > >>> 数据库中主键的设置跟 primary key 定义的一样不?
> > >>> > > > >>>
> > >>> > > > >>>
> > >>> > > > >>> Best,
> > >>> > > > >>> Hailong
> > >>> > > > >>> 在 2020-11-23 13:15:01,"赵一旦" <[hidden email]> 写道:
> > >>> > > > >>> >如题,按照官方文档,当mysql表定义了primary
> > >>> key的时候,会使用UpsertTableSink,并且会使用insert
> > >>> > on
> > >>> > > > >>> >duplicate方式写入。
> > >>> > > > >>> >
> > >>> > > > >>> >但我在使用中,发现报了 duplicate entry的错误。例如:
> > >>> > > > >>> >Caused by: com.mysql.jdbc.exceptions.jdbc4.
> > >>> > > > >>> >MySQLIntegrityConstraintViolationException: Duplicate
> entry
> > >>> > > > >>> >'2036-feed_landing_box_news-2000-202011231405' for key
> > >>> 'uniq_ssmt'
> > >>> > > > >>> >    at
> > >>> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > >>> > > > >>> Method)
> > >>> > > > >>> >    at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance(
> > >>> > > > >>> >NativeConstructorAccessorImpl.java:62)
> > >>> > > > >>> >    at
> > >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> > >>> > > > >>> >DelegatingConstructorAccessorImpl.java:45)
> > >>> > > > >>> >    at
> > >>> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> > >>> > > > >>> >    at
> com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
> > >>> > > > >>> >    at com.mysql.jdbc.Util.getInstance(Util.java:386)
> > >>> > > > >>> >    at
> > >>> > > com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1041)
> > >>> > > > >>> >    at
> > >>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4190)
> > >>> > > > >>> >    at
> > >>> com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:4122)
> > >>> > > > >>> >    at
> com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2570)
> > >>> > > > >>> >    at
> > >>> com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2731)
> > >>> > > > >>> >    at
> > >>> > > com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2818)
> > >>> > > > >>> >    at
> > >>> > > > >>>
> > >>> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement
> > >>> > > > >>> >.java:2157)
> > >>> > > > >>> >    at
> > >>> > > >
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> > > > >>> >.java:2460)
> > >>> > > > >>> >    at
> > >>> > > >
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> > > > >>> >.java:2377)
> > >>> > > > >>> >    at
> > >>> > > >
> com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement
> > >>> > > > >>> >.java:2361)
> > >>> > > > >>> >    at
> > com.mysql.jdbc.PreparedStatement.executeBatchedInserts(
> > >>> > > > >>> >PreparedStatement.java:1793)
> > >>> > > > >>> >
> > >>> > > > >>> >(2)
> > >>> > > > >>>
> > >>> >
> >此外,还有个小奇怪点,202011231405的数据,其他该时间的数据都在06分输出了(我设置了1min的maxOutOfOrder)。
> > >>> > > > >>> >但这个冲突的entry是在14.11分那一波才报错的。
> > >>> > > > >>>
> > >>> > > > >>
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>