Xgraph
코드 예시(notebook 코드 참조)
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
make vertex
val names = sc.textFile("./data/Marvel-names.txt")
names: org.apache.spark.rdd.RDD[String] = ./data/Marvel-names.txt MapPartitionsRDD[1] at textFile at <console>:41
var fields = names.take(1)(0).split("\"")
fields: Array[String] = Array("1 ", 24-HOUR MAN/EMMANUEL)
val heroID:Long = fields(0).trim().toLong
heroID: Long = 1
Some( fields(0).trim().toLong, fields(1))
res0: Some[(Long, String)] = Some((1,24-HOUR MAN/EMMANUEL))
def parseNames(line: String) : Option[(VertexId, String)] = {
var fields = line.split("\"")
if (fields.length > 1) {
val heroID:Long = fields(0).trim().toLong
if (heroID < 6487) { // ID's above 6486 aren't real characters
return Some( fields(0).trim().toLong, fields(1))
}
}
return None // flatmap will just discard None results, and extract data from Some results.
}
val verts = names.flatMap(parseNames)
parseNames: (line: String)Option[(org.apache.spark.graphx.VertexId, String)]
verts: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = MapPartitionsRDD[2] at flatMap at <console>:59
verts.take(10).foreach(println)
(1,24-HOUR MAN/EMMANUEL)
(2,3-D MAN/CHARLES CHAN)
(3,4-D MAN/MERCURIO)
(4,8-BALL/)
(5,A)
(6,A'YIN)
(7,ABBOTT, JACK)
(8,ABCISSA)
(9,ABEL)
(10,ABOMINATION/EMIL BLO)
make edges
val lines = sc.textFile("./data/Marvel-graph.txt")
lines: org.apache.spark.rdd.RDD[String] = ./data/Marvel-graph.txt MapPartitionsRDD[4] at textFile at <console>:41
import scala.collection.mutable.ListBuffer
var _edges = new ListBuffer[Edge[Int]]()
import scala.collection.mutable.ListBuffer
_edges: scala.collection.mutable.ListBuffer[org.apache.spark.graphx.Edge[Int]] = ListBuffer()
val _fields = lines.take(1)(0).split(" ")
_fields: Array[String] = Array(5988, 748, 1722, 3752, 4655, 5743, 1872, 3413, 5527, 6368, 6085, 4319, 4728, 1636, 2397, 3364, 4001, 1614, 1819, 1585, 732, 2660, 3952, 2507, 3891, 2070, 2239, 2602, 612, 1352, 5447, 4548, 1596, 5488, 1605, 5517, 11, 479, 2554, 2043, 17, 865, 4292, 6312, 473, 534, 1479, 6375, 4456)
val _origin = _fields(0)
_origin: String = 5988
Edge
는 xgraph에 있는 패키지 함수이다.
for (x <- 1 to (_fields.length - 1)) {
// Our attribute field is unused, but in other graphs could
// be used to deep track of physical distances etc.
_edges += Edge(_origin.toLong, _fields(x).toLong, 0)
}
_edges
res2: scala.collection.mutable.ListBuffer[org.apache.spark.graphx.Edge[Int]] = ListBuffer(Edge(5988,748,0), Edge(5988,1722,0), Edge(5988,3752,0), Edge(5988,4655,0), Edge(5988,5743,0), Edge(5988,1872,0), Edge(5988,3413,0), Edge(5988,5527,0), Edge(5988,6368,0), Edge(5988,6085,0), Edge(5988,4319,0), Edge(5988,4728,0), Edge(5988,1636,0), Edge(5988,2397,0), Edge(5988,3364,0), Edge(5988,4001,0), Edge(5988,1614,0), Edge(5988,1819,0), Edge(5988,1585,0), Edge(5988,732,0), Edge(5988,2660,0), Edge(5988,3952,0), Edge(5988,2507,0), Edge(5988,3891,0), Edge(5988,2070,0), Edge(5988,2239,0), Edge(5988,2602,0), Edge(5988,612,0), Edge(5988,1352,0), Edge(5988,5447,0), Edge(5988,4548,0), Edge(5988,1596,0), Edge(5988,5488,0), Edge(5988,1605,0), Edge(5988,5517,0), Edge(5988,11,0), Edge(5988,479,0), Edge(5988,...
def makeEdges(line: String) : List[Edge[Int]] = {
import scala.collection.mutable.ListBuffer
var edges = new ListBuffer[Edge[Int]]()
val fields = line.split(" ")
val origin = fields(0)
for (x <- 1 to (fields.length - 1)) {
// Our attribute field is unused, but in other graphs could
// be used to deep track of physical distances etc.
edges += Edge(origin.toLong, fields(x).toLong, 0)
}
return edges.toList
}
//
val edges = lines.flatMap(makeEdges)
makeEdges: (line: String)List[org.apache.spark.graphx.Edge[Int]]
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[5] at flatMap at <console>:62
edges.take(10).foreach(println)
Edge(5988,748,0)
Edge(5988,1722,0)
Edge(5988,3752,0)
Edge(5988,4655,0)
Edge(5988,5743,0)
Edge(5988,1872,0)
Edge(5988,3413,0)
Edge(5988,5527,0)
Edge(5988,6368,0)
Edge(5988,6085,0)
make the graph
cache
: RDD 재사용val default = "Nobody"
val graph = Graph(verts, edges, default).cache()
default: String = Nobody
graph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@714eaeb0
//((node,degree),...)
graph.degrees.take(10).foreach(println)
(4904,136)
(1084,526)
(384,42)
(6400,30)
(3702,34)
(6308,178)
(5618,38)
(5354,176)
(1894,16)
(4926,22)
key
, (degree
, name
))graph.degrees.join(verts).take(10).foreach(println)
(4904,(136,SCATTERBRAIN))
(1084,(526,CLOAK/TYRONE JOHNSON))
(3586,(88,MEACHUM, WARD))
(6400,(30,ZA'KEN))
(3702,(34,MIST))
(6308,(178,WOLVERINE DOPPELGANG))
(5618,(38,TALO))
(5354,(176,STACY, GWEN))
(1894,(16,FIRESTAR DOPPELGANGE))
(4926,(22,SCOTT, JAKE))
_
._
2._
1,ascending=false)
_
= (4904,(136,SCATTERBRAIN))_
.2 = (136,SCATTERBRAIN)_
.2.1 = 136(=degree
)println("\nTop 10 most-connected superheroes:")
// The join merges the hero names into the output; sorts by total connections on each node.
graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)
Top 10 most-connected superheroes:
(859,(3866,CAPTAIN AMERICA))
(5306,(3482,SPIDER-MAN/PETER PAR))
(2664,(3056,IRON MAN/TONY STARK ))
(5716,(2852,THING/BENJAMIN J. GR))
(6306,(2788,WOLVERINE/LOGAN ))
(3805,(2772,MR. FANTASTIC/REED R))
(2557,(2742,HUMAN TORCH/JOHNNY S))
(4898,(2690,SCARLET WITCH/WANDA ))
(5736,(2578,THOR/DR. DONALD BLAK))
(403,(2560,BEAST/HENRY &HANK& P))
// Now let's do Breadth-First Search using the Pregel API
println("\nComputing degrees of separation from SpiderMan...")
// Start from SpiderMan
val root: VertexId = 5306 // SpiderMan
Computing degrees of separation from SpiderMan...
root: org.apache.spark.graphx.VertexId = 5306
// Initialize each node with a distance of infinity, unless it's our starting point
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
initialGraph: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@6935e720
// graph update
val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)(
// 가장 짧은 거리 구하기
//(id, attr, msg) = (node, attrubute, message)
(id, attr, msg) => math.min(attr, msg),
// 각 노드의 이웃 노드들을 업데이트 후 1 추가
triplet => {
if (triplet.srcAttr != Double.PositiveInfinity) {
Iterator((triplet.dstId, triplet.srcAttr+1))
} else {
Iterator.empty
}
},
// 누적 거리 구하기
(a,b) => math.min(a,b) ).cache()
2019-04-08 11:19:34 WARN BlockManager:66 - Block rdd_235_1 already exists on this machine; not re-adding it
2019-04-08 11:19:34 WARN BlockManager:66 - Block rdd_235_0 already exists on this machine; not re-adding it
bfs: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@3db87249
spider-man
)와 다른 노드의 거리
spider-man
는 4904(SCATTERBRAIN
)과 거리 1만큼 떨어져 있음bfs.vertices.take(10).foreach(println)
(4904,1.0)
(1084,1.0)
(384,2.0)
(6400,2.0)
(3702,2.0)
(6308,1.0)
(5618,2.0)
(5354,1.0)
(1894,2.0)
(4926,2.0)
// Print out the first 100 results:
bfs.vertices.join(verts).take(10).foreach(println)
(4904,(1.0,SCATTERBRAIN))
(1084,(1.0,CLOAK/TYRONE JOHNSON))
(3586,(2.0,MEACHUM, WARD))
(6400,(2.0,ZA'KEN))
(3702,(2.0,MIST))
(6308,(1.0,WOLVERINE DOPPELGANG))
(5618,(2.0,TALO))
(5354,(1.0,STACY, GWEN))
(1894,(2.0,FIRESTAR DOPPELGANGE))
(4926,(2.0,SCOTT, JAKE))
// Recreate our "degrees of separation" result:
println("\n\nDegrees from SpiderMan to ADAM 3,031") // ADAM 3031 is hero ID 14
// x = (id, distance)
bfs.vertices.filter(x => x._1 == 14).collect.foreach(println)
Degrees from SpiderMan to ADAM 3,031
(14,2.0)