Zookeeper 使用 Java 进行增删改查操作


前面已经介绍了使用命令行操作 Zookeeper,方便我们对 Zookeeper 有一个整体的认识。

Zookeeper 本质上就是一个 NoSQL 数据库,只不过其存储的数据结构是树状结构形式,理解起来很简单。

对于一个新手小白来说,面对 Zookeeer,介绍一大堆概念,没啥用处,没有什么比代码来得更加实际一些。

本篇博客通过编写 Java 代码对 Zookeeper 进行增删改查,让大家轻松掌握 Zookeeper 的基本操作,增强自信心。

在本篇博客的最后会提供源代码 Demo 下载。

一、搭建工程

新建一个 maven 项目,导入相关 jar 包,内容如下:

有关具体的 jar 包地址,可以在 https://mvnrepository.com 上进行查询。

<dependencies>

    <!--导入 Spring 的 jar 包-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.3.18</version>
    </dependency>

    <!--导入 Spring 整合 junit 的 jar 包-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.3.18</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>

    <!--导入 curator 的 jar 包-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

    <!--导入日志相关的 jar 包-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
</dependencies>

搭建后的最终工程如下图所示,非常简单:

image

使用 Java 操作 Zookeeper 的方式很多,相比而言采用 Curator 组件比较简单,因此也比较流行。

本篇博客的 Demo 采用 Spring 集成 Curator 组件的方式,通过 Java 调用其 API 方法对 Zookeeper 进行增删改查。

需要导入 curator-framework 和 curator-recipes 两个 jar 包,请参考官网,需要注意 Curator 与 zookeeper 版本对应问题。

Curator 的官网地址是:https://curator.apache.org

二、代码细节

首先需要对连接 Zookeeper 的信息进行配置,具体配置细节在 zookeeper.properties 文件中:

# zookeeper的连接字符串
# 如果是操作 zookeeper 集群,可以配置多个 zookeeper 地址
# 多个地址之间用英文逗号分隔,如 ip1:port1,ip2:port2,ip3:port3
zk.connectString=127.0.0.1:2181

# zookeeper的会话超时时间
# 单位:毫秒,默认是 60 秒
zk.sessionTimeoutMs=60000

# zookeeper的连接超时时间
# 单位:毫秒,默认是 15 秒
zk.connectionTimeoutMs=15000

# zookeeper默认操作的根节点
# 所有的增删改查操作,默认在该节点下进行
zk.namespace=jobs

然后在代码中使用 Spring 集成 Curator 组件,从 zookeeper.properties 读取连接配置信息:

package com.jobs.config;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;

//加载 zookeeper.properties 文件内容
@PropertySource("classpath:zookeeper.properties")
public class zookeeperConfig {

    @Value("${zk.connectString}")
    private String connectString;

    @Value("${zk.sessionTimeoutMs}")
    private Integer sessionTimeoutMs;

    @Value("${zk.connectionTimeoutMs}")
    private Integer connectionTimeoutMs;

    @Value("${zk.namespace}")
    private String namespace;


    //获取 Curator 的客户端连接
    @Bean
    public CuratorFramework getCuratorFramework(){
        //配置重试策略,如果没有连接失败,最多重试 1 次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectString)
                        .sessionTimeoutMs(sessionTimeoutMs)
                        .connectionTimeoutMs(connectionTimeoutMs)
                        .namespace(namespace)
                        .retryPolicy(retryPolicy)
                        .build();
        client.start();
        return client;
    }
}
package com.jobs.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

//采用 Spring 集成 Curator,导入 zookeeperConfig 配置类
@Configuration
@Import(zookeeperConfig.class)
public class springConfig { }

下面我们就使用 junit 单元测试,使用 Java 采用 Curator 的 API 方法操作 Zookeeper,具体如下所示:

package com.jobs.test;

import com.jobs.config.springConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = springConfig.class)
public class zkTest {

    @Autowired
    private CuratorFramework client;

    //注意:
    //由于我在 zookeeper.properties 下配置的 namespace 是 jobs
    //因此下面对 zookeeper 的所有操作,默认情况下都是在 /jobs 节点下进行操作的

    @Test
    public void createTest() throws Exception {

        //默认情况下,创建的都是持久化的节点,一旦创建,永久存储

        //在创建节点时,没有指定存储的数据,默认情况下存储的是当前客户端机器的 ip 地址
        String path1 = client.create().forPath("/test1");
        System.out.println("成功创建节点:" + path1);

        //获取节点中存储的数据,由于在创建时没有存储数据,所以发现存储的是当前客户端机器的 ip 地址
        byte[] data1 = client.getData().forPath("/test1");
        System.out.println("存储的数据为:" + new String(data1));

        System.out.println("-------------------------------------");

        //创建节点时,同时存储数据
        String path2 = client.create().forPath("/test2", "乔京飞".getBytes());
        System.out.println("成功创建节点:" + path2);

        //获取节点中存储的数据
        byte[] data2 = client.getData().forPath("/test2");
        System.out.println("存储的数据为:" + new String(data2));
    }

