熟练掌握HDFS的Java API接口访问详解大数据

    HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。

    通过Java API接口对HDFS进行操作,我将其整理成工具类,地址见底部

1、获取文件系统

 1 /** 
 2  * 获取文件系统 
 3  *  
 4  * @return FileSystem 
 5  */ 
 6 public static FileSystem getFileSystem() { 
 7     //读取配置文件 
 8     Configuration conf = new Configuration(); 
 9     // 文件系统 
10     FileSystem fs = null; 
11      
12     String hdfsUri = HDFSUri; 
13     if(StringUtils.isBlank(hdfsUri)){ 
14         // 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统 
15         try { 
16             fs = FileSystem.get(conf); 
17         } catch (IOException e) { 
18             logger.error("", e); 
19         } 
20     }else{ 
21         // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统 
22         try { 
23             URI uri = new URI(hdfsUri.trim()); 
24             fs = FileSystem.get(uri,conf); 
25         } catch (URISyntaxException | IOException e) { 
26             logger.error("", e); 
27         } 
28     } 
29          
30     return fs; 
31 }

 2、创建文件目录

 1 /** 
 2  * 创建文件目录 
 3  *  
 4  * @param path 
 5  */ 
 6 public static void mkdir(String path) { 
 7     try { 
 8         // 获取文件系统 
 9         FileSystem fs = getFileSystem(); 
10          
11         String hdfsUri = HDFSUri; 
12         if(StringUtils.isNotBlank(hdfsUri)){ 
13             path = hdfsUri + path; 
14         } 
15          
16         // 创建目录 
17         fs.mkdirs(new Path(path)); 
18          
19         //释放资源 
20         fs.close(); 
21     } catch (IllegalArgumentException | IOException e) { 
22         logger.error("", e); 
23     } 
24 }

3、删除文件或者文件目录

 1 /** 
 2  * 删除文件或者文件目录 
 3  *  
 4  * @param path 
 5  */ 
 6 public static void rmdir(String path) { 
 7     try { 
 8         // 返回FileSystem对象 
 9         FileSystem fs = getFileSystem(); 
10          
11         String hdfsUri = HDFSUri; 
12         if(StringUtils.isNotBlank(hdfsUri)){ 
13             path = hdfsUri + path; 
14         } 
15          
16         // 删除文件或者文件目录  delete(Path f) 此方法已经弃用 
17         fs.delete(new Path(path),true); 
18          
19         // 释放资源 
20         fs.close(); 
21     } catch (IllegalArgumentException | IOException e) { 
22         logger.error("", e); 
23     } 
24 }

3、根据filter获取目录下的文件

 1 /** 
 2  * 根据filter获取目录下的文件 
 3  *  
 4  * @param path 
 5  * @param pathFilter 
 6  * @return String[] 
 7  */ 
 8 public static String[] ListFile(String path,PathFilter pathFilter) { 
 9     String[] files = new String[0]; 
10      
11     try { 
12         // 返回FileSystem对象 
13         FileSystem fs = getFileSystem(); 
14          
15         String hdfsUri = HDFSUri; 
16         if(StringUtils.isNotBlank(hdfsUri)){ 
17             path = hdfsUri + path; 
18         } 
19          
20         FileStatus[] status; 
21         if(pathFilter != null){ 
22             // 根据filter列出目录内容 
23             status = fs.listStatus(new Path(path),pathFilter); 
24         }else{ 
25             // 列出目录内容 
26             status = fs.listStatus(new Path(path)); 
27         } 
28          
29         // 获取目录下的所有文件路径 
30         Path[] listedPaths = FileUtil.stat2Paths(status); 
31         // 转换String[] 
32         if (listedPaths != null && listedPaths.length > 0){ 
33             files = new String[listedPaths.length]; 
34             for (int i = 0; i < files.length; i++){ 
35                 files[i] = listedPaths[i].toString(); 
36             } 
37         } 
38         // 释放资源 
39         fs.close(); 
40     } catch (IllegalArgumentException | IOException e) { 
41         logger.error("", e); 
42     } 
43      
44     return files; 
45 }

4、文件上传至 HDFS

 1 /** 
 2  * 文件上传至 HDFS 
 3  *  
 4  * @param delSrc 
 5  * @param overwrite 
 6  * @param srcFile 
 7  * @param destPath 
 8  */ 
 9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) { 
10     // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt 
11     Path srcPath = new Path(srcFile); 
12      
13     // 目的路径 
14     String hdfsUri = HDFSUri; 
15     if(StringUtils.isNotBlank(hdfsUri)){ 
16         destPath = hdfsUri + destPath; 
17     } 
18     Path dstPath = new Path(destPath); 
19      
20     // 实现文件上传 
21     try { 
22         // 获取FileSystem对象 
23         FileSystem fs = getFileSystem(); 
24         fs.copyFromLocalFile(srcPath, dstPath); 
25         fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath); 
26         //释放资源 
27         fs.close(); 
28     } catch (IOException e) { 
29         logger.error("", e); 
30     } 
31 }

