日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Spark?GraphX?分布式圖處理框架圖算法詳解_相關技巧

作者:朝朝mumu ? 更新時間: 2022-11-28 編程語言

正文

Spark GraphX是一個分布式圖處理框架,基于 Pregel 接口實現了常用的圖算法。

包括 PageRank、SVDPlusPlus、TriangleCount、 ConnectedComponents、LPA 等算法,以下通過具象化的圖實例理解相應的算法用途。

Graphx圖結構

Graphx中的Graph有兩個RDD,一個是邊RDD,一個是點RDD

此外,三元組其實就是(點、邊,點)一個有效組合,由triplets()接口獲取,triplets()返回的結果是EdgeTriplet[VD,ED]

1. 最短路徑

最常見的路徑搜索算法(例如DFS & BFS、最短路徑、 最小生成樹、隨機游走等),最短路徑是最容易理解的圖算法,因為大家在生活中能夠廣泛接觸到,如駕駛導航,外賣送餐路線等等。

路徑搜索算法建立在圖搜索算法的基礎上,用來探索節點之間的路徑。這些路徑從一個節點開始,遍歷關系,直到到達目的地,Graphx采用了最短路徑算法Dijkstra的原理。

示例數據

// 輸入一些邊數據
val edgeSeq: Seq[(Int, Int)] = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6),(6, 9),(9, 11)).flatMap(e => Seq(e, e.swap))
val edges = op.sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }

可視化數據

這是上述數據的圖形表示(雙向邊,無權)

計算最短路徑

val graph_sp = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 11).map(_.toLong)
val results = ShortestPaths.run(graph_sp, landmarks).vertices.collect.map {
  case (v, spMap) => (v, spMap.mapValues(i => i))
}

全部結果打印:

println(results.mkString)
(1,Map(1 -> 0, 11 -> 5))
(2,Map(1 -> 1, 11 -> 5))
(3,Map(1 -> 2, 11 -> 4))
(4,Map(1 -> 2, 11 -> 3))
(5,Map(1 -> 1, 11 -> 4))
(6,Map(11 -> 2, 1 -> 3))
(9,Map(11 -> 1, 1 -> 4))
(11,Map(11 -> 0, 1 -> 5))

上述計算了圖中所有點到點1和點11的最短距離,(起點id,Map(目標1 -> 最短路徑長度,目標2 -> 最短路徑長度))。例如5,Map(1 -> 1, 11 -> 4)說明從5到1最短距離是1,5到11的最短距離是4。

2. 網頁排名

PageRank度量一個圖中每個頂點的重要程度,假定從u到v的一條邊代表v的重要性標簽。例如,一個微博用戶被許多其它人粉,該用戶排名很高。GraphX帶有靜態和動態PageRank的實現方法,這些方法在PageRank object中。靜態的PageRank運行固定次數的迭代,而動態的PageRank一直運行,直到收斂。

GraphX有一個我們可以運行PageRank的社交網絡數據集的簡單數據。用戶集在graphx/data/users.txt中,用戶之間的關系在graphx/data/followers.txt中(Spark的源碼或編譯后文件里都包含)。

數據可視化

pagerank算法測試

先說PageRank動態實現,以下調用就是動態的,實際是調用runUntilConvergence()不能指定迭代次數。參數0.0001是個容忍度,是在對圖進行迭代過程中退出迭代的條件,而靜態的PageRank不可傳遞該參數,但可以指定迭代次數【固定次數,所以靜態】。

val graph: Graph[Int, Int] = GraphLoader
      .edgeListFile(op.sc, "followers.txt", canonicalOrientation = true, numEdgePartitions = 1)
val ranks = graph.pageRank(0.0001).vertices.sortBy(_._2, ascending = false)
ranks.take(5).foreach(println(_))

算法結果

(7,1.8138212152810693)
(2,1.0692956678358136)
(4,0.8759124087591241)
(6,0.8759124087591241)
(1,0.6825291496824343)
# join name
(odersky,1.8138212152810693)
(ladygaga,1.0692956678358136)
(justinbieber,0.8759124087591241)
(matei_zaharia,0.8759124087591241)
(BarackObama,0.6825291496824343)

二元組左側是頂點信息,右側是重要程度,也就是分數越高排名越靠前。

這個結果有一些順序跟直觀感受不符,點7最重要毋庸置疑,點1的重要性應該是大于點4的,但是結果不是這樣,那么數據集大一些會更好嗎??

personalizedPageRank()方法還可以進行個性化推薦,比如社交網絡中,給某用戶再推薦一個人,或者對于用戶商品的推薦中,用戶商品兩個實體可以形成一個圖,我們就可以根據具體的某個用戶來給他推薦一些商品。

