Java多线程实现文件快速切分详解编程语言

前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

 

[Java]代码    

import org.apache.log4j.LogManager; 
import org.apache.log4j.Logger; 
 
import java.io.*; 
import java.util.*; 
import java.util.concurrent.*; 
 
public class FileSplitUtil { 
 
    private final static Logger log = LogManager.getLogger(FileSplitUtil.class); 
    private static final long originFileSize = 1024 * 1024 * 100;// 100M 
    private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方 
    /** 
     * CVS文件分隔符 
     */ 
    private static final char cvsSeparator = '^'; 
    public static  void  main(String args[]){ 
        long start = System.currentTimeMillis(); 
        try { 
            String fileName = "D://csvtest//aa.csv"; 
            File sourceFile = new File(fileName); 
            if (sourceFile.length() >= originFileSize) { 
                String cvsFileName = fileName.replaceAll("////", "/"); 
                FileSplitUtil fileSplitUtil = new FileSplitUtil(); 
                List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize); 
                for(String part:parts){ 
                    System.out.println("partName is:"+part); 
                } 
            } 
            System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms."); 
        }catch (Exception e){ 
            log.info(e.getStackTrace()); 
        } 
 
    } 
 
 
 
    /** 
     * 拆分文件 
     * 
     * @param fileName 待拆分的完整文件名 
     * @param byteSize 按多少字节大小拆分 
     * @return 拆分后的文件名列表 
     */ 
    public List<String> splitBySize(String fileName, int byteSize) 
            throws IOException, InterruptedException { 
        List<String> parts = new ArrayList<String>(); 
        File file = new File(fileName); 
        int count = (int) Math.ceil(file.length() / (double) byteSize); 
        int countLen = (count + "").length(); 
        RandomAccessFile raf = new RandomAccessFile(fileName, "r"); 
        long totalLen = raf.length(); 
        CountDownLatch latch = new CountDownLatch(count); 
 
        for (int i = 0; i < count; i++) { 
            String partFileName = file.getPath() + "." 
                    + leftPad((i + 1) + "", countLen, '0') + ".cvs"; 
            int readSize=byteSize; 
            long startPos=(long)i * byteSize; 
            long nextPos=(long)(i+1) * byteSize; 
            if(nextPos>totalLen){ 
                readSize= (int) (totalLen-startPos); 
            } 
            new SplitRunnable(readSize, startPos, partFileName, file, latch).run(); 
            parts.add(partFileName); 
        } 
        latch.await();//等待所有文件写完 
        //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行 
        mergeRow(parts); 
        return parts; 
    } 
 
    /** 
     * 分割处理Runnable 
     * 
     * @author supeidong 
     */ 
    private class SplitRunnable implements Runnable { 
        int byteSize; 
        String partFileName; 
        File originFile; 
        long startPos; 
        CountDownLatch latch; 
        public SplitRunnable(int byteSize, long startPos, String partFileName, 
                             File originFile, CountDownLatch latch) { 
            this.startPos = startPos; 
            this.byteSize = byteSize; 
            this.partFileName = partFileName; 
            this.originFile = originFile; 
            this.latch = latch; 
        } 
 
        public void run() { 
            RandomAccessFile rFile; 
            OutputStream os; 
            try { 
                rFile = new RandomAccessFile(originFile, "r"); 
                byte[] b = new byte[byteSize]; 
                rFile.seek(startPos);// 移动指针到每“段”开头 
                int s = rFile.read(b); 
                os = new FileOutputStream(partFileName); 
                os.write(b, 0, s); 
                os.flush(); 
                os.close(); 
                latch.countDown(); 
            } catch (IOException e) { 
                log.error(e.getMessage()); 
                latch.countDown(); 
            } 
        } 
    } 
 
    /** 
     * 合并被切断的行 
     * 
     * @param parts 
     */ 
    private void mergeRow(List<String> parts) { 
        List<PartFile> partFiles = new ArrayList<PartFile>(); 
        try { 
            //组装被切分表对象 
            for (int i=0;i<parts.size();i++) { 
                String partFileName=parts.get(i); 
                File splitFileTemp = new File(partFileName); 
                if (splitFileTemp.exists()) { 
                    PartFile partFile = new PartFile(); 
                    BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk")); 
                    String firstRow = reader.readLine(); 
                    String secondRow = reader.readLine(); 
                    String endRow = readLastLine(partFileName); 
                    partFile.setPartFileName(partFileName); 
                    partFile.setFirstRow(firstRow); 
                    partFile.setEndRow(endRow); 
                    if(i>=1){ 
                        String prePartFile=parts.get(i - 1); 
                        String preEndRow = readLastLine(prePartFile); 
                        partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow)); 
                    } 
 
                    partFiles.add(partFile); 
                    reader.close(); 
                } 
            } 
            //进行需要合并的行的写入 
            for (int i = 0; i < partFiles.size() - 1; i++) { 
                PartFile partFile = partFiles.get(i); 
                PartFile partFileNext = partFiles.get(i + 1); 
                StringBuilder sb = new StringBuilder(); 
                if (partFileNext.getFirstIsFull()) { 
                    sb.append("/r/n"); 
                    sb.append(partFileNext.getFirstRow()); 
                } else { 
                    sb.append(partFileNext.getFirstRow()); 
                } 
                writeLastLine(partFile.getPartFileName(),sb.toString()); 
            } 
        } catch (Exception e) { 
            log.error(e.getMessage()); 
        } 
    } 
 
    /** 
     * 得到某个字符出现的次数 
     * @param s 
     * @return 
     */ 
    private int getCharCount(String s) { 
        int count = 0; 
        for (int i = 0; i < s.length(); i++) { 
            if (s.charAt(i) == cvsSeparator) { 
                count++; 
            } 
        } 
        return count; 
    } 
 
    /** 
     * 采用BufferedInputStream方式读取文件行数 
     * 
     * @param filename 
     * @return 
     */ 
    public int getFileRow(String filename) throws IOException { 
        InputStream is = new BufferedInputStream(new FileInputStream(filename)); 
        byte[] c = new byte[1024]; 
        int count = 0; 
        int readChars = 0; 
        while ((readChars = is.read(c)) != -1) { 
            for (int i = 0; i < readChars; ++i) { 
                if (c[i] == '/n') 
                    ++count; 
            } 
        } 
        is.close(); 
        return count; 
    } 
 
    /** 
     * 读取最后一行数据 
     * @param filename 
     * @return 
     * @throws IOException 
     */ 
    private String readLastLine(String filename) throws IOException { 
        // 使用RandomAccessFile , 从后找最后一行数据 
        RandomAccessFile raf = new RandomAccessFile(filename, "r"); 
        long len = raf.length(); 
        String lastLine = ""; 
        if(len!=0L) { 
            long pos = len - 1; 
            while (pos > 0) { 
                pos--; 
                raf.seek(pos); 
                if (raf.readByte() == '/n') { 
                    lastLine = raf.readLine(); 
                    lastLine=new String(lastLine.getBytes("8859_1"), "gbk"); 
                    break; 
                } 
            } 
        } 
        raf.close(); 
        return lastLine; 
    } 
    /** 
     * 修改最后一行数据 
     * @param fileName 
     * @param lastString 
     * @return 
     * @throws IOException 
     */ 
    private void writeLastLine(String fileName,String lastString){ 
        try { 
            // 打开一个随机访问文件流,按读写方式 
            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw"); 
            // 文件长度,字节数 
            long fileLength = randomFile.length(); 
            //将写文件指针移到文件尾。 
            randomFile.seek(fileLength); 
            //此处必须加gbk,否则会出现写入乱码 
            randomFile.write(lastString.getBytes("gbk")); 
            randomFile.close(); 
        } catch (IOException e) { 
            log.error(e.getMessage()); 
        } 
    } 
    /** 
     * 左填充 
     * 
     * @param str 
     * @param length 
     * @param ch 
     * @return 
     */ 
    public static String leftPad(String str, int length, char ch) { 
        if (str.length() >= length) { 
            return str; 
        } 
        char[] chs = new char[length]; 
        Arrays.fill(chs, ch); 
        char[] src = str.toCharArray(); 
        System.arraycopy(src, 0, chs, length - src.length, src.length); 
        return new String(chs); 
    } 
 
    /** 
     * 合并文件行内部类 
     */ 
    class PartFile { 
        private String partFileName; 
        private String firstRow; 
        private String endRow; 
        private boolean firstIsFull; 
 
        public String getPartFileName() { 
            return partFileName; 
        } 
 
        public void setPartFileName(String partFileName) { 
            this.partFileName = partFileName; 
        } 
 
        public String getFirstRow() { 
            return firstRow; 
        } 
 
        public void setFirstRow(String firstRow) { 
            this.firstRow = firstRow; 
        } 
 
        public String getEndRow() { 
            return endRow; 
        } 
 
        public void setEndRow(String endRow) { 
            this.endRow = endRow; 
        } 
 
        public boolean getFirstIsFull() { 
            return firstIsFull; 
        } 
 
        public void setFirstIsFull(boolean firstIsFull) { 
            this.firstIsFull = firstIsFull; 
        } 
    } 
 
} 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/10279.html

(0)
上一篇 2021年7月19日 10:09
下一篇 2021年7月19日 10:09

相关推荐

发表回复

登录后才能评论