前面我们所写mr程序的输入都是文本文件,但真正工作中我们难免会碰到需要处理其它格式的情况,下面以处理excel数据为例
1、项目需求
有刘超与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示。我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件
下面是部分数据,数据格式:编号 联系人 电话 时间
2、分析
统计每个月每个家庭成员给自己打电话的次数这一点很简单,我们之前已经写过几个这样的程序。实现需求的麻烦点在于文件的输入是Excel文件,输出要按月份输出到不同文件。这就要我们实现自定义的InputFormat和OutputFormat
3、实现
首先,输入文件是Excel格式,我们可以借助poi来解析Excel文件,我们先来实现一个Excel的解析类(ExcelParser)
package com.buaa;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.poi.hssf.usermodel.HSSFSheet;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
/** * @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelParser
* @Description 解析excel
* @Author 刘吉超
* @Date 2016-04-24 16:59:28
*/ public class ExcelParser { private static final Log logger = LogFactory.getLog(ExcelParser.class);
/**
* 解析is
*
* @param is 数据源
* @return String[]
*/ public static String[] parseExcelData(InputStream is) { // 结果集
List<String> resultList = new ArrayList<String>();
try { // 获取Workbook
HSSFWorkbook workbook = new HSSFWorkbook(is); // 获取sheet
HSSFSheet sheet = workbook.getSheetAt(0);
Iterator<Row> rowIterator = sheet.iterator();
while (rowIterator.hasNext()) { // 行
Row row = rowIterator.next(); // 字符串
StringBuilder rowString = new StringBuilder();
Iterator<Cell> colIterator = row.cellIterator(); while (colIterator.hasNext()) {
Cell cell = colIterator.next();
switch (cell.getCellType()) { case Cell.CELL_TYPE_BOOLEAN:
rowString.append(cell.getBooleanCellValue() + "/t"); break; case Cell.CELL_TYPE_NUMERIC:
rowString.append(cell.getNumericCellValue() + "/t"); break; case Cell.CELL_TYPE_STRING:
rowString.append(cell.getStringCellValue() + "/t"); break;
}
}
resultList.add(rowString.toString());
}
} catch (IOException e) {
logger.error("IO Exception : File not found " + e);
}
return resultList.toArray(new String[0]);
}
}
然后,我们需要定义一个从Excel读取数据的InputFormat类,命名为ExcelInputFormat,实现代码如下
package com.buaa;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/** * @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelInputFormat
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-28 17:31:54
*/ public class ExcelInputFormat extends FileInputFormat<LongWritable,Text>{
@Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new ExcelRecordReader();
}
public class ExcelRecordReader extends RecordReader<LongWritable, Text> { private LongWritable key = new LongWritable(-1); private Text value = new Text(); private InputStream inputStream; private String[] strArrayofLines;
@Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { // 分片
FileSplit split = (FileSplit) genericSplit; // 获取配置
Configuration job = context.getConfiguration();
// 分片路径
Path filePath = split.getPath();
FileSystem fileSystem = filePath.getFileSystem(job);
inputStream = fileSystem.open(split.getPath());
// 调用解析excel方法 this.strArrayofLines = ExcelParser.parseExcelData(inputStream);
}
@Override public boolean nextKeyValue() throws IOException, InterruptedException { int pos = (int) key.get() + 1;
if (pos < strArrayofLines.length){
if(strArrayofLines[pos] != null){
key.set(pos);
value.set(strArrayofLines[pos]);
return true;
}
}
return false;
}
@Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key;
}
@Override public Text getCurrentValue() throws IOException, InterruptedException { return value;
}
@Override public float getProgress() throws IOException, InterruptedException { return 0;
}
@Override public void close() throws IOException { if (inputStream != null) {
inputStream.close();
}
}
}
}
接下来,我们要定义一个ExcelOutputFormat类,用于实现按月份输出到不同文件中
package com.buaa;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
/** * @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelOutputFormat
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-28 17:24:23
*/ public class ExcelOutputFormat extends FileOutputFormat<Text,Text> { // MultiRecordWriter对象 private MultiRecordWriter writer = null;
@Override public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job) throws IOException,
InterruptedException { if (writer == null) {
writer = new MultiRecordWriter(job, getTaskOutputPath(job));
}
return writer;
}
private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(conf);
if (committer instanceof FileOutputCommitter) {
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("没有定义输出目录");
}
workPath = outputPath;
}
return workPath;
}
/**
* 通过key, value, conf来确定输出文件名(含扩展名)
*
* @param key
* @param value
* @param conf
* @return String
*/ protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf){ // name + month
String[] records = key.toString().split("/t"); return records[1] + ".txt";
}
/**
* 定义MultiRecordWriter */ public class MultiRecordWriter extends RecordWriter<Text,Text> { // RecordWriter的缓存 private HashMap<String, RecordWriter<Text,Text>> recordWriters = null; // TaskAttemptContext private TaskAttemptContext job = null; // 输出目录 private Path workPath = null;
public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; this.recordWriters = new HashMap<String, RecordWriter<Text,Text>>();
}
@Override public void write(Text key, Text value) throws IOException, InterruptedException { // 得到输出文件名
String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
RecordWriter<Text,Text> rw = this.recordWriters.get(baseName); if (rw == null) {
rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw);
}
rw.write(key, value);
}
private RecordWriter<Text,Text> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job); //key value 分隔符
String keyValueSeparator = "/t";
RecordWriter<Text,Text> recordWriter = null; if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new MailRecordWriter<Text,Text>(new DataOutputStream(codec
.createOutputStream(fileOut)), keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
recordWriter = new MailRecordWriter<Text,Text>(fileOut, keyValueSeparator);
}
return recordWriter;
}
@Override public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<Text,Text>> values = this.recordWriters.values().iterator(); while (values.hasNext()) {
values.next().close(context);
} this.recordWriters.clear();
}
}
}
package com.buaa;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/** * @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName MailRecordWriter
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-24 16:59:23
*/ public class MailRecordWriter< K, V > extends RecordWriter< K, V > { // 编码 private static final String utf8 = "UTF-8"; // 换行 private static final byte[] newline; static { try {
newline = "/n".getBytes(utf8);//换行符 "/n"不对
} catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
} // 输出数据 protected DataOutputStream out; // 分隔符 private final byte[] keyValueSeparator;
public MailRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public MailRecordWriter(DataOutputStream out) { this(out, "/t");
}
private void writeObject(Object o) throws IOException { if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return;
} if (!nullKey) {
writeObject(key);
} if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
} if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
最后我们来编写Mapper类,实现 map() 函数;编写Reduce类,实现reduce函数;还有一些运行代码
package com.buaa;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/** * @ProjectName HandleExcelPhone
* @PackageName com.buaa
* @ClassName ExcelContactCount
* @Description TODO
* @Author 刘吉超
* @Date 2016-04-24 16:34:24
*/ public class ExcelContactCount extends Configured implements Tool {
public static class PhoneMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {
Text pkey = new Text();
Text pvalue = new Text(); // 1.0, 老爸, 13999123786, 2014-12-20
String line = value.toString();
String[] records = line.split("//s+"); // 获取月份
String[] months = records[3].split("-"); // 昵称+月份
pkey.set(records[1] + "/t" + months[1]); // 手机号
pvalue.set(records[2]);
context.write(pkey, pvalue);
}
}
public static class PhoneReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text Key, Iterable<Text> Values, Context context) throws IOException, InterruptedException {
Text phone = Values.iterator().next(); int phoneToal = 0;
for(java.util.Iterator<Text> its = Values.iterator();its.hasNext();its.next()){
phoneToal++;
}
Text pvalue = new Text(phone + "/t" + phoneToal);
context.write(Key, pvalue);
}
}
@Override
@SuppressWarnings("deprecation") public int run(String[] args) throws Exception { // 读取配置文件
Configuration conf = new Configuration();
// 判断输出路径,如果存在,则删除
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
// 新建任务
Job job = new Job(conf,"Call Log");
job.setJarByClass(ExcelContactCount.class);
// 输入路径
FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Mapper
job.setMapperClass(PhoneMapper.class); // Reduce
job.setReducerClass(PhoneReducer.class);
// 输出key类型
job.setOutputKeyClass(Text.class); // 输出value类型
job.setOutputValueClass(Text.class);
// 自定义输入格式
job.setInputFormatClass(ExcelInputFormat.class); // 自定义输出格式
job.setOutputFormatClass(ExcelOutputFormat.class);
return job.waitForCompletion(true) ? 0:1;
}
public static void main(String[] args) throws Exception {
String[] args0 = { "hdfs://ljc:9000/buaa/phone/phone.xls", "hdfs://ljc:9000/buaa/phone/out/"
}; int ec = ToolRunner.run(new Configuration(), new ExcelContactCount(), args0);
System.exit(ec);
}
}
4、结果
通过这份数据很容易看出,刘超1月份与姐姐通话次数最多,19次
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/7695.html
