Soul API网关数据同步之WebSocket详解程序员

前言

前面的一系列文章,先说了几个example的使用,然后说了SoulWebHandler、SoulPlugin、MatchStrage、MatchStrategy;前面几个主要从使用的角度说,后面的基本源于单独的类(接口)来进行源码解析的。

那么从本篇文章开始将讲一些系统性的东西,那就从数据同步来开始我们说数据同步的代码分析模块。

数据同步源头

1、配置文件

说到数据同步,我们要先说一下yml配置文件,这里涉及两个地方,一个是soul-admin,另一个是soul-bootstrap,这两个yml的文件分别是:

soul-admin 
soul: 
  database: 
    dialect: mysql 
    init_script: "META-INF/schema.sql" 
    init_enable: true 
  sync: 
    websocket: 
      enabled: true 
       
soul-bootstrap 
soul : 
    file: 
      enabled: true 
    corss: 
      enabled: true 
    dubbo : 
      parameter: multi 
    sync: 
        websocket : 
             urls: ws://localhost:9095/websocket 

2、同步接口与实现

说完配置文件,我们接着说一下SyncDataService这个接口,这个接口从命名上就可以直观的看出是作为同步数据用的,代码如下:

/** 
 * The interface Sync data service. 
 */ 
public interface SyncDataService {
    
} 

既然说到SyncDataService接口,那便自然有实现类来做实现,不过这个接口并没有定义方法来做约束,意思就是说在各自的实现类中做各自的实现,这里先看一下具体有哪些实现类,如图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kdhO6Wu5-1611333797000)(https://uploader.shimo.im/f/Hp2ijLXTsN1RatcJ.png!thumbnail?fileGuid=XHDVWWJqVdQHY9xy)]

图中有四个实现类,不过本篇文章只是以WebsocketSyncDataService角度来进行解析,那就直接看看这个类的代码实现:

@Slf4j 
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
 
private final List<WebSocketClient> clients = new ArrayList<>(); 
private final ScheduledThreadPoolExecutor executor; 
/** 
* Instantiates a new Websocket sync cache. 
* 
* @param websocketConfig      the websocket config 
* @param pluginDataSubscriber the plugin data subscriber 
* @param metaDataSubscribers  the meta data subscribers 
* @param authDataSubscribers  the auth data subscribers 
*/ 
public WebsocketSyncDataService(final WebsocketConfig websocketConfig, 
final PluginDataSubscriber pluginDataSubscriber, 
final List<MetaDataSubscriber> metaDataSubscribers, 
final List<AuthDataSubscriber> authDataSubscribers) {
 
String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); 
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); 
for (String url : urls) {
 
try {
 
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); 
} catch (URISyntaxException e) {
 
log.error("websocket url({}) is error", url, e); 
} 
} 
try {
 
for (WebSocketClient client : clients) {
 
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); 
if (success) {
 
log.info("websocket connection is successful....."); 
} else {
 
log.error("websocket connection is error....."); 
} 
executor.scheduleAtFixedRate(() -> {
 
try {
 
if (client.isClosed()) {
 
boolean reconnectSuccess = client.reconnectBlocking(); 
if (reconnectSuccess) {
 
log.info("websocket reconnect is successful....."); 
} else {
 
log.error("websocket reconnection is error....."); 
} 
} 
} catch (InterruptedException e) {
 
log.error("websocket connect is error :{}", e.getMessage()); 
} 
}, 10, 30, TimeUnit.SECONDS); 
} 
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ 
} catch (InterruptedException e) {
 
log.info("websocket connection...exception....", e); 
} 
} 
@Override 
public void close() {
 
for (WebSocketClient client : clients) {
 
if (!client.isClosed()) {
 
client.close(); 
} 
} 
if (Objects.nonNull(executor)) {
 
executor.shutdown(); 
} 
} 
} 

WebsocketSyncDataService还实现了AutoCloseable接口,不过我们就看代码主题,在这个类中第一就是定义一个WebsocketSyncDataService的有参构造函数,然后就是实现了AutoCloseable类的close方法。
当然说到说到这里就又要说WebsocketSyncDataService这个类是怎么来的,那自然是基于Spring Boot自动装配的原理了。这个装配的位置是在WebsocketSyncDataConfiguration类中,代码如下:

@Configuration 
@ConditionalOnClass(WebsocketSyncDataService.class) 
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls") 
@Slf4j 
public class WebsocketSyncDataConfiguration {
 
/** 
* Websocket sync data service. 
* 
* @param websocketConfig   the websocket config 
* @param pluginSubscriber the plugin subscriber 
* @param metaSubscribers   the meta subscribers 
* @param authSubscribers   the auth subscribers 
* @return the sync data service 
*/ 
@Bean 
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, 
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
 
log.info("you use websocket sync soul data......."); 
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), 
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); 
} 
// 省略部分代码 
} 

