1、背景
在实际项目中,输入数据往往是由许多小文件组成,这里的小文件是指小于HDFS系统Block大小的文件(默认128M), 然而每一个存储在HDFS中的文件、目录和块都映射为一个对象,存储在NameNode服务器内存中,通常占用150个字节。 如果有1千万个文件,就需要消耗大约3G的内存空间。如果是10亿个文件呢,简直不可想象。所以在项目开始前, 我们选择一种适合的方案来解决本项目的小文件问题
2、介绍
本地 D:/data目录下有 2012-09-17 至 2012-09-23 一共7天的数据集,我们需要将这7天的数据集按日期合并为7个大文件上传至 HDFS
3、数据
本地 D:/data目录下的所有数据,如下图所示
4、分析
基于项目的需求,我们通过下面几个步骤完成
1、获取 D:/data目录下的所有日期路径,循环所有日期路径,通过globStatus()方法获取所有txt格式文件路径。
2、最后通过IOUtils.copyBytes(in, out, 4096, false)方法将数据集合并为大文件,并上传至 HDFS
5、实现
自定义RegexAcceptPathFilter类实现 PathFilter,比如只接受D:/data/2012-09-17日期目录下txt格式的文件
1 /** 2 * @ProjectName FileMerge
3 * @PackageName com.buaa
4 * @ClassName RegexAcceptPathFilter
5 * @Description 接受 regex 格式的文件
6 * @Author 刘吉超
7 * @Date 2016-04-18 21:58:07
8 */ 9 public static class RegexAcceptPathFilter implements PathFilter {
10 private final String regex;
11 12 public RegexAcceptPathFilter(String regex) {
13 this.regex = regex;
14 }
15 16 @Override
17 public boolean accept(Path path) {
18 boolean flag = path.toString().matches(regex);
19 return flag;
20 }
21 }
实现主程序 merge 方法,完成数据集的合并,并上传至 HDFS
1 /** 2 * 合并
3 *
4 * @param srcPath 源目录
5 * @param destPath 目标目录
6 */ 7 public static void merge(String srcPath,String destPath) {
8 try{
9 // 读取hadoop文件系统的配置 10 Configuration conf = new Configuration();
11 12 // 获取远端文件系统 13 URI uri = new URI(HDFSUri);
14 FileSystem remote = FileSystem.get(uri, conf);
15 16 // 获得本地文件系统 17 FileSystem local = FileSystem.getLocal(conf);
18 19 // 获取data目录下的所有文件路径 20 Path[] dirs = FileUtil.stat2Paths(local.globStatus(new Path(srcPath)));
21 22 FSDataOutputStream out = null;
23 FSDataInputStream in = null;
24 25 for (Path dir : dirs) {
26 // 文件名称 27 String fileName = dir.getName().replace("-", "");
28 // 只接受目录下的.txt文件 29 FileStatus[] localStatus = local.globStatus(new Path(dir + "/*"), new RegexAcceptPathFilter("^.*.txt$"));
30 // 获得目录下的所有文件 31 Path[] listedPaths = FileUtil.stat2Paths(localStatus);
32 // 输出路径 33 Path block = new Path(destPath + "/" + fileName + ".txt");
34 // 打开输出流 35 out = remote.create(block);
36 for (Path p : listedPaths) {
37 // 打开输入流 38 in = local.open(p);
39 // 复制数据 40 IOUtils.copyBytes(in, out, 4096, false);
41 // 关闭输入流 42 in.close();
43 }
44 if (out != null) {
45 // 关闭输出流 46 out.close();
47 }
48 }
49 }catch(Exception e){
50 logger.error("", e);
51 }
52 }
6、一些运行代码
1 /** 2 * main方法
3 *
4 * @param args
5 */ 6 public static void main(String[] args) {
7 merge("D://data//*","/buaa/tv");
8 }
7、结果

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/7690.html