我们这里使用windows系统作为开发系统
首先保证已在Linux中正确开启Hadoop集群,然后要在windows的环境变量里新加一个变量HADOOP_USER_NAME
,值为集群里开启Hadoop服务的账户,笔者在服务器中用root用户使用的start-dfs.sh
,所以这里的值为root。
然后我们将Hadoop的两个配置文件core-site.xml
和hdfs-site.xml
传输到windows本地。
打开idea,新建一个Maven项目,将以下两个依赖导入pom.xml
,具体版本号根据读者集群的Hadoop而定。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.5</version>
</dependency>
再将刚才的两个配置文件黏贴进resources
文件夹中,一个Hadoop项目就被我们搭建完成了。
下述代码说明了如何简单操作HDFS
package com.msb.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.*;
import java.io.*;
import java.net.URI;
/**
* @author Song X.
*/
public class HDFSTest {
/**
* 解析hdfs-sitexml和core-site.xml
*/
private static Configuration configuration = null;
private static FileSystem fileSystem = null;
/**
* 连接集群
* @throws IOException
* @throws InterruptedException
*/
@Before
public void connect() throws IOException, InterruptedException {
//true表示会加载配置文件
configuration = new Configuration(true);
// 以下三种方式都可以解析配置文件
/// fileSystem = FileSystem.get(configuration);
/// fileSystem = FileSystem.get(URI.create("hdfs://mycluster"), configuration);
fileSystem = FileSystem.get(URI.create("hdfs://mycluster"), configuration, "root");
}
/**
* 创建目录
* @throws IOException
*/
@Test
public void mkdir() throws IOException {
Path dir = new Path("/tmp");
if(fileSystem.exists(dir)){
fileSystem.delete(dir, true);
}
fileSystem.mkdirs(dir);
}
/**
* 上传文件,笔者已事先在项目中创建了data文件夹,hello.txt文件的内容是一句hello world
* @throws Exception
*/
@Test
public void upload() throws Exception {
String path = "./data/hello.txt";
BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(new File(path)));
Path dir = new Path("/tmp/out.txt");
FSDataOutputStream fsDataOutputStream = fileSystem.create(dir);
// 这是hadoop给我们提供的方便的io操作工具类,最后一个boolean代表执行完这句要不要关闭io流
IOUtils.copyBytes(bufferedInputStream, fsDataOutputStream, configuration, true);
}
/**
* 下载文件
* @throws Exception
*/
@Test
public void download() throws Exception{
//FIXME
Path path = new Path("/tmp/out.txt");
FSDataInputStream fsDataInputStream = fileSystem.open(path);
String localPath = "./data/hello_down.txt";
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(new File(localPath)));
IOUtils.copyBytes(fsDataInputStream, bufferedOutputStream, configuration, true);
}
@Test
public void blocks() throws Exception {
Path path = new Path("/tmp/out.txt");
FileStatus fileStatus = fileSystem.getFileStatus(path);
//第二个参数是block的offset,第三个参数是要取多长,如果是0和fileStatus.getLen(),那么就代表了取所有block,也就是整个文件
BlockLocation[] fileBlockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for(BlockLocation block : fileBlockLocations){
System.out.println("block = " + block);
// block = 0, 11, node03, node04 表示第0个块大小是11B,在node03和node04结点
}
/* 注意,如果想分治的计算每个块,必须要取到块,可是如果用fileSystem.open这种流的方式读文件,读到的必然是完整的一个文件,无法读更细的块,因为IO流面向的最小单位就是文件
所以如果两个结点都是用fileSystem.open取数据再计算,就会造成重复计算。Hadoop使用下面的方式解决了这个问题
FSDataInputStream fsDataInputStream = fileSystem.open(path);
fsDataInputStream.seek(385739);
seek方法传入块的偏移量,假设现在有两个块,如果385739是第二个块的起始偏移量,那么接下来的读取,就会从第二个块开始,这就解决了上述问题
*/
}
/**
* 删除文件
* @throws IOException
*/
@Test
public void delete() throws IOException {
Path dir = new Path("/tmp");
fileSystem.delete(dir, true);
}
/**
* 关闭连接
* @throws IOException
*/
@After
public void close() throws IOException {
fileSystem.close();
}
}
我将在下一篇博客中详细介绍MapReduce的全过程
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/20595.html