在上面的这段代码中,[email protected],注解里的参数(prefix = “soul.sync.websocket”)啥的大家是不是感觉很熟悉?这个“soul.sync.websocket”是不是在配置文件中的。至于WebsocketSyncDataConfiguration的构建,就是前面说的Spring Boot自动构建的原理,却在前面的文章中也有提及,那么这里就不再多提了。

启动同步

1、On Message

说到启动时同步,那就不得不提到WebsocketCollector这个类,在启动soul-bootstrap时便会调用其中的WebsocketCollector.onMessage(…)方法,但是这个类的创建时机时在soul-admin项目启动的时候进行构建的,[email protected](“/websocket”)[email protected],关于这两个注解乃是WebSocket的,这里只是提一下,@Serverendpoint作为WebSocket服务器的端点,而 @OnMessage注解的Java方法用于接收传入的WebSocket信息。这个信息可以是文本,二进制信息。那意思就很明显了,直接看onMessage方法的代码了,如下:

@OnMessage 
public void onMessage(final String message, final Session session) {
 
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
 
try {
 
ThreadLocalUtil.put(SESSION_KEY, session); 
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); 
} finally {
 
ThreadLocalUtil.clear(); 
} 
} 
} 

这个方法的首先判断DataEventType,若匹配便进入代码块,代码块中先把session存入缓存中,然后就是根据SyncDataService.class来进行依赖查找,来获得SyncDataService类对象,紧接着便是调用syncAll方法。

2、syncAll方法

这个方法里涉及到插件、选择器、规则这写数据,并且设计到Spring的事件发布,代码如下:

SyncDataServiceImpl.class

@Override 
public boolean syncAll(final DataEventTypeEnum type) {
 
appAuthService.syncData(); 
List<PluginData> pluginDataList = pluginService.listAll(); 
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); 
List<SelectorData> selectorDataList = selectorService.listAll(); 
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); 
List<RuleData> ruleDataList = ruleService.listAll(); 
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); 
metaDataService.syncData(); 
return true; 
} 

上面的代码很直白,开始调用了appAuthService.syncData()方法,然后便是获取pluginData,紧接着通过Spring的事件发布器,来进行时间的发布。后面依次是selector、rule。最后再次调用metaDataService.syncData()方法。
appAuthService.syncData()方法代码如下:、

AppAuthServiceImpl.class

@Override 
public SoulAdminResult syncData() {
 
List<AppAuthDO> appAuthDOList = appAuthMapper.selectAll(); 
if (CollectionUtils.isNotEmpty(appAuthDOList)) {
 
List<AppAuthData> dataList = appAuthDOList.stream().map(this::buildByEntity).collect(Collectors.toList()); 
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.APP_AUTH, 
DataEventTypeEnum.REFRESH, 
dataList)); 
} 
return SoulAdminResult.success(); 
} 

这段代码是同步权限的数据,这个暂不多说,我们还是说说metaDataService.syncData()方法的代码:
MetaDataServiceImpl.class

@Override 
public void syncData() {
 
List<MetaDataDO> all = metaDataMapper.findAll(); 
if (CollectionUtils.isNotEmpty(all)) {
 
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all))); 
} 
} 

3、DataChangedEventDispatcher 接收事件

@Override 
@SuppressWarnings("unchecked") 
public void onApplicationEvent(final DataChangedEvent event) {
 
for (DataChangedListener listener : listeners) {
 
switch (event.getGroupKey()) {
 
case APP_AUTH: 
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); 
break; 
case PLUGIN: 
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); 
break; 
case RULE: 
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); 
break; 
case SELECTOR: 
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); 
break; 
case META_DATA: 
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); 
break; 
default: 
throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); 
} 
} 
} 

在前面Spring事件发布器发不完事件后,这里便开始接受事件了,就是说开始处理事件了。代码中是根据传入的事件,然后event.getGroupKey(),然后匹配其事件的key。

@Override 
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
 
WebsocketData<MetaData> configData = 
new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList); 
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); 
} 

手动同步

手动同步就是调用metaDataService.syncData()方法,后面细节基本一致,那就不多说了。

总结

本篇文章从配置文件、同步的接口、然后是WebSocket(涉及到注解)、然后就是同步的一个基本流程,这里面还涉及到事件的发布与事件的处理。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/1291.html

(0)
上一篇 2021年7月15日
下一篇 2021年7月15日

相关推荐

发表回复

登录后才能评论