Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

anonnius
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} 
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} 
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} 
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} 
6> 第一次结果(这里sql-cli的sql一直在运行)

    wStart                                      wEnd                        pv                        uv

2020-09-18T09:14          2020-09-18T09:16                  5                         3

2020-09-18T09:16          2020-09-18T09:18                  8                         3

2020-09-18T09:18          2020-09-18T09:20                  8                         3

2020-09-18T09:20          2020-09-18T09:22                  2                         2

7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)

wStart                                        wEnd                           pv                        uv
2020-09-18T09:14          2020-09-18T09:16                  2                         2
2020-09-18T09:16          2020-09-18T09:18                  2                         2
2020-09-18T09:18          2020-09-18T09:20                  8                         3
2020-09-18T09:20          2020-09-18T09:22                  2                         2
8> 详细过程以放入附件文件中





 


flink-sql-kafka.txt (13K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

Evan
先占个楼
我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
最后得到的结果是这样的 :(跟题主不一样)

                 wStart                      wEnd                        pv                        uv
          2020-09-18T09:14          2020-09-18T09:16                         2                         2
          2020-09-18T09:16          2020-09-18T09:18                         8                         3
          2020-09-18T09:18          2020-09-18T09:20                         8                         3
          2020-09-18T09:20          2020-09-18T09:22                         2                         2

等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
wStart                                        wEnd                           pv                        uv
2020-09-18T09:14          2020-09-18T09:16                  2                         2
2020-09-18T09:16          2020-09-18T09:18                  2                         2
2020-09-18T09:18          2020-09-18T09:20                  8                         3
2020-09-18T09:20          2020-09-18T09:22                  2                         2



 
发件人: anonnius
发送时间: 2020-09-18 11:24
收件人: user-zh
主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}
6> 第一次结果(这里sql-cli的sql一直在运行)
    wStart                                      wEnd                        pv                        uv
2020-09-18T09:14          2020-09-18T09:16                  5                         3
2020-09-18T09:16          2020-09-18T09:18                  8                         3
2020-09-18T09:18          2020-09-18T09:20                  8                         3
2020-09-18T09:20          2020-09-18T09:22                  2                         2
7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
wStart                                        wEnd                           pv                        uv
2020-09-18T09:14          2020-09-18T09:16                  2                         2
2020-09-18T09:16          2020-09-18T09:18                  2                         2
2020-09-18T09:18          2020-09-18T09:20                  8                         3
2020-09-18T09:20          2020-09-18T09:22                  2                         2
8> 详细过程以放入附件文件中





 
Reply | Threaded
Open this post in threaded view
|

回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

Evan
In reply to this post by anonnius
Hi,问题已经找到了
你的kafka是3个分区,第一次消费你是边发送数据边消费,这时可以认为watermark就是按照你发送数据的顺序生成的,会按照你发送数据的顺序触发计算,所以得到的结果也是你预想的结果。

第二次消费,你的数据不再生产了,这时kafka中的数据可认为是历史数据,你的scan.startup.mode设置的是earliest-offset,这时候flink消费的也是历史数据,因为是3个分区,所以flink会先消费完1个分区内的数据,然后再依次消费另外2个分区的数据,kafka跨分区不能保证有序,所以这时候watermark是按照flink消费数据的顺序生成的,也就是按照如下顺序生成watermark的(此时部分数据会被当做迟到的数据遗弃掉):

select * from iservVisit
                      type                      uuid                clientTime                   rowtime
        iservVisit                         a                1600391663       2020-09-18T09:14:23
                iservVisit                         b                1600391748       2020-09-18T09:15:48
                iservVisit                         b                1600391823       2020-09-18T09:17:03
                                                                                                                                                ---此时触发2020-09-18 09:14 - 2020-09-18 09:16
                iservVisit                         a                1600391857       2020-09-18T09:17:37
                iservVisit                         c                1600391903       2020-09-18T09:18:23
                iservVisit                         b                1600391938       2020-09-18T09:18:58
                iservVisit                         b                1600391970       2020-09-18T09:19:30
                                                                                                                                               ---此时触发2020-09-18 09:16 - 2020-09-18 09:18
                iservVisit                         a                1600392057       2020-09-18T09:20:57
                iservVisit                         c                1600391684       2020-09-18T09:14:44        
                iservVisit                         c                1600391709       2020-09-18T09:15:09      
                iservVisit                         b                1600391781       2020-09-18T09:16:21
                iservVisit                         a                1600391815       2020-09-18T09:16:55
                iservVisit                         b                1600391851       2020-09-18T09:17:31
                iservVisit                         a                1600391945       2020-09-18T09:19:05
                iservVisit                         c                1600391936       2020-09-18T09:18:56
                iservVisit                         c                1600391993       2020-09-18T09:19:53
                iservVisit                         a                1600391690       2020-09-18T09:14:50
                iservVisit                         c                1600391782       2020-09-18T09:16:22
                iservVisit                         b                1600391822       2020-09-18T09:17:02
                iservVisit                         a                1600391870       2020-09-18T09:17:50
                iservVisit                         a                1600391889       2020-09-18T09:18:09
                iservVisit                         b                1600391951       2020-09-18T09:19:11
                iservVisit                         c                1600392016       2020-09-18T09:20:16
                iservVisit                         a                1800392057       2027-01-20T04:54:17
                                                                                                                                           ---此时触发2020-09-18 09:18 - 2020-09-18 09:20 以及 2020-09-18 09:20 - 2020-09-18 09:22



