Soul API网关数据同步之HttpSyncData详解程序员

前言

上一篇文章写了关于WebSocket数据同步,从配置文件到接口定义,再到接口实现,,最后解析了事件的发布与处理。整体的执行逻辑就是这样。

那么本篇文章便开始HttpSyncData的解析之旅,那么话不多说,就开始吧!

HTTP同步配置

这里还是要涉及到soul-admin和soul-bootstrap,首先要在这两个里面开启同步配置,配置如下:

soul-admin配置:

  sync: 
    http: 
      enabled: true 

soul-bootstrap配置:

soul : 
    file: 
      enabled: true 
    corss: 
      enabled: true 
    dubbo : 
      parameter: multi 
  sync: 
      http: 
        url : http://localhost:9095 

soul-admin启动数据同步

这个点在上篇文章中没有被提及,本篇文章来提一下。在soul-admin启动时会装配数据同步的配置类,代码如下:

@Configuration 
public class DataSyncConfiguration {
    
    /** 
     * http long polling. 
     */ 
    @Configuration 
    @ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true") 
    @EnableConfigurationProperties(HttpSyncProperties.class) 
    static class HttpLongPollingListener {
    
        @Bean 
        @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class) 
        public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
    
            return new HttpLongPollingDataChangedListener(httpSyncProperties); 
        } 
    } 
    // 此处省略n行代码 
} 

从上面代码中可以看到这两个类:HttpSyncProperties、HttpLongPollingDataChangedListener,第一个是Http同步的配置类,第二个是Http的监听类。
HttpSyncProperties.class

@Getter 
@Setter 
@ConfigurationProperties(prefix = "soul.sync.http") 
public class HttpSyncProperties {
    
    /** 
     * Whether enabled http sync strategy, default: true. 
     */ 
    private boolean enabled = true; 
    /** 
     * Periodically refresh the config data interval from the database, default: 5 minutes. 
     */ 
    private Duration refreshInterval = Duration.ofMinutes(5); 
} 

HttpLongPollingDataChangedListener.class

@Slf4j 
@SuppressWarnings("all") 
public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
    
    private static final String X_REAL_IP = "X-Real-IP"; 
    private static final String X_FORWARDED_FOR = "X-Forwarded-For"; 
    private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ","; 
    private static final ReentrantLock LOCK = new ReentrantLock(); 
    /** 
     * Blocked client. 
     */ 
    private final BlockingQueue<LongPollingClient> clients; 
    private final ScheduledExecutorService scheduler; 
    private final HttpSyncProperties httpSyncProperties; 
    /** 
     * Instantiates a new Http long polling data changed listener. 
     * @param httpSyncProperties the HttpSyncProperties 
     */ 
    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
    
        this.clients = new ArrayBlockingQueue<>(1024); 
        this.scheduler = new ScheduledThreadPoolExecutor(1, 
                SoulThreadFactory.create("long-polling", true)); 
        this.httpSyncProperties = httpSyncProperties; 
    } 
    @Override 
    protected void afterInitialize() {
    
        long syncInterval = httpSyncProperties.getRefreshInterval().toMillis(); 
        // Periodically check the data for changes and update the cache 
        scheduler.scheduleWithFixedDelay(() -> {
    
            log.info("http sync strategy refresh config start."); 
            try {
    
                this.refreshLocalCache(); 
                log.info("http sync strategy refresh config success."); 
            } catch (Exception e) {
    
                log.error("http sync strategy refresh config error!", e); 
            } 
        }, syncInterval, syncInterval, TimeUnit.MILLISECONDS); 
        log.info("http sync strategy refresh interval: {}ms", syncInterval); 
    } 
    private void refreshLocalCache() {
    
        this.updateAppAuthCache(); 
        this.updatePluginCache(); 
        this.updateRuleCache(); 
        this.updateSelectorCache(); 
        this.updateMetaDataCache(); 
    } 
    public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    
        // compare group md5 
        List<ConfigGroupEnum> changedGroup = compareChangedGroup(request); 
        String clientIp = getRemoteIp(request); 
        // response immediately. 
        if (CollectionUtils.isNotEmpty(changedGroup)) {
    
            this.generateResponse(response, changedGroup); 
            log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup); 
            return; 
        } 
        // listen for configuration changed. 
        final AsyncContext asyncContext = request.startAsync(); 
        // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself 
        asyncContext.setTimeout(0L); 
        // block client's thread. 
        scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT)); 
    } 
    // 次出省略N行代码 
} 

