Soul API 网关源码学习《二》详解程序员

基于examples下面的 http服务进行源码解析

前言

上一篇文章Soul API 网关源码解析《一》中简单介绍了一下网关的含义,同时介绍了两种微服务开发常用的网关:Zuul 1.x(毕竟Zuul 2.x难产了)和Gateway。简单的阐述了一下两种网关的执行流程,以及Zuul 1.x网关被Gateway取代的原因。在介绍了这些之后,又介绍了一下Soul API 网关,这是一款是基于WebFlux实现的响应式的 API 网关,具有异步、高性能、跨语言等特点。其优点我们在后面的源码解析中再慢慢阐述。

本篇文章将会接着上一篇文章继续进行一系列的阐述,之前只是讲到了 soul-admin 和soul-bootstrap 两个服务的启动,本篇文章将会以soul-examples下的http demo的运行来进行简单的解析。下面就开始我们的源码解析之旅了!

一、运行soul-examples-http

首先我们来看下 soul-examples-http 实例中的配置:

server: 
  port: 8188 
  address: 0.0.0.0 
soul: 
  http: 
    adminUrl: http://localhost:9095 
    port: 8188 
    contextPath: /http 
    appName: http 
    full: true 
     
logging: 
  level: 
    root: info 
    org.springframework.boot: info 
    org.apache.ibatis: info 
    org.dromara.soul.test.bonuspoint: info 
    org.dromara.soul.test.lottery: debug 
    org.dromara.soul.test: debug 

上面的配置比较简单,我们就讲讲 sou.http 下几个配置的意思,adminUrl是 soul-examples-http 所要注册到的服务连接,port是本地服务端口,contextPath是注册的路径,full是表示是否像admin服务进行注册,后面会讲到这些。

首先,这里笔者以 8188 和 8189 端口来启动两个实例,admin 服务如图:

图片

从上图中可以看出我们所启动的两个实例都成功了,分别是 8188 和 8199。那么此时我们就要思考一个问题了,就是这两个实例是如何注册的,那么我们就进行下一步吧!

二、soul-examples-http是如何注册的

1.关于配置文件

前面一节中我们又提到几个配置文件参数,那么久来看看这一个配置参数的代码。如下:

org.dromara.soul.client.springmvc.config.SoulSpringMvcConfig

@Data 
public class SoulSpringMvcConfig {
    
    private String adminUrl; 
    private String contextPath; 
    private String appName; 
    /** 
     * Set true means providing proxy for your entire service, or only a few controller. 
     */ 
    private boolean full; 
    private String host; 
    private Integer port; 
} 

2.上下文注册监听器

上面代码中所涉及的几个属性,在前面已经介绍过了,这里就不再阐述了。那接下来就是这个类在哪有用到:

org.dromara.soul.client.springmvc.init.ContextRegisterListener

public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
 
private final AtomicBoolean registered = new AtomicBoolean(false); 
private final String url; 
private final SoulSpringMvcConfig soulSpringMvcConfig; 
/** 
* Instantiates a new Context register listener. 
* 
* @param soulSpringMvcConfig the soul spring mvc config 
*/ 
public ContextRegisterListener(final SoulSpringMvcConfig soulSpringMvcConfig) {
 
ValidateUtils.validate(soulSpringMvcConfig); 
this.soulSpringMvcConfig = soulSpringMvcConfig; 
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register"; 
} 
@Override 
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
 
if (!registered.compareAndSet(false, true)) {
 
return; 
} 
if (soulSpringMvcConfig.isFull()) {
 
RegisterUtils.doRegister(buildJsonParams(), url, RpcTypeEnum.HTTP); 
} 
} 
private String buildJsonParams() {
 
String contextPath = soulSpringMvcConfig.getContextPath(); 
String appName = soulSpringMvcConfig.getAppName(); 
Integer port = soulSpringMvcConfig.getPort(); 
String path = contextPath + "/**"; 
String configHost = soulSpringMvcConfig.getHost(); 
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost; 
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder() 
.context(contextPath) 
.host(host) 
.port(port) 
.appName(appName) 
.path(path) 
.rpcType(RpcTypeEnum.HTTP.getName()) 
.enabled(true) 
.ruleName(path) 
.build(); 
return OkHttpTools.getInstance().getGson().toJson(registerDTO); 
} 
} 

