Hadoop——使用idea+maven开发Hadoop项目入门详解编程语言

我们这里使用windows系统作为开发系统

首先保证已在Linux中正确开启Hadoop集群,然后要在windows的环境变量里新加一个变量HADOOP_USER_NAME,值为集群里开启Hadoop服务的账户,笔者在服务器中用root用户使用的start-dfs.sh,所以这里的值为root。

然后我们将Hadoop的两个配置文件core-site.xmlhdfs-site.xml传输到windows本地。

打开idea,新建一个Maven项目,将以下两个依赖导入pom.xml,具体版本号根据读者集群的Hadoop而定。

        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-common</artifactId> 
            <version>2.6.5</version> 
        </dependency> 
 
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --> 
        <dependency> 
            <groupId>org.apache.hadoop</groupId> 
            <artifactId>hadoop-hdfs</artifactId> 
            <version>2.6.5</version> 
        </dependency> 

再将刚才的两个配置文件黏贴进resources文件夹中,一个Hadoop项目就被我们搭建完成了。

下述代码说明了如何简单操作HDFS

package com.msb.hadoop.hdfs; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.io.IOUtils; 
import org.junit.*; 
 
import java.io.*; 
import java.net.URI; 
 
/** 
 * @author Song X. 
 */ 
public class HDFSTest {
    
 
    /** 
     * 解析hdfs-sitexml和core-site.xml 
     */ 
    private static Configuration configuration = null; 
    private static FileSystem fileSystem = null; 
 
    /** 
     * 连接集群 
     * @throws IOException 
     * @throws InterruptedException 
     */ 
    @Before 
    public void connect() throws IOException, InterruptedException {
    
        //true表示会加载配置文件 
        configuration = new Configuration(true); 
 
        // 以下三种方式都可以解析配置文件 
        /// fileSystem = FileSystem.get(configuration); 
        /// fileSystem = FileSystem.get(URI.create("hdfs://mycluster"), configuration); 
        fileSystem = FileSystem.get(URI.create("hdfs://mycluster"), configuration, "root"); 
 
    } 
 
    /** 
     * 创建目录 
     * @throws IOException 
     */ 
    @Test 
    public void mkdir() throws IOException {
    
        Path dir = new Path("/tmp"); 
        if(fileSystem.exists(dir)){
    
            fileSystem.delete(dir, true); 
        } 
        fileSystem.mkdirs(dir); 
    } 
 
    /** 
     * 上传文件,笔者已事先在项目中创建了data文件夹,hello.txt文件的内容是一句hello world 
     * @throws Exception 
     */ 
    @Test 
    public void upload() throws Exception {
    
        String path = "./data/hello.txt"; 
        BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(new File(path))); 
        Path dir = new Path("/tmp/out.txt"); 
        FSDataOutputStream fsDataOutputStream = fileSystem.create(dir); 
 
        // 这是hadoop给我们提供的方便的io操作工具类,最后一个boolean代表执行完这句要不要关闭io流 
        IOUtils.copyBytes(bufferedInputStream, fsDataOutputStream, configuration, true); 
    } 
 
    /** 
     * 下载文件 
     * @throws Exception 
     */ 
    @Test 
    public void download() throws Exception{
    
        //FIXME 
        Path path = new Path("/tmp/out.txt"); 
        FSDataInputStream fsDataInputStream = fileSystem.open(path); 
        String localPath = "./data/hello_down.txt"; 
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(new File(localPath))); 
        IOUtils.copyBytes(fsDataInputStream, bufferedOutputStream, configuration, true); 
    } 
 
    @Test 
    public void blocks() throws Exception {
    
        Path path = new Path("/tmp/out.txt"); 
        FileStatus fileStatus = fileSystem.getFileStatus(path); 
        //第二个参数是block的offset,第三个参数是要取多长,如果是0和fileStatus.getLen(),那么就代表了取所有block,也就是整个文件 
        BlockLocation[] fileBlockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); 
        for(BlockLocation block : fileBlockLocations){
    
            System.out.println("block = " + block); 
            // block = 0, 11, node03, node04 表示第0个块大小是11B,在node03和node04结点 
        } 
        /* 注意,如果想分治的计算每个块,必须要取到块,可是如果用fileSystem.open这种流的方式读文件,读到的必然是完整的一个文件,无法读更细的块,因为IO流面向的最小单位就是文件 
           所以如果两个结点都是用fileSystem.open取数据再计算,就会造成重复计算。Hadoop使用下面的方式解决了这个问题 
                FSDataInputStream fsDataInputStream = fileSystem.open(path); 
                fsDataInputStream.seek(385739); 
           seek方法传入块的偏移量,假设现在有两个块,如果385739是第二个块的起始偏移量,那么接下来的读取,就会从第二个块开始,这就解决了上述问题 
         */ 
    } 
 
 
    /** 
     * 删除文件 
     * @throws IOException 
     */ 
    @Test 
    public void delete() throws IOException {
    
        Path dir = new Path("/tmp"); 
        fileSystem.delete(dir, true); 
    } 
 
 
    /** 
     * 关闭连接 
     * @throws IOException 
     */ 
    @After 
    public void close() throws IOException {
    
        fileSystem.close(); 
    } 
 
} 
 

我将在下一篇博客中详细介绍MapReduce的全过程

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/20595.html

(0)
上一篇 2021年7月19日 23:35
下一篇 2021年7月19日 23:35

相关推荐

发表回复

登录后才能评论