Java多线程实现文件快速切分详解编程语言

前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

 

[Java]代码    

import org.apache.log4j.LogManager; 
import org.apache.log4j.Logger; 
import java.io.*; 
import java.util.*; 
import java.util.concurrent.*; 
public class FileSplitUtil { 
private final static Logger log = LogManager.getLogger(FileSplitUtil.class); 
private static final long originFileSize = 1024 * 1024 * 100;// 100M 
private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方 
/** 
* CVS文件分隔符 
*/ 
private static final char cvsSeparator = '^'; 
public static  void  main(String args[]){ 
long start = System.currentTimeMillis(); 
try { 
String fileName = "D://csvtest//aa.csv"; 
File sourceFile = new File(fileName); 
if (sourceFile.length() >= originFileSize) { 
String cvsFileName = fileName.replaceAll("////", "/"); 
FileSplitUtil fileSplitUtil = new FileSplitUtil(); 
List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize); 
for(String part:parts){ 
System.out.println("partName is:"+part); 
} 
} 
System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms."); 
}catch (Exception e){ 
log.info(e.getStackTrace()); 
} 
} 
/** 
* 拆分文件 
* 
* @param fileName 待拆分的完整文件名 
* @param byteSize 按多少字节大小拆分 
* @return 拆分后的文件名列表 
*/ 
public List<String> splitBySize(String fileName, int byteSize) 
throws IOException, InterruptedException { 
List<String> parts = new ArrayList<String>(); 
File file = new File(fileName); 
int count = (int) Math.ceil(file.length() / (double) byteSize); 
int countLen = (count + "").length(); 
RandomAccessFile raf = new RandomAccessFile(fileName, "r"); 
long totalLen = raf.length(); 
CountDownLatch latch = new CountDownLatch(count); 
for (int i = 0; i < count; i++) { 
String partFileName = file.getPath() + "." 
+ leftPad((i + 1) + "", countLen, '0') + ".cvs"; 
int readSize=byteSize; 
long startPos=(long)i * byteSize; 
long nextPos=(long)(i+1) * byteSize; 
if(nextPos>totalLen){ 
readSize= (int) (totalLen-startPos); 
} 
new SplitRunnable(readSize, startPos, partFileName, file, latch).run(); 
parts.add(partFileName); 
} 
latch.await();//等待所有文件写完 
//由于切割时可能会导致行被切断,加工所有的的分割文件,合并行 
mergeRow(parts); 
return parts; 
} 
/** 
* 分割处理Runnable 
* 
* @author supeidong 
*/ 
private class SplitRunnable implements Runnable { 
int byteSize; 
String partFileName; 
File originFile; 
long startPos; 
CountDownLatch latch; 
public SplitRunnable(int byteSize, long startPos, String partFileName, 
File originFile, CountDownLatch latch) { 
this.startPos = startPos; 
this.byteSize = byteSize; 
this.partFileName = partFileName; 
this.originFile = originFile; 
this.latch = latch; 
} 
public void run() { 
RandomAccessFile rFile; 
OutputStream os; 
try { 
rFile = new RandomAccessFile(originFile, "r"); 
byte[] b = new byte[byteSize]; 
rFile.seek(startPos);// 移动指针到每“段”开头 
int s = rFile.read(b); 
os = new FileOutputStream(partFileName); 
os.write(b, 0, s); 
os.flush(); 
os.close(); 
latch.countDown(); 
} catch (IOException e) { 
log.error(e.getMessage()); 
latch.countDown(); 
} 
} 
} 
/** 
* 合并被切断的行 
* 
* @param parts 
*/ 
private void mergeRow(List<String> parts) { 
List<PartFile> partFiles = new ArrayList<PartFile>(); 
try { 
//组装被切分表对象 
for (int i=0;i<parts.size();i++) { 
String partFileName=parts.get(i); 
File splitFileTemp = new File(partFileName); 
if (splitFileTemp.exists()) { 
PartFile partFile = new PartFile(); 
BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk")); 
String firstRow = reader.readLine(); 
String secondRow = reader.readLine(); 
String endRow = readLastLine(partFileName); 
partFile.setPartFileName(partFileName); 
partFile.setFirstRow(firstRow); 
partFile.setEndRow(endRow); 
if(i>=1){ 
String prePartFile=parts.get(i - 1); 
String preEndRow = readLastLine(prePartFile); 
partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow)); 
} 
partFiles.add(partFile); 
reader.close(); 
} 
} 
//进行需要合并的行的写入 
for (int i = 0; i < partFiles.size() - 1; i++) { 
PartFile partFile = partFiles.get(i); 
PartFile partFileNext = partFiles.get(i + 1); 
StringBuilder sb = new StringBuilder(); 
if (partFileNext.getFirstIsFull()) { 
sb.append("/r/n"); 
sb.append(partFileNext.getFirstRow()); 
} else { 
sb.append(partFileNext.getFirstRow()); 
} 
writeLastLine(partFile.getPartFileName(),sb.toString()); 
} 
} catch (Exception e) { 
log.error(e.getMessage()); 
} 
} 
/** 
* 得到某个字符出现的次数 
* @param s 
* @return 
*/ 
private int getCharCount(String s) { 
int count = 0; 
for (int i = 0; i < s.length(); i++) { 
if (s.charAt(i) == cvsSeparator) { 
count++; 
} 
} 
return count; 
} 
/** 
* 采用BufferedInputStream方式读取文件行数 
* 
* @param filename 
* @return 
*/ 
public int getFileRow(String filename) throws IOException { 
InputStream is = new BufferedInputStream(new FileInputStream(filename)); 
byte[] c = new byte[1024]; 
int count = 0; 
int readChars = 0; 
while ((readChars = is.read(c)) != -1) { 
for (int i = 0; i < readChars; ++i) { 
if (c[i] == '/n') 
++count; 
} 
} 
is.close(); 
return count; 
} 
/** 
* 读取最后一行数据 
* @param filename 
* @return 
* @throws IOException 
*/ 
private String readLastLine(String filename) throws IOException { 
// 使用RandomAccessFile , 从后找最后一行数据 
RandomAccessFile raf = new RandomAccessFile(filename, "r"); 
long len = raf.length(); 
String lastLine = ""; 
if(len!=0L) { 
long pos = len - 1; 
while (pos > 0) { 
pos--; 
raf.seek(pos); 
if (raf.readByte() == '/n') { 
lastLine = raf.readLine(); 
lastLine=new String(lastLine.getBytes("8859_1"), "gbk"); 
break; 
} 
} 
} 
raf.close(); 
return lastLine; 
} 
/** 
* 修改最后一行数据 
* @param fileName 
* @param lastString 
* @return 
* @throws IOException 
*/ 
private void writeLastLine(String fileName,String lastString){ 
try { 
// 打开一个随机访问文件流,按读写方式 
RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw"); 
// 文件长度,字节数 
long fileLength = randomFile.length(); 
//将写文件指针移到文件尾。 
randomFile.seek(fileLength); 
//此处必须加gbk,否则会出现写入乱码 
randomFile.write(lastString.getBytes("gbk")); 
randomFile.close(); 
} catch (IOException e) { 
log.error(e.getMessage()); 
} 
} 
/** 
* 左填充 
* 
* @param str 
* @param length 
* @param ch 
* @return 
*/ 
public static String leftPad(String str, int length, char ch) { 
if (str.length() >= length) { 
return str; 
} 
char[] chs = new char[length]; 
Arrays.fill(chs, ch); 
char[] src = str.toCharArray(); 
System.arraycopy(src, 0, chs, length - src.length, src.length); 
return new String(chs); 
} 
/** 
* 合并文件行内部类 
*/ 
class PartFile { 
private String partFileName; 
private String firstRow; 
private String endRow; 
private boolean firstIsFull; 
public String getPartFileName() { 
return partFileName; 
} 
public void setPartFileName(String partFileName) { 
this.partFileName = partFileName; 
} 
public String getFirstRow() { 
return firstRow; 
} 
public void setFirstRow(String firstRow) { 
this.firstRow = firstRow; 
} 
public String getEndRow() { 
return endRow; 
} 
public void setEndRow(String endRow) { 
this.endRow = endRow; 
} 
public boolean getFirstIsFull() { 
return firstIsFull; 
} 
public void setFirstIsFull(boolean firstIsFull) { 
this.firstIsFull = firstIsFull; 
} 
} 
} 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/10279.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论