Xgraph

November 13, 2018    Spark

Xgraph

  • 그래프 연산을 쉽게 할 수 있는 spark 패키지이다.
  • Xgraph는 두가지 요소를 가지고 있다.
    • Vertex RDD: 노드, degree of connectedness
    • EdgeRDD: 두개의 노드가 연결된 것
  • Xgraph를 이용해 BFS search를 쉽게 구현할 수 있다.


코드 예시(notebook 코드 참조)

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._


make vertex

val names = sc.textFile("./data/Marvel-names.txt")
names: org.apache.spark.rdd.RDD[String] = ./data/Marvel-names.txt MapPartitionsRDD[1] at textFile at <console>:41


  • for example. first row
    • 1 col: index
    • 2 col: name
var fields = names.take(1)(0).split("\"")
fields: Array[String] = Array("1 ", 24-HOUR MAN/EMMANUEL)


val heroID:Long = fields(0).trim().toLong
heroID: Long = 1


Some( fields(0).trim().toLong, fields(1))
res0: Some[(Long, String)] = Some((1,24-HOUR MAN/EMMANUEL))


  • 위 내용을 함수로 표현하면 아래와 같다.
def parseNames(line: String) : Option[(VertexId, String)] = {
    var fields = line.split("\"")
    if (fields.length > 1) {
      val heroID:Long = fields(0).trim().toLong
      if (heroID < 6487) {  // ID's above 6486 aren't real characters
        return Some( fields(0).trim().toLong, fields(1))
      }
    } 

    return None // flatmap will just discard None results, and extract data from Some results.
}

val verts = names.flatMap(parseNames)
parseNames: (line: String)Option[(org.apache.spark.graphx.VertexId, String)]
verts: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = MapPartitionsRDD[2] at flatMap at <console>:59


  • 그래프의 vertex를 생성한 것을 확인 할 수 있다.
