網(wǎng)站首頁 編程語言 正文
Spark Sql之count distinct
- 學習內(nèi)容
- spark 對count(distinct)的優(yōu)化
- 數(shù)據(jù)膨脹原理
- distinct數(shù)據(jù)膨脹
- grouping sets數(shù)據(jù)膨脹
- 開個坑
- distinct源碼
- spark sql grouping sets
- 優(yōu)化思路
- 1、增加 expand的過程中partition 的數(shù)量
- 2、縮減expand 的數(shù)據(jù)量
- 參考
學習內(nèi)容
- spark sql count(distinct)
- 數(shù)據(jù)膨脹
- count(distinct)原理
- grouping sets原理
- count(distinct)優(yōu)化
spark 對count(distinct)的優(yōu)化
先說結(jié)論:spark sql和hive不一樣,spark對count(distinct)做了group by優(yōu)化
在hive中count().
hive往往只用一個 reduce 來處理全局聚合函數(shù),最后導(dǎo)致數(shù)據(jù)傾斜;在不考慮其它因素的情況下,我們的優(yōu)化方案是先 group by 再 count 。
--優(yōu)化前
select count(distinct id) from table_a
--優(yōu)化后
select
count(id)
from
(
select
id
from table_a group by id
) tmp
在使用spark sql 時,不用擔心這個問題,因為 spark 對count distinct 做了優(yōu)化:
explain
select
count(distinct id),
count(distinct name)
from table_a
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
+- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
+- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
+- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
+- *(1) Project [id#146979, name#146984]
+- *(1) FileScan parquet table_a
數(shù)據(jù)膨脹原理
從上述執(zhí)行計劃可以看到,expand,那為什么為產(chǎn)生數(shù)據(jù)膨脹吶?
distinct算子在處理過程中是將distinct后的字段和group by字段共同作為key傳入reduce,導(dǎo)致shuffle前map階段沒有預(yù)聚合,同時shuffle時網(wǎng)絡(luò)傳輸數(shù)據(jù)量過大消耗增加,對reduce處理時負載也增大
distinct算子在處理過程中會將原有數(shù)據(jù)膨脹,有N個DISTINCT關(guān)鍵字數(shù)據(jù)就會在map端膨脹N倍,同時對shuffle和reduce的長尾影響(原因1)也會擴大N
expand 之后,再以id、name 為 key 進行HashAggregate 也就是 group by ,這樣以來,就相當于去重了。后面直接計算count (id) 、 count(name) 就可以,把數(shù)據(jù)分而治之。 在一定程度上緩解了數(shù)據(jù)傾斜。
distinct數(shù)據(jù)膨脹
val sql:String =
s"""
|select
| count(distinct sha1),
| count(distinct task_id),
| count(distinct task_type)
|from tmp
|""".stripMargin
val df2: DataFrame = session.sql(sql)
df2.show()
df2.explain(true)
grouping sets數(shù)據(jù)膨脹
val sql1:String =
s"""
|select
| count(sha1),
| count(task_id),
| count(task_type)
|from (
|select sha1,task_id,task_type
|from tmp
|group by grouping sets(sha1, task_id, task_type)
|)
|""".stripMargin
val df22: DataFrame = session.sql(sql1)
df22.explain(true)
df22.show()
開個坑
在spark sql里面小數(shù)據(jù)量的話,count(distinct)和gruop by的執(zhí)行時間是差不多的,
但是我看到有篇文章介紹的是大數(shù)據(jù)量的distinct和group by的對比,說的是大數(shù)據(jù)量的話無法在內(nèi)存里HashAggregate也就是group by,兩者的執(zhí)行時間的差距還是很大的。具體的還沒測試。。。
distinct源碼
def rewrite(a: Aggregate): Aggregate = {
// 把所有聚合表式取出來
val aggExpressions = a.aggregateExpressions.flatMap { e =>
e.collect {
case ae: AggregateExpression => ae
}
}
// 抽取出含有 distinct的聚合表達式
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
unfoldableChildren
} else {
e.aggregateFunction.children.take(1).toSet
}
}
//todo 當有多個distinct聚合表達式時,進行expand
if (distinctAggGroups.size > 1) {
// 創(chuàng)建gid標志
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
case ne: NamedExpression => ne -> ne.toAttribute
case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
}
val groupByAttrs = groupByMap.map(_._2)
....
}
// 構(gòu)建Expand算子
val expand = Expand(
regularAggProjection ++ distinctAggProjections,
groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
a.child)
.....
}
重點代碼:
//todo 當有多個distinct聚合表達式時,進行expand
if (distinctAggGroups.size > 1) { expand }
spark sql grouping sets
grouping sets 、rollup 、cube 是用來處理多維分析的函數(shù):
grouping sets:對分組集中指定的組表達式的每個子集執(zhí)行g(shù)roup by,group by A,B grouping sets(A,B)就等價于 group by A union group by B,其中A和B也可以是一個集合,比如group by A,B,C grouping sets((A,B),(A,C))。
rollup:在指定表達式的每個層次級別創(chuàng)建分組集。group by A,B,C with rollup首先會對(A、B、C)進行g(shù)roup by,然后對(A、B)進行g(shù)roup by,然后是(A)進行g(shù)roup by,最后對全表進行g(shù)roup by操作。
cube : 為指定表達式集的每個可能組合創(chuàng)建分組集。首先會對(A、B、C)進行g(shù)roup by,然后依次是(A、B),(A、C),(A),(B、C),(B),?,最后對全表進行g(shù)roup by操作。
前文也說了,grouping sets也是利用expand的方式
優(yōu)化思路
上文我們基本可以了解到了,是由于expand導(dǎo)致的慢,優(yōu)化方向可以朝著減少distinct關(guān)鍵的出現(xiàn)的次數(shù),減少數(shù)據(jù)膨脹方向入手
1、增加 expand的過程中partition 的數(shù)量
但是這樣有一個弊端:同時啟動太多task 會造成集群資源緊張,也會導(dǎo)致其它任務(wù)沒有資源。并且數(shù)據(jù)是 逐日增加的,總體上不好控制。
2、縮減expand 的數(shù)據(jù)量
從sql結(jié)構(gòu)上:
可以把計算的指標拆開,分兩次計算,然后再 join。
總體的處理原則就是,讓過濾掉的數(shù)據(jù)盡量的多,expand 時的數(shù)據(jù)盡量少:
參考
參考博客
原文鏈接:https://blog.csdn.net/Lzx116/article/details/126153664
相關(guān)推薦
- 2022-08-13 微信公眾號--根據(jù)用戶的opneId發(fā)送模版消息
- 2022-03-26 R語言基于Keras的MLP神經(jīng)網(wǎng)絡(luò)及環(huán)境搭建_R語言
- 2022-04-26 Swift踩坑實戰(zhàn)之一個字符引發(fā)的Crash_Swift
- 2023-03-01 GoLang中Strconv庫有哪些常用方法_Golang
- 2023-10-09 注冊頁面編寫
- 2023-10-15 lua 如何在嵌入式Linux中與c語言結(jié)合
- 2022-04-01 python+selenium對table表和分頁處理_python
- 2023-03-28 Linux版本中Nginx平滑升級與回退_nginx
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支