Soul API 网关源码解析之SoulWebHandler详解程序员

关于web handler

前言

1、关于SoulWebHandler

Soul 实际是通过Spring WebFlux来实现的,而在WebFlux中最核心的接口便是WebHandler,它是一个请求处理器,具体处理请求是由它的实现类实现的,它具有多个实现类。如图:

图片

但是我在Soul中,它有一个自定义的实现SoulWebHandler,如果是用Gateway网关的话,则要关注DispatcherHandler、FilteringWebHandler。在这里我们只需要关注SoulWebHandler即可,先看看WebHandler接口:

public interface WebHandler {
    
   /** 
    * Handle the web server exchange. 
    * @param exchange the current server exchange 
    * @return [email protected] Mono<Void>} to indicate when request handling is complete 
    */ 
   Mono<Void> handle(ServerWebExchange exchange); 
} 

2、WebHandler的自定义实现

在这个接口里,只是定义了一个hanlde方法,那我们看看在Soul里面的实现SoulWebHandler:

public final class SoulWebHandler implements WebHandler {
 
private final List<SoulPlugin> plugins; 
private final Scheduler scheduler; 
/** 
* Instantiates a new Soul web handler. 
* 
* @param plugins the plugins 
*/ 
public SoulWebHandler(final List<SoulPlugin> plugins) {
 
this.plugins = plugins; 
String schedulerType = System.getProperty("soul.scheduler.type", "fixed"); 
if (Objects.equals(schedulerType, "fixed")) {
 
int threads = Integer.parseInt(System.getProperty( 
"soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16))); 
scheduler = Schedulers.newParallel("soul-work-threads", threads); 
} else {
 
scheduler = Schedulers.elastic(); 
} 
} 
/** 
* Handle the web server exchange. 
* 
* @param exchange the current server exchange 
* @return [email protected] Mono<Void>} to indicate when request handling is complete 
*/ 
@Override 
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
 
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName()); 
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName()); 
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler) 
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time))); 
} 
private static class DefaultSoulPluginChain implements SoulPluginChain {
 
// 此处省略代码 
} 
} 

在上面的类中首先看到的是定义了一个调度器(Scheduler),和一个集合plugins,然后把plugins传入SoulWebHandler的构造器。这个构造的时机是在项目启动的时候:
SoulConfiguration

@Bean("webHandler") 
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
 
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList); 
final List<SoulPlugin> soulPlugins = pluginList.stream() 
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList()); 
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName())); 
return new SoulWebHandler(soulPlugins); 
} 

上面代码便是启动的时候在SoulConfiguration中来创造SoulWebHandler实例。这属于Spring Boot的新特性,我们来看下SoulConfiguration的spring.factories配置文件:

# Auto Configure 
org.springframework.boot.autoconfigure.EnableAutoConfiguration=/ 
org.dromara.soul.web.configuration.SoulConfiguration 
org.springframework.context.ApplicationListener=/ 
org.dromara.soul.web.logo.SoulLogo 

二、WebHandler的实际逻辑实现

1、WebHandler的实现

因为SoulWebHandler是WebHandler的实现类,而WebHandler中只定义了一个execute方法,因此这里的execute便是SoulWebHandler的实际处理逻辑。代码如下:

@Override 
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
 
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName()); 
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName()); 
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler) 
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time))); 
} 

上面代码中,首先是调用一个计数器 counterInc,然后是获取一个开始时间,接下来的才是重点:创建一个DefaultSoulPluginChain实例,传入soulPlugins的同时还调用其实例的execute方法。而在这个方法里所传入的是ServerWebExchange,这个类中包含我们请求的参数,如图:
图片

然后是调用一个订阅方法(subscribeOn),最后返回调用结果。

2、关于DefaultSoulPluginChain

这里我们先看看DefaultSoulPluginChain这个默认的插件链,代码如下:

private static class DefaultSoulPluginChain implements SoulPluginChain {
 
private int index; 
private final List<SoulPlugin> plugins; 
DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
 
this.plugins = plugins; 
} 
@Override 
public Mono<Void> execute(final ServerWebExchange exchange) {
 
return Mono.defer(() -> {
 
if (this.index < plugins.size()) {
 
SoulPlugin plugin = plugins.get(this.index++); 
Boolean skip = plugin.skip(exchange); 
if (skip) {
 
return this.execute(exchange); 
} 
return plugin.execute(exchange, this); 
} 
return Mono.empty(); 
}); 
} 
} 

这里的DefaultSoulPluginChain是实现了SoulPluginChain接口的,这个接口代码如下:

public interface SoulPluginChain {
 
Mono<Void> execute(ServerWebExchange exchange); 
} 

在这个接口里也只是定义了一个execute方法,这个方法便是DefaultSoulPluginChain的主要处理方法。在这个方法里,首先是获取plugin,然后判断是否skip,是则调用this.execute(exchange),否则则调用plugin.execute(exchange, this),这里涉及到SoulPlugin,本篇文章暂不涉及。

总结

本篇文章主要从WebHandler入手。然后简单的介绍了一下SoulWebHandler这个自定义实现类,以及其中的一些简单的代码解析。其实这个在Gateway 网关中也是重要的一个环节。后面我们将接着从plugin来入手看看plugin的相关实现。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/1288.html

(0)
上一篇 2021年7月15日
下一篇 2021年7月15日

相关推荐

发表回复

登录后才能评论