網站首頁 編程語言 正文
Spark Sql之count distinct
- 學習內容
- spark 對count(distinct)的優化
- 數據膨脹原理
- distinct數據膨脹
- grouping sets數據膨脹
- 開個坑
- distinct源碼
- spark sql grouping sets
- 優化思路
- 1、增加 expand的過程中partition 的數量
- 2、縮減expand 的數據量
- 參考
學習內容
- spark sql count(distinct)
- 數據膨脹
- count(distinct)原理
- grouping sets原理
- count(distinct)優化
spark 對count(distinct)的優化
先說結論:spark sql和hive不一樣,spark對count(distinct)做了group by優化
在hive中count().
hive往往只用一個 reduce 來處理全局聚合函數,最后導致數據傾斜;在不考慮其它因素的情況下,我們的優化方案是先 group by 再 count 。
--優化前
select count(distinct id) from table_a
--優化后
select
count(id)
from
(
select
id
from table_a group by id
) tmp
在使用spark sql 時,不用擔心這個問題,因為 spark 對count distinct 做了優化:
explain
select
count(distinct id),
count(distinct name)
from table_a
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
+- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
+- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
+- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
+- *(1) Project [id#146979, name#146984]
+- *(1) FileScan parquet table_a
數據膨脹原理
從上述執行計劃可以看到,expand,那為什么為產生數據膨脹吶?
distinct算子在處理過程中是將distinct后的字段和group by字段共同作為key傳入reduce,導致shuffle前map階段沒有預聚合,同時shuffle時網絡傳輸數據量過大消耗增加,對reduce處理時負載也增大
distinct算子在處理過程中會將原有數據膨脹,有N個DISTINCT關鍵字數據就會在map端膨脹N倍,同時對shuffle和reduce的長尾影響(原因1)也會擴大N
expand 之后,再以id、name 為 key 進行HashAggregate 也就是 group by ,這樣以來,就相當于去重了。后面直接計算count (id) 、 count(name) 就可以,把數據分而治之。 在一定程度上緩解了數據傾斜。
distinct數據膨脹
val sql:String =
s"""
|select
| count(distinct sha1),
| count(distinct task_id),
| count(distinct task_type)
|from tmp
|""".stripMargin
val df2: DataFrame = session.sql(sql)
df2.show()
df2.explain(true)
grouping sets數據膨脹
val sql1:String =
s"""
|select
| count(sha1),
| count(task_id),
| count(task_type)
|from (
|select sha1,task_id,task_type
|from tmp
|group by grouping sets(sha1, task_id, task_type)
|)
|""".stripMargin
val df22: DataFrame = session.sql(sql1)
df22.explain(true)
df22.show()
開個坑
在spark sql里面小數據量的話,count(distinct)和gruop by的執行時間是差不多的,
但是我看到有篇文章介紹的是大數據量的distinct和group by的對比,說的是大數據量的話無法在內存里HashAggregate也就是group by,兩者的執行時間的差距還是很大的。具體的還沒測試。。。
distinct源碼
def rewrite(a: Aggregate): Aggregate = {
// 把所有聚合表式取出來
val aggExpressions = a.aggregateExpressions.flatMap { e =>
e.collect {
case ae: AggregateExpression => ae
}
}
// 抽取出含有 distinct的聚合表達式
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
unfoldableChildren
} else {
e.aggregateFunction.children.take(1).toSet
}
}
//todo 當有多個distinct聚合表達式時,進行expand
if (distinctAggGroups.size > 1) {
// 創建gid標志
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
case ne: NamedExpression => ne -> ne.toAttribute
case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
}
val groupByAttrs = groupByMap.map(_._2)
....
}
// 構建Expand算子
val expand = Expand(
regularAggProjection ++ distinctAggProjections,
groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
a.child)
.....
}
重點代碼:
//todo 當有多個distinct聚合表達式時,進行expand
if (distinctAggGroups.size > 1) { expand }
spark sql grouping sets
grouping sets 、rollup 、cube 是用來處理多維分析的函數:
grouping sets:對分組集中指定的組表達式的每個子集執行group by,group by A,B grouping sets(A,B)就等價于 group by A union group by B,其中A和B也可以是一個集合,比如group by A,B,C grouping sets((A,B),(A,C))。
rollup:在指定表達式的每個層次級別創建分組集。group by A,B,C with rollup首先會對(A、B、C)進行group by,然后對(A、B)進行group by,然后是(A)進行group by,最后對全表進行group by操作。
cube : 為指定表達式集的每個可能組合創建分組集。首先會對(A、B、C)進行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),?,最后對全表進行group by操作。
前文也說了,grouping sets也是利用expand的方式
優化思路
上文我們基本可以了解到了,是由于expand導致的慢,優化方向可以朝著減少distinct關鍵的出現的次數,減少數據膨脹方向入手
1、增加 expand的過程中partition 的數量
但是這樣有一個弊端:同時啟動太多task 會造成集群資源緊張,也會導致其它任務沒有資源。并且數據是 逐日增加的,總體上不好控制。
2、縮減expand 的數據量
從sql結構上:
可以把計算的指標拆開,分兩次計算,然后再 join。
總體的處理原則就是,讓過濾掉的數據盡量的多,expand 時的數據盡量少:
參考
參考博客
原文鏈接:https://blog.csdn.net/Lzx116/article/details/126153664
相關推薦
- 2022-05-27 android實現簡單拼圖游戲_Android
- 2022-06-06 判斷一個元素是否在可視區域中
- 2022-06-08 優化使用Feign進行Rpc調用,支持對象傳參自動轉換
- 2023-11-14 樹莓派以及linux ubuntu 上,各種依賴不滿足,修復不了:E: Release file f
- 2022-12-11 C語言冷知識之預處理字符串操作符詳解_C 語言
- 2022-08-14 在WPF中合并兩個ObservableCollection集合_C#教程
- 2022-11-25 DOS窗口命令和單表簡單查詢_DOS/BAT
- 2022-06-26 Android?app啟動節點與上報啟動實例詳解_Android
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支