有时候我们会碰到这样的场景:利用Spark批量插入数据。因为Spark相比MR编程更方便,更容易上手。因此接下来讲讲利用Spark批量插入数据时候的注意点。假设批量往SolrCloud里面插入数据。
1:利用MapPartitions针对每个分区的数据进行遍历插入,而不是利用Map针对每条数据进行插入
原因:当进行插入的时候,需要获取和SolrCloud的连接,如果利用Map针对每条数据进行插入的话,则需要获取N条连接(N为数据的总数);如果利用MapPartitions进行插入的话,则只需要获取M条连接(M为分区的总数)
2:在Excutor端初始化1个链接池,每个Excutor端的链接从这个链接池获取。这样做的好处是:1)链接池保存着和SolrCloud的长链接,一旦打开,就不关闭,除非Excutor退出;2)链接池可以控制每个Excutor连接SolrCloud的链接数,防止Rdd分区过多的情况下,由于过高频繁的插入造成SolrCloud崩溃。
Java实例代码如下:
1)利用MapPartitions插入代码块:
//finalRdd为JavaRDD<SomeObjects>
JavaRDD<SomeObjects> mapPartitionsRdd = finalRdd.mapPartitions(new FlatMapFunction<Iterator<SomeObjects>, SomeObjects>() {
public Iterable<SomeObjects> call(Iterator<SomeObjects> iterator)
throws Exception {
final String collection = source;
//初始化链接池
BlockingQueue<CloudSolrServer> serverList = SolrCollectionPool.instance.getCollectionPool(zkHost, collection, poolSize);
CloudSolrServer cloudServer = serverList.take();
List<SomeObjects> batchSomeObjects = new LinkedList<SomeObjects>();
List<SomeObjects> resultObjects = new LinkedList<SomeObjects>();
try {
while (flag) {
for (int i = 0; i < dmlRatePerBatch && iterator.hasNext(); i++) {
batchSomeObjects.add(iterator.next());
}
if (batchSomeObjects.size() > 0) {
//插入solr
List<SolrInputDocument> solrInputDocumentList = RecordConverterAdapter.******(fieldInfoList, batchSomeObjects);//将数据转化为Solr的格式
cloudServer.add(solrInputDocumentList);
resultObjects.addAll(batchSomeObjects);
//清空
batchSomeObjects.clear();
} else {
flag = false;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
serverList.put(cloudServer);
}
return resultObjects;
}
});
2)链接池代码块:
public class SolrCollectionPool implements Serializable {
private static Logger log = Logger.getLogger(SolrCollectionPool.class);
public static SolrCollectionPool instance = new SolrCollectionPool();
private static Map<String, BlockingQueue<CloudSolrServer>> poolMap = new ConcurrentHashMap<String, BlockingQueue<CloudSolrServer>>();
public SolrCollectionPool() {
}
public synchronized BlockingQueue<CloudSolrServer> getCollectionPool(String zkHost, String collection, final int size) {
if (poolMap.get(collection) == null) {
log.info("solr:" + collection + " poolsize:" + size);
System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
"com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
BlockingQueue<CloudSolrServer> serverList = new LinkedBlockingQueue<CloudSolrServer>(size);
for (int i = 0; i < size; i++) {
CloudSolrServer cloudServer = new CloudSolrServer(zkHost);
cloudServer.setDefaultCollection(collection);
cloudServer.setZkClientTimeout(Utils.ZKCLIENTTIMEOUT);
cloudServer.setZkConnectTimeout(Utils.ZKCONNECTTIMEOUT);
cloudServer.connect();
serverList.add(cloudServer);
}
poolMap.put(collection, serverList);
}
return poolMap.get(collection);
}
public static SolrCollectionPool instance() {
return SolrCollectionPool.instance;
}
}
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9316.html