首頁 > 軟體

Spark GraphX 分散式圖處理框架圖演演算法詳解

2022-10-22 14:01:01

正文

Spark GraphX是一個分散式圖處理框架,基於 Pregel 介面實現了常用的圖演演算法。

包括 PageRank、SVDPlusPlus、TriangleCount、 ConnectedComponents、LPA 等演演算法,以下通過具象化的圖範例理解相應的演演算法用途。

Graphx圖結構

Graphx中的Graph有兩個RDD,一個是邊RDD,一個是點RDD

此外,三元組其實就是(點、邊,點)一個有效組合,由triplets()介面獲取,triplets()返回的結果是EdgeTriplet[VD,ED]

1. 最短路徑

最常見的路徑搜尋演演算法(例如DFS & BFS、最短路徑、 最小生成樹、隨機遊走等),最短路徑是最容易理解的圖演演算法,因為大家在生活中能夠廣泛接觸到,如駕駛導航,外賣送餐路線等等。

路徑搜尋演演算法建立在圖搜尋演演算法的基礎上,用來探索節點之間的路徑。這些路徑從一個節點開始,遍歷關係,直到到達目的地,Graphx採用了最短路徑演演算法Dijkstra的原理。

範例資料

// 輸入一些邊資料
val edgeSeq: Seq[(Int, Int)] = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6),(6, 9),(9, 11)).flatMap(e => Seq(e, e.swap))
val edges = op.sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }

視覺化資料

這是上述資料的圖形表示(雙向邊,無權)

計算最短路徑

val graph_sp = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 11).map(_.toLong)
val results = ShortestPaths.run(graph_sp, landmarks).vertices.collect.map {
  case (v, spMap) => (v, spMap.mapValues(i => i))
}

全部結果列印:

println(results.mkString)
(1,Map(1 -> 0, 11 -> 5))
(2,Map(1 -> 1, 11 -> 5))
(3,Map(1 -> 2, 11 -> 4))
(4,Map(1 -> 2, 11 -> 3))
(5,Map(1 -> 1, 11 -> 4))
(6,Map(11 -> 2, 1 -> 3))
(9,Map(11 -> 1, 1 -> 4))
(11,Map(11 -> 0, 1 -> 5))

上述計算了圖中所有點到點1和點11的最短距離,(起點id,Map(目標1 -> 最短路徑長度,目標2 -> 最短路徑長度))。例如5,Map(1 -> 1, 11 -> 4)說明從5到1最短距離是1,5到11的最短距離是4。

2. 網頁排名

PageRank度量一個圖中每個頂點的重要程度,假定從u到v的一條邊代表v的重要性標籤。例如,一個微博使用者被許多其它人粉,該使用者排名很高。GraphX帶有靜態和動態PageRank的實現方法,這些方法在PageRank object中。靜態的PageRank執行固定次數的迭代,而動態的PageRank一直執行,直到收斂。

GraphX有一個我們可以執行PageRank的社群網路資料集的簡單資料。使用者集在graphx/data/users.txt中,使用者之間的關係在graphx/data/followers.txt中(Spark的原始碼或編譯後檔案裡都包含)。

資料視覺化

pagerank演演算法測試

先說PageRank動態實現,以下呼叫就是動態的,實際是呼叫runUntilConvergence()不能指定迭代次數。引數0.0001是個容忍度,是在對圖進行迭代過程中退出迭代的條件,而靜態的PageRank不可傳遞該引數,但可以指定迭代次數【固定次數,所以靜態】。

val graph: Graph[Int, Int] = GraphLoader
      .edgeListFile(op.sc, "followers.txt", canonicalOrientation = true, numEdgePartitions = 1)
val ranks = graph.pageRank(0.0001).vertices.sortBy(_._2, ascending = false)
ranks.take(5).foreach(println(_))

演演算法結果

(7,1.8138212152810693)
(2,1.0692956678358136)
(4,0.8759124087591241)
(6,0.8759124087591241)
(1,0.6825291496824343)
# join name
(odersky,1.8138212152810693)
(ladygaga,1.0692956678358136)
(justinbieber,0.8759124087591241)
(matei_zaharia,0.8759124087591241)
(BarackObama,0.6825291496824343)

二元組左側是頂點資訊,右側是重要程度,也就是分數越高排名越靠前。

這個結果有一些順序跟直觀感受不符,點7最重要毋庸置疑,點1的重要性應該是大於點4的,但是結果不是這樣,那麼資料集大一些會更好嗎??

personalizedPageRank()方法還可以進行個性化推薦,比如社群網路中,給某使用者再推薦一個人,或者對於使用者商品的推薦中,使用者商品兩個實體可以形成一個圖,我們就可以根據具體的某個使用者來給他推薦一些商品。

