在前面的几篇文章中已经介绍了http、zk、websocket这几种同步数据的方式,也对其中的源码做了些分析。那么本篇文章将开始nacos数据同步的解析了。
Nacos数据同步配置
还是和前面一样要从配置文件开始,还是soul-admin、soul-bootstrap这两个工程的配置文件。
soul-admin.yml:
soul:
database:
dialect: mysql
init_script: "META-INF/schema.sql"
init_enable: true
sync:
nacos:
url: localhost:8848
namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
acm:
enabled: false
endpoint: acm.aliyun.com
namespace:
accessKey:
secretKey:
soul-bootstrap.yml:
soul:
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
nacos:
url: localhost:8848
namespace: 1c10d748-af86-43b9-8265-75f487d20c6c
acm:
enabled: false
endpoint: acm.aliyun.com
namespace:
accessKey:
secretKey:
然后就是需要在soul-bootstrap工程的pom文件中加上关于nacos的依赖,如下:
<!--soul data sync start use nacos-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
<version>${project.version}</version>
</dependency>
Nacos数据同步实现
前面都说了数据同步接口,这里依旧略过,我们只看关于Nancos数据同步的实现,代码如下:
public class NacosSyncDataService extends NacosCacheHandler implements AutoCloseable, SyncDataService {
public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
start();
}
public void start() {
watcherData(PLUGIN_DATA_ID, this::updatePluginMap);
watcherData(SELECTOR_DATA_ID, this::updateSelectorMap);
watcherData(RULE_DATA_ID, this::updateRuleMap);
watcherData(META_DATA_ID, this::updateMetaDataMap);
watcherData(AUTH_DATA_ID, this::updateAuthMap);
}
@Override
public void close() {
LISTENERS.forEach((dataId, lss) -> {
lss.forEach(listener -> getConfigService().removeListener(dataId, GROUP, listener));
lss.clear();
});
LISTENERS.clear();
}
}
上面的代码中,可以看到NacosSyncDataService类继承了NacosCacheHandler这个类,当然也实现了AutoCloseable, SyncDataService。
然后就是一个构造器,构造器参数传入的是configService、metaDataSubscribers、authDataSubscribers;在构造函数中同时调用了start函数,在这个函数里涉及到了插件数据、选择器数据、规则数据、元数据数据、AuthMap。
关于start方法
watcherData:
在start方法中都调用了watcherData函数,那就来看看这个函数的具体实现,如下:
protected void watcherData(final String dataId, final OnChange oc) {
Listener listener = new Listener() {
@Override
public void receiveConfigInfo(final String configInfo) {
// 当admin数据发生变化时,bootstrap监听器会收到,然后在此处理
oc.change(configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
};
// 根据dataId从nacos拿到数据,然后将拿到的数据传给updatePluginMap方法调用
oc.change(getConfigAndSignListener(dataId, listener));
LISTENERS.computeIfAbsent(dataId, key -> new ArrayList<>()).add(listener);
}
updatePluginMap:
protected void updatePluginMap(final String configInfo) {
try {
// Fix bug #656(https://github.com/dromara/soul/issues/656)
List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values());
pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
// 这里先把这个插件之前缓存数据清除,然后再插入新数据
subscriber.unSubscribe(pluginData);
subscriber.onSubscribe(pluginData);
}));
} catch (JsonParseException e) {
log.error("sync plugin data have error:", e);
}
}
updateSelectorMap:
protected void updateSelectorMap(final String configInfo) {
try {
List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList());
selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
// 此处同上
subscriber.unSelectorSubscribe(selectorData);
subscriber.onSelectorSubscribe(selectorData);
}));
} catch (JsonParseException e) {
log.error("sync selector data have error:", e);
}
}
updateRuleMap:
protected void updateRuleMap(final String configInfo) {
try {
List<RuleData> ruleDataList = GsonUtils.getInstance().toObjectMapList(configInfo, RuleData.class).values()
.stream().flatMap(Collection::stream)
.collect(Collectors.toList());
ruleDataList.forEach(ruleData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> {
subscriber.unRuleSubscribe(ruleData);
subscriber.onRuleSubscribe(ruleData);
}));
} catch (JsonParseException e) {
log.error("sync rule data have error:", e);
}
}
updateMetaDataMap:
protected void updateMetaDataMap(final String configInfo) {
try {
List<MetaData> metaDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, MetaData.class).values());
metaDataList.forEach(metaData -> metaDataSubscribers.forEach(subscriber -> {
subscriber.unSubscribe(metaData);
subscriber.onSubscribe(metaData);
}));
} catch (JsonParseException e) {
log.error("sync meta data have error:", e);
}
}
updateAuthMap:
protected void updateAuthMap(final String configInfo) {
try {
List<AppAuthData> appAuthDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, AppAuthData.class).values());
appAuthDataList.forEach(appAuthData -> authDataSubscribers.forEach(subscriber -> {
subscriber.unSubscribe(appAuthData);
subscriber.onSubscribe(appAuthData);
}));
} catch (JsonParseException e) {
log.error("sync auth data have error:", e);
}
}
总结
本篇文章简单的介绍了soul-bootstrap启动时所作的数据更新,从工程的配置文件、nacos的实现、以及数据监听和处理这几个部分进行的简单分析。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/1294.html