Hadoop I/O中的压缩
文件压缩有两大好处:
1.可以减少存储文件所需要的磁盘空间
2.可以加速数据在网络和磁盘上的传输
5种压缩格式的特征的比较
*DEFLATE是一个标准压缩算法,该算法通常实现是zlib,没有可用于生成DEFLATE文件的常用命令行工具,因为通常都用gzip格式。
所有的要锁算法都要权衡时间/空间:压缩和解压缩的速度更快,其代价通常只能节省少量的时间,我们有9个不同的选项来控制压缩时必须考虑的权衡,-1为优化压缩速度,-9优化压缩空间。
在考虑如何压缩那些将由MapReduce处理的数据时,考虑压缩格式是否支持分割是很重要的。考虑存储在HDFS中的未压缩的文件,其大小为1GB,HDFS的块大小为64MB,所以该文件将被存储为16块,将此文件用作输入的MapReduce作业会创建1个输人分片(split ,也称为“分块”。对于block,我们统一称为“块”。)每个分片都被作为一个独立map任务的输入单独进行处理。
现在假设。该文件是一个gzip格式的压缩文件,压缩后的大小为1GB。和前面一样,HDFS将此文件存储为16块。然而,针对每一块创建一个分块是没有用的,因为不可能从gzip数据流中的任意点开始读取,map任务也不可能独立于其他分块只读取一个分块中的数据。gzip格式使用DEFLATE来存储压缩过的数据,DEFLATE将数据作为一系列压缩过的块进行存储。问题是,每块的开始没有指定用户在数据流中任意点定位到下一个块的起始位置,而是其自身与数据流同步。因此,gzip不支持分割(块)机制。
在这种情况下,MapReduce不分割gzip格式的文件,因为它知道输入是gzip压缩格式的(通过文件扩展名得知),而gzip压缩机制不支持分割机制。这样是以牺牲本地化为代价:一个map任务将处理16个HDFS块。大都不是map的本地数据。与此同时,因为map任务少,所以作业分割的粒度不够细,从而导致运行时间变长。
在我们假设的例子中,如果是一个LZO格式的文件,我们会碰到同样的问题,因为基本压缩格式不为reader提供方法使其与流同步。但是,bzip2格式的压缩文件确实提供了块与块之间的同步标记(一个48位的PI近似值),因此它支持分割机制。
对于文件的收集,这些问题会稍有不同。ZIP是存档格式,因此它可以将多个文件合并为一个ZIP文件。每个文件单独压缩,所有文档的存储位置存储在ZIP文件的尾部。这个属性表明ZIP文件支持文件边界处分割,每个分片中包括ZIP压缩文件中的一个或多个文件。
在MapReduce我们应该使用哪种压缩格式
根据应用的具体情况来决定应该使用哪种压缩格式。就个人而言,更趋向于使用最快的速度压缩,还是使用最优的空间压缩?一般来说,应该尝试不同的策略,并用具有代表性的数据集进行测试,从而找到最佳方法。
对于那些大型的、没有边界的文件,如日志文件,有以下选项:
1.存储未压缩的文件。
2.使用支持分割机制的压缩格式,如bzip2。
3.在应用中将文件分割成几个大的数据块,然后使用任何一种支持的压缩格式单独压缩每个数据块(可不用考虑压缩格式是否支持分割)。在这里,需要选择数据块的大小使压缩后的数据块在大小上相当于HDFS的块。
4.使用支持压缩和分割的Sequence File(序列文件)。
5.对于大型文件,不要对整个文件使用不支持分割的压缩格式,因为这样会损失本地性优势,从而使降低MapReduce应用的性能。
下面是压缩和解压缩的例子:
压缩:
package hdfs; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.Progressable; import sun.reflect.misc.ReflectUtil; public class TestCompress { public static void main(String[] args) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException{ String CodecClassName="org.apache.hadoop.io.compress.BZip2Codec"; //指定我们的压缩算法,可以在这里更改压缩算法 Class<?> cls=Class.forName(CodecClassName); Configuration conf=new Configuration(); CompressionCodec codec=(CompressionCodec)ReflectionUtil.newInstance(cls,conf); String input="/root/file"; //输入文件 String output=input + codec.getDefaultExtension(); //输出文件 FileOutputStream outFile=new FileOutputStream(output); //创建一个输出流 CompressionOutputStream out=codec.createOutputStream(outFile); FileInputStream in=new FileInputStream(input);//创建一个输入流 IOUtils.copyBytes(in, out, 4096, false); in.close(); //关闭输入输出流 out.close(); } }
解压缩:
package hdfs; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.zookeeper.common.IOUtils; public class WordCount { public static void main(String[] args) throws FileNotFoundException, IOException{ String file = "/root/file.gz"; //需要解压的文件 Configuration conf=new Configuration(); CompressionCodecFactory codecFactory=new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodec(new Path(file));//通过getCodec方法,compressionCodecFactory提供了一种方法可以将文件扩展名映射到一个compressionCodec中 CompressionInputStream in = codec.createInputStream(new FileInputStream(file)); FileOutputStream out=new FileOutputStream(new File(codecFactory.removeSuffix(file, codec.getDefaultExtension()))); //一旦找到对应的codec,便用remodeSuffix去除文件名,形成输出文件 IOUtils.copyBytes(in, out, 4096); } }
本地压缩库
考虑到性能,最好使用一个本地库(native library)来压缩和解压。例如,在一个测试中,使用本地gzip压缩库减少了解压时间50%,压缩时间大约减少了10%(与内置的Java实现相比较)。表4-4展示了Java和本地提供的每个压缩格式的实现。井不是所有的格式都有本地实现(例如bzip2压缩),而另一些则仅有本地实现(例如LZO)。
压缩格式 | Java实现 | 本地实现 |
DEFLATE | 是 | 是 |
gzip | 是 | 是 |
bzip2 | 是 | 否 |
LZO | 否 | 是 |
Hadoop带有预置的32位和64位Linux的本地压缩库,位于库/本地目录。对于其他平台,需要自己编译库。
本地库通过Java系统属性java.library.path来使用。Hadoop的脚本在bin目录中已经设置好这个属性,但如果不使用该脚本,则需要在应用中设置属性。
默认情况下,Hadoop会在它运行的平台上查找本地库,如果发现就自动加载。这意味着不必更改任何配置设置就可以使用本地库。在某些情况下,可能希望禁用本地库,比如在调试压缩相关问题的时候。为此,将属性hadoop.native.lib设置为false,即可确保内置的Java等同内置实现被使用(如果它们可用的话)。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/7215.html