5、从 HDFS 下载文件

 1 /** 
 2  * 从 HDFS 下载文件 
 3  *  
 4  * @param srcFile 
 5  * @param destPath 
 6  */ 
 7 public static void getFile(String srcFile,String destPath) { 
 8     // 源文件路径 
 9     String hdfsUri = HDFSUri; 
10     if(StringUtils.isNotBlank(hdfsUri)){ 
11         srcFile = hdfsUri + srcFile; 
12     } 
13     Path srcPath = new Path(srcFile); 
14      
15     // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/ 
16     Path dstPath = new Path(destPath); 
17      
18     try { 
19         // 获取FileSystem对象 
20         FileSystem fs = getFileSystem(); 
21         // 下载hdfs上的文件 
22         fs.copyToLocalFile(srcPath, dstPath); 
23         // 释放资源 
24         fs.close(); 
25     } catch (IOException e) { 
26         logger.error("", e); 
27     } 
28 }

6、获取 HDFS 集群节点信息

 1 /** 
 2  * 获取 HDFS 集群节点信息 
 3  *  
 4  * @return DatanodeInfo[] 
 5  */ 
 6 public static DatanodeInfo[] getHDFSNodes() { 
 7     // 获取所有节点 
 8     DatanodeInfo[] dataNodeStats = new DatanodeInfo[0]; 
 9      
10     try { 
11         // 返回FileSystem对象 
12         FileSystem fs = getFileSystem(); 
13          
14         // 获取分布式文件系统 
15         DistributedFileSystem hdfs = (DistributedFileSystem)fs; 
16          
17         dataNodeStats = hdfs.getDataNodeStats(); 
18     } catch (IOException e) { 
19         logger.error("", e); 
20     } 
21     return dataNodeStats; 
22 }

7、查找某个文件在 HDFS集群的位置

 1 /** 
 2  * 查找某个文件在 HDFS集群的位置 
 3  *  
 4  * @param filePath 
 5  * @return BlockLocation[] 
 6  */ 
 7 public static BlockLocation[] getFileBlockLocations(String filePath) { 
 8     // 文件路径 
 9     String hdfsUri = HDFSUri; 
10     if(StringUtils.isNotBlank(hdfsUri)){ 
11         filePath = hdfsUri + filePath; 
12     } 
13     Path path = new Path(filePath); 
14      
15     // 文件块位置列表 
16     BlockLocation[] blkLocations = new BlockLocation[0]; 
17     try { 
18         // 返回FileSystem对象 
19         FileSystem fs = getFileSystem(); 
20         // 获取文件目录  
21         FileStatus filestatus = fs.getFileStatus(path); 
22         //获取文件块位置列表 
23         blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen()); 
24     } catch (IOException e) { 
25         logger.error("", e); 
26     } 
27     return blkLocations; 
28 }

 8、文件重命名

 1 /** 
 2  * 文件重命名 
 3  *  
 4  * @param srcPath 
 5  * @param dstPath 
 6  */ 
 7 public boolean rename(String srcPath, String dstPath){ 
 8     boolean flag = false; 
 9     try    { 
10         // 返回FileSystem对象 
11         FileSystem fs = getFileSystem(); 
12          
13         String hdfsUri = HDFSUri; 
14         if(StringUtils.isNotBlank(hdfsUri)){ 
15             srcPath = hdfsUri + srcPath; 
16             dstPath = hdfsUri + dstPath; 
17         } 
18          
19         flag = fs.rename(new Path(srcPath), new Path(dstPath)); 
20     } catch (IOException e) { 
21         logger.error("{} rename to {} error.", srcPath, dstPath); 
22     } 
23      
24     return flag; 
25 }

9、判断目录是否存在

 1 /** 
 2  * 判断目录是否存在 
 3  *  
 4  * @param srcPath 
 5  * @param dstPath 
 6  */ 
 7 public boolean existDir(String filePath, boolean create){ 
 8     boolean flag = false; 
 9      
10     if (StringUtils.isEmpty(filePath)){ 
11         return flag; 
12     } 
13      
14     try{ 
15         Path path = new Path(filePath); 
16         // FileSystem对象 
17         FileSystem fs = getFileSystem(); 
18          
19         if (create){ 
20             if (!fs.exists(path)){ 
21                 fs.mkdirs(path); 
22             } 
23         } 
24          
25         if (fs.isDirectory(path)){ 
26             flag = true; 
27         } 
28     }catch (Exception e){ 
29         logger.error("", e); 
30     } 
31      
32     return flag; 
33 }

 

如果,您认为阅读这篇博客让您有些收获,不妨点击一下右下角的【推荐】。
如果,您希望更容易地发现我的新博客,不妨点击一下左下角的【关注我】。
如果,您对我的博客所讲述的内容有兴趣,请继续关注我的后续博客,我是【刘超★ljc】。

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

地址:下载

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/7686.html

(0)
上一篇 2021年7月18日
下一篇 2021年7月18日

相关推荐

发表回复

登录后才能评论