前言
前面的一系列文章,先说了几个example的使用,然后说了SoulWebHandler、SoulPlugin、MatchStrage、MatchStrategy;前面几个主要从使用的角度说,后面的基本源于单独的类(接口)来进行源码解析的。
那么从本篇文章开始将讲一些系统性的东西,那就从数据同步来开始我们说数据同步的代码分析模块。
数据同步源头
1、配置文件
说到数据同步,我们要先说一下yml配置文件,这里涉及两个地方,一个是soul-admin,另一个是soul-bootstrap,这两个yml的文件分别是:
soul-admin
soul:
database:
dialect: mysql
init_script: "META-INF/schema.sql"
init_enable: true
sync:
websocket:
enabled: true
soul-bootstrap
soul :
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
websocket :
urls: ws://localhost:9095/websocket
2、同步接口与实现
说完配置文件,我们接着说一下SyncDataService这个接口,这个接口从命名上就可以直观的看出是作为同步数据用的,代码如下:
/**
* The interface Sync data service.
*/
public interface SyncDataService {
}
既然说到SyncDataService接口,那便自然有实现类来做实现,不过这个接口并没有定义方法来做约束,意思就是说在各自的实现类中做各自的实现,这里先看一下具体有哪些实现类,如图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kdhO6Wu5-1611333797000)(https://uploader.shimo.im/f/Hp2ijLXTsN1RatcJ.png!thumbnail?fileGuid=XHDVWWJqVdQHY9xy)]
图中有四个实现类,不过本篇文章只是以WebsocketSyncDataService角度来进行解析,那就直接看看这个类的代码实现:
@Slf4j
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
private final List<WebSocketClient> clients = new ArrayList<>();
private final ScheduledThreadPoolExecutor executor;
/**
* Instantiates a new Websocket sync cache.
*
* @param websocketConfig the websocket config
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
@Override
public void close() {
for (WebSocketClient client : clients) {
if (!client.isClosed()) {
client.close();
}
}
if (Objects.nonNull(executor)) {
executor.shutdown();
}
}
}
WebsocketSyncDataService还实现了AutoCloseable接口,不过我们就看代码主题,在这个类中第一就是定义一个WebsocketSyncDataService的有参构造函数,然后就是实现了AutoCloseable类的close方法。
当然说到说到这里就又要说WebsocketSyncDataService这个类是怎么来的,那自然是基于Spring Boot自动装配的原理了。这个装配的位置是在WebsocketSyncDataConfiguration类中,代码如下:
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
*
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
// 省略部分代码
}
在上面的这段代码中,[email protected],注解里的参数(prefix = “soul.sync.websocket”)啥的大家是不是感觉很熟悉?这个“soul.sync.websocket”是不是在配置文件中的。至于WebsocketSyncDataConfiguration的构建,就是前面说的Spring Boot自动构建的原理,却在前面的文章中也有提及,那么这里就不再多提了。
启动同步
1、On Message
说到启动时同步,那就不得不提到WebsocketCollector这个类,在启动soul-bootstrap时便会调用其中的WebsocketCollector.onMessage(…)方法,但是这个类的创建时机时在soul-admin项目启动的时候进行构建的,[email protected](“/websocket”)[email protected],关于这两个注解乃是WebSocket的,这里只是提一下,@Serverendpoint作为WebSocket服务器的端点,而 @OnMessage注解的Java方法用于接收传入的WebSocket信息。这个信息可以是文本,二进制信息。那意思就很明显了,直接看onMessage方法的代码了,如下:
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
try {
ThreadLocalUtil.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally {
ThreadLocalUtil.clear();
}
}
}
这个方法的首先判断DataEventType,若匹配便进入代码块,代码块中先把session存入缓存中,然后就是根据SyncDataService.class来进行依赖查找,来获得SyncDataService类对象,紧接着便是调用syncAll方法。
2、syncAll方法
这个方法里涉及到插件、选择器、规则这写数据,并且设计到Spring的事件发布,代码如下:
SyncDataServiceImpl.class
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
上面的代码很直白,开始调用了appAuthService.syncData()方法,然后便是获取pluginData,紧接着通过Spring的事件发布器,来进行时间的发布。后面依次是selector、rule。最后再次调用metaDataService.syncData()方法。
appAuthService.syncData()方法代码如下:、
AppAuthServiceImpl.class
@Override
public SoulAdminResult syncData() {
List<AppAuthDO> appAuthDOList = appAuthMapper.selectAll();
if (CollectionUtils.isNotEmpty(appAuthDOList)) {
List<AppAuthData> dataList = appAuthDOList.stream().map(this::buildByEntity).collect(Collectors.toList());
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.APP_AUTH,
DataEventTypeEnum.REFRESH,
dataList));
}
return SoulAdminResult.success();
}
这段代码是同步权限的数据,这个暂不多说,我们还是说说metaDataService.syncData()方法的代码:
MetaDataServiceImpl.class
@Override
public void syncData() {
List<MetaDataDO> all = metaDataMapper.findAll();
if (CollectionUtils.isNotEmpty(all)) {
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all)));
}
}
3、DataChangedEventDispatcher 接收事件
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
在前面Spring事件发布器发不完事件后,这里便开始接受事件了,就是说开始处理事件了。代码中是根据传入的事件,然后event.getGroupKey(),然后匹配其事件的key。
@Override
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
WebsocketData<MetaData> configData =
new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
手动同步
手动同步就是调用metaDataService.syncData()方法,后面细节基本一致,那就不多说了。
总结
本篇文章从配置文件、同步的接口、然后是WebSocket(涉及到注解)、然后就是同步的一个基本流程,这里面还涉及到事件的发布与事件的处理。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/1291.html