Soul API网关数据同步之ZookeeperSyncData详解程序员

前面介绍了WebSocket同步和Http同步,本篇将开始介绍Zookeeper同步的内容。

Zookeeper数据同步配置

在启动soul-admin和soul-bootstrap的时候,我们需要把其他的同步方式给关闭掉,然后开启zk的同步方式。

soul-admin.yml:

 soul:  
    sync: 
        zookeeper: 
             url: localhost:2181 
             sessionTimeout: 5000 
             connectionTimeout: 2000 

soul-bootstrap.yml:

soul: 
    file: 
      enabled: true 
    corss: 
      enabled: true 
    dubbo : 
      parameter: multi 
    sync: 
        zookeeper: 
             url: localhost:2181 
             sessionTimeout: 5000 
             connectionTimeout: 2000 

从前面的文章中,我们已经知道这些同步的接口定义,那么这里我们就不多提了,我们直接看zk的实现:

public class ZookeeperSyncDataService implements SyncDataService, AutoCloseable {
    
    private final ZkClient zkClient; 
    private final PluginDataSubscriber pluginDataSubscriber; 
    private final List<MetaDataSubscriber> metaDataSubscribers; 
    private final List<AuthDataSubscriber> authDataSubscribers; 
     
    public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber, 
                                    final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
    
        this.zkClient = zkClient; 
        this.pluginDataSubscriber = pluginDataSubscriber; 
        this.metaDataSubscribers = metaDataSubscribers; 
        this.authDataSubscribers = authDataSubscribers; 
        watcherData(); 
        watchAppAuth(); 
        watchMetaData(); 
    } 
    private void watcherData() {
    
        final String pluginParent = ZkPathConstants.PLUGIN_PARENT; 
        List<String> pluginZKs = zkClientGetChildren(pluginParent); 
        for (String pluginName : pluginZKs) {
    
            watcherAll(pluginName); 
        } 
        zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
    
            if (CollectionUtils.isNotEmpty(currentChildren)) {
    
                for (String pluginName : currentChildren) {
    
                    watcherAll(pluginName); 
                } 
            } 
        }); 
    } 
    private void watcherAll(final String pluginName) {
    
        watcherPlugin(pluginName); 
        watcherSelector(pluginName); 
        watcherRule(pluginName); 
    } 
    private void watchAppAuth() {
    
        final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT; 
        List<String> childrenList = zkClientGetChildren(appAuthParent); 
        if (CollectionUtils.isNotEmpty(childrenList)) {
    
            childrenList.forEach(children -> {
    
                String realPath = buildRealPath(appAuthParent, children); 
                cacheAuthData(zkClient.readData(realPath)); 
                subscribeAppAuthDataChanges(realPath); 
            }); 
        } 
        subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList); 
    } 
} 

上面代码中在构造器可以看到调用了三个方法:watcherData、watchAppAuth、watchMetaData,但是可以在watcherData方法中有调用watcherAll的方法,在这个方法里涉及这几种数据:插件、选择器、规则等。当然除了这些,还有AppAuthData、元数据。

watcherData

接下来,可以看看这些方法的作用,首先从watcherData看:

private void watcherData() {
    
    // 获取插件,启动监听 
    final String pluginParent = ZkPathConstants.PLUGIN_PARENT; 
    List<String> pluginZKs = zkClientGetChildren(pluginParent); 
    for (String pluginName : pluginZKs) {
    
        watcherAll(pluginName); 
    } 
    // 启动监听,更新插件数据,并进行监听 
    zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
    
        if (CollectionUtils.isNotEmpty(currentChildren)) {
    
            for (String pluginName : currentChildren) {
    
                watcherAll(pluginName); 
            } 
        } 
    }); 
} 
private void watcherAll(final String pluginName) {
    
    watcherPlugin(pluginName); 
    watcherSelector(pluginName); 
    watcherRule(pluginName); 
} 
private void watcherPlugin(final String pluginName) {
    
    // 构建pluginPath 
    String pluginPath = ZkPathConstants.buildPluginPath(pluginName); 
    if (!zkClient.exists(pluginPath)) {
    
        zkClient.createPersistent(pluginPath, true); 
    } 
    // 初始化数据 
    cachePluginData(zkClient.readData(pluginPath)); 
    // 监听数据 
    subscribePluginDataChanges(pluginPath, pluginName); 
} 
// 初始化并监听选择器数据 
private void watcherSelector(final String pluginName) {
    
    String selectorParentPath = ZkPathConstants.buildSelectorParentPath(pluginName); 
    List<String> childrenList = zkClientGetChildren(selectorParentPath); 
    if (CollectionUtils.isNotEmpty(childrenList)) {
    
        childrenList.forEach(children -> {
    
            String realPath = buildRealPath(selectorParentPath, children); 
            cacheSelectorData(zkClient.readData(realPath)); 
            subscribeSelectorDataChanges(realPath); 
        }); 
    } 
    subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath, childrenList); 
} 
// 初始化并监听规则数据 
private void watcherRule(final String pluginName) {
    
    String ruleParent = ZkPathConstants.buildRuleParentPath(pluginName); 
    List<String> childrenList = zkClientGetChildren(ruleParent); 
    if (CollectionUtils.isNotEmpty(childrenList)) {
    
        childrenList.forEach(children -> {
    
            String realPath = buildRealPath(ruleParent, children); 
            cacheRuleData(zkClient.readData(realPath)); 
            subscribeRuleDataChanges(realPath); 
        }); 
    } 
    subscribeChildChanges(ConfigGroupEnum.RULE, ruleParent, childrenList); 
} 