verts.take(10).foreach(println)
(1,24-HOUR MAN/EMMANUEL)
(2,3-D MAN/CHARLES CHAN)
(3,4-D MAN/MERCURIO)
(4,8-BALL/)
(5,A)
(6,A'YIN)
(7,ABBOTT, JACK)
(8,ABCISSA)
(9,ABEL)
(10,ABOMINATION/EMIL BLO)


make edges

val lines = sc.textFile("./data/Marvel-graph.txt")
lines: org.apache.spark.rdd.RDD[String] = ./data/Marvel-graph.txt MapPartitionsRDD[4] at textFile at <console>:41


import scala.collection.mutable.ListBuffer
var _edges = new ListBuffer[Edge[Int]]()
import scala.collection.mutable.ListBuffer
_edges: scala.collection.mutable.ListBuffer[org.apache.spark.graphx.Edge[Int]] = ListBuffer()


  • for example. first row
val _fields = lines.take(1)(0).split(" ")
_fields: Array[String] = Array(5988, 748, 1722, 3752, 4655, 5743, 1872, 3413, 5527, 6368, 6085, 4319, 4728, 1636, 2397, 3364, 4001, 1614, 1819, 1585, 732, 2660, 3952, 2507, 3891, 2070, 2239, 2602, 612, 1352, 5447, 4548, 1596, 5488, 1605, 5517, 11, 479, 2554, 2043, 17, 865, 4292, 6312, 473, 534, 1479, 6375, 4456)


val _origin = _fields(0)
_origin: String = 5988


  • Edge는 xgraph에 있는 패키지 함수이다.
    • 연결된 노드를 tuple로 묶어주며 0으로 초기화
for (x <- 1 to (_fields.length - 1)) {
  // Our attribute field is unused, but in other graphs could
  // be used to deep track of physical distances etc.
  _edges += Edge(_origin.toLong, _fields(x).toLong, 0)
}


_edges
res2: scala.collection.mutable.ListBuffer[org.apache.spark.graphx.Edge[Int]] = ListBuffer(Edge(5988,748,0), Edge(5988,1722,0), Edge(5988,3752,0), Edge(5988,4655,0), Edge(5988,5743,0), Edge(5988,1872,0), Edge(5988,3413,0), Edge(5988,5527,0), Edge(5988,6368,0), Edge(5988,6085,0), Edge(5988,4319,0), Edge(5988,4728,0), Edge(5988,1636,0), Edge(5988,2397,0), Edge(5988,3364,0), Edge(5988,4001,0), Edge(5988,1614,0), Edge(5988,1819,0), Edge(5988,1585,0), Edge(5988,732,0), Edge(5988,2660,0), Edge(5988,3952,0), Edge(5988,2507,0), Edge(5988,3891,0), Edge(5988,2070,0), Edge(5988,2239,0), Edge(5988,2602,0), Edge(5988,612,0), Edge(5988,1352,0), Edge(5988,5447,0), Edge(5988,4548,0), Edge(5988,1596,0), Edge(5988,5488,0), Edge(5988,1605,0), Edge(5988,5517,0), Edge(5988,11,0), Edge(5988,479,0), Edge(5988,...


  • 모든 데이터 쌍에 대해서 적용해보자.
def makeEdges(line: String) : List[Edge[Int]] = {
    import scala.collection.mutable.ListBuffer
    var edges = new ListBuffer[Edge[Int]]()
    val fields = line.split(" ")
    val origin = fields(0)
    for (x <- 1 to (fields.length - 1)) {
      // Our attribute field is unused, but in other graphs could
      // be used to deep track of physical distances etc.
      edges += Edge(origin.toLong, fields(x).toLong, 0)
    }

    return edges.toList
}
// 
val edges = lines.flatMap(makeEdges)    
makeEdges: (line: String)List[org.apache.spark.graphx.Edge[Int]]
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[5] at flatMap at <console>:62


  • 그래프의 edge 생성
edges.take(10).foreach(println)
Edge(5988,748,0)
Edge(5988,1722,0)
Edge(5988,3752,0)
Edge(5988,4655,0)
Edge(5988,5743,0)
Edge(5988,1872,0)
Edge(5988,3413,0)
Edge(5988,5527,0)
Edge(5988,6368,0)
Edge(5988,6085,0)


make the graph

  • cache: RDD 재사용
val default = "Nobody"
val graph = Graph(verts, edges, default).cache()
default: String = Nobody
graph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@714eaeb0


  • 각 노드에 대한 degree 산출
//((node,degree),...)
graph.degrees.take(10).foreach(println)
(4904,136)
(1084,526)
(384,42)
(6400,30)
(3702,34)
(6308,178)
(5618,38)
(5354,176)
(1894,16)
(4926,22)


  • 두 RDD에 대해서 key 맵핑
    • graph.degrees = (key, degree)
    • verts = (key, name)
  • (key, (degree, name))
graph.degrees.join(verts).take(10).foreach(println)
(4904,(136,SCATTERBRAIN))
(1084,(526,CLOAK/TYRONE JOHNSON))
(3586,(88,MEACHUM, WARD))
(6400,(30,ZA'KEN))
(3702,(34,MIST))
(6308,(178,WOLVERINE DOPPELGANG))
(5618,(38,TALO))
(5354,(176,STACY, GWEN))
(1894,(16,FIRESTAR DOPPELGANGE))
(4926,(22,SCOTT, JAKE))


  • sortBy(_._2._1,ascending=false)
    • _ = (4904,(136,SCATTERBRAIN))
    • _.2 = (136,SCATTERBRAIN)
    • _.2.1 = 136(=degree)
    • decreasing sort


println("\nTop 10 most-connected superheroes:")
// The join merges the hero names into the output; sorts by total connections on each node.
graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)
Top 10 most-connected superheroes:
(859,(3866,CAPTAIN AMERICA))
(5306,(3482,SPIDER-MAN/PETER PAR))
(2664,(3056,IRON MAN/TONY STARK ))
(5716,(2852,THING/BENJAMIN J. GR))
(6306,(2788,WOLVERINE/LOGAN ))
(3805,(2772,MR. FANTASTIC/REED R))
(2557,(2742,HUMAN TORCH/JOHNNY S))
(4898,(2690,SCARLET WITCH/WANDA ))
(5736,(2578,THOR/DR. DONALD BLAK))
(403,(2560,BEAST/HENRY &HANK& P))


// Now let's do Breadth-First Search using the Pregel API
println("\nComputing degrees of separation from SpiderMan...")

// Start from SpiderMan
val root: VertexId = 5306 // SpiderMan
Computing degrees of separation from SpiderMan...

root: org.apache.spark.graphx.VertexId = 5306


  • BFS 알고리즘 방식의 그래프 초기화($\infty$)
// Initialize each node with a distance of infinity, unless it's our starting point
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
initialGraph: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@6935e720


  • 10 iteration 탐색
  • shortest path(distance) 계산
// graph update
val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)( 
    // 가장 짧은 거리 구하기
    //(id, attr, msg) = (node, attrubute, message)
    (id, attr, msg) => math.min(attr, msg), 
    // 각 노드의 이웃 노드들을 업데이트 후 1 추가
    triplet => { 
      if (triplet.srcAttr != Double.PositiveInfinity) { 
        Iterator((triplet.dstId, triplet.srcAttr+1)) 
      } else { 
        Iterator.empty 
      } 
    }, 

    // 누적 거리 구하기
    (a,b) => math.min(a,b) ).cache()
2019-04-08 11:19:34 WARN  BlockManager:66 - Block rdd_235_1 already exists on this machine; not re-adding it
2019-04-08 11:19:34 WARN  BlockManager:66 - Block rdd_235_0 already exists on this machine; not re-adding it

bfs: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@3db87249


  • start node(spider-man)와 다른 노드의 거리
    • 아래의 첫번째 row를 해석하면, spider-man는 4904(SCATTERBRAIN)과 거리 1만큼 떨어져 있음
bfs.vertices.take(10).foreach(println)
(4904,1.0)
(1084,1.0)
(384,2.0)
(6400,2.0)
(3702,2.0)
(6308,1.0)
(5618,2.0)
(5354,1.0)
(1894,2.0)
(4926,2.0)


// Print out the first 100 results:
bfs.vertices.join(verts).take(10).foreach(println)
(4904,(1.0,SCATTERBRAIN))
(1084,(1.0,CLOAK/TYRONE JOHNSON))
(3586,(2.0,MEACHUM, WARD))
(6400,(2.0,ZA'KEN))
(3702,(2.0,MIST))
(6308,(1.0,WOLVERINE DOPPELGANG))
(5618,(2.0,TALO))
(5354,(1.0,STACY, GWEN))
(1894,(2.0,FIRESTAR DOPPELGANGE))
(4926,(2.0,SCOTT, JAKE))


// Recreate our "degrees of separation" result:
println("\n\nDegrees from SpiderMan to ADAM 3,031")  // ADAM 3031 is hero ID 14
// x = (id, distance)
bfs.vertices.filter(x => x._1 == 14).collect.foreach(println)
Degrees from SpiderMan to ADAM 3,031
(14,2.0)

DSBA