Java多线程实现文件快速切分

前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

 

[Java]代码    

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class FileSplitUtil {

    private final static Logger log = LogManager.getLogger(FileSplitUtil.class);
    private static final long originFileSize = 1024 * 1024 * 100;// 100M
    private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方
    /**
     * CVS文件分隔符
     */
    private static final char cvsSeparator = '^';
    public static  void  main(String args[]){
        long start = System.currentTimeMillis();
        try {
            String fileName = "D:\\csvtest\\aa.csv";
            File sourceFile = new File(fileName);
            if (sourceFile.length() >= originFileSize) {
                String cvsFileName = fileName.replaceAll("\\\\", "/");
                FileSplitUtil fileSplitUtil = new FileSplitUtil();
                List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);
                for(String part:parts){
                    System.out.println("partName is:"+part);
                }
            }
            System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms.");
        }catch (Exception e){
            log.info(e.getStackTrace());
        }

    }



    /**
     * 拆分文件
     *
     * @param fileName 待拆分的完整文件名
     * @param byteSize 按多少字节大小拆分
     * @return 拆分后的文件名列表
     */
    public List<String> splitBySize(String fileName, int byteSize)
            throws IOException, InterruptedException {
        List<String> parts = new ArrayList<String>();
        File file = new File(fileName);
        int count = (int) Math.ceil(file.length() / (double) byteSize);
        int countLen = (count + "").length();
        RandomAccessFile raf = new RandomAccessFile(fileName, "r");
        long totalLen = raf.length();
        CountDownLatch latch = new CountDownLatch(count);

        for (int i = 0; i < count; i++) {
            String partFileName = file.getPath() + "."
                    + leftPad((i + 1) + "", countLen, '0') + ".cvs";
            int readSize=byteSize;
            long startPos=(long)i * byteSize;
            long nextPos=(long)(i+1) * byteSize;
            if(nextPos>totalLen){
                readSize= (int) (totalLen-startPos);
            }
            new SplitRunnable(readSize, startPos, partFileName, file, latch).run();
            parts.add(partFileName);
        }
        latch.await();//等待所有文件写完
        //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行
        mergeRow(parts);
        return parts;
    }

    /**
     * 分割处理Runnable
     *
     * @author supeidong
     */
    private class SplitRunnable implements Runnable {
        int byteSize;
        String partFileName;
        File originFile;
        long startPos;
        CountDownLatch latch;
        public SplitRunnable(int byteSize, long startPos, String partFileName,
                             File originFile, CountDownLatch latch) {
            this.startPos = startPos;
            this.byteSize = byteSize;
            this.partFileName = partFileName;
            this.originFile = originFile;
            this.latch = latch;
        }

        public void run() {
            RandomAccessFile rFile;
            OutputStream os;
            try {
                rFile = new RandomAccessFile(originFile, "r");
                byte[] b = new byte[byteSize];
                rFile.seek(startPos);// 移动指针到每“段”开头
                int s = rFile.read(b);
                os = new FileOutputStream(partFileName);
                os.write(b, 0, s);
                os.flush();
                os.close();
                latch.countDown();
            } catch (IOException e) {
                log.error(e.getMessage());
                latch.countDown();
            }
        }
    }

    /**
     * 合并被切断的行
     *
     * @param parts
     */
    private void mergeRow(List<String> parts) {
        List<PartFile> partFiles = new ArrayList<PartFile>();
        try {
            //组装被切分表对象
            for (int i=0;i<parts.size();i++) {
                String partFileName=parts.get(i);
                File splitFileTemp = new File(partFileName);
                if (splitFileTemp.exists()) {
                    PartFile partFile = new PartFile();
                    BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));
                    String firstRow = reader.readLine();
                    String secondRow = reader.readLine();
                    String endRow = readLastLine(partFileName);
                    partFile.setPartFileName(partFileName);
                    partFile.setFirstRow(firstRow);
                    partFile.setEndRow(endRow);
                    if(i>=1){
                        String prePartFile=parts.get(i - 1);
                        String preEndRow = readLastLine(prePartFile);
                        partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));
                    }

                    partFiles.add(partFile);
                    reader.close();
                }
            }
            //进行需要合并的行的写入
            for (int i = 0; i < partFiles.size() - 1; i++) {
                PartFile partFile = partFiles.get(i);
                PartFile partFileNext = partFiles.get(i + 1);
                StringBuilder sb = new StringBuilder();
                if (partFileNext.getFirstIsFull()) {
                    sb.append("\r\n");
                    sb.append(partFileNext.getFirstRow());
                } else {
                    sb.append(partFileNext.getFirstRow());
                }
                writeLastLine(partFile.getPartFileName(),sb.toString());
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    /**
     * 得到某个字符出现的次数
     * @param s
     * @return
     */
    private int getCharCount(String s) {
        int count = 0;
        for (int i = 0; i < s.length(); i++) {
            if (s.charAt(i) == cvsSeparator) {
                count++;
            }
        }
        return count;
    }

    /**
     * 采用BufferedInputStream方式读取文件行数
     *
     * @param filename
     * @return
     */
    public int getFileRow(String filename) throws IOException {
        InputStream is = new BufferedInputStream(new FileInputStream(filename));
        byte[] c = new byte[1024];
        int count = 0;
        int readChars = 0;
        while ((readChars = is.read(c)) != -1) {
            for (int i = 0; i < readChars; ++i) {
                if (c[i] == '\n')
                    ++count;
            }
        }
        is.close();
        return count;
    }

    /**
     * 读取最后一行数据
     * @param filename
     * @return
     * @throws IOException
     */
    private String readLastLine(String filename) throws IOException {
        // 使用RandomAccessFile , 从后找最后一行数据
        RandomAccessFile raf = new RandomAccessFile(filename, "r");
        long len = raf.length();
        String lastLine = "";
        if(len!=0L) {
            long pos = len - 1;
            while (pos > 0) {
                pos--;
                raf.seek(pos);
                if (raf.readByte() == '\n') {
                    lastLine = raf.readLine();
                    lastLine=new String(lastLine.getBytes("8859_1"), "gbk");
                    break;
                }
            }
        }
        raf.close();
        return lastLine;
    }
    /**
     * 修改最后一行数据
     * @param fileName
     * @param lastString
     * @return
     * @throws IOException
     */
    private void writeLastLine(String fileName,String lastString){
        try {
            // 打开一个随机访问文件流,按读写方式
            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
            // 文件长度,字节数
            long fileLength = randomFile.length();
            //将写文件指针移到文件尾。
            randomFile.seek(fileLength);
            //此处必须加gbk,否则会出现写入乱码
            randomFile.write(lastString.getBytes("gbk"));
            randomFile.close();
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }
    /**
     * 左填充
     *
     * @param str
     * @param length
     * @param ch
     * @return
     */
    public static String leftPad(String str, int length, char ch) {
        if (str.length() >= length) {
            return str;
        }
        char[] chs = new char[length];
        Arrays.fill(chs, ch);
        char[] src = str.toCharArray();
        System.arraycopy(src, 0, chs, length - src.length, src.length);
        return new String(chs);
    }

    /**
     * 合并文件行内部类
     */
    class PartFile {
        private String partFileName;
        private String firstRow;
        private String endRow;
        private boolean firstIsFull;

        public String getPartFileName() {
            return partFileName;
        }

        public void setPartFileName(String partFileName) {
            this.partFileName = partFileName;
        }

        public String getFirstRow() {
            return firstRow;
        }

        public void setFirstRow(String firstRow) {
            this.firstRow = firstRow;
        }

        public String getEndRow() {
            return endRow;
        }

        public void setEndRow(String endRow) {
            this.endRow = endRow;
        }

        public boolean getFirstIsFull() {
            return firstIsFull;
        }

        public void setFirstIsFull(boolean firstIsFull) {
            this.firstIsFull = firstIsFull;
        }
    }

}

编程技巧