Soul API网关数据同步之ZookeeperSyncData详解程序员

前面介绍了WebSocket同步和Http同步,本篇将开始介绍Zookeeper同步的内容。

Zookeeper数据同步配置

在启动soul-admin和soul-bootstrap的时候,我们需要把其他的同步方式给关闭掉,然后开启zk的同步方式。

soul-admin.yml:

 soul:  
    sync: 
        zookeeper: 
             url: localhost:2181 
             sessionTimeout: 5000 
             connectionTimeout: 2000 

soul-bootstrap.yml:

soul: 
    file: 
      enabled: true 
    corss: 
      enabled: true 
    dubbo : 
      parameter: multi 
    sync: 
        zookeeper: 
             url: localhost:2181 
             sessionTimeout: 5000 
             connectionTimeout: 2000 

从前面的文章中,我们已经知道这些同步的接口定义,那么这里我们就不多提了,我们直接看zk的实现:

public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
 
private final ZkClient zkClient; 
private final PluginDataSubscriber pluginDataSubscriber; 
private final List<MetaDataSubscriber> metaDataSubscribers; 
private final List<AuthDataSubscriber> authDataSubscribers; 
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber, 
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
 
this.zkClient = zkClient; 
this.pluginDataSubscriber = pluginDataSubscriber; 
this.metaDataSubscribers = metaDataSubscribers; 
this.authDataSubscribers = authDataSubscribers; 
watcherData(); 
watchAppAuth(); 
watchMetaData(); 
} 
private void watcherData() {
 
final String pluginParent = ZkPathConstants.PLUGIN_PARENT; 
List<String> pluginZKs = zkClientGetChildren(pluginParent); 
for (String pluginName : pluginZKs) {
 
watcherAll(pluginName); 
} 
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
 
if (CollectionUtils.isNotEmpty(currentChildren)) {
 
for (String pluginName : currentChildren) {
 
watcherAll(pluginName); 
} 
} 
}); 
} 
private void watcherAll(final String pluginName) {
 
watcherPlugin(pluginName); 
watcherSelector(pluginName); 
watcherRule(pluginName); 
} 
private void watchAppAuth() {
 
final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT; 
List<String> childrenList = zkClientGetChildren(appAuthParent); 
if (CollectionUtils.isNotEmpty(childrenList)) {
 
childrenList.forEach(children -> {
 
String realPath = buildRealPath(appAuthParent, children); 
cacheAuthData(zkClient.readData(realPath)); 
subscribeAppAuthDataChanges(realPath); 
}); 
} 
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList); 
} 
} 

上面代码中在构造器可以看到调用了三个方法:watcherData、watchAppAuth、watchMetaData,但是可以在watcherData方法中有调用watcherAll的方法,在这个方法里涉及这几种数据:插件、选择器、规则等。当然除了这些,还有AppAuthData、元数据。

watcherData

接下来,可以看看这些方法的作用,首先从watcherData看:

private void watcherData() {
 
// 获取插件,启动监听 
final String pluginParent = ZkPathConstants.PLUGIN_PARENT; 
List<String> pluginZKs = zkClientGetChildren(pluginParent); 
for (String pluginName : pluginZKs) {
 
watcherAll(pluginName); 
} 
// 启动监听,更新插件数据,并进行监听 
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
 
if (CollectionUtils.isNotEmpty(currentChildren)) {
 
for (String pluginName : currentChildren) {
 
watcherAll(pluginName); 
} 
} 
}); 
} 
private void watcherAll(final String pluginName) {
 
watcherPlugin(pluginName); 
watcherSelector(pluginName); 
watcherRule(pluginName); 
} 
private void watcherPlugin(final String pluginName) {
 
// 构建pluginPath 
String pluginPath = ZkPathConstants.buildPluginPath(pluginName); 
if (!zkClient.exists(pluginPath)) {
 
zkClient.createPersistent(pluginPath, true); 
} 
// 初始化数据 
cachePluginData(zkClient.readData(pluginPath)); 
// 监听数据 
subscribePluginDataChanges(pluginPath, pluginName); 
} 
// 初始化并监听选择器数据 
private void watcherSelector(final String pluginName) {
 
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName); 
List<String> childrenList = zkClientGetChildren(selectorParentPath); 
if (CollectionUtils.isNotEmpty(childrenList)) {
 
childrenList.forEach(children -> {
 
String realPath = buildRealPath(selectorParentPath, children); 
cacheSelectorData(zkClient.readData(realPath)); 
subscribeSelectorDataChanges(realPath); 
}); 
} 
subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList); 
} 
// 初始化并监听规则数据 
private void watcherRule(final String pluginName) {
 
String ruleParent = ZkPathConstants.buildRuleParentPath(pluginName); 
List<String> childrenList = zkClientGetChildren(ruleParent); 
if (CollectionUtils.isNotEmpty(childrenList)) {
 
childrenList.forEach(children -> {
 
String realPath = buildRealPath(ruleParent, children); 
cacheRuleData(zkClient.readData(realPath)); 
subscribeRuleDataChanges(realPath); 
}); 
} 
subscribeChildChanges(ConfigGroupEnum.RULE, ruleParent, childrenList); 
} 

