日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Spark Sql之count(distinct)分析&&學習&&驗證

作者:南風知我意丿 更新時間: 2022-09-05 編程語言

Spark Sql之count distinct

  • 學習內容
  • spark 對count(distinct)的優化
  • 數據膨脹原理
      • distinct數據膨脹
      • grouping sets數據膨脹
      • 開個坑
  • distinct源碼
  • spark sql grouping sets
  • 優化思路
    • 1、增加 expand的過程中partition 的數量
    • 2、縮減expand 的數據量
  • 參考

學習內容

  • spark sql count(distinct)
  • 數據膨脹
  • count(distinct)原理
  • grouping sets原理
  • count(distinct)優化

spark 對count(distinct)的優化

先說結論:spark sql和hive不一樣,spark對count(distinct)做了group by優化

在hive中count().
hive往往只用一個 reduce 來處理全局聚合函數,最后導致數據傾斜;在不考慮其它因素的情況下,我們的優化方案是先 group by 再 count 。

--優化前
select count(distinct id) from table_a 

--優化后
select 
  count(id)
from
(
    select 
        id
    from table_a group by id
) tmp

在使用spark sql 時,不用擔心這個問題,因為 spark 對count distinct 做了優化:

explain 
select 
    count(distinct id),
    count(distinct name) 
from table_a
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
      +- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
         +- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
            +- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
               +- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
                  +- *(1) Project [id#146979, name#146984]
                     +- *(1) FileScan parquet table_a

數據膨脹原理

從上述執行計劃可以看到,expand,那為什么為產生數據膨脹吶?

distinct算子在處理過程中是將distinct后的字段和group by字段共同作為key傳入reduce,導致shuffle前map階段沒有預聚合,同時shuffle時網絡傳輸數據量過大消耗增加,對reduce處理時負載也增大

distinct算子在處理過程中會將原有數據膨脹,有N個DISTINCT關鍵字數據就會在map端膨脹N倍,同時對shuffle和reduce的長尾影響(原因1)也會擴大N

在這里插入圖片描述

expand 之后,再以id、name 為 key 進行HashAggregate 也就是 group by ,這樣以來,就相當于去重了。后面直接計算count (id) 、 count(name) 就可以,把數據分而治之。 在一定程度上緩解了數據傾斜。

distinct數據膨脹

 val sql:String =
    s"""
       |select
       |  count(distinct sha1),
       |  count(distinct task_id),
       |  count(distinct task_type)
       |from tmp
       |""".stripMargin

    val df2: DataFrame = session.sql(sql)
    df2.show()
    df2.explain(true)

在這里插入圖片描述

grouping sets數據膨脹

    val sql1:String =
      s"""
         |select
         |  count(sha1),
         |  count(task_id),
         |  count(task_type)
         |from (
         |select sha1,task_id,task_type
         |from tmp
         |group by grouping sets(sha1, task_id, task_type)
         |)
         |""".stripMargin

    val df22: DataFrame = session.sql(sql1)
    df22.explain(true)
    df22.show()

在這里插入圖片描述

開個坑

在spark sql里面小數據量的話,count(distinct)和gruop by的執行時間是差不多的,
但是我看到有篇文章介紹的是大數據量的distinct和group by的對比,說的是大數據量的話無法在內存里HashAggregate也就是group by,兩者的執行時間的差距還是很大的。具體的還沒測試。。。


distinct源碼

def rewrite(a: Aggregate): Aggregate = {
    // 把所有聚合表式取出來
    val aggExpressions = a.aggregateExpressions.flatMap { e =>
      e.collect {
        case ae: AggregateExpression => ae
      }
    }
    // 抽取出含有 distinct的聚合表達式
    val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
        val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
        if (unfoldableChildren.nonEmpty) {
          // Only expand the unfoldable children
          unfoldableChildren
        } else {        
          e.aggregateFunction.children.take(1).toSet
        }
    }
    //todo 當有多個distinct聚合表達式時,進行expand
    if (distinctAggGroups.size > 1) {
      // 創建gid標志
      val gid = AttributeReference("gid", IntegerType, nullable = false)()
      val groupByMap = a.groupingExpressions.collect {
        case ne: NamedExpression => ne -> ne.toAttribute
        case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
      }
      val groupByAttrs = groupByMap.map(_._2)
      ....     
      }

      // 構建Expand算子
      val expand = Expand(
        regularAggProjection ++ distinctAggProjections,
        groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
        a.child)        
        .....
  }

重點代碼:
//todo 當有多個distinct聚合表達式時,進行expand
if (distinctAggGroups.size > 1) { expand }


spark sql grouping sets

grouping sets 、rollup 、cube 是用來處理多維分析的函數:

grouping sets:對分組集中指定的組表達式的每個子集執行group by,group by A,B grouping sets(A,B)就等價于 group by A union group by B,其中A和B也可以是一個集合,比如group by A,B,C grouping sets((A,B),(A,C))。

rollup:在指定表達式的每個層次級別創建分組集。group by A,B,C with rollup首先會對(A、B、C)進行group by,然后對(A、B)進行group by,然后是(A)進行group by,最后對全表進行group by操作。

cube : 為指定表達式集的每個可能組合創建分組集。首先會對(A、B、C)進行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),?,最后對全表進行group by操作。

前文也說了,grouping sets也是利用expand的方式


優化思路

上文我們基本可以了解到了,是由于expand導致的慢,優化方向可以朝著減少distinct關鍵的出現的次數,減少數據膨脹方向入手

1、增加 expand的過程中partition 的數量

但是這樣有一個弊端:同時啟動太多task 會造成集群資源緊張,也會導致其它任務沒有資源。并且數據是 逐日增加的,總體上不好控制。

2、縮減expand 的數據量

從sql結構上:
可以把計算的指標拆開,分兩次計算,然后再 join。
總體的處理原則就是,讓過濾掉的數據盡量的多,expand 時的數據盡量少:

參考

參考博客

原文鏈接:https://blog.csdn.net/Lzx116/article/details/126153664

欄目分類
最近更新