前面已经介绍了使用命令行操作 Zookeeper,方便我们对 Zookeeper 有一个整体的认识。
Zookeeper 本质上就是一个 NoSQL 数据库,只不过其存储的数据结构是树状结构形式,理解起来很简单。
对于一个新手小白来说,面对 Zookeeer,介绍一大堆概念,没啥用处,没有什么比代码来得更加实际一些。
本篇博客通过编写 Java 代码对 Zookeeper 进行增删改查,让大家轻松掌握 Zookeeper 的基本操作,增强自信心。
在本篇博客的最后会提供源代码 Demo 下载。
一、搭建工程
新建一个 maven 项目,导入相关 jar 包,内容如下:
有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。
<dependencies>
<!--导入 Spring 的 jar 包-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.18</version>
</dependency>
<!--导入 Spring 整合 junit 的 jar 包-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<!--导入 curator 的 jar 包-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--导入日志相关的 jar 包-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
搭建后的最终工程如下图所示,非常简单:
使用 Java 操作 Zookeeper 的方式很多,相比而言采用 Curator 组件比较简单,因此也比较流行。
本篇博客的 Demo 采用 Spring 集成 Curator 组件的方式,通过 Java 调用其 API 方法对 Zookeeper 进行增删改查。
需要导入 curator-framework 和 curator-recipes 两个 jar 包,请参考官网,需要注意 Curator 与 zookeeper 版本对应问题。
Curator 的官网地址是:https://curator.apache.org
二、代码细节
首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:
# zookeeper的连接字符串
# 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址
# 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3
zk.connectString=127.0.0.1:2181
# zookeeper的会话超时时间
# 单位:毫秒,默认是 60 秒
zk.sessionTimeoutMs=60000
# zookeeper的连接超时时间
# 单位:毫秒,默认是 15 秒
zk.connectionTimeoutMs=15000
# zookeeper默认操作的根节点
# 所有的增删改查操作,默认在该节点下进行
zk.namespace=jobs
然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:
package com.jobs.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
//加载 zookeeper.properties 文件内容
@PropertySource("classpath:zookeeper.properties")
public class zookeeperConfig {
@Value("${zk.connectString}")
private String connectString;
@Value("${zk.sessionTimeoutMs}")
private Integer sessionTimeoutMs;
@Value("${zk.connectionTimeoutMs}")
private Integer connectionTimeoutMs;
@Value("${zk.namespace}")
private String namespace;
//获取 Curator 的客户端连接
@Bean
public CuratorFramework getCuratorFramework(){
//配置重试策略,如果没有连接失败,最多重试 1 次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.namespace(namespace)
.retryPolicy(retryPolicy)
.build();
client.start();
return client;
}
}
package com.jobs.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
//采用 Spring 集成 Curator,导入 zookeeperConfig 配置类
@Configuration
@Import(zookeeperConfig.class)
public class springConfig { }
下面我们就使用 junit 单元测试,使用 Java 采用 Curator 的 API 方法操作 Zookeeper,具体如下所示:
package com.jobs.test;
import com.jobs.config.springConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = springConfig.class)
public class zkTest {
@Autowired
private CuratorFramework client;
//注意:
//由于我在 zookeeper.properties 下配置的 namespace 是 jobs
//因此下面对 zookeeper 的所有操作,默认情况下都是在 /jobs 节点下进行操作的
@Test
public void createTest() throws Exception {
//默认情况下,创建的都是持久化的节点,一旦创建,永久存储
//在创建节点时,没有指定存储的数据,默认情况下存储的是当前客户端机器的 ip 地址
String path1 = client.create().forPath("/test1");
System.out.println("成功创建节点:" + path1);
//获取节点中存储的数据,由于在创建时没有存储数据,所以发现存储的是当前客户端机器的 ip 地址
byte[] data1 = client.getData().forPath("/test1");
System.out.println("存储的数据为:" + new String(data1));
System.out.println("-------------------------------------");
//创建节点时,同时存储数据
String path2 = client.create().forPath("/test2", "乔京飞".getBytes());
System.out.println("成功创建节点:" + path2);
//获取节点中存储的数据
byte[] data2 = client.getData().forPath("/test2");
System.out.println("存储的数据为:" + new String(data2));
}
@Test
public void createTempTest() throws Exception {
//创建临时节点,一旦当前连接断开,zookeeper 会自动销毁所创建的节点
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3", "任肥肥".getBytes());
System.out.println("成功创建临时节点:" + path);
byte[] data = client.getData().forPath("/test3");
System.out.println("存储的数据为:" + new String(data));
//注意:
//由于当前方法执行完后,zookeeper 连接就断开了,临时节点就销毁了
//所以你可以通过命令行的方式,查看 zookeeper 中 /jobs 下的节点,发现找不到 /jobs/test3 节点
//但是:通过上面运行的程序,打印出节点信息和节点存储的数据,可以断定:/jobs/test3 节点曾经被创建过
}
@Test
public void autoCreateParentsNodeTest() throws Exception {
//当我们创建深层次节点时,可能路径上的父节点并不存在,所以仅仅 create 无法同时创建父子节点
//可以使用 creatingParentsIfNeeded 方法,同时创建父子节点
String path = client.create().creatingParentsIfNeeded().forPath("/test4/node1");
System.out.println(path);
}
//----------------------------------------
//----------------------------------------
@Test
public void getTest() throws Exception {
//查询节点中存储的数据
byte[] data = client.getData().forPath("/test4/node1");
//由于在创建节点时,没有存储数据,所以默认存储的是当前客户端所在机器的 ip 地址
System.out.println(new String(data));
}
@Test
public void getChildrenNodeTest() throws Exception {
//获取一个节点下面,所有的子节点,并打印出来
//注意:由于 namespace 配置的是 jobs ,所以这里的 / 代表的是 /jobs
List<String> pathlist = client.getChildren().forPath("/");
for (String path : pathlist) {
System.out.println(path);
}
}
@Test
public void getNodeStatusTest() throws Exception {
//查看一个节点的详细状态信息,相当于执行命令 ls -s 节点
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/test2");
//由于开发 Curator 的程序员太懒惰了,
//所以打印出来的每个数字的含义,请查看 Stat 类的 toString 方法
System.out.println(status);
}
//----------------------------------------
//----------------------------------------
@Test
public void setDataTest() throws Exception {
byte[] data1 = client.getData().forPath("/test1");
System.out.println("【修改前】存储的数据为:" + new String(data1));
//修改数据为当前时间毫秒值
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String nowTime = sdf.format(new Date());
client.setData().forPath("/test1", nowTime.getBytes());
byte[] data2 = client.getData().forPath("/test1");
System.out.println("【修改后】存储的数据为:" + new String(data2));
}
@Test
public void SetDataTest2() throws Exception {
//如果所要修改的数据,是公共资源,存在并发修改的情况
//为了确保修改数据的安全性,可以采用版本号进行控制修改操作
//也就是所谓的乐观锁机制
//在修改数据时,如果发现数据版本号不是自己读取的版本号,则放弃修改
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/test2");
//获取数据版本号
int version1 = status.getVersion();
System.out.println("【修改前】的数据版本号:" + version1);
//要修改的节点版本号,与所提供的版本号,不一致时,会报异常
client.setData().withVersion(version1).forPath("/test2", "任肥肥".getBytes());
client.getData().storingStatIn(status).forPath("/test2");
//获取数据版本号
int version2 = status.getVersion();
System.out.println("【修改后】的数据版本号:" + version2);
}
//----------------------------------------
//----------------------------------------
@Test
public void deleteTest() throws Exception {
//删除节点,如果节点下有子节点的话,无法删除
//要删除的节点不存在时,会报异常
client.delete().forPath("/test1");
}
@Test
public void deleteAllTest() throws Exception {
//删除节点,即使有子节点,连同子节点一起删除
//要删除的节点不存在时,会报异常
client.delete().deletingChildrenIfNeeded().forPath("/test4");
}
@Test
public void testDelete3() throws Exception {
//保证删除成功,如果网络有问题,则会定期重试
//要删除的节点不存在时,会报异常
client.delete().guaranteed().forPath("/test2");
}
@Test
public void testDelete4() throws Exception {
//节点删除成功后,执行回调方法
//要删除的节点不存在时,会报异常
client.delete().guaranteed().inBackground((client, event) -> {
System.out.println("Lambda表达式写法:节点删除操作执行成功....");
System.out.println(event);
}).forPath("/test1");
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
//参数含义:
//client 其实就是客户端连接对象,可以对 zookeeper 节点进行增删改查
//event 事件状态对象
System.out.println("匿名函数写法:节点删除操作执行成功....");
System.out.println(event);
}
}).forPath("/test2");
}
}
在运行以上单元测试方法时,可以使用上篇博客中所介绍的命令行操作 Zookeeper 进行操作结果的查看和验证。
到此为止,有关使用 Java 通过 Curator 组件的 API 对 Zookeeper 进行基本的增删改查操作,已经介绍完毕,非常简单。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_curator.zip
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/273284.html