watchAppAuth

上面对watcherData方法对数据进行了简单的分析,现在继续来看看watchAppAuth:

// 这个和上面的类似 
private void watchAppAuth() {
 
final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT; 
List<String> childrenList = zkClientGetChildren(appAuthParent); 
if (CollectionUtils.isNotEmpty(childrenList)) {
 
childrenList.forEach(children -> {
 
String realPath = buildRealPath(appAuthParent, children); 
cacheAuthData(zkClient.readData(realPath)); 
subscribeAppAuthDataChanges(realPath); 
}); 
} 
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList); 
} 
// 监听AppAuthData 
private void subscribeAppAuthDataChanges(final String realPath) {
 
zkClient.subscribeDataChanges(realPath, new IZkDataListener() {
 
@Override 
public void handleDataChange(final String dataPath, final Object data) {
 
cacheAuthData((AppAuthData) data); 
} 
@Override 
public void handleDataDeleted(final String dataPath) {
 
unCacheAuthData(dataPath); 
} 
}); 
} 
// 监听ChildChanges 
private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String groupParentPath, final List<String> childrenList) {
 
// 根据 groupKey 来进行判断 
switch (groupKey) {
 
case SELECTOR: 
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
 
if (CollectionUtils.isNotEmpty(currentChildren)) {
 
List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
addSubscribePath.stream().map(addPath -> {
 
String realPath = buildRealPath(parentPath, addPath); 
cacheSelectorData(zkClient.readData(realPath)); 
return realPath; 
}).forEach(this::subscribeSelectorDataChanges); 
} 
}); 
break; 
case RULE: 
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
 
if (CollectionUtils.isNotEmpty(currentChildren)) {
 
List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
// Get the newly added node data and subscribe to that node 
addSubscribePath.stream().map(addPath -> {
 
String realPath = buildRealPath(parentPath, addPath); 
cacheRuleData(zkClient.readData(realPath)); 
return realPath; 
}).forEach(this::subscribeRuleDataChanges); 
} 
}); 
break; 
case APP_AUTH: 
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
 
if (CollectionUtils.isNotEmpty(currentChildren)) {
 
final List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
addSubscribePath.stream().map(children -> {
 
final String realPath = buildRealPath(parentPath, children); 
cacheAuthData(zkClient.readData(realPath)); 
return realPath; 
}).forEach(this::subscribeAppAuthDataChanges); 
} 
}); 
break; 
case META_DATA: 
zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
 
if (CollectionUtils.isNotEmpty(currentChildren)) {
 
final List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
addSubscribePath.stream().map(children -> {
 
final String realPath = buildRealPath(parentPath, children); 
cacheMetaData(zkClient.readData(realPath)); 
return realPath; 
}).forEach(this::subscribeMetaDataChanges); 
} 
}); 
break; 
default: 
throw new IllegalStateException("Unexpected groupKey: " + groupKey); 
} 
} 

watchMetaData

// 和选择器的原理一样 
private void watchMetaData() {
 
final String metaDataPath = ZkPathConstants.META_DATA; 
List<String> childrenList = zkClientGetChildren(metaDataPath); 
if (CollectionUtils.isNotEmpty(childrenList)) {
 
childrenList.forEach(children -> {
 
String realPath = buildRealPath(metaDataPath, children); 
cacheMetaData(zkClient.readData(realPath)); 
subscribeMetaDataChanges(realPath); 
}); 
} 
subscribeChildChanges(ConfigGroupEnum.META_DATA, metaDataPath, childrenList); 
} 

总结

总体看来,zk的数据同步方式相对而言还是比较简单些的,看了上面的代码之后,可以感觉代码的逻辑也是简单清晰的,其实zk这块就是涉及发布和订阅。

按照代码逻辑步骤带盖如下:

  • 获取并初始化数据
  • 对获取的的初始化数据进行监听
  • 对于变化的数据进行更新,并监听

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/1292.html

(0)
上一篇 2021年7月15日
下一篇 2021年7月15日

相关推荐

发表回复

登录后才能评论