maven依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>per.ym</groupId>
<artifactId>zk</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
测试类:
package per.ym.zookeeper;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ZkTest implements Watcher{
private ZooKeeper zk;
private CountDownLatch cdl;
private String path = "/test";
private String rootPath = "/";
private int sessionTimeOut = 15000;
private byte[] data = "data".getBytes();
private byte[] newData = "newData".getBytes();
@Before
public void connect() throws IOException {
zk = new ZooKeeper("192.168.61.131:2184", sessionTimeOut, this);
cdl = new CountDownLatch(1);
}
//同步调用
@Test
public void testSync( ) throws Exception {
//等待与zookeeper服务端连接完成
cdl.await();
//创建一个持久节点/test,并为其赋值为data
zk.create("/test", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//获取根节点下的子节点
List<String> children = zk.getChildren("/", this);
System.out.println("根节点下的子节点有: " + children);
//查询节点/test保存的数据
Stat stat = new Stat();
byte[] tempData = zk.getData(path, this, stat);
System.out.println("test节点数据为: " + new String(tempData));
//设置节点/test的数据为newData
zk.setData(path, newData, -1);
tempData = zk.getData(path, this, stat);
System.out.println("test节点新数据为: " + new String(tempData));
//删除节点/test
zk.delete(path, -1);
//判断节点/test是否存在
stat = zk.exists(path, this);
System.out.println(stat);
}
//创建节点后进行回调
private StringCallback cb = new StringCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, String name) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
//如果失去连接,我们无法保证是在创建前还是创建后丢失的,因此重试
create();
break;
case OK:
System.out.println("节点/test创建成功");
break;
case NODEEXISTS:
System.out.println("节点/test已经存在");
break;
default:
System.out.println(KeeperException.create(Code.get(rc), path));
}
}
};
//获取子节点进行回调
private ChildrenCallback ccb = new ChildrenCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, List<String> children) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
getChildren();
break;
case OK:
System.out.println("根节点下的子节点有: " + children);
break;
default:
System.out.println(KeeperException.create(Code.get(rc), path));
}
}
};
//获取数据进行回调
private DataCallback dcb = new DataCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, byte[] data, Stat stat) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
getData();
break;
case OK:
System.out.println("test节点数据为: " + new String(data));
break;
default:
System.out.println(KeeperException.create(Code.get(rc), path));
}
}
};
//设置数据进行回调
private StatCallback scb = new StatCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, Stat stat) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
setData();
break;
case OK:
//这个ctx就是我们调用zk.setData时传入的最后一个参数
System.out.println("test节点设置新数 " + new String((byte[])ctx) + "成功");
break;
default:
System.out.println(KeeperException.create(Code.get(rc), path));
}
}
};
//删除节点进行回调
private VoidCallback vcb = new VoidCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
delete();
break;
case OK:
System.out.println("删除节点/test成功");
break;
default:
System.out.println(KeeperException.create(Code.get(rc), path));
}
}
};
//节点是否存在进行回调
private StatCallback scb2 = new StatCallback() {
@Override
public void proce***esult(int rc, String path, Object ctx, Stat stat) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
exists();
break;
case OK:
System.out.println("节点/test存在");
break;
case NONODE:
System.out.println("节点/test不存在");
break;
default:
break;
}
}
};
//异步调用
@Test
public void testAsync() throws Exception {
cdl.await();
create();
getChildren();
setData();
getData();
exists();
delete();
exists();
}
private void create() {
zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, data);
}
private void getChildren() {
zk.getChildren(rootPath, this, ccb, null);
}
private void getData() {
zk.getData(path, this, dcb, null);
}
private void setData() {
zk.setData(path, newData, -1, scb, newData);
}
private void delete() {
zk.delete(path, -1, vcb, null);
}
private void exists() {
zk.exists(path, this, scb2, null);
}
@Override
public void process(WatchedEvent event) {
//如果连接成功,放行
if (event.getType().equals(EventType.None)) {
cdl.countDown();
}
System.out.println(event);
}
@After
public void close() throws InterruptedException {
//测试完成后关闭连接
zk.close();
}
}
原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/185656.html