从上面代码中只看几个重点:HttpLongPollingDataChangedListener类的构造器、afterInitialize方法、doLongPolling方法。构造器的作用自然不用多说了,afterInitialize方法是初始化时执行完一次本地缓存刷新任务,这里默认等5分钟再刷新一次,但是这个值可以设置的。而真doLongPolling方法是数据正在变动时,给客户端的响应,但如果没有变动,则阻塞客户端请求一定时间。
这里还是忽略了HttpLongPollingDataChangedListener继承的AbstractDataChangedListener类。简单看下代码,如下:

@Slf4j 
@SuppressWarnings("all") 
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
    
    /** 
     * The constant CACHE. 
     */ 
    protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>(); 
    @Resource 
    private AppAuthService appAuthService; 
    /** 
     * The Plugin service. 
     */ 
    @Resource 
    private PluginService pluginService; 
    /** 
     * The Rule service. 
     */ 
    @Resource 
    private RuleService ruleService; 
    /** 
     * The Selector service. 
     */ 
    @Resource 
    private SelectorService selectorService; 
    @Resource 
    private MetaDataService metaDataService; 
    /** 
     * fetch configuration from cache. 
     * 
     * @param groupKey the group key 
     * @return the configuration data 
     */ 
    public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
    
        ConfigDataCache config = CACHE.get(groupKey.name()); 
        switch (groupKey) {
    
            case APP_AUTH: 
                List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
    
                }.getType()); 
                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList); 
            case PLUGIN: 
                List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
    
                }.getType()); 
                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList); 
            case RULE: 
                List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
    
                }.getType()); 
                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList); 
            case SELECTOR: 
                List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
    
                }.getType()); 
                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList); 
            case META_DATA: 
                List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
    
                }.getType()); 
                return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList); 
            default: 
                throw new IllegalStateException("Unexpected groupKey: " + groupKey); 
        } 
    } 
    @Override 
    public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
    
        if (CollectionUtils.isEmpty(changed)) {
    
            return; 
        } 
        this.updateAppAuthCache(); 
        this.afterAppAuthChanged(changed, eventType); 
    } 
    @Override 
    public void onMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType) {
    
        if (CollectionUtils.isEmpty(changed)) {
    
            return; 
        } 
        this.updateMetaDataCache(); 
        this.afterMetaDataChanged(changed, eventType); 
    } 
    /** 
     * After meta data changed. 
     * 
     * @param changed   the changed 
     * @param eventType the event type 
     */ 
    protected void afterMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType) {
    
    } 
    /** 
     * After app auth changed. 
     * 
     * @param changed   the changed 
     * @param eventType the event type 
     */ 
    protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
    
    } 
    @Override 
    public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    
        if (CollectionUtils.isEmpty(changed)) {
    
            return; 
        } 
        this.updatePluginCache(); 
        this.afterPluginChanged(changed, eventType); 
    } 
    /** 
     * After plugin changed. 
     * 
     * @param changed   the changed 
     * @param eventType the event type 
     */ 
    protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    
    } 
    @Override 
    public void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
    
        if (CollectionUtils.isEmpty(changed)) {
    
            return; 
        } 
        this.updateRuleCache(); 
        this.afterRuleChanged(changed, eventType); 
    } 
    /** 
     * After rule changed. 
     * 
     * @param changed   the changed 
     * @param eventType the event type 
     */ 
    protected void afterRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
    
    } 
    @Override 
    public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
    
        if (CollectionUtils.isEmpty(changed)) {
    
            return; 
        } 
        this.updateSelectorCache(); 
        this.afterSelectorChanged(changed, eventType); 
    } 
    /** 
     * After selector changed. 
     * 
     * @param changed   the changed 
     * @param eventType the event type 
     */ 
    protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
    
    } 
    @Override 
    public final void afterPropertiesSet() {
    
        updateAppAuthCache(); 
        updatePluginCache(); 
        updateRuleCache(); 
        updateSelectorCache(); 
        updateMetaDataCache(); 
        afterInitialize(); 
    } 
    protected abstract void afterInitialize(); 
    /** 
     * if md5 is not the same as the original, then update lcoal cache. 
     * @param group ConfigGroupEnum 
     * @param <T> the type of class 
     * @param data the new config data 
     */ 
    protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
    
        String json = GsonUtils.getInstance().toJson(data); 
        ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis()); 
        ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal); 
        log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal); 
    } 
    /** 
     * Update selector cache. 
     */ 
    protected void updateSelectorCache() {
    
        this.updateCache(ConfigGroupEnum.SELECTOR, selectorService.listAll()); 
    } 
    /** 
     * Update rule cache. 
     */ 
    protected void updateRuleCache() {
    
        this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll()); 
    } 
    /** 
     * Update plugin cache. 
     */ 
    protected void updatePluginCache() {
    
        this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll()); 
    } 
    /** 
     * Update app auth cache. 
     */ 
    protected void updateAppAuthCache() {
    
        this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll()); 
    } 
    /** 
     * Update meta data cache. 
     */ 
    protected void updateMetaDataCache() {
    
        this.updateCache(ConfigGroupEnum.META_DATA, metaDataService.listAll()); 
    } 
} 

