educoder-Spark GraphX—寻找社交媒体中的“影响力用户”

【educoder-Spark GraphX—寻找社交媒体中的“影响力用户”】第1关:认识 API
简介
Spark 中提供了方便开发者的基于谷歌 API的迭代算法,因此可以用的计算框架来处理Spark上的图数据 。的 API提供了一个简明的函数式算法设计,用它可以在图中方便的迭代计算,如最短路径、关键路径、n度关系等,也可以通过对一些内部数据集的缓存和释放缓存操作来提升性能 。
编程要求
根据图1运用函数找出距离Ann最远的顶点 。补全代码中的内容,使得程序运行结果如预期输出 。具体请参见后续测试样例 。
测试说明
平台会对你编写的代码进行测试:
测试输入:
预期输出:
import org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.graphx._object farthest_distance{def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("farthest distance").setMaster("local[4]")val sc = new SparkContext(conf) //屏蔽日志Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//构造图val myVertices = sc.parallelize(Array((1L,"Ann"),(2L,"Bill"),(3L,"Diane"),(4L,"Cody"),(5L,"Adam"),(6L,"Bob")))val myEdges = sc.parallelize(Array(Edge(1L,2L,""),Edge(2L,3L,""),Edge(2L,4L,""),Edge(3L,4L,""),Edge(4L,5L,"C"),Edge(4L,6L,""),Edge(5L,6L,""))) //构造EdgeRDDval myGraph = Graph(myVertices,myEdges)//**************Begin*************************//使用pregel函数找到距离Ann(1号)最远的顶点val g=Pregel(myGraph.mapVertices((vid,vd) => 0),0,activeDirection=EdgeDirection.Out)((id:VertexId, vd:Int, a:Int) => math.max(vd,a),(et:EdgeTriplet[Int,String]) => Iterator((et.dstId,et.srcAttr+1)),(a:Int,b:Int) => math.max(a,b))//得到返回的新图的顶点集合val result = g.vertices.collectprintln("")//输出结果result.foreach(println)//**************End**************************}}
第2关:寻找社交媒体中的“影响力用户”
任务描述
本关主题是在数据中,寻找“影响力用户”,简单而言就是找图中出度最大的节点 。一个独立的推特用户可以通过他/她的推文影响到N个级别,即 ofof …。本关卡只考虑2级,即一个用户的 of。
在txt文件中每一行代表一个关系,前面一个是名称和id,后一个是的名称和id,其中用逗号隔开 。图中的箭头是从指向,所以也可以表示成寻找被关注最多的人 。-graph-data.txt文件内容如图2所示 。由于图中没有给每一条边的属性,所以默认就是1,或者也可以赋值成其他的一些提示信息 。

educoder-Spark GraphX—寻找社交媒体中的“影响力用户”

文章插图
编程要求
根据算法提示和代码中的注释,补全代码中的内容,以使得程序运行结果如预期输出 。
测试说明
平台会对你编写的代码进行测试:
测试输入:
预期输出:
hasonwith 95 .
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object Twitter_test{def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Twitter Influencer").setMaster("local[*]")val sparkContext = new SparkContext(conf)sparkContext.setLogLevel("ERROR")//读取文件val twitterData = http://www.kingceram.com/post/sparkContext.textFile("/root/data/twitter-graph-data.txt")//分别从文本文件中提取followee和follower的数据val followeeVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map {arr =>val user = arr(0).replace("((", "")val id = arr(1).replace(")", "")(id.toLong, user)}//根据followee的提取方法,提取follower的数据val followerVertices: RDD[(VertexId, String)] = twitterData.map(_.split(",")).map {arr =>val user = arr(2).replace("(", "")val id = arr(3).replace("))", "")(id.toLong, user)}//根据提取的数据创建图val vertices = followeeVertices.union(followerVertices)val edges: RDD[Edge[String]] = twitterData.map(_.split(",")).map { arr =>val followeeId = arr(1).replace(")", "").toLongval followerId = arr(3).replace("))", "").toLongEdge(followeeId, followerId, "follow")}val defaultUser = ("")val graph = Graph(vertices, edges, defaultUser)//使用Pregel API和广度优先遍历算法,最大迭代次数为2val subGraph = graph.pregel("", 2, EdgeDirection.In)((_, attr, msg) =>attr + "," + msg,//sendMsg函数,发送follower的属性到源顶点,//以便可以在Twitter上累积跟随个人用户的所有2级用户 。triplet => Iterator((triplet.srcId, triplet.dstAttr)),//mergeMsg函数将属性用“,”连接(a, b) => (a + "," + b))//**************Begin*************************//找到拥有最多followers of followers的用户,对subGraph的顶点属性进行切分,除去重复属性,并计算长度val lengthRDD = subGraph.vertices.map(vertex =>(vertex._1, vertex._2.split(",").distinct.length - 2)).max()(new Ordering[Tuple2[VertexId, Int]]() {override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =Ordering[Int].compare(x._2, y._2)})//找出拥有一个最多跟随者的顶点val userId = graph.vertices.filter(_._1 == lengthRDD._1).map(_._2).collect().headprintln("")//输出结果println(userId + " has maximum influence on network with " + lengthRDD._2 + " influencers.")//**************End**************************sparkContext.stop()}}