如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
需求描述
数据源是一批网络日志数据,每条数据都有两个字段srcip和dstip,字段之间以逗号分隔,问题的需求是给定一个srcip和dstip,在给定的搜索深度下检索这两个ip之间所有的通联路径。这个问题是网络日志处理中的一个实际需求,之前在单机的程序中实现过,但是需要将所有的ip对加载到内存中。考虑到如果数据量太大的情况,可能单节点的内存无法支撑这样的操作,但是如果不将ip对全加载内存中,使用深度优先遍历的方法,搜索过程又会很慢。最近在学习spark框架,刚接触RDD,就是这用RDD来解决这个问题。以下是scala代码
package com.pxu.spark.core import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * pxu * 2021-01-29 16:57 */ object FindIpRel { def main(args: Array[String]): Unit = { val srcIp = args(0) // 源ip val dstIp = args(1) // 目标ip val depth = args(2).toInt //搜索深度 val resPath = args(3) //搜索结果的输出位置 val conf = new SparkConf().setAppName("findIpRel") val sc = new SparkContext(conf) /** * 从数据源中构建原始rdd,每一行的数据形式为a,b */ val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv") /** * 对原始Rdd进行元组形式转化,现在每一行的数据形式为(a,b) * 除此之外还对数据进行了去重处理,并显示使用hash分区器对RDD中的数据进行分区 * 为后面的join操作,做一些优化 */ val base = ori.map(a => { val tmpArr = a.split(",") (tmpArr(0), tmpArr(1)) }).distinct().partitionBy(new HashPartitioner(10)) /** * 这是一个用于保存结果的RDD,其中每一行的形式为(dstIp,List(ip on path)) * 在查找过程中,发现了搜索结果后,就会将其并入到res中 */ var res = sc.makeRDD[(String,List[String])](List()) /** * 这是一个用于迭代的RDD,其初始化的内容是,首先从baseRdd中过滤出元组第一个元素a是参数SrcIp的, * 然后将其转化成(b,List(a))的格式,其中b总是代表当前搜索路径上的尾ip,list中的其他内容代表搜索 * 路径上其他的ip */ var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1))) for(i <- 2 to depth){ /** * 1.首先iteration和base按照key进行join,这个操作的意义就是更深一层的搜索,结果RDD的格式是(b,(List(ip on path),c)) * 2.对数据进行一次过滤,过去掉那些路径已经形成环的元素,成环的判据就是List(ip on path)中的数据已经包含c了 * 3.进行map操作,b并入到List(ip on path),将c作为新的key,因此此时更深一层的搜索,导致c成为了当前搜索路径中的尾节点, * 此时RDD中的每一个元素的格式应该是(c,(List(ip on path)) */ val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1)) /** * 将tmp中已经成功搜索的路径筛选出来,成功搜索的判据是(c,(List(ip on path)),c与dstIp相等 */ val success = tmp.filter(a => a._1.equals(dstIp)) /** * 将成功搜索的数据合并到res中 */ res = res.union(success) /** * 更新iteration */ iteration = tmp.subtract(success) } /** * 将成功搜索的路径并入到res中 */ res.union(iteration.filter(a => a._1.equals(dstIp))) /** * 执行一次转换操作,将res中的元素从(c,(List(ip on path))格式转换成List(all ip on path) */ val finalResult = res.map(a => a._2 :+ a._1) finalResult.saveAsTextFile(resPath) } }
关于如何使用spark-core实现广度优先搜索问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/223412.html