PS: 你可以把你的topic设置成1个分区,这样就可以保证数据整体有序,每次查询得到的结果正确且一样的了。
如果分析的有误,敬请指正!



 
发件人: anonnius
发送时间: 2020-09-18 11:24
收件人: user-zh
主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)
然后在sql-cli执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
5> 向kafka生产者依次发送下面的json消息
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}
{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}
{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}
{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}
6> 第一次结果(这里sql-cli的sql一直在运行)
    wStart                                      wEnd                        pv                        uv
2020-09-18T09:14          2020-09-18T09:16                  5                         3
2020-09-18T09:16          2020-09-18T09:18                  8                         3
2020-09-18T09:18          2020-09-18T09:20                  8                         3
2020-09-18T09:20          2020-09-18T09:22                  2                         2
7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
wStart                                        wEnd                           pv                        uv
2020-09-18T09:14          2020-09-18T09:16                  2                         2
2020-09-18T09:16          2020-09-18T09:18                  2                         2
2020-09-18T09:18          2020-09-18T09:20                  8                         3
2020-09-18T09:20          2020-09-18T09:22                  2                         2
8> 详细过程以放入附件文件中





 
Reply | Threaded
Open this post in threaded view
|

Re:回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

anonnius
In reply to this post by anonnius
hi: 感觉你的关注和回复
1> 下面是我的分析过程
1. 第一次是, 先在sql-client.sh 中执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)

此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入, 并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的

2. 第二次是, 退出sql-client.sh后在执行sql
select  
    tumble_start(rowtime, interval '2' MINUTE) as wStart,
    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
    count(1) as pv,
    count(distinct uuid) as uv 
from iservVisit
group by tumble(rowtime, interval '2' MINUTE)
这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和 手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了

3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
create table iservVisit (
    type string comment '时间类型',
    uuid string comment '用户uri',
    clientTime string comment '10位时间戳',
    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
    WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列, 作为watermark, 有1分钟变为5分钟
) with (
    'connector' = 'kafka-0.10',
    'topic' = 'message-json',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'consumer-rt',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)
4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点







