Soul API网关数据同步之Nacos数据同步详解程序员

在前面的几篇文章中已经介绍了http、zk、websocket这几种同步数据的方式,也对其中的源码做了些分析。那么本篇文章将开始nacos数据同步的解析了。

Nacos数据同步配置

还是和前面一样要从配置文件开始,还是soul-admin、soul-bootstrap这两个工程的配置文件。

soul-admin.yml:

soul: 
  database: 
    dialect: mysql 
    init_script: "META-INF/schema.sql" 
    init_enable: true 
  sync: 
      nacos: 
        url: localhost:8848 
        namespace: 1c10d748-af86-43b9-8265-75f487d20c6c 
        acm: 
          enabled: false 
          endpoint: acm.aliyun.com 
          namespace: 
          accessKey: 
          secretKey: 

soul-bootstrap.yml:

soul: 
    file: 
      enabled: true 
    corss: 
      enabled: true 
    dubbo : 
      parameter: multi 
    sync: 
        nacos: 
          url: localhost:8848 
          namespace: 1c10d748-af86-43b9-8265-75f487d20c6c 
          acm: 
            enabled: false 
            endpoint: acm.aliyun.com 
            namespace: 
            accessKey: 
            secretKey: 

然后就是需要在soul-bootstrap工程的pom文件中加上关于nacos的依赖,如下:

<!--soul data sync start use nacos--> 
<dependency> 
<groupId>org.dromara</groupId> 
<artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId> 
<version>${project.version}</version> 
</dependency> 

Nacos数据同步实现

前面都说了数据同步接口,这里依旧略过,我们只看关于Nancos数据同步的实现,代码如下:

public class NacosSyncDataService extends NacosCacheHandler implements AutoCloseable, SyncDataService {
    
    public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber, 
                                final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
    
        super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); 
        start(); 
    } 
    public void start() {
    
        watcherData(PLUGIN_DATA_ID, this::updatePluginMap); 
        watcherData(SELECTOR_DATA_ID, this::updateSelectorMap); 
        watcherData(RULE_DATA_ID, this::updateRuleMap); 
        watcherData(META_DATA_ID, this::updateMetaDataMap); 
        watcherData(AUTH_DATA_ID, this::updateAuthMap); 
    } 
    @Override 
    public void close() {
    
        LISTENERS.forEach((dataId, lss) -> {
    
            lss.forEach(listener -> getConfigService().removeListener(dataId, GROUP, listener)); 
            lss.clear(); 
        }); 
        LISTENERS.clear(); 
    } 
} 

上面的代码中,可以看到NacosSyncDataService类继承了NacosCacheHandler这个类,当然也实现了AutoCloseable, SyncDataService。
然后就是一个构造器,构造器参数传入的是configService、metaDataSubscribers、authDataSubscribers;在构造函数中同时调用了start函数,在这个函数里涉及到了插件数据、选择器数据、规则数据、元数据数据、AuthMap。

关于start方法

watcherData:

在start方法中都调用了watcherData函数,那就来看看这个函数的具体实现,如下:

protected void watcherData(final String dataId, final OnChange oc) {
    
    Listener listener = new Listener() {
    
        @Override 
        public void receiveConfigInfo(final String configInfo) {
    
            // 当admin数据发生变化时,bootstrap监听器会收到,然后在此处理 
            oc.change(configInfo); 
        } 
        @Override 
        public Executor getExecutor() {
    
            return null; 
        } 
    }; 
    // 根据dataId从nacos拿到数据,然后将拿到的数据传给updatePluginMap方法调用 
    oc.change(getConfigAndSignListener(dataId, listener)); 
    LISTENERS.computeIfAbsent(dataId, key -> new ArrayList<>()).add(listener); 
} 

updatePluginMap:

protected void updatePluginMap(final String configInfo) {
    
    try {
    
        // Fix bug #656(https://github.com/dromara/soul/issues/656) 
        List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values()); 
        pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
    
            // 这里先把这个插件之前缓存数据清除,然后再插入新数据 
            subscriber.unSubscribe(pluginData); 
            subscriber.onSubscribe(pluginData); 
        })); 
    } catch (JsonParseException e) {
    
        log.error("sync plugin data have error:", e); 
    } 
} 

updateSelectorMap:

protected void updateSelectorMap(final String configInfo) {
    
    try {
    
        List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); 
        selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
    
            // 此处同上 
            subscriber.unSelectorSubscribe(selectorData); 
            subscriber.onSelectorSubscribe(selectorData); 
        })); 
    } catch (JsonParseException e) {
    
        log.error("sync selector data have error:", e); 
    } 
} 

updateRuleMap:

protected void updateRuleMap(final String configInfo) {
    
    try {
    
        List<RuleData> ruleDataList = GsonUtils.getInstance().toObjectMapList(configInfo, RuleData.class).values() 
                .stream().flatMap(Collection::stream) 
                .collect(Collectors.toList()); 
        ruleDataList.forEach(ruleData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
    
            subscriber.unRuleSubscribe(ruleData); 
            subscriber.onRuleSubscribe(ruleData); 
        })); 
    } catch (JsonParseException e) {
    
        log.error("sync rule data have error:", e); 
    } 
} 

updateMetaDataMap:

protected void updateMetaDataMap(final String configInfo) {
    
    try {
    
        List<MetaData> metaDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, MetaData.class).values()); 
        metaDataList.forEach(metaData -> metaDataSubscribers.forEach(subscriber -> {
    
            subscriber.unSubscribe(metaData); 
            subscriber.onSubscribe(metaData); 
        })); 
    } catch (JsonParseException e) {
    
        log.error("sync meta data have error:", e); 
    } 
} 

updateAuthMap:

protected void updateAuthMap(final String configInfo) {
    
try {
    
List<AppAuthData> appAuthDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, AppAuthData.class).values()); 
appAuthDataList.forEach(appAuthData -> authDataSubscribers.forEach(subscriber -> {
    
subscriber.unSubscribe(appAuthData); 
subscriber.onSubscribe(appAuthData); 
})); 
} catch (JsonParseException e) {
    
log.error("sync auth data have error:", e); 
} 
} 

总结

本篇文章简单的介绍了soul-bootstrap启动时所作的数据更新,从工程的配置文件、nacos的实现、以及数据监听和处理这几个部分进行的简单分析。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/1294.html

(0)
上一篇 2021年7月15日
下一篇 2021年7月15日

相关推荐

发表回复

登录后才能评论