首先ContextRegisterListener是实现了ApplicationListener接口的,而这个实现又是基于ContextRefreshedEvent(上下文刷新事件)的。此时这里涉及了Spring 的事件监听,不过这不是本篇的主题,关注的童鞋可自行寻找资料学习,不着急的可以期待下笔者后面的输出(不过能不能输出就另当别论哈!)。
还是回归主题,上面的代码中,首先是构建ContextRegisterListener对象(那么这个对象怎么构建的呢?稍后再说),在构建的时候传入了SoulSpringMvcConfig对象。在构造函数中首先对传入的参数soulSpringMvcConfig进行相应的验证,然后再将其赋给本实例中的SoulSpringMvcConfig,最后获取所传入对象soulSpringMvcConfig中的AdminUrl,顺便将其赋给本实例中的 url。

接着就是 onApplicationEvent 方法的调用,至于这个方法何时调用,在哪里调用的,感兴趣的童鞋还是要自己去看看Spring的运行机制,这里笔者暂且给一张调用栈的截图,如下:

图片

这个调用栈很清晰哈,从下至上是服务调用的一系列栈路径,还是那一句话:要好好学习Spring(Spring 大法好)。

在onApplicationEvent 方法中首先通过cas比较替换 registered 的初始值,然后判断配置文件中的 full 是否设置为 true,如果是则进行到下一步,此时先调用 buildJsonParams() 方法来构建SpringMvcRegisterDTO对象,然后把这个对象转换成 json 字符串。之后就是开始注册了,读过Spring 源码的都知道,凡事方法带 “do“ 那都是干实事的。那就来看看 doRegister() 方法到底干了啥。代码如下:

org.dromara.soul.client.common.utils.RegisterUtils

public static void doRegister(final String json, final String url, final RpcTypeEnum rpcTypeEnum) {
 
try {
 
String result = OkHttpTools.getInstance().post(url, json); 
if (AdminConstants.SUCCESS.equals(result)) {
 
log.info("{} client register success: {} ", rpcTypeEnum.getName(), json); 
} else {
 
log.error("{} client register error: {} ", rpcTypeEnum.getName(), json); 
} 
} catch (IOException e) {
 
log.error("cannot register soul admin param, url: {}, request body: {}", url, json, e); 
} 
} 

不细看的话,你可能只是关注了这个方法 else 和 catch 中的几个 log.error 了。这里主要的调用方法是这行代码:String result = OkHttpTools.getInstance().post(url, json)。在正式进入这个方法之前,先补充下前面说的ContextRegisterListener对象是如何构建的。此处代码在:

org.dromara.soul.springboot.starter.client.springmvc.SoulSpringMvcClientConfiguration

@Configuration 
public class SoulSpringMvcClientConfiguration {
 
/** 
* Spring http client bean post processor spring http client bean post processor. 
* 
* @param soulSpringMvcConfig the soul http config 
* @return the spring http client bean post processor 
*/ 
@Bean 
public SpringMvcClientBeanPostProcessor springHttpClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
 
return new SpringMvcClientBeanPostProcessor(soulSpringMvcConfig); 
} 
/** 
* Context register listener context register listener. 
* 
* @param soulSpringMvcConfig the soul spring mvc config 
* @return the context register listener 
*/ 
@Bean 
public ContextRegisterListener contextRegisterListener(final SoulSpringMvcConfig soulSpringMvcConfig) {
 
return new ContextRegisterListener(soulSpringMvcConfig); 
} 
/** 
* Soul http config soul http config. 
* 
* @return the soul http config 
*/ 
@Bean 
@ConfigurationProperties(prefix = "soul.http") 
public SoulSpringMvcConfig soulHttpConfig() {
 
return new SoulSpringMvcConfig(); 
} 
} 

Spring Boot [email protected][email protected]
注解的方法进行实例化,这个笔者曾写过相关文章Spring [email protected]@Bean以及说说Spring Boot(Spring)的自动装配机制,感情兴趣的童鞋可以去看看。

  1. post 调用

其实这个方法很简单,就是借用 OkHttp 来进行实现的,代码如下:

org.dromara.soul.client.common.utils.OkHttpTools

public String post(final String url, final String json) throws IOException {
 
RequestBody body = RequestBody.create(JSON, json); 
Request request = new Request.Builder() 
.url(url) 
.post(body) 
.build(); 
return client.newCall(request).execute().body().string(); 
} 

这后面都是涉及 OkHttp 的调用了,由于不是本文重点,就先到此为止吧。

三、关于soul-plugin-divide插件

在我们启动 soul-bootstrap 实例的时候,是会注入 soul-plugin-divide 插件的,同时要使用了soul-spring-boot-starter-gateway(而在这里这是引用了soul-web),因为在pom文件中引用了,如下:

图片

因此,在服务调用的时候,回西安经过网关进行相关的过滤或处理。我们先看执行调用的时候的代码执行流程,如下:

