如何利用Spark提高批量插入Solr的效率详解大数据

有时候我们会碰到这样的场景:利用Spark批量插入数据。因为Spark相比MR编程更方便,更容易上手。因此接下来讲讲利用Spark批量插入数据时候的注意点。假设批量往SolrCloud里面插入数据。

1:利用MapPartitions针对每个分区的数据进行遍历插入,而不是利用Map针对每条数据进行插入

原因:当进行插入的时候,需要获取和SolrCloud的连接,如果利用Map针对每条数据进行插入的话,则需要获取N条连接(N为数据的总数);如果利用MapPartitions进行插入的话,则只需要获取M条连接(M为分区的总数)

2:在Excutor端初始化1个链接池,每个Excutor端的链接从这个链接池获取。这样做的好处是:1)链接池保存着和SolrCloud的长链接,一旦打开,就不关闭,除非Excutor退出;2)链接池可以控制每个Excutor连接SolrCloud的链接数,防止Rdd分区过多的情况下,由于过高频繁的插入造成SolrCloud崩溃。

Java实例代码如下:

1)利用MapPartitions插入代码块:

//finalRdd为JavaRDD<SomeObjects> 
 JavaRDD<SomeObjects> mapPartitionsRdd = finalRdd.mapPartitions(new FlatMapFunction<Iterator<SomeObjects>, SomeObjects>() { 
                            public Iterable<SomeObjects> call(Iterator<SomeObjects> iterator) 
                                    throws Exception { 
                           
                                final String collection = source; 
                                //初始化链接池 
                                BlockingQueue<CloudSolrServer> serverList = SolrCollectionPool.instance.getCollectionPool(zkHost, collection, poolSize); 
                                CloudSolrServer cloudServer = serverList.take(); 
 
                                List<SomeObjects> batchSomeObjects = new LinkedList<SomeObjects>(); 
                                List<SomeObjects> resultObjects = new LinkedList<SomeObjects>(); 
 
                                try { 
                                    while (flag) { 
                                        for (int i = 0; i < dmlRatePerBatch && iterator.hasNext(); i++) { 
                                            batchSomeObjects.add(iterator.next()); 
                                        } 
                                        if (batchSomeObjects.size() > 0) { 
                                            //插入solr 
                                            List<SolrInputDocument> solrInputDocumentList = RecordConverterAdapter.******(fieldInfoList, batchSomeObjects);//将数据转化为Solr的格式 
                                            cloudServer.add(solrInputDocumentList); 
                                            resultObjects.addAll(batchSomeObjects); 
                                            //清空 
                                            batchSomeObjects.clear(); 
                                        } else { 
                                            flag = false; 
                                        } 
                                    } 
                                } catch (Exception e) { 
                                    e.printStackTrace(); 
                                } finally { 
                                    serverList.put(cloudServer); 
                                } 
                                return resultObjects; 
                            } 
                        });

2)链接池代码块:

public class SolrCollectionPool implements Serializable { 
    private static Logger log = Logger.getLogger(SolrCollectionPool.class); 
    public static SolrCollectionPool instance = new SolrCollectionPool(); 
    private static Map<String, BlockingQueue<CloudSolrServer>> poolMap = new ConcurrentHashMap<String, BlockingQueue<CloudSolrServer>>(); 
    public SolrCollectionPool() { 
 
    } 
    public synchronized BlockingQueue<CloudSolrServer> getCollectionPool(String zkHost, String collection, final int size) { 
        if (poolMap.get(collection) == null) { 
            log.info("solr:" + collection + " poolsize:" + size); 
            System.setProperty("javax.xml.parsers.DocumentBuilderFactory", 
                    "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"); 
            System.setProperty("javax.xml.parsers.SAXParserFactory", 
                    "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); 
            BlockingQueue<CloudSolrServer> serverList = new LinkedBlockingQueue<CloudSolrServer>(size); 
            for (int i = 0; i < size; i++) { 
                CloudSolrServer cloudServer = new CloudSolrServer(zkHost); 
                cloudServer.setDefaultCollection(collection); 
                cloudServer.setZkClientTimeout(Utils.ZKCLIENTTIMEOUT); 
                cloudServer.setZkConnectTimeout(Utils.ZKCONNECTTIMEOUT); 
                cloudServer.connect(); 
                serverList.add(cloudServer); 
            } 
            poolMap.put(collection, serverList); 
        } 
        return poolMap.get(collection); 
    } 
 
    public static SolrCollectionPool instance() { 
        return SolrCollectionPool.instance; 
    } 
} 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/9316.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论