    @Test
    public void createTempTest() throws Exception {

        //创建临时节点,一旦当前连接断开,zookeeper 会自动销毁所创建的节点

        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3", "任肥肥".getBytes());
        System.out.println("成功创建临时节点:" + path);
        byte[] data = client.getData().forPath("/test3");
        System.out.println("存储的数据为:" + new String(data));

        //注意:
        //由于当前方法执行完后,zookeeper 连接就断开了,临时节点就销毁了
        //所以你可以通过命令行的方式,查看 zookeeper 中 /jobs 下的节点,发现找不到 /jobs/test3 节点
        //但是:通过上面运行的程序,打印出节点信息和节点存储的数据,可以断定:/jobs/test3 节点曾经被创建过
    }

    @Test
    public void autoCreateParentsNodeTest() throws Exception {

        //当我们创建深层次节点时,可能路径上的父节点并不存在,所以仅仅 create 无法同时创建父子节点
        //可以使用 creatingParentsIfNeeded 方法,同时创建父子节点

        String path = client.create().creatingParentsIfNeeded().forPath("/test4/node1");
        System.out.println(path);
    }

    //----------------------------------------
    //----------------------------------------

    @Test
    public void getTest() throws Exception {

        //查询节点中存储的数据
        byte[] data = client.getData().forPath("/test4/node1");
        //由于在创建节点时,没有存储数据,所以默认存储的是当前客户端所在机器的 ip 地址
        System.out.println(new String(data));
    }

    @Test
    public void getChildrenNodeTest() throws Exception {

        //获取一个节点下面,所有的子节点,并打印出来
        //注意:由于 namespace 配置的是 jobs ,所以这里的 / 代表的是 /jobs
        List<String> pathlist = client.getChildren().forPath("/");
        for (String path : pathlist) {
            System.out.println(path);
        }
    }

    @Test
    public void getNodeStatusTest() throws Exception {

        //查看一个节点的详细状态信息,相当于执行命令 ls -s 节点

        Stat status = new Stat();
        client.getData().storingStatIn(status).forPath("/test2");

        //由于开发 Curator 的程序员太懒惰了,
        //所以打印出来的每个数字的含义,请查看 Stat 类的 toString 方法
        System.out.println(status);
    }

    //----------------------------------------
    //----------------------------------------

    @Test
    public void setDataTest() throws Exception {

        byte[] data1 = client.getData().forPath("/test1");
        System.out.println("【修改前】存储的数据为:" + new String(data1));

        //修改数据为当前时间毫秒值
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String nowTime = sdf.format(new Date());
        client.setData().forPath("/test1", nowTime.getBytes());

        byte[] data2 = client.getData().forPath("/test1");
        System.out.println("【修改后】存储的数据为:" + new String(data2));
    }

    @Test
    public void SetDataTest2() throws Exception {

        //如果所要修改的数据,是公共资源,存在并发修改的情况
        //为了确保修改数据的安全性,可以采用版本号进行控制修改操作
        //也就是所谓的乐观锁机制
        //在修改数据时,如果发现数据版本号不是自己读取的版本号,则放弃修改

        Stat status = new Stat();
        client.getData().storingStatIn(status).forPath("/test2");
        //获取数据版本号
        int version1 = status.getVersion();
        System.out.println("【修改前】的数据版本号:" + version1);

        //要修改的节点版本号,与所提供的版本号,不一致时,会报异常
        client.setData().withVersion(version1).forPath("/test2", "任肥肥".getBytes());

        client.getData().storingStatIn(status).forPath("/test2");
        //获取数据版本号
        int version2 = status.getVersion();
        System.out.println("【修改后】的数据版本号:" + version2);
    }

    //----------------------------------------
    //----------------------------------------

    @Test
    public void deleteTest() throws Exception {

        //删除节点,如果节点下有子节点的话,无法删除
        //要删除的节点不存在时,会报异常
        client.delete().forPath("/test1");
    }

    @Test
    public void deleteAllTest() throws Exception {

        //删除节点,即使有子节点,连同子节点一起删除
        //要删除的节点不存在时,会报异常
        client.delete().deletingChildrenIfNeeded().forPath("/test4");
    }

    @Test
    public void testDelete3() throws Exception {

        //保证删除成功,如果网络有问题,则会定期重试
        //要删除的节点不存在时,会报异常
        client.delete().guaranteed().forPath("/test2");
    }

    @Test
    public void testDelete4() throws Exception {

        //节点删除成功后,执行回调方法
        //要删除的节点不存在时,会报异常

        client.delete().guaranteed().inBackground((client, event) -> {
            System.out.println("Lambda表达式写法:节点删除操作执行成功....");
            System.out.println(event);
        }).forPath("/test1");


        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {

                //参数含义:
                //client 其实就是客户端连接对象,可以对 zookeeper 节点进行增删改查
                //event 事件状态对象

                System.out.println("匿名函数写法:节点删除操作执行成功....");
                System.out.println(event);
            }
        }).forPath("/test2");
    }
}

在运行以上单元测试方法时,可以使用上篇博客中所介绍的命令行操作 Zookeeper 进行操作结果的查看和验证。

到此为止,有关使用 Java 通过 Curator 组件的 API 对 Zookeeper 进行基本的增删改查操作,已经介绍完毕,非常简单。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/zookeeper_curator.zip

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/273284.html

(0)
上一篇 2022年7月10日 04:47
下一篇 2022年7月10日 04:47

相关推荐

发表回复

登录后才能评论