在hbase-server项目中的org.apache.hadoop.hbase.regionserver.HRegionServer类中。
public HRegionServer(Configuration conf, CoordinatedStateManager csm) throws IOException, InterruptedException { this.fsOk = true; this.conf = conf; checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf); Superusers.initialize(conf); FSUtils.setupShortCircuitRead(this.conf); // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); // Config'ed params //读取client的最大重试次数 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); //检查memstore是否超过hbase.hregion.memstore.flush.size设置的flush大小的时间间隔 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); //定时向master发送此rs的报告的间隔时间,默认为3s=3000ms this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); this.sleeper = new Sleeper(this.msgInterval, this); boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null; //配置向master进行region的region个数 this.numRegionsToReport = conf.getInt( "hbase.regionserver.numregionstoreport", 10); //超时时间 this.operationTimeout = conf.getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.shortOperationTimeout = conf.getInt( HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); this.abortRequested = false; this.stopped = false; rpcServices = createRpcServices(); this.startcode = System.currentTimeMillis(); if (this instanceof HMaster) { useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY); } else { useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY); } String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead : rpcServices.isa.getHostName(); serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); // login the zookeeper client principal (if using security) ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); // login the server principal (if using secure Hadoop) login(userProvider, hostName); //生成用来记录此rs中所有的memstore所占大小的实例 regionServerAccounting = new RegionServerAccounting(); uncaughtExceptionHandler = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { abort("Uncaught exception in service thread " + t.getName(), e); } }; useZKForAssignment = ConfigUtil.useZKForAssignment(conf); // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else // underlying hadoop hdfs accessors will be going against wrong filesystem // (unless all is set to defaults). FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase // checksum verification enabled, then automatically switch off hdfs checksum verification. boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); this.fs = new HFileSystem(this.conf, useHBaseChecksum); this.rootDir = FSUtils.getRootDir(this.conf); this.tableDescriptors = new FSTableDescriptors( this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false); service = new ExecutorService(getServerName().toShortString()); spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration()); // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher //创建当前rs与zk的连接 zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, canCreateBaseZNode()); this.csm = (BaseCoordinatedStateManager) csm; this.csm.initialize(this); this.csm.start(); tableLockManager = TableLockManager.createTableLockManager( conf, zooKeeper, serverName); //创建master的跟踪器,等待master的启动(在zk上注册) masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker.start(); //创建cluster的跟踪器,等待cluster的启动,也就是master注册clusterid到zk后,表示集群已经启动 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); clusterStatusTracker.start(); } this.configurationManager = new ConfigurationManager(); rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); this.choreService = new ChoreService(getServerName().toString(), true); if (!SystemUtils.IS_OS_WINDOWS) { Signal.handle(new Signal("HUP"), new SignalHandler() { public void handle(Signal signal) { getConfiguration().reloadConfiguration(); configurationManager.notifyAllObservers(getConfiguration()); } }); } }
在上述构造函数中,所做操作如下:
A. 设置一些变量的初始值:第501行、502行。
B. 从配置文件中读取一些配置项的值。
C. 启动rpc服务。第535行。
D. 确定hostname。第542行
E. 确定serverName。第544行。
F. 实例化rpcController和rpcRetryingCaller。
G. 若使用了kerberos认证,则登录zookeeper客户端认证,登录server认证。第550-553行。
H. 向zookeeper注册节点。
I. 设置hdfs的defaultFS目录(要确保hbase-site.xml里配置的root目录和hadoop中的hdfs-site.xml中的defaultFS目录一模一样)。
J. 设置zookeepeerWatcher,所需参数有配置Configuration,hostname+端口号,本实例,znode。注意此处的canCreateBaseNode为false,即在HRegionServer启动时是不创建基本的znode节点的。第583行。
在587行的this.csm.initialize(this)是为该RegionServer初始化了zookeeper的监测,如watcher,splitLogWorkerCoordiantion,closeRegionCoordination等。用于zookeeper来监测RegionServer的运行状态。
K. 实例化锁表管理员
L. 实例化追踪master服务节点的实例masterAddressTracker并启动该实例。
Hbase使用多Master来解决Master单点故障的问题。主Master服务故障时,它与Zookeeper的心跳延迟超过阈值,Zookeeper路径下的数据被清理,备Master上的ActiveMasterManager服务会竞争该Master路径,成为主Master。MasterAddressTracker是RS内部监听Master节点变化的追踪器。
M. 实例化追踪集群状态的实例clusterStatusTracker并启动该实例服务。第596行。
ClusterStatusTracker是HBase集群状态追踪器。该选项可以标识当前集群的状态,及它的启动时间。该设置选项有利于集群中的各个工作节点(RS)统一执行启动和退出操作。
上述两个追踪实例均需要zookeeper做参数。
N. 启动rpcService。
O. Run()方法作用在HRegionServer的启动。
P. initializeZookeeper()方法:第693行。
Q. initalizeThreads()方法。第831行。
R. run()方法:886行。首先向master注册,告诉master此regionServer启动了(906的循环),待和master通信之后,就进入932行的循环。
S. 在第960行附近的这段代码中,regionServer会定时向master报告心跳。如果此时与master通信终端,则regionServer会再次到zookeeper上找master地址,以便与master通信。
long now =System.currentTimeMillis(); if ((now - lastMsg) >= msgInterval) { tryRegionServerReport(lastMsg, now); lastMsg = System.currentTimeMillis(); doMetrics(); }
下面为网上找到的HRegionServer类的继承图:
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9521.html