Soul API网关数据同步之WebSocket详解程序员

前言

前面的一系列文章,先说了几个example的使用,然后说了SoulWebHandler、SoulPlugin、MatchStrage、MatchStrategy;前面几个主要从使用的角度说,后面的基本源于单独的类(接口)来进行源码解析的。

那么从本篇文章开始将讲一些系统性的东西,那就从数据同步来开始我们说数据同步的代码分析模块。

数据同步源头

1、配置文件

说到数据同步,我们要先说一下yml配置文件,这里涉及两个地方,一个是soul-admin,另一个是soul-bootstrap,这两个yml的文件分别是:

soul-admin 
soul: 
  database: 
    dialect: mysql 
    init_script: "META-INF/schema.sql" 
    init_enable: true 
  sync: 
    websocket: 
      enabled: true 
       
soul-bootstrap 
soul : 
    file: 
      enabled: true 
    corss: 
      enabled: true 
    dubbo : 
      parameter: multi 
    sync: 
        websocket : 
             urls: ws://localhost:9095/websocket 

2、同步接口与实现

说完配置文件,我们接着说一下SyncDataService这个接口,这个接口从命名上就可以直观的看出是作为同步数据用的,代码如下:

/** 
 * The interface Sync data service. 
 */ 
public interface SyncDataService {
    
} 

既然说到SyncDataService接口,那便自然有实现类来做实现,不过这个接口并没有定义方法来做约束,意思就是说在各自的实现类中做各自的实现,这里先看一下具体有哪些实现类,如图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kdhO6Wu5-1611333797000)(https://uploader.shimo.im/f/Hp2ijLXTsN1RatcJ.png!thumbnail?fileGuid=XHDVWWJqVdQHY9xy)]

图中有四个实现类,不过本篇文章只是以WebsocketSyncDataService角度来进行解析,那就直接看看这个类的代码实现:

@Slf4j 
public class WebsocketSyncDataService implements SyncDataService, AutoCloseable {
    
    private final List<WebSocketClient> clients = new ArrayList<>(); 
    private final ScheduledThreadPoolExecutor executor; 
     
    /** 
     * Instantiates a new Websocket sync cache. 
     * 
     * @param websocketConfig      the websocket config 
     * @param pluginDataSubscriber the plugin data subscriber 
     * @param metaDataSubscribers  the meta data subscribers 
     * @param authDataSubscribers  the auth data subscribers 
     */ 
    public WebsocketSyncDataService(final WebsocketConfig websocketConfig, 
                                    final PluginDataSubscriber pluginDataSubscriber, 
                                    final List<MetaDataSubscriber> metaDataSubscribers, 
                                    final List<AuthDataSubscriber> authDataSubscribers) {
    
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ","); 
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); 
        for (String url : urls) {
    
            try {
    
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); 
            } catch (URISyntaxException e) {
    
                log.error("websocket url({}) is error", url, e); 
            } 
        } 
        try {
    
            for (WebSocketClient client : clients) {
    
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); 
                if (success) {
    
                    log.info("websocket connection is successful....."); 
                } else {
    
                    log.error("websocket connection is error....."); 
                } 
                executor.scheduleAtFixedRate(() -> {
    
                    try {
    
                        if (client.isClosed()) {
    
                            boolean reconnectSuccess = client.reconnectBlocking(); 
                            if (reconnectSuccess) {
    
                                log.info("websocket reconnect is successful....."); 
                            } else {
    
                                log.error("websocket reconnection is error....."); 
                            } 
                        } 
                    } catch (InterruptedException e) {
    
                        log.error("websocket connect is error :{}", e.getMessage()); 
                    } 
                }, 10, 30, TimeUnit.SECONDS); 
            } 
            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/ 
        } catch (InterruptedException e) {
    
            log.info("websocket connection...exception....", e); 
        } 
    } 
     
    @Override 
    public void close() {
    
        for (WebSocketClient client : clients) {
    
            if (!client.isClosed()) {
    
                client.close(); 
            } 
        } 
        if (Objects.nonNull(executor)) {
    
            executor.shutdown(); 
        } 
    } 
} 

WebsocketSyncDataService还实现了AutoCloseable接口,不过我们就看代码主题,在这个类中第一就是定义一个WebsocketSyncDataService的有参构造函数,然后就是实现了AutoCloseable类的close方法。
当然说到说到这里就又要说WebsocketSyncDataService这个类是怎么来的,那自然是基于Spring Boot自动装配的原理了。这个装配的位置是在WebsocketSyncDataConfiguration类中,代码如下:

@Configuration 
@ConditionalOnClass(WebsocketSyncDataService.class) 
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls") 
@Slf4j 
public class WebsocketSyncDataConfiguration {
    
    /** 
     * Websocket sync data service. 
     * 
     * @param websocketConfig   the websocket config 
     * @param pluginSubscriber the plugin subscriber 
     * @param metaSubscribers   the meta subscribers 
     * @param authSubscribers   the auth subscribers 
     * @return the sync data service 
     */ 
    @Bean 
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, 
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
    