3. 連通域(連通元件)

連通分量演演算法用其編號最小的頂點的 ID 標記圖中的每個連通分量。例如,在社群網路中,連線的元件可以近似叢集。

載入圖測試連通域

這裡的graph仍然是載入followers.txt資料,spark自帶的有。

val cc: Graph[VertexId, Int] = graph.connectedComponents()
println("連通結果展示++++++++:")
cc.vertices.map(_.swap)
  .groupByKey()
  .map(_._2)
  .foreach(println)

結果展示(跟圖形觀察的結果是一致):

連通結果展示++++++++:
CompactBuffer(4, 1, 2)
CompactBuffer(6, 3, 7)

可以看到是2個域。結果圖資料本身不是這樣組織的,為了便於理解進行了聚合。原始資料collect回來是這樣:

Array((4,1), (1,1), (6,3), (3,3), (7,3), (2,1))

元組左側是頂點,右側表示歸屬,這個結果符合預期。

生成圖測試

val g = Graph(sc.makeRDD((1L to 7L).map((_,""))),
      sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""), Edge(3L,2L,""),
        Edge(4L,5L,""), Edge(6L,7L,""))))
    g.connectedComponents
      .vertices
      .map(_.swap)
      .groupByKey()
      .map(_._2)
      .foreach(println)

圖範例的形態展示

這樣的程式碼便於自行組織一套圖資料,按自己意思進行修改,執行上述程式碼得到結果是:

CompactBuffer(1)
CompactBuffer(2, 3, 4, 5)
CompactBuffer(6, 7)

強連線網路就是:在這個網路中無論你從哪個頂點開始,其他所有頂點都是可達的。

強連線域的計算

g.stronglyConnectedComponents(3)
      .vertices.map(_.swap)
      .groupByKey()
      .map(_._2)
      .filter(_.size > 1)
      .foreach(println)

過濾掉那些單點的域,那麼強連線的計算結果是CompactBuffer(2, 3, 5)

4. 三角計數

當一個頂點有兩個相鄰的頂點並且它們之間有一條邊時,它就是三角形的一部分。需要注意的是,在計算社群網路資料集的三角形計數時,TriangleCount需要邊的方向是規範的方向(srcId < dstId),並且圖通過Graph.partitionBy分片過。

三角計數統計應用場景:大規模的社群發現,通過該演演算法可以做群體檢測。只要是跟大規模小團體檢測方面該演演算法都可以很好的支援,演演算法是找出擁有三角形環關係的最多的頂點。

Triangle Count的演演算法思想如下:

  • 計算每個結點的鄰結點;
  • 對通過每條邊的兩個頂點相聯的頂點的相鄰點集合計算交集,並找出交集中id大於前兩個結點id的結點;
  • 對每個結點統計Triangle總數,注意只統計符合計算方向的Triangle Count。

程式碼測試

val graph2 = graph.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph2.triangleCount().vertices
println(triCounts.collect().mkString("n"))

開頭先對圖graph進行了分片得到graph2。

測試結果

這個意思是6,3,7頂點分別擁有1個三角環,而其他頂點沒有,實際上正是6,3,7組成了三角。

(4,0)
(1,0)
(6,1)
(3,1)
(7,1)
(2,0)

5. 標籤傳播演演算法(LPA)

Label Propagation,是一種基於圖的半監督學習演演算法(Semi-supervised learning),應用場景為:社群發現(Community detection)。社群發現的過程就是一種聚類的過程。主要是用於團體檢測,LPA能夠以接近線性複雜度去檢測一個大規模圖中的團體結構,主要思想是給所有頂點中的密集連線組打上一個唯一標籤,這些擁有相同標籤的組就是所謂的團體。

它不保證收斂,且迭代次數足夠多之後,所有聯通節點最終收斂為一個社群。

該演演算法也可以用於半監督學習(大部分沒有標籤,小部分有標籤),給那些沒有標籤的通過標籤傳播演演算法進行打標籤。也可以應用於風控,對於通過已有風險評估的人,通過社群網路去評估跟其有關係的人的風險。

基本思想

標籤傳播演演算法的應用場景是不重疊社群發現,其基本思想是:將一個節點的鄰居節點的標籤中數量最多的標籤作為該節點自身的標籤。給每個節點新增標籤(label)以代表它所屬的社群,並通過標籤的“傳播”形成同一標籤的“社群”結構。簡而言之,你的鄰居屬於哪個label最多,你就屬於哪個label。該演演算法的有點是收斂週期短,除了迭代次數無需任何先驗引數(不需事先指定社群個數和大小),演演算法執行過程中不需要計算任何社群指標。

以上就是Spark GraphX 分散式圖處理框架圖演演算法詳解的詳細內容,更多關於Spark GraphX 圖演演算法的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com