org.dromara.soul.web.configuration.SoulConfiguration

@Configuration 
@ComponentScan("org.dromara.soul") 
@Import(value = {
ErrorHandlerConfiguration.class, SoulExtConfiguration.class, SpringExtConfiguration.class}) 
@Slf4j 
public class SoulConfiguration {
 
/** 
* Init SoulWebHandler. 
* 
* @param plugins this plugins is All impl SoulPlugin. 
* @return [email protected] SoulWebHandler} 
*/ 
@Bean("webHandler") 
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
 
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList); 
final List<SoulPlugin> soulPlugins = pluginList.stream() 
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList()); 
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName())); 
return new SoulWebHandler(soulPlugins); 
} 
......此处省略代码 
} 

服务启动的时候,会是实例化 SoulWebHandler,然后在执行调用过的时候,便会执行如下代码:
org.dromara.soul.web.handler.SoulWebHandler

@Override 
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
 
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName()); 
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName()); 
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler) 
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time))); 
} 

上面的这段代码暂且只execute(exchange) 方法代码如下:

@Override 
public Mono<Void> execute(final ServerWebExchange exchange) {
 
return Mono.defer(() -> {
 
if (this.index < plugins.size()) {
 
SoulPlugin plugin = plugins.get(this.index++); 
Boolean skip = plugin.skip(exchange); 
if (skip) {
 
return this.execute(exchange); 
} 
return plugin.execute(exchange, this); 
} 
return Mono.empty(); 
}); 
} 

如上代码,会根据plugin是否匹配,然后进行相关调用,这里主要是 plugin.execute(exchange, this)方法,这里以AbstractSoulPlugin为例:

@Override 
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
 
String pluginName = named(); 
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); 
if (pluginData != null && pluginData.getEnabled()) {
 
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); 
if (CollectionUtils.isEmpty(selectors)) {
 
return handleSelectorIsNull(pluginName, exchange, chain); 
} 
final SelectorData selectorData = matchSelector(exchange, selectors); 
if (Objects.isNull(selectorData)) {
 
return handleSelectorIsNull(pluginName, exchange, chain); 
} 
selectorLog(selectorData, pluginName); 
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); 
if (CollectionUtils.isEmpty(rules)) {
 
return handleRuleIsNull(pluginName, exchange, chain); 
} 
RuleData rule; 
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
 
//get last 
rule = rules.get(rules.size() - 1); 
} else {
 
rule = matchRule(exchange, rules); 
} 
if (Objects.isNull(rule)) {
 
return handleRuleIsNull(pluginName, exchange, chain); 
} 
ruleLog(rule, pluginName); 
return doExecute(exchange, chain, selectorData, rule); 
} 
return chain.execute(exchange); 
} 

如上代码我们只需关注 doExecute(exchange, chain, selectorData, rule) 方法即可,因为chain.execute(exchange)方法只是一个单纯的“回调”,此时便进入了 DividePlugin(注意DividePlugin是继承自AbstractSoulPlugin的) ,如下:

@Override 
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
 
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); 
assert soulContext != null; 
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class); 
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId()); 
if (CollectionUtils.isEmpty(upstreamList)) {
 
log.error("divide upstream configuration error: {}", rule.toString()); 
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null); 
return WebFluxResultUtils.result(exchange, error); 
} 
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); 
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip); 
if (Objects.isNull(divideUpstream)) {
 
log.error("divide has no upstream"); 
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null); 
return WebFluxResultUtils.result(exchange, error); 
} 
// set the http url 
String domain = buildDomain(divideUpstream); 
String realURL = buildRealURL(domain, soulContext, exchange); 
exchange.getAttributes().put(Constants.HTTP_URL, realURL); 
// set the http timeout 
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout()); 
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry()); 
return chain.execute(exchange); 
} 

是不是突然发现,这里又是一个回调(这个和Spring Boot 中的一个策略很类似,笔者记不太清,有机会找出来对比一下)。不过忽略了上面两端代码中其实涉及到负载均衡和相关代码,由于时已夜深,笔者要先休息了(身体是革命本钱)。

总结

本文主要是从soul-examples-http实例的运行开始,期间分析了相关的注册(这中间笔者忽略了一些信息,有机会再补上),设计到Spring的相关只是,以及OkHttp调用等,以后简单涉及了一下 plugin 和 gateway(因为时间原因,只是简单涉及,其间定有谬误,望请指出,谢谢!)

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/1285.html

(0)
上一篇 2021年7月15日
下一篇 2021年7月15日

相关推荐

发表回复

登录后才能评论