程序导入大数据(大数据导入方式)


本文介绍如何通过编写代码的方式,离线导入大数据量到PolarDB-X 1.0数据库。

背景信息
假设当前数据库有一个表需要导入到PolarDB-X 1.0数据库中,数据量大致为814万,表结构如下。

public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException,
        SQLException {

        URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");

        File dataFile = new File(url.toURI());

        String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) values(?,?,?,?,?,?)";

        int batchSize = 10000;

        try (

            Connection connection = getConnection("XXXXX.drds.aliyuncs.com", 3306, "XXXXX",
                "XXXX",
                "XXXX");
            BufferedReader br = new BufferedReader(new FileReader(dataFile))) {
            String line;
            PreparedStatement st = connection.prepareStatement(sql);
            long startTime = System.currentTimeMillis();
            int batchCount = 0;
            while ((line = br.readLine()) != null) {
                String[] data = line.split(",");
                st.setInt(1, Integer.valueOf(data[0]));
                st.setInt(2, Integer.valueOf(data[1]));

                st.setObject(3, "".equals(data[2]) ? null : Integer.valueOf(data[2]));
                st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]));
                st.setObject(5, "".equals(data[4]) ? null : Integer.valueOf(data[4]));
                if (data.length >= 6) {
                    st.setObject(6, data[5]);
                }
                st.addBatch();
                if (++batchCount % batchSize == 0) {
                    st.executeBatch();
                    System.out.println(String.format("insert %d record", batchCount));
                }
            }
            if (batchCount % batchSize != 0) {
                st.executeBatch();
            }
            long cost = System.currentTimeMillis() – startTime;

            System.out.println(String.format("Take %d second,insert %d record, tps %d", cost/1000,batchCount, batchCount/(cost/1000)  ));

        }

    }

    /**
     * 获取数据库连接
     *
     * @param host     数据库地址
     * @param port     端口
     * @param database 数据库名称
     * @param username 用户名
     * @param password 密码
     * @return
     * @throws ClassNotFoundException
     * @throws SQLException
     */
    private static Connection getConnection(String host, int port, String database, String username, String password)
        throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        String url = String.format(
            "jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port,
            database);
        Connection con = DriverManager.getConnection(url, username, password);
        return con;
    }
            

简单说一下我们的思路,这里主要说思路,更细节的技术问题。

第一步:把Excel转成CSV文件,这里可以是系统使用人员手动转换,也可以由程序来转换。
然后先导入Excel中的5000条信息到人员信息表。后台数据库用存储过程实现,使用merge的
方式进行增量导入。

第二步:关联其他表,然后将符合条件的结果集作为值直接插入到临时表,这个过程也是在
存储过程中实现。这里有一个技巧,把SELECT的结果集,作为INSERT语句的VALUES,这样
能比逐条处理速度要快很多。

第三步:使用MERGE的方式,将临时表的数据,增量导入目标表。这个过程也在存储过程中
实现。

以上的案例,数据校验的逻辑不是很多,只有一个重复性校验,所以使用的是方式。
如果业务上的数据校验逻辑比较复杂,可能性能就会降低很多,这时就要考虑其他解决
方案。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/291360.html

(0)
上一篇 2022年10月18日
下一篇 2022年10月18日

发表回复

登录后才能评论