Zookeeper 使用 Java 进行增删改查操作


前面已经介绍了使用命令行操作 Zookeeper,方便我们对 Zookeeper 有一个整体的认识。

Zookeeper 本质上就是一个 NoSQL 数据库,只不过其存储的数据结构是树状结构形式,理解起来很简单。

对于一个新手小白来说,面对 Zookeeer,介绍一大堆概念,没啥用处,没有什么比代码来得更加实际一些。

本篇博客通过编写 Java 代码对 Zookeeper 进行增删改查,让大家轻松掌握 Zookeeper 的基本操作,增强自信心。

在本篇博客的最后会提供源代码 Demo 下载。

一、搭建工程

新建一个 maven 项目,导入相关 jar 包,内容如下:

有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。

<dependencies>

    <!--导入 Spring 的 jar 包-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.3.18</version>
    </dependency>

    <!--导入 Spring 整合 junit 的 jar 包-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.3.18</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>

    <!--导入 curator 的 jar 包-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

    <!--导入日志相关的 jar 包-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
</dependencies>

搭建后的最终工程如下图所示,非常简单:

image

使用 Java 操作 Zookeeper 的方式很多,相比而言采用 Curator 组件比较简单,因此也比较流行。

本篇博客的 Demo 采用 Spring 集成 Curator 组件的方式,通过 Java 调用其 API 方法对 Zookeeper 进行增删改查。

需要导入 curator-framework 和 curator-recipes 两个 jar 包,请参考官网,需要注意 Curator 与 zookeeper 版本对应问题。

Curator 的官网地址是:https://curator.apache.org

二、代码细节

首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:

# zookeeper的连接字符串
# 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址
# 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3
zk.connectString=127.0.0.1:2181

# zookeeper的会话超时时间
# 单位:毫秒,默认是 60 秒
zk.sessionTimeoutMs=60000

# zookeeper的连接超时时间
# 单位:毫秒,默认是 15 秒
zk.connectionTimeoutMs=15000

# zookeeper默认操作的根节点
# 所有的增删改查操作,默认在该节点下进行
zk.namespace=jobs

然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:

package com.jobs.config;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;

//加载 zookeeper.properties 文件内容
@PropertySource("classpath:zookeeper.properties")
public class zookeeperConfig {

    @Value("${zk.connectString}")
    private String connectString;

    @Value("${zk.sessionTimeoutMs}")
    private Integer sessionTimeoutMs;

    @Value("${zk.connectionTimeoutMs}")
    private Integer connectionTimeoutMs;

    @Value("${zk.namespace}")
    private String namespace;


    //获取 Curator 的客户端连接
    @Bean
    public CuratorFramework getCuratorFramework(){
        //配置重试策略,如果没有连接失败,最多重试 1 次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectString)
                        .sessionTimeoutMs(sessionTimeoutMs)
                        .connectionTimeoutMs(connectionTimeoutMs)
                        .namespace(namespace)
                        .retryPolicy(retryPolicy)
                        .build();
        client.start();
        return client;
    }
}
package com.jobs.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

//采用 Spring 集成 Curator,导入 zookeeperConfig 配置类
@Configuration
@Import(zookeeperConfig.class)
public class springConfig { }

下面我们就使用 junit 单元测试,使用 Java 采用 Curator 的 API 方法操作 Zookeeper,具体如下所示:

