前言
上一篇文章写了关于WebSocket数据同步,从配置文件到接口定义,再到接口实现,,最后解析了事件的发布与处理。整体的执行逻辑就是这样。
那么本篇文章便开始HttpSyncData的解析之旅,那么话不多说,就开始吧!
HTTP同步配置
这里还是要涉及到soul-admin和soul-bootstrap,首先要在这两个里面开启同步配置,配置如下:
soul-admin配置:
sync:
http:
enabled: true
soul-bootstrap配置:
soul :
file:
enabled: true
corss:
enabled: true
dubbo :
parameter: multi
sync:
http:
url : http://localhost:9095
soul-admin启动数据同步
这个点在上篇文章中没有被提及,本篇文章来提一下。在soul-admin启动时会装配数据同步的配置类,代码如下:
@Configuration
public class DataSyncConfiguration {
/**
* http long polling.
*/
@Configuration
@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {
@Bean
@ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
return new HttpLongPollingDataChangedListener(httpSyncProperties);
}
}
// 此处省略n行代码
}
从上面代码中可以看到这两个类:HttpSyncProperties、HttpLongPollingDataChangedListener,第一个是Http同步的配置类,第二个是Http的监听类。
HttpSyncProperties.class
@Getter
@Setter
@ConfigurationProperties(prefix = "soul.sync.http")
public class HttpSyncProperties {
/**
* Whether enabled http sync strategy, default: true.
*/
private boolean enabled = true;
/**
* Periodically refresh the config data interval from the database, default: 5 minutes.
*/
private Duration refreshInterval = Duration.ofMinutes(5);
}
HttpLongPollingDataChangedListener.class
@Slf4j
@SuppressWarnings("all")
public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
private static final String X_REAL_IP = "X-Real-IP";
private static final String X_FORWARDED_FOR = "X-Forwarded-For";
private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
private static final ReentrantLock LOCK = new ReentrantLock();
/**
* Blocked client.
*/
private final BlockingQueue<LongPollingClient> clients;
private final ScheduledExecutorService scheduler;
private final HttpSyncProperties httpSyncProperties;
/**
* Instantiates a new Http long polling data changed listener.
* @param httpSyncProperties the HttpSyncProperties
*/
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
this.clients = new ArrayBlockingQueue<>(1024);
this.scheduler = new ScheduledThreadPoolExecutor(1,
SoulThreadFactory.create("long-polling", true));
this.httpSyncProperties = httpSyncProperties;
}
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache() {
this.updateAppAuthCache();
this.updatePluginCache();
this.updateRuleCache();
this.updateSelectorCache();
this.updateMetaDataCache();
}
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// listen for configuration changed.
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
// 次出省略N行代码
}
从上面代码中只看几个重点:HttpLongPollingDataChangedListener类的构造器、afterInitialize方法、doLongPolling方法。构造器的作用自然不用多说了,afterInitialize方法是初始化时执行完一次本地缓存刷新任务,这里默认等5分钟再刷新一次,但是这个值可以设置的。而真doLongPolling方法是数据正在变动时,给客户端的响应,但如果没有变动,则阻塞客户端请求一定时间。
这里还是忽略了HttpLongPollingDataChangedListener继承的AbstractDataChangedListener类。简单看下代码,如下:
@Slf4j
@SuppressWarnings("all")
public abstract class AbstractDataChangedListener implements DataChangedListener, InitializingBean {
/**
* The constant CACHE.
*/
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
@Resource
private AppAuthService appAuthService;
/**
* The Plugin service.
*/
@Resource
private PluginService pluginService;
/**
* The Rule service.
*/
@Resource
private RuleService ruleService;
/**
* The Selector service.
*/
@Resource
private SelectorService selectorService;
@Resource
private MetaDataService metaDataService;
/**
* fetch configuration from cache.
*
* @param groupKey the group key
* @return the configuration data
*/
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH:
List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
case PLUGIN:
List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
case RULE:
List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
case SELECTOR:
List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
case META_DATA:
List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
default:
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
@Override
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updateAppAuthCache();
this.afterAppAuthChanged(changed, eventType);
}
@Override
public void onMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updateMetaDataCache();
this.afterMetaDataChanged(changed, eventType);
}
/**
* After meta data changed.
*
* @param changed the changed
* @param eventType the event type
*/
protected void afterMetaDataChanged(final List<MetaData> changed, final DataEventTypeEnum eventType) {
}
/**
* After app auth changed.
*
* @param changed the changed
* @param eventType the event type
*/
protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
}
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updatePluginCache();
this.afterPluginChanged(changed, eventType);
}
/**
* After plugin changed.
*
* @param changed the changed
* @param eventType the event type
*/
protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
}
@Override
public void onRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updateRuleCache();
this.afterRuleChanged(changed, eventType);
}
/**
* After rule changed.
*
* @param changed the changed
* @param eventType the event type
*/
protected void afterRuleChanged(final List<RuleData> changed, final DataEventTypeEnum eventType) {
}
@Override
public void onSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updateSelectorCache();
this.afterSelectorChanged(changed, eventType);
}
/**
* After selector changed.
*
* @param changed the changed
* @param eventType the event type
*/
protected void afterSelectorChanged(final List<SelectorData> changed, final DataEventTypeEnum eventType) {
}
@Override
public final void afterPropertiesSet() {
updateAppAuthCache();
updatePluginCache();
updateRuleCache();
updateSelectorCache();
updateMetaDataCache();
afterInitialize();
}
protected abstract void afterInitialize();
/**
* if md5 is not the same as the original, then update lcoal cache.
* @param group ConfigGroupEnum
* @param <T> the type of class
* @param data the new config data
*/
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
String json = GsonUtils.getInstance().toJson(data);
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
/**
* Update selector cache.
*/
protected void updateSelectorCache() {
this.updateCache(ConfigGroupEnum.SELECTOR, selectorService.listAll());
}
/**
* Update rule cache.
*/
protected void updateRuleCache() {
this.updateCache(ConfigGroupEnum.RULE, ruleService.listAll());
}
/**
* Update plugin cache.
*/
protected void updatePluginCache() {
this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
}
/**
* Update app auth cache.
*/
protected void updateAppAuthCache() {
this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}
/**
* Update meta data cache.
*/
protected void updateMetaDataCache() {
this.updateCache(ConfigGroupEnum.META_DATA, metaDataService.listAll());
}
}
这个抽象类的代码有点丰富,但是还是要关注它所实现的接口DataChangedListener,这里就不展示代码了。不过要提一下AbstractDataChangedListener类中的onPluginChanged方法在DataChangedEventDispatcher类中有调用。那么关于soul- admin启动的代码就先到这里为止。
soul-bootstrap启动时同步处理
关于这里,在前面的文章中已经提过SyncDataService接口,这里就不多说了,直接看HttpSyncDataService的代码(,如下:
@Slf4j
public class HttpSyncDataService implements SyncDataService, AutoCloseable {
private static final AtomicBoolean RUNNING = new AtomicBoolean(false);
private static final Gson GSON = new Gson();
/**
* default: 10s.
*/
private Duration connectionTimeout = Duration.ofSeconds(10);
/**
* only use for http long polling.
*/
private RestTemplate httpClient;
private ExecutorService executor;
private HttpConfig httpConfig;
private List<String> serverList;
private DataRefreshFactory factory;
public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.httpConfig = httpConfig;
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
this.httpClient = createRestTemplate();
this.start();
}
// 创建 RestTemplate
private RestTemplate createRestTemplate() {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
return new RestTemplate(factory);
}
//获取Group数据,然后开启线程池执行对服务端的数据进行监听
private void start() {
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
this.fetchGroupConfig(ConfigGroupEnum.values());
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
// 调用内部类 HttpLongPollingTask
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
this.doFetchGroupConfig(server, groups);
break;
} catch (SoulException e) {
// no available server, throw exception.
if (index >= serverList.size() - 1) {
throw e;
}
log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
log.info("request configs: [{}]", url);
String json = null;
try {
// 进行远程调用
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
log.warn(message);
throw new SoulException(message, e);
}
// update local cache
// 更新本地缓存
boolean updated = this.updateCacheWithJson(json);
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}
// not updated. it is likely that the current config server has not been updated yet. wait a moment.
log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
/**
* update local cache.
* @param json the response from config server.
* @return true: the local cache was updated. false: not updated.
*/
private boolean updateCacheWithJson(final String json) {
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
return factory.executor(data);
}
@SuppressWarnings("unchecked")
private void doLongPolling(final String server) {
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
try {
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
log.debug("listener result: [{}]", json);
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new SoulException(message, e);
}
if (groupJson != null) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
log.info("Group config changed: {}", Arrays.toString(changedGroups));
this.doFetchGroupConfig(server, changedGroups);
}
}
}
@Override
public void close() throws Exception {
RUNNING.set(false);
if (executor != null) {
executor.shutdownNow();
// help gc
executor = null;
}
}
// 内部类
class HttpLongPollingTask implements Runnable {
private String server;
private final int retryTimes = 3;
HttpLongPollingTask(final String server) {
this.server = server;
}
@Override
public void run() {
while (RUNNING.get()) {
for (int time = 1; time <= retryTimes; time++) {
try {
doLongPolling(server);
} catch (Exception e) {
// print warnning log.
if (time < retryTimes) {
log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
time, retryTimes - time, e.getMessage());
ThreadUtils.sleep(TimeUnit.SECONDS, 5);
continue;
}
// print error, then suspended for a while.
log.error("Long polling failed, try again after 5 minutes!", e);
ThreadUtils.sleep(TimeUnit.MINUTES, 5);
}
}
}
log.warn("Stop http long polling.");
}
}
}
上面的代码还是比较复杂的,但是只要关注start方法、doLongPolling方法即可。
总结
本篇文章是从配置文件开始讲起,后面讲了soul- admin启动数据同步、soul-bootstrap启动时同步数据这几个方面来看的。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/1293.html