前面介绍了WebSocket同步和Http同步,本篇将开始介绍Zookeeper同步的内容。
Zookeeper数据同步配置
在启动soul-admin和soul-bootstrap的时候,我们需要把其他的同步方式给关闭掉,然后开启zk的同步方式。
soul-admin.yml:
soul:
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
soul-bootstrap.yml:
soul:
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
从前面的文章中,我们已经知道这些同步的接口定义,那么这里我们就不多提了,我们直接看zk的实现:
public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
private final ZkClient zkClient;
private final PluginDataSubscriber pluginDataSubscriber;
private final List<MetaDataSubscriber> metaDataSubscribers;
private final List<AuthDataSubscriber> authDataSubscribers;
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
this.zkClient = zkClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
watcherData();
watchAppAuth();
watchMetaData();
}
private void watcherData() {
final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
List<String> pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
watcherAll(pluginName);
}
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
watcherAll(pluginName);
}
}
});
}
private void watcherAll(final String pluginName) {
watcherPlugin(pluginName);
watcherSelector(pluginName);
watcherRule(pluginName);
}
private void watchAppAuth() {
final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT;
List<String> childrenList = zkClientGetChildren(appAuthParent);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(appAuthParent, children);
cacheAuthData(zkClient.readData(realPath));
subscribeAppAuthDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList);
}
}
上面代码中在构造器可以看到调用了三个方法:watcherData、watchAppAuth、watchMetaData,但是可以在watcherData方法中有调用watcherAll的方法,在这个方法里涉及这几种数据:插件、选择器、规则等。当然除了这些,还有AppAuthData、元数据。
watcherData
接下来,可以看看这些方法的作用,首先从watcherData看:
private void watcherData() {
// 获取插件,启动监听
final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
List<String> pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
watcherAll(pluginName);
}
// 启动监听,更新插件数据,并进行监听
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
watcherAll(pluginName);
}
}
});
}
private void watcherAll(final String pluginName) {
watcherPlugin(pluginName);
watcherSelector(pluginName);
watcherRule(pluginName);
}
private void watcherPlugin(final String pluginName) {
// 构建pluginPath
String pluginPath = ZkPathConstants.buildPluginPath(pluginName);
if (!zkClient.exists(pluginPath)) {
zkClient.createPersistent(pluginPath, true);
}
// 初始化数据
cachePluginData(zkClient.readData(pluginPath));
// 监听数据
subscribePluginDataChanges(pluginPath, pluginName);
}
// 初始化并监听选择器数据
private void watcherSelector(final String pluginName) {
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName);
List<String> childrenList = zkClientGetChildren(selectorParentPath);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(selectorParentPath, children);
cacheSelectorData(zkClient.readData(realPath));
subscribeSelectorDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList);
}
// 初始化并监听规则数据
private void watcherRule(final String pluginName) {
String ruleParent = ZkPathConstants.buildRuleParentPath(pluginName);
List<String> childrenList = zkClientGetChildren(ruleParent);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(ruleParent, children);
cacheRuleData(zkClient.readData(realPath));
subscribeRuleDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.RULE, ruleParent, childrenList);
}
watchAppAuth
上面对watcherData方法对数据进行了简单的分析,现在继续来看看watchAppAuth:
// 这个和上面的类似
private void watchAppAuth() {
final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT;
List<String> childrenList = zkClientGetChildren(appAuthParent);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(appAuthParent, children);
cacheAuthData(zkClient.readData(realPath));
subscribeAppAuthDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList);
}
// 监听AppAuthData
private void subscribeAppAuthDataChanges(final String realPath) {
zkClient.subscribeDataChanges(realPath, new IZkDataListener() {
@Override
public void handleDataChange(final String dataPath, final Object data) {
cacheAuthData((AppAuthData) data);
}
@Override
public void handleDataDeleted(final String dataPath) {
unCacheAuthData(dataPath);
}
});
}
// 监听ChildChanges
private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String groupParentPath, final List<String> childrenList) {
// 根据 groupKey 来进行判断
switch (groupKey) {
case SELECTOR:
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);
addSubscribePath.stream().map(addPath -> {
String realPath = buildRealPath(parentPath, addPath);
cacheSelectorData(zkClient.readData(realPath));
return realPath;
}).forEach(this::subscribeSelectorDataChanges);
}
});
break;
case RULE:
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);
// Get the newly added node data and subscribe to that node
addSubscribePath.stream().map(addPath -> {
String realPath = buildRealPath(parentPath, addPath);
cacheRuleData(zkClient.readData(realPath));
return realPath;
}).forEach(this::subscribeRuleDataChanges);
}
});
break;
case APP_AUTH:
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
final List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);
addSubscribePath.stream().map(children -> {
final String realPath = buildRealPath(parentPath, children);
cacheAuthData(zkClient.readData(realPath));
return realPath;
}).forEach(this::subscribeAppAuthDataChanges);
}
});
break;
case META_DATA:
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
final List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren);
addSubscribePath.stream().map(children -> {
final String realPath = buildRealPath(parentPath, children);
cacheMetaData(zkClient.readData(realPath));
return realPath;
}).forEach(this::subscribeMetaDataChanges);
}
});
break;
default:
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
watchMetaData
// 和选择器的原理一样
private void watchMetaData() {
final String metaDataPath = ZkPathConstants.META_DATA;
List<String> childrenList = zkClientGetChildren(metaDataPath);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(metaDataPath, children);
cacheMetaData(zkClient.readData(realPath));
subscribeMetaDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.META_DATA, metaDataPath, childrenList);
}
总结
总体看来,zk的数据同步方式相对而言还是比较简单些的,看了上面的代码之后,可以感觉代码的逻辑也是简单清晰的,其实zk这块就是涉及发布和订阅。
按照代码逻辑步骤带盖如下:
- 获取并初始化数据
- 对获取的的初始化数据进行监听
- 对于变化的数据进行更新,并监听
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/1292.html