hi all:
请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ? 还是简单通过java的set容器去重的呢? |
Hi hiliuxg,
count distinct 用的MapVIew来做的去重: 在batch场景下,MapView的底层实现就是HashMap; 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 hiliuxg <[hidden email]> 于2020年3月19日周四 下午11:31写道: > hi all: > 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ? > 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
hi,
我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html> ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li <[hidden email]> 于2020年3月20日周五 上午9:50写道: > Hi hiliuxg, > > count distinct 用的MapVIew来做的去重: > 在batch场景下,MapView的底层实现就是HashMap; > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 > > hiliuxg <[hidden email]> 于2020年3月19日周四 下午11:31写道: > > > hi all: > > 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ? > > 还是简单通过java的set容器去重的呢? > > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: [hidden email]; [hidden email] > |
In reply to this post by hiroot
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
原始邮件 发件人:[hidden email] 收件人:[hidden email] 发送时间:2020年3月20日(周五) 11:44 主题:Re: flink sql 去重算法 hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li [hidden email] 于2020年3月20日周五 上午9:50写道: Hi hiliuxg, count distinct 用的MapVIew来做的去重: 在batch场景下,MapView的底层实现就是HashMap; 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 hiliuxg [hidden email] 于2020年3月19日周四 下午11:31写道: hi all: 请问flink sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ? 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [hidden email]; [hidden email] |
In reply to this post by hiroot
Hi zhisheng,
我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。 比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置), 总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。 对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。 Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用 Compaction Filter 算法来清理。 第二个就是使用增量 Checkpoint 方式吧。 Best wishes, LakeShen lucas.wu <[hidden email]> 于2020年3月20日周五 上午11:50写道: > 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。 > > > 原始邮件 > 发件人:[hidden email] > 收件人:[hidden email] > 发送时间:2020年3月20日(周五) 11:44 > 主题:Re: flink sql 去重算法 > > > hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state > 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS > 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration "> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li [hidden email] 于2020年3月20日周五 > 上午9:50写道: Hi hiliuxg, count distinct 用的MapVIew来做的去重: > 在batch场景下,MapView的底层实现就是HashMap; > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 > hiliuxg [hidden email] 于2020年3月19日周四 下午11:31写道: hi all: 请问flink > sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ? > 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics > Engineering and Computer Science, Peking University Tel:+86-15650713730 > Email: [hidden email]; [hidden email] |
hi,LakeShen
1、那个大状态作业之前是我们算法同学写的,是没加官网说的 query_configuration 这个配置,在我的指导下,已经加上 2、Flink 框架层我已经做了默认的配置,使用 RocksDB,并且是增量的,但是还是发现每次 Checkpoint 状态非常大 最近我梳理了下公司的大状态作业,发现通常有这么几个特性: 1、SQL 作业 2、长时间的分组滑动窗口 3、使用 distinct 等关键字的 因为是 SQL 作业,开发可能只关心了自己的业务逻辑,而没有去关注这种性能的问题,所以也就可能会导致这种大状态的问题,目前是我自己把这些大状态的作业捞出来后,都一个个联系优化后再上线的,后面我再看看怎么在框架层做到加上这种优化的配置。 Best wishes, zhisheng LakeShen <[hidden email]> 于2020年3月20日周五 下午1:36写道: > Hi zhisheng, > > 我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。 > 比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置), > 总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。 > > 对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。 > Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用 > Compaction Filter 算法来清理。 > > 第二个就是使用增量 Checkpoint 方式吧。 > > Best wishes, > LakeShen > > > > lucas.wu <[hidden email]> 于2020年3月20日周五 上午11:50写道: > > > 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。 > > > > > > 原始邮件 > > 发件人:[hidden email] > > 收件人:[hidden email] > > 发送时间:2020年3月20日(周五) 11:44 > > 主题:Re: flink sql 去重算法 > > > > > > hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state > > 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS > > 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration "> > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li [hidden email] 于2020年3月20日周五 > > 上午9:50写道: Hi hiliuxg, count distinct 用的MapVIew来做的去重: > > 在batch场景下,MapView的底层实现就是HashMap; > > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 > > hiliuxg [hidden email] 于2020年3月19日周四 下午11:31写道: hi all: 请问flink > > sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ? > > 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics > > Engineering and Computer Science, Peking University Tel:+86-15650713730 > > Email: [hidden email]; [hidden email] > |
Hi zhisheng,
咱们遇到的问题差不多,昨天遇到一个滑动窗口状态很大的问题,由于业务方设置的滑动窗口的窗口时间大(比如一天、三天),同时也是统计 count 之类的操作,状态很大。 这种滑动窗口操作,我觉得可以先通过滚动窗口(比如10、20分钟)来计算一次,然后业务方使用的时候,扫描最近一段的时间滚动窗口计算的指标值,然后相加。 还有一种方式,就是使用 ProcessFunction + Timer 的方式来处理这种滑动窗口的计算[1]。 类似 count(distinct) 这种,目前我还没有比较好的方式来解决,也在研究中。 [1] https://stackoverflow.com/questions/51977741/flink-performance-issue-with-sliding-time-window Best wishes, LakeShen zhisheng <[hidden email]> 于2020年3月20日周五 下午2:21写道: > hi,LakeShen > > 1、那个大状态作业之前是我们算法同学写的,是没加官网说的 query_configuration 这个配置,在我的指导下,已经加上 > > 2、Flink 框架层我已经做了默认的配置,使用 RocksDB,并且是增量的,但是还是发现每次 Checkpoint 状态非常大 > > 最近我梳理了下公司的大状态作业,发现通常有这么几个特性: > > 1、SQL 作业 > > 2、长时间的分组滑动窗口 > > 3、使用 distinct 等关键字的 > > 因为是 SQL > > 作业,开发可能只关心了自己的业务逻辑,而没有去关注这种性能的问题,所以也就可能会导致这种大状态的问题,目前是我自己把这些大状态的作业捞出来后,都一个个联系优化后再上线的,后面我再看看怎么在框架层做到加上这种优化的配置。 > > Best wishes, > > zhisheng > > LakeShen <[hidden email]> 于2020年3月20日周五 下午1:36写道: > > > Hi zhisheng, > > > > 我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。 > > 比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置), > > 总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。 > > > > 对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。 > > Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用 > > Compaction Filter 算法来清理。 > > > > 第二个就是使用增量 Checkpoint 方式吧。 > > > > Best wishes, > > LakeShen > > > > > > > > lucas.wu <[hidden email]> 于2020年3月20日周五 上午11:50写道: > > > > > 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。 > > > > > > > > > 原始邮件 > > > 发件人:[hidden email] > > > 收件人:[hidden email] > > > 发送时间:2020年3月20日(周五) 11:44 > > > 主题:Re: flink sql 去重算法 > > > > > > > > > hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state > > > 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS > > > 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration "> > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > > ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li [hidden email] 于2020年3月20日周五 > > > 上午9:50写道: Hi hiliuxg, count distinct 用的MapVIew来做的去重: > > > 在batch场景下,MapView的底层实现就是HashMap; > > > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 > > > hiliuxg [hidden email] 于2020年3月19日周四 下午11:31写道: hi all: > 请问flink > > > sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ? > > > 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics > > > Engineering and Computer Science, Peking University > Tel:+86-15650713730 > > > Email: [hidden email]; [hidden email] > > > |
hi, LakeShen
那我这边的场景和你还不太一样,我这边主要是 SQL 作业才有这种问题,算法和数仓的同学他们没有使用 DataStream API 开发,自然也用不上 ProcessFunction 和 Timer,另外场景也不太一样,我遇到的这几个大状态作业才开三小时的滑动窗口(一分钟滑一次),没有你的那种(一天/三天)这么长。不过还是感谢你! Best wishes, zhisheng LakeShen <[hidden email]> 于2020年3月20日周五 下午3:23写道: > Hi zhisheng, > > 咱们遇到的问题差不多,昨天遇到一个滑动窗口状态很大的问题,由于业务方设置的滑动窗口的窗口时间大(比如一天、三天),同时也是统计 count > 之类的操作,状态很大。 > 这种滑动窗口操作,我觉得可以先通过滚动窗口(比如10、20分钟)来计算一次,然后业务方使用的时候,扫描最近一段的时间滚动窗口计算的指标值,然后相加。 > > 还有一种方式,就是使用 ProcessFunction + Timer 的方式来处理这种滑动窗口的计算[1]。 > > 类似 count(distinct) 这种,目前我还没有比较好的方式来解决,也在研究中。 > > [1] > > https://stackoverflow.com/questions/51977741/flink-performance-issue-with-sliding-time-window > > Best wishes, > LakeShen > > > > > zhisheng <[hidden email]> 于2020年3月20日周五 下午2:21写道: > > > hi,LakeShen > > > > 1、那个大状态作业之前是我们算法同学写的,是没加官网说的 query_configuration 这个配置,在我的指导下,已经加上 > > > > 2、Flink 框架层我已经做了默认的配置,使用 RocksDB,并且是增量的,但是还是发现每次 Checkpoint 状态非常大 > > > > 最近我梳理了下公司的大状态作业,发现通常有这么几个特性: > > > > 1、SQL 作业 > > > > 2、长时间的分组滑动窗口 > > > > 3、使用 distinct 等关键字的 > > > > 因为是 SQL > > > > > 作业,开发可能只关心了自己的业务逻辑,而没有去关注这种性能的问题,所以也就可能会导致这种大状态的问题,目前是我自己把这些大状态的作业捞出来后,都一个个联系优化后再上线的,后面我再看看怎么在框架层做到加上这种优化的配置。 > > > > Best wishes, > > > > zhisheng > > > > LakeShen <[hidden email]> 于2020年3月20日周五 下午1:36写道: > > > > > Hi zhisheng, > > > > > > 我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。 > > > 比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置), > > > 总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。 > > > > > > 对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。 > > > Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用 > > > Compaction Filter 算法来清理。 > > > > > > 第二个就是使用增量 Checkpoint 方式吧。 > > > > > > Best wishes, > > > LakeShen > > > > > > > > > > > > lucas.wu <[hidden email]> 于2020年3月20日周五 上午11:50写道: > > > > > > > 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。 > > > > > > > > > > > > 原始邮件 > > > > 发件人:[hidden email] > > > > 收件人:[hidden email] > > > > 发送时间:2020年3月20日(周五) 11:44 > > > > 主题:Re: flink sql 去重算法 > > > > > > > > > > > > hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint > state > > > > 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS > > > > 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration "> > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > > > ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li [hidden email] > 于2020年3月20日周五 > > > > 上午9:50写道: Hi hiliuxg, count distinct 用的MapVIew来做的去重: > > > > 在batch场景下,MapView的底层实现就是HashMap; > > > > 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。 > > > > hiliuxg [hidden email] 于2020年3月19日周四 下午11:31写道: hi all: > > 请问flink > > > > sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ? > > > > 还是简单通过java的set容器去重的呢? -- Benchao Li School of Electronics > > > > Engineering and Computer Science, Peking University > > Tel:+86-15650713730 > > > > Email: [hidden email]; [hidden email] > > > > > > |
Free forum by Nabble | Edit this page |