3. 連通域(連通組件)

連通分量算法用其編號最小的頂點的 ID 標記圖中的每個連通分量。例如,在社交網絡中,連接的組件可以近似集群。

加載圖測試連通域

這里的graph仍然是加載followers.txt數據,spark自帶的有。

val cc: Graph[VertexId, Int] = graph.connectedComponents()
println("連通結果展示++++++++:")
cc.vertices.map(_.swap)
  .groupByKey()
  .map(_._2)
  .foreach(println)

結果展示(跟圖形觀察的結果是一致):

連通結果展示++++++++:
CompactBuffer(4, 1, 2)
CompactBuffer(6, 3, 7)

可以看到是2個域。結果圖數據本身不是這樣組織的,為了便于理解進行了聚合。原始數據collect回來是這樣:

Array((4,1), (1,1), (6,3), (3,3), (7,3), (2,1))

元組左側是頂點,右側表示歸屬,這個結果符合預期。

生成圖測試

val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
      sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
        Edge(4L,5L,""), Edge(6L,7L,""))))
    g.connectedComponents
      .vertices
      .map(_.swap)
      .groupByKey()
      .map(_._2)
      .foreach(println)

圖實例的形態展示

這樣的代碼便于自行組織一套圖數據,按自己意思進行修改,運行上述代碼得到結果是:

CompactBuffer(1)
CompactBuffer(2, 3, 4, 5)
CompactBuffer(6, 7)

強連接網絡就是:在這個網絡中無論你從哪個頂點開始,其他所有頂點都是可達的。

強連接域的計算

g.stronglyConnectedComponents(3)
      .vertices.map(_.swap)
      .groupByKey()
      .map(_._2)
      .filter(_.size > 1)
      .foreach(println)

過濾掉那些單點的域,那么強連接的計算結果是CompactBuffer(2, 3, 5)

4. 三角計數

當一個頂點有兩個相鄰的頂點并且它們之間有一條邊時,它就是三角形的一部分。需要注意的是,在計算社交網絡數據集的三角形計數時,TriangleCount需要邊的方向是規范的方向(srcId < dstId),并且圖通過Graph.partitionBy分片過。

三角計數統計應用場景:大規模的社區發現,通過該算法可以做群體檢測。只要是跟大規模小團體檢測方面該算法都可以很好的支持,算法是找出擁有三角形環關系的最多的頂點。

Triangle Count的算法思想如下:

  • 計算每個結點的鄰結點;
  • 對通過每條邊的兩個頂點相聯的頂點的相鄰點集合計算交集,并找出交集中id大于前兩個結點id的結點;
  • 對每個結點統計Triangle總數,注意只統計符合計算方向的Triangle Count。

代碼測試

val graph2 = graph.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph2.triangleCount().vertices
println(triCounts.collect().mkString("\n"))

開頭先對圖graph進行了分片得到graph2。

測試結果

這個意思是6,3,7頂點分別擁有1個三角環,而其他頂點沒有,實際上正是6,3,7組成了三角。

(4,0)
(1,0)
(6,1)
(3,1)
(7,1)
(2,0)

5. 標簽傳播算法(LPA)

Label Propagation,是一種基于圖的半監督學習算法(Semi-supervised learning),應用場景為:社區發現(Community detection)。社區發現的過程就是一種聚類的過程。主要是用于團體檢測,LPA能夠以接近線性復雜度去檢測一個大規模圖中的團體結構,主要思想是給所有頂點中的密集連接組打上一個唯一標簽,這些擁有相同標簽的組就是所謂的團體。

它不保證收斂,且迭代次數足夠多之后,所有聯通節點最終收斂為一個社區。

該算法也可以用于半監督學習(大部分沒有標簽,小部分有標簽),給那些沒有標簽的通過標簽傳播算法進行打標簽。也可以應用于風控,對于通過已有風險評估的人,通過社交網絡去評估跟其有關系的人的風險。

基本思想

標簽傳播算法的應用場景是不重疊社區發現,其基本思想是:將一個節點的鄰居節點的標簽中數量最多的標簽作為該節點自身的標簽。給每個節點添加標簽(label)以代表它所屬的社區,并通過標簽的“傳播”形成同一標簽的“社區”結構。簡而言之,你的鄰居屬于哪個label最多,你就屬于哪個label。該算法的有點是收斂周期短,除了迭代次數無需任何先驗參數(不需事先指定社區個數和大小),算法執行過程中不需要計算任何社區指標。

原文鏈接:https://juejin.cn/post/7156143614848925726

欄目分類
最近更新