大数据梦工厂(0009 – 基于MapReduce的应用案例)
1 – MapReduce词频统计案例
1.1 – 样本数据
这是一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。
[root@hadoop-01 ~]# cat input.txt
Spark HBase Azkaban Flume
Hive Flink Storm Hadoop HBase Spark
Flink Presto Kudu Azkaban
HBase Storm Presto Kafka
HBase Hadoop Hive Flink Kudu
HBase Flink Hive Storm
Hive Flink Hadoop Flume
HBase Hive Kudu Zookeeper
Hadoop Spark HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm Kudu
Hive Flink Hadoop Kafka
HBase Hive Presto Zookeeper
Presto Kudu Hadoop Kafka
Zookeeper Hadoop Flume
Azkaban Kudu Presto
Kafka Zookeeper
Kafka Flume
[root@hadoop-01 ~]# hdfs dfs -mkdir -p /tmp/wordcount/input
[root@hadoop-01 ~]# hdfs dfs -put -f input.txt /tmp/wordcount/input/input.txt
项目完整源码下载地址: WordCountDemo
1.2 – 项目依赖
进行 MapReduce 编程,需要导入 hadoop-client 的依赖版本:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
1.3 – WordCountMapper
将 input.txt
文件的每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable
接口。
在 Map 中将每行数据按照分隔符进行拆分:
/**
* Object : Mapping 输入文件的内容
* Text : Mapping 输入的每一行的数据
* Text : Mapping 输出 key 的类型
* IntWritable : Mapping 输出 value 的类型
*/
public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 获取到一行文件的内容
String line = value.toString();
// 按照空格切分这一行的内容为一个单词数组
String[] words = StringUtils.split(line, " ");
// 遍历输出 <key, value> 键值对
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
WordCountMapper
对应下图的 Mapping 操作:
1.4 – WordCountReducer
在 Reduce 中进行单词出现次数的统计:
/**
* Text : Mapping 输入的 key 的类型
* IntWritable : Mapping 输入的 value 的类型
* Text : Reducing 输出的 key 的类型
* IntWritable : Reducing 输出的 value 的类型
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 统计数字
int count = 0;
// 累加求和
for (IntWritable value : values) {
count += value.get();
}
// 输出 <单词:count> 键值对
context.write(key, new IntWritable(count));
}
}
WordCountReducer
对应下图的 Shuffling 和 Reducing 操作(Shuffling 的输出是 Reducing 的输入):
1.5 – WordCountApp
打包 jar 到集群使用 hadoop
命令提交作业示例:
public class WordCountApp {
// 1、使用硬编码,显示参数,实际开发中可以通过外部传参
private static final String HDFS_URL = "hdfs://172.16.1.2:8020";
private static final String HADOOP_USER_NAME = "hdfs";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 文件的输入路径和输出路径由外部传参指定
if (args.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
// 指定 Hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
Configuration configuration = new Configuration();
// 指定 HDFS 的地址
configuration.set("fs.defaultFS", HDFS_URL);
// 2、获取 job 对象
Job job = Job.getInstance(configuration);
// 3、设置 jar 存储位置
job.setJarByClass(WordCountApp.class);
// 4、关联 Mapper 和 Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 5、设置 Mapper 阶段输出数据的 key 和 value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 6、设置 Reducer 阶段输出数据的 key 和 value 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 7、如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
Path outputPath = new Path(args[1]);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
// 8、设置输入文件和输出文件的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, outputPath);
// 9、提交 job 到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
boolean result = job.waitForCompletion(true);
// 10、退出程序
System.exit(result ? 0 : 1);
}
}
提示: 如果不设置 Mapper
操作的输出类型,则默认和 Reducer
操作输出的类型相同。
1.6 – 提交到 MapReduce 集群运行
可以在本机配置 Hadoop 开发环境,直接在 IDE 中启动进行测试。本案例打包提交到服务器中运行。
由于本项目没有使用除 Hadoop 外的第三方依赖,因此直接打包即可:
1、打包 jar 文件
或者使用 mvn 命令打包:
[root@hadoop-01 ~]# mvn package -DskipTests
2、提交作业
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountApp /tmp/wordcount/input /tmp/wordcount/output
3、作业完成后,在 YARN 的 Web 界面查看 Applications 运行情况
4、查看 HDFS 上统计结果
# 查看生成目录
[root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output
# 查看统计结果
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000
提示: 每个作业的 Reduce 任务的默认个数为 1(通过 mapreduce.job.reduces 设置)。
每个 Reduce 任务都会产生一个输出文件,如果设置 Reduce 个数为 3
,那么 Map 输出数据会被分成 3
份,Reduce 输出的 part-r-00000
文件也会有 3
个。
2 – 词频统计案例之Combiner
2.1 – 代码实现
如果需要使用 Combiner
功能,只要在打包 jar 文件时,添加下面一行代码即可:
// 5、设置Combiner
job.setCombinerClass(WordCountReducer.class);
2.2 – 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerApp /tmp/wordcount/input /tmp/wordcount/output
加入 Combiner
功能后,统计结果不会发生变化,但是可以从打印的日志看出 Combiner
的效果:
没有加入 Combiner
功能的打印日志如下:
加入 Combiner
功能后的打印日志如下:
本案例只有一个输入文件并且小于 128M,所以只有一个 Map 进行处理。可以看到经过 Combiner 功能后,Combine input records
值由 70
降低为 12
(样本中单词种类就只有 12 种),在本案例中 Combiner 能降低需要传输的数据量。
3 – 词频统计案例之Partitioner
3.1 – 默认的 Partitioner
假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner
。
MapReduce 默认的分类规则:在构建 job 的时候,如果不指定,则默认使用的是 HashPartitioner
:对 key 值进行哈希散列,并对 numReduceTasks
取余。其实现如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
3.2 – 自定义 Partitioner
使用 Partitioner
自定义分类规则,按照单词进行分类:
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
if (key.toString().equals("Azkaban")) {
return 0;
}else if (key.toString().equals("Flink")) {
return 1;
}else if (key.toString().equals("Flume")) {
return 2;
}else if (key.toString().equals("HBase")) {
return 3;
}else if (key.toString().equals("Hadoop")) {
return 4;
}else if (key.toString().equals("Hive")) {
return 5;
}else if (key.toString().equals("Kafka")) {
return 6;
}else if (key.toString().equals("Kudu")) {
return 7;
}else if (key.toString().equals("Presto")) {
return 8;
}else if (key.toString().equals("Spark")) {
return 9;
}else if (key.toString().equals("Storm")) {
return 10;
}else {
return 11;
}
}
}
在构建 job
时,指定使用自定义的分类规则,并设置 reduce
的个数:
// 6、设置自定义 Partitioner 规则
job.setPartitionerClass(CustomPartitioner.class);
// 7、设置 Reduce 个数
job.setNumReduceTasks(12);
3.3 – 执行结果
[root@hadoop-01 ~]# hadoop jar WordCount-1.0.jar com.jerome.wordcount.WordCountCombinerPartitionerApp /tmp/wordcount/input /tmp/wordcount/output
执行结果如下,分别生成 12 个文件,每个文件中为对应单词的统计结果:
[root@hadoop-01 ~]# hdfs dfs -ls /tmp/wordcount/output/
Found 13 items
-rw-r--r-- 3 hdfs supergroup 0 2021-09-03 15:30 /tmp/wordcount/output/_SUCCESS
-rw-r--r-- 3 hdfs supergroup 10 2021-09-03 15:30 /tmp/wordcount/output/part-r-00000
-rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00001
-rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00002
-rw-r--r-- 3 hdfs supergroup 9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00003
-rw-r--r-- 3 hdfs supergroup 9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00004
-rw-r--r-- 3 hdfs supergroup 7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00005
-rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00006
-rw-r--r-- 3 hdfs supergroup 7 2021-09-03 15:30 /tmp/wordcount/output/part-r-00007
-rw-r--r-- 3 hdfs supergroup 9 2021-09-03 15:30 /tmp/wordcount/output/part-r-00008
-rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00009
-rw-r--r-- 3 hdfs supergroup 8 2021-09-03 15:30 /tmp/wordcount/output/part-r-00010
-rw-r--r-- 3 hdfs supergroup 12 2021-09-03 15:30 /tmp/wordcount/output/part-r-00011
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00000
Azkaban 3
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00001
Flink 8
[root@hadoop-01 ~]# hdfs dfs -cat /tmp/wordcount/output/part-r-00002
Flume 4
::: hljs-center
扫一扫,我们的故事就开始了。
:::
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/170144.html