针对大批量插入Hbase的场景,如果单条记录插入的时候效率比较低下,如果可以利用Rdd生成Hfile的话,然后利用Bulk Load导入Hfile的话,则会大大提升导入的速度,废话不说,直接上代码:
1.利用Create创建表blog:create ‘blog’ ,’article’

2.创建数据文件 blog.txt

3.上传文件至hdfs

备注:因为之前文件已经上传了
4.Java版本代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* Created by WangLiang on 2015/11/30.
*/
public class Test {
private static Logger log = Logger.getLogger(HelloWorld.class);
public static void main(String[] args) {
try {
System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
"com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");
//项目内部自己的配置类,可以忽略,其实就是设置sparkConf,然后获取到JavaSparkContext
String sparkMaster = Configure.instance.get("sparkMaster");
String sparkJarAddress = Configure.instance.get("sparkJarAddress");
String sparkExecutorMemory = Configure.instance.get("sparkExecutorMemory");
String sparkCoresMax = Configure.instance.get("sparkCoresMax");
String sparkLocalDir = Configure.instance.get("sparkLocalDir");
log.info("initialize parameters");
log.info("sparkMaster:" + sparkMaster);
log.info("sparkJarAddress:" + sparkJarAddress);
log.info("sparkExecutorMemory:" + sparkExecutorMemory);
log.info("sparkCoresMax:" + sparkCoresMax);
log.info("sparkLocalDir:" + sparkLocalDir);
SparkConf sparkConf = new SparkConf().setAppName("dse load application in Java");
sparkConf.setMaster(sparkMaster);
if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) {
sparkConf.set("spark.executor.memory", sparkExecutorMemory); // 16g
sparkConf.set("spark.scheduler.mode", "FAIR");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "com.dahua.dse3.driver.dataset.DseKryoRegistrator");
sparkConf.set("spark.cores.max", sparkCoresMax);
sparkConf.set("spark.akka.threads", "12");
sparkConf.set("spark.local.dir", sparkLocalDir);
sparkConf.set("spark.shuffle.manager", "SORT");
sparkConf.set("spark.network.timeout", "120");
sparkConf.set("spark.rpc.lookupTimeout", "120");
sparkConf.set("spark.executor.extraClassPath", "/usr/dahua/spark/executelib/hbase-protocol-0.98.3-hadoop2.jar");
sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps");
sparkConf.set("spark.driver.extraJavaOptions", "-XX:PermSize=256M -XX:MaxPermSize=512M");
sparkConf.set("spark.network.timeout", "120");
}
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) {
jsc.addJar(sparkJarAddress);
}
Configuration conf = HBaseConfiguration.create();
String zk = "172.25.3.160,172.25.3.161,172.25.3.162";
String tableName = "blog";
conf.set("hbase.zookeeper.quorum", zk);
HTable table = new HTable(conf, tableName);
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
Job job = Job.getInstance(conf);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
String hdfsPath = "hdfs://mycluster/raw/hfile/blog.txt";
JavaRDD<String> lines = jsc.textFile(hdfsPath);
JavaPairRDD<ImmutableBytesWritable,KeyValue> hfileRdd = lines.mapToPair(new PairFunction<String, ImmutableBytesWritable, KeyValue>() {
public Tuple2<ImmutableBytesWritable, KeyValue> call(String v1) throws Exception {
String[] tokens = v1.split(" ");
String rowkey = tokens[0];
String content = tokens[1];
KeyValue keyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("article"), Bytes.toBytes("value"), Bytes.toBytes(content));
return new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), keyValue);
}
});
String hfilePath = "hdfs://mycluster/hfile/blog.hfile";
hfileRdd.saveAsNewAPIHadoopFile(hfilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
//利用bulk load hfile
LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
bulkLoader.doBulkLoad(new Path(hfilePath), table);
}catch(Exception e){
e.printStackTrace();
}finally {
;
}
}
}
5.scan blog表,数据已经入库

参考文章链接如下:http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9317.html