        log.info("you use websocket sync soul data......."); 
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(), 
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); 
    } 
    // 省略部分代码 
} 

在上面的这段代码中,[email protected],注解里的参数(prefix = “soul.sync.websocket”)啥的大家是不是感觉很熟悉?这个“soul.sync.websocket”是不是在配置文件中的。至于WebsocketSyncDataConfiguration的构建,就是前面说的Spring Boot自动构建的原理,却在前面的文章中也有提及,那么这里就不再多提了。

启动同步

1、On Message

说到启动时同步,那就不得不提到WebsocketCollector这个类,在启动soul-bootstrap时便会调用其中的WebsocketCollector.onMessage(…)方法,但是这个类的创建时机时在soul-admin项目启动的时候进行构建的,[email protected](“/websocket”)[email protected],关于这两个注解乃是WebSocket的,这里只是提一下,@Serverendpoint作为WebSocket服务器的端点,而 @OnMessage注解的Java方法用于接收传入的WebSocket信息。这个信息可以是文本,二进制信息。那意思就很明显了,直接看onMessage方法的代码了,如下:

@OnMessage 
public void onMessage(final String message, final Session session) {
    
    if (message.equals(DataEventTypeEnum.MYSELF.name())) {
    
        try {
    
            ThreadLocalUtil.put(SESSION_KEY, session); 
            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); 
        } finally {
    
            ThreadLocalUtil.clear(); 
        } 
    } 
} 

这个方法的首先判断DataEventType,若匹配便进入代码块,代码块中先把session存入缓存中,然后就是根据SyncDataService.class来进行依赖查找,来获得SyncDataService类对象,紧接着便是调用syncAll方法。

2、syncAll方法

这个方法里涉及到插件、选择器、规则这写数据,并且设计到Spring的事件发布,代码如下:

SyncDataServiceImpl.class

@Override 
public boolean syncAll(final DataEventTypeEnum type) {
    
    appAuthService.syncData(); 
    List<PluginData> pluginDataList = pluginService.listAll(); 
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); 
    List<SelectorData> selectorDataList = selectorService.listAll(); 
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); 
    List<RuleData> ruleDataList = ruleService.listAll(); 
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); 
    metaDataService.syncData(); 
    return true; 
} 

上面的代码很直白,开始调用了appAuthService.syncData()方法,然后便是获取pluginData,紧接着通过Spring的事件发布器,来进行时间的发布。后面依次是selector、rule。最后再次调用metaDataService.syncData()方法。
appAuthService.syncData()方法代码如下:、

AppAuthServiceImpl.class

@Override 
public SoulAdminResult syncData() {
    
    List<AppAuthDO> appAuthDOList = appAuthMapper.selectAll(); 
    if (CollectionUtils.isNotEmpty(appAuthDOList)) {
    
        List<AppAuthData> dataList = appAuthDOList.stream().map(this::buildByEntity).collect(Collectors.toList()); 
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.APP_AUTH, 
                DataEventTypeEnum.REFRESH, 
                dataList)); 
    } 
    return SoulAdminResult.success(); 
} 

这段代码是同步权限的数据,这个暂不多说,我们还是说说metaDataService.syncData()方法的代码:
MetaDataServiceImpl.class

@Override 
public void syncData() {
    
    List<MetaDataDO> all = metaDataMapper.findAll(); 
    if (CollectionUtils.isNotEmpty(all)) {
    
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.REFRESH, MetaDataTransfer.INSTANCE.mapToDataAll(all))); 
    } 
} 

3、DataChangedEventDispatcher 接收事件

@Override 
@SuppressWarnings("unchecked") 
public void onApplicationEvent(final DataChangedEvent event) {
    
    for (DataChangedListener listener : listeners) {
    
        switch (event.getGroupKey()) {
    
            case APP_AUTH: 
                listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); 
                break; 
            case PLUGIN: 
                listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); 
                break; 
            case RULE: 
                listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); 
                break; 
            case SELECTOR: 
                listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); 
                break; 
            case META_DATA: 
                listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); 
                break; 
            default: 
                throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); 
        } 
    } 
} 

在前面Spring事件发布器发不完事件后,这里便开始接受事件了,就是说开始处理事件了。代码中是根据传入的事件,然后event.getGroupKey(),然后匹配其事件的key。

@Override 
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
    
    WebsocketData<MetaData> configData = 
            new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList); 
    WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); 
} 

手动同步

手动同步就是调用metaDataService.syncData()方法,后面细节基本一致,那就不多说了。

总结

本篇文章从配置文件、同步的接口、然后是WebSocket(涉及到注解)、然后就是同步的一个基本流程,这里面还涉及到事件的发布与事件的处理。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/1291.html

(0)
上一篇 2021年7月15日 22:48
下一篇 2021年7月15日 22:48

相关推荐

发表回复

登录后才能评论