这个抽象类的代码有点丰富,但是还是要关注它所实现的接口DataChangedListener,这里就不展示代码了。不过要提一下AbstractDataChangedListener类中的onPluginChanged方法在DataChangedEventDispatcher类中有调用。那么关于soul- admin启动的代码就先到这里为止。

soul-bootstrap启动时同步处理

关于这里,在前面的文章中已经提过SyncDataService接口,这里就不多说了,直接看HttpSyncDataService的代码(,如下:

@Slf4j 
public class HttpSyncDataService implements SyncDataService, AutoCloseable {
    
    private static final AtomicBoolean RUNNING = new AtomicBoolean(false); 
    private static final Gson GSON = new Gson(); 
    /** 
     * default: 10s. 
     */ 
    private Duration connectionTimeout = Duration.ofSeconds(10); 
    /** 
     * only use for http long polling. 
     */ 
    private RestTemplate httpClient; 
    private ExecutorService executor; 
    private HttpConfig httpConfig; 
    private List<String> serverList; 
    private DataRefreshFactory factory; 
    public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber, 
                               final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
    
        this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); 
        this.httpConfig = httpConfig; 
        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl())); 
        this.httpClient = createRestTemplate(); 
        this.start(); 
    } 
    // 创建 RestTemplate 
    private RestTemplate createRestTemplate() {
    
        OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory(); 
        factory.setConnectTimeout((int) this.connectionTimeout.toMillis()); 
        factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT); 
        return new RestTemplate(factory); 
    } 
    //获取Group数据,然后开启线程池执行对服务端的数据进行监听 
    private void start() {
    
        // It could be initialized multiple times, so you need to control that. 
        if (RUNNING.compareAndSet(false, true)) {
    
            // fetch all group configs. 
            this.fetchGroupConfig(ConfigGroupEnum.values()); 
            int threadSize = serverList.size(); 
            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS, 
                    new LinkedBlockingQueue<>(), 
                    SoulThreadFactory.create("http-long-polling", true)); 
            // start long polling, each server creates a thread to listen for changes. 
            // 调用内部类 HttpLongPollingTask 
            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server))); 
        } else {
    
            log.info("soul http long polling was started, executor=[{}]", executor); 
        } 
    } 
    private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
    
        for (int index = 0; index < this.serverList.size(); index++) {
    
            String server = serverList.get(index); 
            try {
    
                this.doFetchGroupConfig(server, groups); 
                break; 
            } catch (SoulException e) {
    
                // no available server, throw exception. 
                if (index >= serverList.size() - 1) {
    
                    throw e; 
                } 
                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1)); 
            } 
        } 
    } 
    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    
        StringBuilder params = new StringBuilder(); 
        for (ConfigGroupEnum groupKey : groups) {
    
            params.append("groupKeys").append("=").append(groupKey.name()).append("&"); 
        } 
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&"); 
        log.info("request configs: [{}]", url); 
        String json = null; 
        try {
    
            // 进行远程调用 
            json = this.httpClient.getForObject(url, String.class); 
        } catch (RestClientException e) {
    
            String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage()); 
            log.warn(message); 
            throw new SoulException(message, e); 
        } 
        // update local cache 
        // 更新本地缓存 
        boolean updated = this.updateCacheWithJson(json); 
        if (updated) {
    
            log.info("get latest configs: [{}]", json); 
            return; 
        } 
        // not updated. it is likely that the current config server has not been updated yet. wait a moment. 
        log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server); 
        ThreadUtils.sleep(TimeUnit.SECONDS, 30); 
    } 
    /** 
     * update local cache. 
     * @param json the response from config server. 
     * @return true: the local cache was updated. false: not updated. 
     */ 
    private boolean updateCacheWithJson(final String json) {
    
        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class); 
        JsonObject data = jsonObject.getAsJsonObject("data"); 
        // if the config cache will be updated? 
        return factory.executor(data); 
    } 
    @SuppressWarnings("unchecked") 
    private void doLongPolling(final String server) {
    
        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8); 
        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
    
            ConfigData<?> cacheConfig = factory.cacheConfigData(group); 
            String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime())); 
            params.put(group.name(), Lists.newArrayList(value)); 
        } 
        HttpHeaders headers = new HttpHeaders(); 
        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); 
        HttpEntity httpEntity = new HttpEntity(params, headers); 
        String listenerUrl = server + "/configs/listener"; 
        log.debug("request listener configs: [{}]", listenerUrl); 
        JsonArray groupJson = null; 
        try {
    
            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody(); 
            log.debug("listener result: [{}]", json); 
            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data"); 
        } catch (RestClientException e) {
    
            String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage()); 
            throw new SoulException(message, e); 
        } 
        if (groupJson != null) {
    
            // fetch group configuration async. 
            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class); 
            if (ArrayUtils.isNotEmpty(changedGroups)) {
    
                log.info("Group config changed: {}", Arrays.toString(changedGroups)); 
                this.doFetchGroupConfig(server, changedGroups); 
            } 
        } 
    } 
    @Override 
    public void close() throws Exception {
    
        RUNNING.set(false); 
        if (executor != null) {
    
            executor.shutdownNow(); 
            // help gc 
            executor = null; 
        } 
    } 
    // 内部类 
    class HttpLongPollingTask implements Runnable {
    
        private String server; 
        private final int retryTimes = 3; 
        HttpLongPollingTask(final String server) {
    
            this.server = server; 
        } 
        @Override 
        public void run() {
    
            while (RUNNING.get()) {
    
                for (int time = 1; time <= retryTimes; time++) {
    
                    try {
    
                        doLongPolling(server); 
                    } catch (Exception e) {
    
                        // print warnning log. 
                        if (time < retryTimes) {
    
                            log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}", 
                                    time, retryTimes - time, e.getMessage()); 
                            ThreadUtils.sleep(TimeUnit.SECONDS, 5); 
                            continue; 
                        } 
                        // print error, then suspended for a while. 
                        log.error("Long polling failed, try again after 5 minutes!", e); 
                        ThreadUtils.sleep(TimeUnit.MINUTES, 5); 
                    } 
                } 
            } 
            log.warn("Stop http long polling."); 
        } 
    } 
} 

上面的代码还是比较复杂的,但是只要关注start方法、doLongPolling方法即可。

总结

本篇文章是从配置文件开始讲起,后面讲了soul- admin启动数据同步、soul-bootstrap启动时同步数据这几个方面来看的。

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/1293.html

(0)
上一篇 2021年7月15日 22:48
下一篇 2021年7月15日 22:48

相关推荐

发表回复

登录后才能评论