watchAppAuth

上面对watcherData方法对数据进行了简单的分析,现在继续来看看watchAppAuth:

// 这个和上面的类似 
private void watchAppAuth() {
    
    final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT; 
    List<String> childrenList = zkClientGetChildren(appAuthParent); 
    if (CollectionUtils.isNotEmpty(childrenList)) {
    
        childrenList.forEach(children -> {
    
            String realPath = buildRealPath(appAuthParent, children); 
            cacheAuthData(zkClient.readData(realPath)); 
            subscribeAppAuthDataChanges(realPath); 
        }); 
    } 
    subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList); 
} 
// 监听AppAuthData 
private void subscribeAppAuthDataChanges(final String realPath) {
    
    zkClient.subscribeDataChanges(realPath, new IZkDataListener() {
    
        @Override 
        public void handleDataChange(final String dataPath, final Object data) {
    
            cacheAuthData((AppAuthData) data); 
        } 
        @Override 
        public void handleDataDeleted(final String dataPath) {
    
            unCacheAuthData(dataPath); 
        } 
    }); 
} 
// 监听ChildChanges 
private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String groupParentPath, final List<String> childrenList) {
    
    // 根据 groupKey 来进行判断 
    switch (groupKey) {
    
        case SELECTOR: 
            zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
    
                if (CollectionUtils.isNotEmpty(currentChildren)) {
    
                    List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
                    addSubscribePath.stream().map(addPath -> {
    
                        String realPath = buildRealPath(parentPath, addPath); 
                        cacheSelectorData(zkClient.readData(realPath)); 
                        return realPath; 
                    }).forEach(this::subscribeSelectorDataChanges); 
                } 
            }); 
            break; 
        case RULE: 
            zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
    
                if (CollectionUtils.isNotEmpty(currentChildren)) {
    
                    List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
                    // Get the newly added node data and subscribe to that node 
                    addSubscribePath.stream().map(addPath -> {
    
                        String realPath = buildRealPath(parentPath, addPath); 
                        cacheRuleData(zkClient.readData(realPath)); 
                        return realPath; 
                    }).forEach(this::subscribeRuleDataChanges); 
                } 
            }); 
            break; 
        case APP_AUTH: 
            zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
    
                if (CollectionUtils.isNotEmpty(currentChildren)) {
    
                    final List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
                    addSubscribePath.stream().map(children -> {
    
                        final String realPath = buildRealPath(parentPath, children); 
                        cacheAuthData(zkClient.readData(realPath)); 
                        return realPath; 
                    }).forEach(this::subscribeAppAuthDataChanges); 
                } 
            }); 
            break; 
        case META_DATA: 
            zkClient.subscribeChildChanges(groupParentPath, (parentPath, currentChildren) -> {
    
                if (CollectionUtils.isNotEmpty(currentChildren)) {
    
                    final List<String> addSubscribePath = addSubscribePath(childrenList, currentChildren); 
                    addSubscribePath.stream().map(children -> {
    
                        final String realPath = buildRealPath(parentPath, children); 
                        cacheMetaData(zkClient.readData(realPath)); 
                        return realPath; 
                    }).forEach(this::subscribeMetaDataChanges); 
                } 
            }); 
            break; 
        default: 
            throw new IllegalStateException("Unexpected groupKey: " + groupKey); 
    } 
} 

watchMetaData

// 和选择器的原理一样 
private void watchMetaData() {
    
    final String metaDataPath = ZkPathConstants.META_DATA; 
    List<String> childrenList = zkClientGetChildren(metaDataPath); 
    if (CollectionUtils.isNotEmpty(childrenList)) {
    
        childrenList.forEach(children -> {
    
            String realPath = buildRealPath(metaDataPath, children); 
            cacheMetaData(zkClient.readData(realPath)); 
            subscribeMetaDataChanges(realPath); 
        }); 
    } 
    subscribeChildChanges(ConfigGroupEnum.META_DATA, metaDataPath, childrenList); 
} 

总结

总体看来,zk的数据同步方式相对而言还是比较简单些的,看了上面的代码之后,可以感觉代码的逻辑也是简单清晰的,其实zk这块就是涉及发布和订阅。

按照代码逻辑步骤带盖如下:

  • 获取并初始化数据
  • 对获取的的初始化数据进行监听
  • 对于变化的数据进行更新,并监听

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/1292.html

(0)
上一篇 2021年7月15日 22:48
下一篇 2021年7月15日 22:48

相关推荐

发表回复

登录后才能评论