package com.jobs.test;
import com.jobs.config.springConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = springConfig.class)
public class zkTest {
@Autowired
private CuratorFramework client;
//注意:
//由于我在 zookeeper.properties 下配置的 namespace 是 jobs
//因此下面对 zookeeper 的所有操作,默认情况下都是在 /jobs 节点下进行操作的
@Test
public void createTest() throws Exception {
//默认情况下,创建的都是持久化的节点,一旦创建,永久存储
//在创建节点时,没有指定存储的数据,默认情况下存储的是当前客户端机器的 ip 地址
String path1 = client.create().forPath("/test1");
System.out.println("成功创建节点:" + path1);
//获取节点中存储的数据,由于在创建时没有存储数据,所以发现存储的是当前客户端机器的 ip 地址
byte[] data1 = client.getData().forPath("/test1");
System.out.println("存储的数据为:" + new String(data1));
System.out.println("-------------------------------------");
//创建节点时,同时存储数据
String path2 = client.create().forPath("/test2", "乔京飞".getBytes());
System.out.println("成功创建节点:" + path2);
//获取节点中存储的数据
byte[] data2 = client.getData().forPath("/test2");
System.out.println("存储的数据为:" + new String(data2));
}
@Test
public void createTempTest() throws Exception {
//创建临时节点,一旦当前连接断开,zookeeper 会自动销毁所创建的节点
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3", "任肥肥".getBytes());
System.out.println("成功创建临时节点:" + path);
byte[] data = client.getData().forPath("/test3");
System.out.println("存储的数据为:" + new String(data));
//注意:
//由于当前方法执行完后,zookeeper 连接就断开了,临时节点就销毁了
//所以你可以通过命令行的方式,查看 zookeeper 中 /jobs 下的节点,发现找不到 /jobs/test3 节点
//但是:通过上面运行的程序,打印出节点信息和节点存储的数据,可以断定:/jobs/test3 节点曾经被创建过
}
@Test
public void autoCreateParentsNodeTest() throws Exception {
//当我们创建深层次节点时,可能路径上的父节点并不存在,所以仅仅 create 无法同时创建父子节点
//可以使用 creatingParentsIfNeeded 方法,同时创建父子节点
String path = client.create().creatingParentsIfNeeded().forPath("/test4/node1");
System.out.println(path);
}
//----------------------------------------
//----------------------------------------
@Test
public void getTest() throws Exception {
//查询节点中存储的数据
byte[] data = client.getData().forPath("/test4/node1");
//由于在创建节点时,没有存储数据,所以默认存储的是当前客户端所在机器的 ip 地址
System.out.println(new String(data));
}
@Test
public void getChildrenNodeTest() throws Exception {
//获取一个节点下面,所有的子节点,并打印出来
//注意:由于 namespace 配置的是 jobs ,所以这里的 / 代表的是 /jobs
List<String> pathlist = client.getChildren().forPath("/");
for (String path : pathlist) {
System.out.println(path);
}
}
@Test
public void getNodeStatusTest() throws Exception {
//查看一个节点的详细状态信息,相当于执行命令 ls -s 节点
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/test2");
//由于开发 Curator 的程序员太懒惰了,
//所以打印出来的每个数字的含义,请查看 Stat 类的 toString 方法
System.out.println(status);
}
//----------------------------------------
//----------------------------------------
@Test
public void setDataTest() throws Exception {
byte[] data1 = client.getData().forPath("/test1");
System.out.println("【修改前】存储的数据为:" + new String(data1));
//修改数据为当前时间毫秒值
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String nowTime = sdf.format(new Date());
client.setData().forPath("/test1", nowTime.getBytes());
byte[] data2 = client.getData().forPath("/test1");
System.out.println("【修改后】存储的数据为:" + new String(data2));
}
@Test
public void SetDataTest2() throws Exception {
//如果所要修改的数据,是公共资源,存在并发修改的情况
//为了确保修改数据的安全性,可以采用版本号进行控制修改操作
//也就是所谓的乐观锁机制
//在修改数据时,如果发现数据版本号不是自己读取的版本号,则放弃修改
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/test2");
//获取数据版本号
int version1 = status.getVersion();
System.out.println("【修改前】的数据版本号:" + version1);
//要修改的节点版本号,与所提供的版本号,不一致时,会报异常
client.setData().withVersion(version1).forPath("/test2", "任肥肥".getBytes());
client.getData().storingStatIn(status).forPath("/test2");
//获取数据版本号
int version2 = status.getVersion();
System.out.println("【修改后】的数据版本号:" + version2);
}
//----------------------------------------
//----------------------------------------
@Test
public void deleteTest() throws Exception {
//删除节点,如果节点下有子节点的话,无法删除
//要删除的节点不存在时,会报异常
client.delete().forPath("/test1");
}
@Test
public void deleteAllTest() throws Exception {
//删除节点,即使有子节点,连同子节点一起删除
//要删除的节点不存在时,会报异常
client.delete().deletingChildrenIfNeeded().forPath("/test4");
}
@Test
public void testDelete3() throws Exception {
//保证删除成功,如果网络有问题,则会定期重试
//要删除的节点不存在时,会报异常
client.delete().guaranteed().forPath("/test2");
}
@Test
public void testDelete4() throws Exception {
//节点删除成功后,执行回调方法
//要删除的节点不存在时,会报异常
client.delete().guaranteed().inBackground((client, event) -> {
System.out.println("Lambda表达式写法:节点删除操作执行成功....");
System.out.println(event);
}).forPath("/test1");
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
//参数含义:
//client 其实就是客户端连接对象,可以对 zookeeper 节点进行增删改查
//event 事件状态对象
System.out.println("匿名函数写法:节点删除操作执行成功....");
System.out.println(event);
}
}).forPath("/test2");
}
}

在运行以上单元测试方法时,可以使用上篇博客中所介绍的命令行操作 Zookeeper 进行操作结果的查看和验证。

到此为止,有关使用 Java 通过 Curator 组件的 API 对 Zookeeper 进行基本的增删改查操作,已经介绍完毕,非常简单。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_curator.zip

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/273284.html

(0)
上一篇 2022年7月10日
下一篇 2022年7月10日

相关推荐

发表回复

登录后才能评论