MapReduce业务 - 图片关联计算详解大数据

1.概述

  最近在和人交流时谈到数据相似度和数据共性问题,而刚好在业务层面有类似的需求,今天和大家分享这类问题的解决思路,分享目录如下所示:

  • 业务背景
  • 编码实践
  • 预览截图

  下面开始今天的内容分享。

2.业务背景

  目前有这样一个背景,在一大堆数据中,里面存放着图片的相关信息,如下图所示:

MapReduce业务 - 图片关联计算详解大数据

  上图只是给大家列举的一个示例数据格式,第一列表示自身图片,第二、第三……等列表示与第一列相关联的图片信息。那么我们从这堆数据中如何找出他们拥有相同图片信息的图片。

2.1 实现思路

  那么,我们在明确了上述需求后,下面我们来分析它的实现思路。首先,我们通过上图所要实现的目标结果,其最终计算结果如下所示:

pic_001pic_002 pic_003,pic_004,pic_005 
pic_001pic_003 pic_002,pic_005 
pic_001pic_004 pic_002,pic_005 
pic_001pic_005 pic_002,pic_003,pic_004 
......

  结果如上所示,找出两两图片之间的共性图片,结果未列完整,只是列举了部分,具体结果大家可以参考截图预览的相关信息。

  下面给大家介绍解决思路,通过观察数据,我们可以发现在上述数据当中,我们要计算图片两两的共性图片,可以从关联图片入手,在关联图片中我们可以找到共性图片的关联信息,比如:我们要计算pic001pic002图片的共性图片,我们可以在关联图片中找到两者(pic001pic002组合)后对应的自身图片(key),最后在将所有的key求并集即为两者的共性图片信息,具体信息如下图所示:
MapReduce业务 - 图片关联计算详解大数据

  通过上图,我们可以知道具体的实现思路,步骤如下所示:

  • 第一步:拆分数据,关联数据两两组合作为Key输出。
  • 第二步:将相同Key分组,然后求并集得到计算结果。

  这里使用一个MR来完成此项工作,在明白了实现思路后,我们接下来去实现对应的编码。

3.编码实践

  • 拆分数据,两两组合。
public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> { 
 
        @Override 
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) 
                throws IOException, InterruptedException { 
            StringTokenizer strToken = new StringTokenizer(value.toString()); 
            Text owner = new Text(); 
 
            Set<String> set = new TreeSet<String>(); 
 
            owner.set(strToken.nextToken()); 
            while (strToken.hasMoreTokens()) { 
                set.add(strToken.nextToken()); 
            } 
 
            String[] relations = new String[set.size()]; 
            relations = set.toArray(relations); 
 
            for (int i = 0; i < relations.length; i++) { 
                for (int j = i + 1; j < relations.length; j++) { 
                    String outPutKey = relations[i] + relations[j]; 
                    context.write(new Text(outPutKey), owner); 
                } 
 
            } 
        } 
    }
  • 按Key分组,求并集
public static class PictureReduce extends Reducer<Text, Text, Text, Text> { 
 
        @Override 
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) 
                throws IOException, InterruptedException { 
            String common = ""; 
            for (Text val : values) { 
                if (common == "") { 
                    common = val.toString(); 
                } else { 
                    common = common + "," + val.toString(); 
                } 
            } 
            context.write(key, new Text(common)); 
        } 
    }
  • 完整示例
package cn.hadoop.hdfs.example; 
import java.io.IOException; 
import java.util.Set; 
import java.util.StringTokenizer; 
import java.util.TreeSet; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import cn.hadoop.hdfs.util.HDFSUtils; 
import cn.hadoop.hdfs.util.SystemConfig; 
/** 
* @Date Aug 31, 2015 
* 
* @Author dengjie 
* 
* @Note Find picture relations 
*/ 
public class PictureRelations extends Configured implements Tool { 
private static Logger log = LoggerFactory.getLogger(PictureRelations.class); 
private static Configuration conf; 
static { 
String tag = SystemConfig.getProperty("dev.tag"); 
String[] hosts = SystemConfig.getPropertyArray(tag + ".hdfs.host", ","); 
conf = new Configuration(); 
conf.set("fs.defaultFS", "hdfs://cluster1"); 
conf.set("dfs.nameservices", "cluster1"); 
conf.set("dfs.ha.namenodes.cluster1", "nna,nns"); 
conf.set("dfs.namenode.rpc-address.cluster1.nna", hosts[0]); 
conf.set("dfs.namenode.rpc-address.cluster1.nns", hosts[1]); 
conf.set("dfs.client.failover.proxy.provider.cluster1", 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); 
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); 
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); 
} 
public static class PictureMap extends Mapper<LongWritable, Text, Text, Text> { 
@Override 
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) 
throws IOException, InterruptedException { 
StringTokenizer strToken = new StringTokenizer(value.toString()); 
Text owner = new Text(); 
Set<String> set = new TreeSet<String>(); 
owner.set(strToken.nextToken()); 
while (strToken.hasMoreTokens()) { 
set.add(strToken.nextToken()); 
} 
String[] relations = new String[set.size()]; 
relations = set.toArray(relations); 
for (int i = 0; i < relations.length; i++) { 
for (int j = i + 1; j < relations.length; j++) { 
String outPutKey = relations[i] + relations[j]; 
context.write(new Text(outPutKey), owner); 
} 
} 
} 
} 
public static class PictureReduce extends Reducer<Text, Text, Text, Text> { 
@Override 
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) 
throws IOException, InterruptedException { 
String common = ""; 
for (Text val : values) { 
if (common == "") { 
common = val.toString(); 
} else { 
common = common + "," + val.toString(); 
} 
} 
context.write(key, new Text(common)); 
} 
} 
public int run(String[] args) throws Exception { 
final Job job = Job.getInstance(conf); 
job.setJarByClass(PictureMap.class); 
job.setMapperClass(PictureMap.class); 
job.setMapOutputKeyClass(Text.class); 
job.setMapOutputValueClass(Text.class); 
job.setReducerClass(PictureReduce.class); 
job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class); 
FileInputFormat.setInputPaths(job, args[0]); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 
int status = job.waitForCompletion(true) ? 0 : 1; 
return status; 
} 
public static void main(String[] args) { 
try { 
if (args.length != 1) { 
log.warn("args length must be 1 and as date param"); 
return; 
} 
String tmpIn = SystemConfig.getProperty("hdfs.input.path.v2"); 
String tmpOut = SystemConfig.getProperty("hdfs.output.path.v2"); 
String inPath = String.format(tmpIn, "t_pic_20150801.log"); 
String outPath = String.format(tmpOut, "meta/" + args[0]); 
// bak dfs file to old 
HDFSUtils.bak(tmpOut, outPath, "meta/" + args[0] + "-old", conf); 
args = new String[] { inPath, outPath }; 
int res = ToolRunner.run(new Configuration(), new PictureRelations(), args); 
System.exit(res); 
} catch (Exception ex) { 
ex.printStackTrace(); 
log.error("Picture relations task has error,msg is" + ex.getMessage()); 
} 
} 
}

4.截图预览

  关于计算结果,如下图所示:

MapReduce业务 - 图片关联计算详解大数据

5.总结

  本篇博客只是从思路上实现了图片关联计算,在数据量大的情况下,是有待优化的,这里就不多做赘述了,后续有时间在为大家分析其中的细节。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

原创文章,作者:506227337,如若转载,请注明出处:https://blog.ytso.com/228057.html

(0)
上一篇 2022年1月11日
下一篇 2022年1月11日

相关推荐

发表回复

登录后才能评论