在 2020-09-18 14:42:42,"[hidden email]" <[hidden email]> 写道: >先占个楼 >我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果 >select > tumble_start(rowtime, interval '2' MINUTE) as wStart, > tumble_end(rowtime, interval '2' MINUTE) as wEnd, > count(1) as pv, > count(distinct uuid) as uv >from iservVisit >group by tumble(rowtime, interval '2' MINUTE) >最后得到的结果是这样的 :(跟题主不一样) > > wStart wEnd pv uv > 2020-09-18T09:14 2020-09-18T09:16 2 2 > 2020-09-18T09:16 2020-09-18T09:18 8 3 > 2020-09-18T09:18 2020-09-18T09:20 8 3 > 2020-09-18T09:20 2020-09-18T09:22 2 2 > >等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的): >wStart wEnd pv uv >2020-09-18T09:14 2020-09-18T09:16 2 2 >2020-09-18T09:16 2020-09-18T09:18 2 2 >2020-09-18T09:18 2020-09-18T09:20 8 3 >2020-09-18T09:20 2020-09-18T09:22 2 2 > > > > >发件人: anonnius >发送时间: 2020-09-18 11:24 >收件人: user-zh >主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同 >hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么 >0> mac本地环境 >1> flink 1.11.1 >2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1 >3> 使用的是sql-client.sh 环境 >4> 先在sql-cli中创建了iservVisit表 >create table iservVisit ( > type string comment '时间类型', > uuid string comment '用户uri', > clientTime string comment '10位时间戳', > rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型 > WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark >) with ( > 'connector' = 'kafka-0.10', > 'topic' = 'message-json', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'consumer-rt', > 'format' = 'json', > 'json.ignore-parse-errors' = 'true', > 'scan.startup.mode' = 'earliest-offset' >) >然后在sql-cli执行sql >select > tumble_start(rowtime, interval '2' MINUTE) as wStart, > tumble_end(rowtime, interval '2' MINUTE) as wEnd, > count(1) as pv, > count(distinct uuid) as uv >from iservVisit >group by tumble(rowtime, interval '2' MINUTE) >5> 向kafka生产者依次发送下面的json消息 >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"} >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"} >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"} >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"} >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"} >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"} >{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"} >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"} >{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"} >6> 第一次结果(这里sql-cli的sql一直在运行) > wStart wEnd pv uv >2020-09-18T09:14 2020-09-18T09:16 5 3 >2020-09-18T09:16 2020-09-18T09:18 8 3 >2020-09-18T09:18 2020-09-18T09:20 8 3 >2020-09-18T09:20 2020-09-18T09:22 2 2 >7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行) >wStart wEnd pv uv >2020-09-18T09:14 2020-09-18T09:16 2 2 >2020-09-18T09:16 2020-09-18T09:18 2 2 >2020-09-18T09:18 2020-09-18T09:20 8 3 >2020-09-18T09:20 2020-09-18T09:22 2 2 >8> 详细过程以放入附件文件中 > > > > > >


 



 


flink-sql-kafka.txt (15K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

nobleyd
有人解答下,flink sql情况下的watermark生成是否有datastream api中的多分区取小机制呢?

这个问题datastream api是肯定不存在的。
情况1: 如果10个分区,来10个并发即可,然后在后续跟上watermark生成,本身watermark会合并取小。

情况2: 即使是2个并发,每个并发消费5个分区,但只要利用kafkaSouce提供的watermark生成机制也不会有这个问题。


anonnius <[hidden email]> 于2020年9月18日周五 下午3:47写道:

> hi: 感觉你的关注和回复
> 1> 下面是我的分析过程
> 1. 第一次是, 先在sql-client.sh 中执行sql
> select
>     tumble_start(rowtime, interval '2' MINUTE) as wStart,
>     tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>     count(1) as pv,
>     count(distinct uuid) as uv
> from iservVisit
> group by tumble(rowtime, interval '2' MINUTE)
>
> 此时, 由于数据 是一条一条的通过kafka生产者工具(kafka-console-producer.sh)写入,
> 并且由kafka-connector会不停的消费数据, 获取的数据是和手动写入的数据的顺序是一样的
>
> 2. 第二次是, 退出sql-client.sh后在执行sql
> select
>     tumble_start(rowtime, interval '2' MINUTE) as wStart,
>     tumble_end(rowtime, interval '2' MINUTE) as wEnd,
>     count(1) as pv,
>     count(distinct uuid) as uv
> from iservVisit
> group by tumble(rowtime, interval '2' MINUTE)
> 这时由于数据已经写入kafka了, 在由kafka-connector进行消费的时候, 由于topic有3个分区, 消费后获取的消息的顺序和
> 手动通过kafka生产者工具(kafka-console-producer.sh)写入时的顺序
> 不一致了, 这样rowtime时间靠后的数据可能先被消费, 导致产生了比较大的watermark, 导致后续消费的部分消息被忽略了
>
> 3. 通过将建表时 watermark的间隔变大些, 能还原第一次的结果, 这种方式还是考虑中(考虑是否一致有效)
> create table iservVisit (
>     type string comment '时间类型',
>     uuid string comment '用户uri',
>     clientTime string comment '10位时间戳',
>     rowtime as
> to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10)
> as bigint))), -- 计算列, 10位时间戳转为timestamp类型
>     WATERMARK for rowtime as rowtime - INTERVAL '5' MINUTE -- 计算列,
> 作为watermark, 有1分钟变为5分钟
> ) with (
>     'connector' = 'kafka-0.10',
>     'topic' = 'message-json',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'properties.group.id' = 'consumer-rt',
>     'format' = 'json',
>     'json.ignore-parse-errors' = 'true',
>     'scan.startup.mode' = 'earliest-offset'
> )
> 4. 初步结论是: 如何保证/或通过什么办法, 让每个分区的消费数据的速度保持一致
> 5. 附件可以通过sublime sql/hql插件查看, 这样显示会清晰点
>
>
>
>
>
>
>
> 在 2020-09-18 14:42:42,"[hidden email]" <[hidden email]> 写道:
> >先占个楼
> >我按照题主给的文档,一边发送数据,一边执行以下SQL实时查看查询结果
> >select
> >    tumble_start(rowtime, interval '2' MINUTE) as wStart,
> >    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> >    count(1) as pv,
> >    count(distinct uuid) as uv
> >from iservVisit
> >group by tumble(rowtime, interval '2' MINUTE)
> >最后得到的结果是这样的 :(跟题主不一样)
> >
> >                 wStart                      wEnd                        pv                        uv
> >          2020-09-18T09:14          2020-09-18T09:16                         2                         2
> >          2020-09-18T09:16          2020-09-18T09:18                         8                         3
> >          2020-09-18T09:18          2020-09-18T09:20                         8                         3
> >          2020-09-18T09:20          2020-09-18T09:22                         2                         2
> >
> >等所有数据都发送完,退出sql-client然后再执行上边的查询语句最后得到的结果:(跟题主是一样的):
> >wStart                                        wEnd                           pv                        uv
> >2020-09-18T09:14          2020-09-18T09:16                  2                         2
> >2020-09-18T09:16          2020-09-18T09:18                  2                         2
> >2020-09-18T09:18          2020-09-18T09:20                  8                         3
> >2020-09-18T09:20          2020-09-18T09:22                  2                         2
> >
> >
> >
> >
> >发件人: anonnius
> >发送时间: 2020-09-18 11:24
> >收件人: user-zh
> >主题: Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同
> >hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
> >0> mac本地环境
> >1> flink 1.11.1
> >2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
> >3> 使用的是sql-client.sh 环境
> >4> 先在sql-cli中创建了iservVisit表
> >create table iservVisit (
> >    type string comment '时间类型',
> >    uuid string comment '用户uri',
> >    clientTime string comment '10位时间戳',
> >    rowtime as to_timestamp(from_unixtime(cast(substring(coalesce(clientTime, '0'), 1, 10) as bigint))), -- 计算列, 10位时间戳转为timestamp类型
> >    WATERMARK for rowtime as rowtime - INTERVAL '1' MINUTE -- 计算列, 作为watermark
> >) with (
> >    'connector' = 'kafka-0.10',
> >    'topic' = 'message-json',
> >    'properties.bootstrap.servers' = 'localhost:9092',
> >    'properties.group.id' = 'consumer-rt',
> >    'format' = 'json',
> >    'json.ignore-parse-errors' = 'true',
> >    'scan.startup.mode' = 'earliest-offset'
> >)
> >然后在sql-cli执行sql
> >select
> >    tumble_start(rowtime, interval '2' MINUTE) as wStart,
> >    tumble_end(rowtime, interval '2' MINUTE) as wEnd,
> >    count(1) as pv,
> >    count(distinct uuid) as uv
> >from iservVisit
> >group by tumble(rowtime, interval '2' MINUTE)
> >5> 向kafka生产者依次发送下面的json消息
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391684"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391663"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391690"}
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391709"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391748"}
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391782"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391781"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391823"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391822"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391815"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391857"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391870"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391851"}
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391903"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391889"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600391945"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391938"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391951"}
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391936"}
> >{"type": "iservVisit", "uuid": "b", "clientTime": "1600391970"}
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600392016"}
> >{"type": "iservVisit", "uuid": "c", "clientTime": "1600391993"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1600392057"}
> >{"type": "iservVisit", "uuid": "a", "clientTime": "1800392057"}
> >6> 第一次结果(这里sql-cli的sql一直在运行)
> >    wStart                                      wEnd                        pv                        uv
> >2020-09-18T09:14          2020-09-18T09:16                  5                         3
> >2020-09-18T09:16          2020-09-18T09:18                  8                         3
> >2020-09-18T09:18          2020-09-18T09:20                  8                         3
> >2020-09-18T09:20          2020-09-18T09:22                  2                         2
> >7> 第二次结果(退出[Quit]sql-cli中的sql, 在次运行)
> >wStart                                        wEnd                           pv                        uv
> >2020-09-18T09:14          2020-09-18T09:16                  2                         2
> >2020-09-18T09:16          2020-09-18T09:18                  2                         2
> >2020-09-18T09:18          2020-09-18T09:20                  8                         3
> >2020-09-18T09:20          2020-09-18T09:22                  2                         2
> >8> 详细过程以放入附件文件中
> >
> >
> >
> >
> >
> >
>
>
>
>
>
>
>
>