Zookeeper 允许多个客户端在指定的一个或一些节点上添加监听事件,当被监听的节点发生状态变化时,Zookeeper 会把节点变化的细节通知到相应的客户端,这就是 Zookeeper 分布式协调机制的核心本质。
为了实现分布式协调功能,Zookeeper 引入了 Watcher 机制来进行事件监听,但是由于原生的方法需要开发人员反复注册,使用起来很不方便,所以我们通常使用第三方组件 Curator 来实现。Curator 对 Zookeeper 有关 Watcher 的原生方法进行了高度封装,引入了缓存 Cache 来实现对 ZooKeeper 服务端事件的监听,使用起来非常方便。
Curator 提供了 3 种 Cache 对象来实现对 Zookeeper 节点的状态变化监听,具体如下:
-
NodeCache : 只监听具体的某一个的节点
-
PathChildrenCache : 只监听一个节点下所有的子孙节点,但是不监听本身节点
-
TreeCache : 监听本身节点以及所有子孙节点,类似于 PathChildrenCache 和 NodeCache 的组合
好了,话不多说,直接上代码,在本篇博客的最后,会提供源代码下载。
一、搭建工程
新建一个 maven 项目,导入相关 jar 包,内容如下:
有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。
<dependencies>
<!--导入 Spring 的 jar 包-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.18</version>
</dependency>
<!--导入 Spring 整合 junit 的 jar 包-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<!--导入 curator 的 jar 包-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--导入日志相关的 jar 包-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
搭建后的最终工程如下图所示,非常简单:
本篇博客的 Demo 的搭建,与上一篇博客很相似,具体实现主要在单元测试中的 zkWatchTest 类中。
二、代码实现
本篇博客除了单元测试中的类不一样之外,其它细节与上一篇博客完全相同,为了保持完整性,这里还是罗列一下。
首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:
# zookeeper的连接字符串
# 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址
# 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3
zk.connectString=127.0.0.1:2181
# zookeeper的会话超时时间
# 单位:毫秒,默认是 60 秒
zk.sessionTimeoutMs=60000
# zookeeper的连接超时时间
# 单位:毫秒,默认是 15 秒
zk.connectionTimeoutMs=15000
# zookeeper默认操作的根节点
# 所有的增删改查操作,默认在该节点下进行
zk.namespace=jobs
然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:
package com.jobs.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
//加载 zookeeper.properties 文件内容
@PropertySource("classpath:zookeeper.properties")
public class zookeeperConfig {
@Value("${zk.connectString}")
private String connectString;
@Value("${zk.sessionTimeoutMs}")
private Integer sessionTimeoutMs;
@Value("${zk.connectionTimeoutMs}")
private Integer connectionTimeoutMs;
@Value("${zk.namespace}")
private String namespace;
//获取 Curator 的客户端连接
@Bean
public CuratorFramework getCuratorFramework(){
//配置重试策略,如果连接失败,最多重试 1 次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.namespace(namespace)
.retryPolicy(retryPolicy)
.build();
client.start();
return client;
}
}
package com.jobs.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
//采用 Spring 集成 Curator,导入 zookeeperConfig 配置类
@Configuration
@Import(zookeeperConfig.class)
public class springConfig { }
下面我们就使用 junit 单元测试,使用 Java 采用 Curator 实现对 Zookeeper 节点的监听。
需要注意的是:运行以下代码对节点监听后,你需要使用 Zookeeper 自带的客户端操作被监听的节点,验证监听效果。
package com.jobs.test;
import com.jobs.config.springConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = springConfig.class)
public class zkWatchTest {
@Autowired
private CuratorFramework client;
//注意:由于 namespace 配置的是 jobs
//因此以下的所有操作,都是默认在 /jobs 节点下进行操作
@Test
public void nodeCacheTest() throws Exception {
//NodeCache 能够进行单节点监控“增删改”操作,并通知所连接的客户端发生的变化细节
//以下代码运行后,请使用命令行操作 zookeeper 进行测试,分别运行以下命令:
//1.创建 /jobs 节点:create /jobs
//需要注意的是:
//上面采用的是 zookeeper 自己的客户端命令行创建节点,没有给节点存储值,默认存储值是 null
//如果采用的是 java 调用 curator 的方法创建节点,没有设置存储值的话,默认是客户端 ip 地址
//2.在 jobs 下创建 test1 节点:create /jobs/test1
//3.修改 test1 节点的内容:set /jobs/test1 hehe
//4.删除节点:delete /jobs/test1
//使用 NodeCache 监控 /jobs/test1 节点的变化
NodeCache nodeCache = new NodeCache(client, "/test1");
//当 /jobs/test1 节点发生增删改操作时,通知当前客户端调用监听的方法
//采用 Lambda 表达式的方式写监听事件
nodeCache.getListenable().addListener(() -> {
//获取所监听的节点路径
String path = nodeCache.getPath();
if (nodeCache.getCurrentData() == null) {
System.out.println("节点[" + path + "]被删除了...");
} else {
//获取数据版本号
int version = nodeCache.getCurrentData().getStat().getVersion();
//获取数据值
byte[] bytes = nodeCache.getCurrentData().getData();
String data = "";
if (bytes != null) {
data = new String(bytes);
}
if (version == 0) {
System.out.println("节点[" + path + "]被创建了,存储的数据为:" + data);
} else {
System.out.println("节点[" + path + "]被修改了,修改后的数据为:" + data);
}
}
});
/*
//采用匿名内部类的方式写监听事件
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
//代码同上.....
}
});
*/
//开启监听
nodeCache.start();
//由于是在单元测试中,测试监听,为了方式程序结束,这里简单采用休眠的方式进行阻塞
//真实应用场景下,是依靠网站的线程保持监听持续运行的
//休眠 5 分钟,请在 5 分钟内,采用命令行对 /jobs/test1 节点进行增删改操作,验证效果
Thread.sleep(300000);
}
//------------------------------------------------
@Test
public void pathChildrenCacheTest() throws Exception {
//PathChildrenCache 能够对一个节点下所有的子节点监控“增删改”操作,并通知所连接的客户端发生的变化细节
//注意:只监视子节点的变化,不监视父节点的变化
//以下代码,只监视 /jobs/test2 下所有子节点的变化,不监视 /jobs/test2 的变化
//以下代码运行后,请使用命令行操作 zookeeper 进行测试,分别运行以下命令:
//1.创建 /jobs/test2 节点:create /jobs/test2
//2.在 /jobs/test2 下创建 p1 和 p2 两个节点:
//create /jobs/test2/p1
//create /jobs/test2/p2
//3.修改 p1 和 p2 两个节点的内容:
//set /jobs/test2/p1 hehe
//set /jobs/test2/p2 haha
//4.删除节点:
//delete /jobs/test2/p1
//delete /jobs/test2/p2
//delete /jobs/test2
PathChildrenCache pathChildrenCache =
new PathChildrenCache(client, "/test2", true);
//采用 Lambda 表达式的方式写监听事件
pathChildrenCache.getListenable().addListener((client, event) -> {
//获取事件类型
PathChildrenCacheEvent.Type type = event.getType();
byte[] bytes;
String data;
String path;
if (type == PathChildrenCacheEvent.Type.CHILD_ADDED) {
//添加子节点
//获取添加的子节点
path = event.getData().getPath();
//获取添加的子节点中的数据
bytes = event.getData().getData();
data = bytes != null ? new String(bytes) : "";
System.out.println("添加了子节点[" + path + "],存储数据为:" + data);
} else if (type == PathChildrenCacheEvent.Type.CHILD_UPDATED) {
//修改子节点
//获取修改的子节点
path = event.getData().getPath();
//获取修改后的子节点中的数据
bytes = event.getData().getData();
data = bytes != null ? new String(bytes) : "";
System.out.println("子节点[" + path + "]修改后的数据为:" + data);
} else if (type == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
//删除子节点
//获取被删除的子节点
path = event.getData().getPath();
System.out.println("子节点[" + path + "]被删除了...");
}
});
/*
//采用匿名内部类的方式写监听事件
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
//代码同上.....
}
});
*/
pathChildrenCache.start();
//由于是在单元测试中,测试监听,为了方式程序结束,这里简单采用休眠的方式进行阻塞
//真实应用场景下,是依靠网站的线程保持监听持续运行的
//休眠 5 分钟,请在 5 分钟内,采用命令行对 /jobs/test2 节点进行增删改操作,验证效果
Thread.sleep(300000);
}
//------------------------------------------------
@Test
public void treeCacheTest() throws Exception {
//TreeCache 能够对一个节点,以及该节点下所有的子节点监控“增删改”操作,并通知所连接的客户端发生的变化细节
//TreeCache 相当于 NodeCache 和 PathChildrenCache 的组合。
//TreeCache 的使用方式,跟 PathChildrenCache 很相似,这里就不详细介绍
//TreeCache 跟 PathChildrenCache 的唯一区别在于,TreeCache 除了监控所有子节点之外,还监视本身节点
TreeCache treeCache = new TreeCache(client, "/test3");
//采用 Lambda 表达式的方式写监听事件
treeCache.getListenable().addListener((client, event) -> {
//获取事件类型
TreeCacheEvent.Type type = event.getType();
byte[] bytes;
String data;
String path;
if (type == TreeCacheEvent.Type.NODE_ADDED) {
//添加节点
//获取添加的节点
path = event.getData().getPath();
//获取添加的节点中的数据
bytes = event.getData().getData();
data = bytes != null ? new String(bytes) : "";
System.out.println("添加了节点[" + path + "],存储数据为:" + data);
} else if (type == TreeCacheEvent.Type.NODE_UPDATED) {
//修改节点
//获取修改的节点
path = event.getData().getPath();
//获取修改后的节点中的数据
bytes = event.getData().getData();
data = bytes != null ? new String(bytes) : "";
System.out.println("节点[" + path + "]修改后的数据为:" + data);
} else if (type == TreeCacheEvent.Type.NODE_REMOVED) {
//删除节点
//获取被删除的节点
path = event.getData().getPath();
System.out.println("节点[" + path + "]被删除了...");
}
});
/*
//采用匿名内部类的方式写监听事件
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
//代码同上.....
}
});
*/
treeCache.start();
//由于是在单元测试中,测试监听,为了方式程序结束,这里简单采用休眠的方式进行阻塞
//真实应用场景下,是依靠网站的线程保持监听持续运行的
//休眠 5 分钟,请在 5 分钟内,采用命令行对 /jobs/test2 节点进行增删改操作,验证效果
Thread.sleep(300000);
}
}
在运行以上单元测试方法时,请使用 Zookeeper 自带的客户端,通过命令行操作 Zookeeper 被监听的节点,进行效果验证。
Ok,有关使用 Java 通过 Curator 组件的 API 实现对 Zookeeper 的节点进行监听,从而实现 Zookeeper 的分布式协调机制,已经介绍完毕,总体非常简单,希望对大家有所帮助。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_watcher.zip
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/273707.html