使用MapReduce实现二度人脉搜索算法详解大数据

一,背景介绍

      在新浪微博、人人网等社交网站上,为了使用户在网络上认识更多的朋友,社交网站往往提供类似“你可能感兴趣的人”、“间接关注推荐”等好友推荐的功能,其中就包含了二度人脉算法。

二,算法实现

原始数据集测试:

a    b 
b    c 
a    c 
b    d 
c    e 
e    c 
e    f

数据集说明:为关注关系,即a关注b,b关注c和d,所以a的二度人脉应该是d和c,而c已经被a关注,所以应该舍去,自己不能二度人脉是自己,如c关注e,而e又关注c

代码实现,代码用了两个Job实现的

难点:两个job如何先后执行

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
 
import java.io.IOException; 
import java.util.HashSet; 
import java.util.Random; 
import java.util.Set; 
 
public class De2Friends { 
    public static class De2Mapper1 extends Mapper<Object,Text,Text,Text>{ 
        @Override 
        protected void map(Object key, Text value, Context context) throws 
                IOException, InterruptedException { 
            String line =value.toString(); 
            String[] strArr = line.split("/t"); 
            if(strArr.length==2) { 
                //关注的人 
                context.write(new Text(strArr[0]), new Text("1" + strArr[1])); 
                //被关注的人 
                context.write(new Text(strArr[1]), new Text("0" + strArr[0])); 
            } 
        } 
    } 
 
    public static class De2Reducer1 extends Reducer<Text,Text,Text,Text> { 
        @Override 
        protected void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException { 
           Set<String> follows= new HashSet<String>(); 
           Set<String> fans=new HashSet<String>(); 
           for(Text val :values ){ 
               String friend =val.toString(); 
               if(friend.startsWith("1")){ 
                   context.write(key,new Text(friend));//输出用户已经关注的人,一度人脉 
                   follows.add(friend.substring(1)); 
               } 
               if(friend.startsWith("0")){ 
                   fans.add(friend.substring(1)); 
               } 
           } 
           for(String fan : fans) 
               for(String follow:follows) { 
                   if (!fan.equals(follow)) { 
                       context.write(new Text(fan),new Text("2"+follow)); 
                   } 
               } 
 
        } 
    } 
 
    public static class De2Mapper2 extends  Mapper<Object,Text,Text,Text>{ 
        @Override 
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
            String line =value.toString(); 
            String[] strArr=line.split("/t"); 
            if(strArr.length==2) { 
                context.write(new Text(strArr[0]), new Text(strArr[1]));//输出用户的一度好友和二度好友 
            } 
        } 
    } 
 
    public static class De2Reducer2 extends Reducer<Text,Text,Text,Text>{ 
        @Override 
        protected void reduce(Text key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException { 
            Set<String> firstFriend = new HashSet<String>(); 
            Set<String> secondFriend =new HashSet<String>(); 
            for(Text val:values){ 
                String friend =val.toString(); 
                if(friend.contains("1")){ 
                    firstFriend.add(friend.substring(1)); 
                } 
                if(friend.contains("2")){ 
                    secondFriend.add(friend.substring(1)); 
                } 
            } 
            for(String second:secondFriend) { 
                if(!(firstFriend.contains(second))) 
                    context.write(key,new Text(second)); //输出好友的二度人脉 
                } 
        } 
    } 
 
    public static void main(String[] args) throws Exception{ 
        System.setProperty("hadoop.home.dir","E://softs//majorSoft//hadoop-2.7.5"); 
        Configuration conf =new Configuration(); 
        conf.set("mapreduce.app-submission.cross-platform", "true"); 
        Path fileInput = new Path("hdfs://mycluster/testFile/qq.txt"); 
        Path tempDir = new Path("hdfs://mycluster/output/deg2friend-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); 
        Path fileOutput = new Path("hdfs://mycluster/output/qq"); 
        Job job = Job.getInstance(conf,"de2Firend"); 
        job.setJar("E://bigData//hadoopDemo//out//artifacts//wordCount_jar//hadoopDemo.jar"); 
        job.setJarByClass(De2Friends.class); 
        job.setMapperClass(De2Mapper1.class); 
        job.setReducerClass(De2Reducer1.class); 
        job.setMapOutputKeyClass(Text.class); 
        job.setMapOutputValueClass(Text.class); 
        job.setNumReduceTasks(1); 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(Text.class); 
 
        FileInputFormat.addInputPath(job,fileInput); 
        FileOutputFormat.setOutputPath(job,tempDir); 
        job.waitForCompletion(true);//必须有,感觉是等job执行完才让job2执行的效果,即阻塞吧 
 
        Job job2 = Job.getInstance(conf,"de2Firend"); 
        job2.setJar("E://bigData//hadoopDemo//out//artifacts//wordCount_jar//hadoopDemo.jar"); 
        job2.setJarByClass(De2Friends.class); 
        job2.setMapperClass(De2Mapper2.class); 
        job2.setReducerClass(De2Reducer2.class); 
        job2.setMapOutputKeyClass(Text.class); 
        job2.setMapOutputValueClass(Text.class); 
        job2.setOutputKeyClass(Text.class); 
        job2.setOutputValueClass(Text.class); 
 
        FileInputFormat.addInputPath(job2,tempDir); 
        FileOutputFormat.setOutputPath(job2,fileOutput); 
 
        System.exit(job2.waitForCompletion(true)?0:1); 
    } 
}

结果如下:

a    d 
b    e 
b    f 
c    f

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9431.html

(0)
上一篇 2021年7月19日 11:11
下一篇 2021年7月19日 11:11

相关推荐

发表回复

登录后才能评论