利用Spark Rdd生成Hfile直接导入到Hbase详解大数据

针对大批量插入Hbase的场景,如果单条记录插入的时候效率比较低下,如果可以利用Rdd生成Hfile的话,然后利用Bulk Load导入Hfile的话,则会大大提升导入的速度,废话不说,直接上代码:

1.利用Create创建表blog:create ‘blog’ ,’article’

利用Spark Rdd生成Hfile直接导入到Hbase详解大数据

2.创建数据文件 blog.txt

 利用Spark Rdd生成Hfile直接导入到Hbase详解大数据

 3.上传文件至hdfs

利用Spark Rdd生成Hfile直接导入到Hbase详解大数据

备注:因为之前文件已经上传了

4.Java版本代码

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.KeyValue; 
import org.apache.hadoop.hbase.client.HTable; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapred.TableOutputFormat; 
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; 
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.sql.*; 
import org.apache.spark.sql.hive.HiveContext; 
import org.apache.spark.sql.types.DataType; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 
import scala.Tuple2; 
 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.TimeUnit; 
 
/** 
 * Created by WangLiang on 2015/11/30. 
 */ 
public class Test { 
    private static Logger log = Logger.getLogger(HelloWorld.class); 
    public static void main(String[] args) { 
        try { 
            System.setProperty("javax.xml.parsers.DocumentBuilderFactory", 
                    "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"); 
            System.setProperty("javax.xml.parsers.SAXParserFactory", 
                    "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); 
	    //项目内部自己的配置类,可以忽略,其实就是设置sparkConf,然后获取到JavaSparkContext 
            String sparkMaster = Configure.instance.get("sparkMaster"); 
            String sparkJarAddress = Configure.instance.get("sparkJarAddress"); 
            String sparkExecutorMemory = Configure.instance.get("sparkExecutorMemory"); 
            String sparkCoresMax = Configure.instance.get("sparkCoresMax"); 
            String sparkLocalDir = Configure.instance.get("sparkLocalDir"); 
            log.info("initialize parameters"); 
            log.info("sparkMaster:" + sparkMaster); 
            log.info("sparkJarAddress:" + sparkJarAddress); 
            log.info("sparkExecutorMemory:" + sparkExecutorMemory); 
            log.info("sparkCoresMax:" + sparkCoresMax); 
            log.info("sparkLocalDir:" + sparkLocalDir); 
            SparkConf sparkConf = new SparkConf().setAppName("dse load application in Java"); 
            sparkConf.setMaster(sparkMaster); 
            if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) { 
                sparkConf.set("spark.executor.memory", sparkExecutorMemory); // 16g 
                sparkConf.set("spark.scheduler.mode", "FAIR"); 
                sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
                sparkConf.set("spark.kryo.registrator", "com.dahua.dse3.driver.dataset.DseKryoRegistrator"); 
                sparkConf.set("spark.cores.max", sparkCoresMax); 
                sparkConf.set("spark.akka.threads", "12"); 
                sparkConf.set("spark.local.dir", sparkLocalDir); 
                sparkConf.set("spark.shuffle.manager", "SORT"); 
                sparkConf.set("spark.network.timeout", "120"); 
                sparkConf.set("spark.rpc.lookupTimeout", "120"); 
                sparkConf.set("spark.executor.extraClassPath", "/usr/dahua/spark/executelib/hbase-protocol-0.98.3-hadoop2.jar"); 
                sparkConf.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"); 
                sparkConf.set("spark.driver.extraJavaOptions", "-XX:PermSize=256M -XX:MaxPermSize=512M"); 
                sparkConf.set("spark.network.timeout", "120"); 
 
            } 
            JavaSparkContext jsc = new JavaSparkContext(sparkConf); 
            if (!sparkJarAddress.isEmpty() && !sparkMaster.contains("local")) { 
                jsc.addJar(sparkJarAddress); 
            } 
 
            Configuration conf = HBaseConfiguration.create(); 
            String zk = "172.25.3.160,172.25.3.161,172.25.3.162"; 
            String tableName = "blog"; 
            conf.set("hbase.zookeeper.quorum", zk); 
            HTable table = new HTable(conf, tableName); 
            conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); 
            Job job = Job.getInstance(conf); 
            job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
            job.setMapOutputValueClass(KeyValue.class); 
            HFileOutputFormat.configureIncrementalLoad(job, table); 
            String hdfsPath = "hdfs://mycluster/raw/hfile/blog.txt"; 
            JavaRDD<String> lines = jsc.textFile(hdfsPath); 
            JavaPairRDD<ImmutableBytesWritable,KeyValue> hfileRdd = lines.mapToPair(new PairFunction<String, ImmutableBytesWritable, KeyValue>() { 
                public Tuple2<ImmutableBytesWritable, KeyValue> call(String v1) throws Exception { 
                    String[] tokens = v1.split(" "); 
                    String rowkey = tokens[0]; 
                    String content = tokens[1]; 
                    KeyValue keyValue = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("article"), Bytes.toBytes("value"), Bytes.toBytes(content)); 
                    return new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), keyValue); 
                } 
            }); 
            String hfilePath = "hdfs://mycluster/hfile/blog.hfile"; 
            hfileRdd.saveAsNewAPIHadoopFile(hfilePath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf); 
			 
			//利用bulk load hfile 
            LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf); 
            bulkLoader.doBulkLoad(new Path(hfilePath), table); 
 
        }catch(Exception e){ 
            e.printStackTrace(); 
        }finally { 
            ; 
        } 
    } 
 
} 

5.scan blog表,数据已经入库

利用Spark Rdd生成Hfile直接导入到Hbase详解大数据

参考文章链接如下:http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9317.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论