HTTP Server实现
源码解析
首先我们先讲一下HTTP Server启动前的准备工作,也就是HTTP Server的初始化。
1.在初始化的时候,会初始化多个线程池,这个线程池中会有许多的阻塞的线程队列,当某一种事件发生的时候,就会根据事件来使用线程池工厂来为事件创建线程。
【源代码片段】
public void initThreadPool() throws Exception {// 批处理消息BlockingQueue<Runnable> batchMsgThreadPoolQueue = new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize);batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, batchMsgThreadPoolQueue, "eventMesh-batchMsg-", true);// 发送消息,和上面的类似BlockingQueue<Runnable> sendMsgThreadPoolQueue=...// 推送消息,和上面的类似BlockingQueue<Runnable> pushMsgThreadPoolQueue =...// 客户端管理,和上面的类似BlockingQueue<Runnable> clientManageThreadPoolQueue =...// 管理线程池,和上面的类似BlockingQueue<Runnable> adminThreadPoolQueue =...// 回复消息,和上面的类似BlockingQueue<Runnable> replyMessageThreadPoolQueue =...}
2.注册http请求处理器,这个处理器会针对http发过来的请求,获取到请求码,根据请求码注册对应的处理器,并且为这个事件用上面的线程池分配一个线程进行处理。
【源代码片段】
public void registerHTTPRequestProcessor() {// 新建批量消息处理器BatchSendMessageProcessor batchSendMessageProcessor = new BatchSendMessageProcessor(this);// 获取请求码,并且分配一个线程进行处理registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(),batchSendMessageProcessor, batchMsgExecutor);BatchSendMessageV2Processor batchSendMessageV2Processor =...// 同步消息处理器,和上面的类似SendSyncMessageProcessor sendSyncMessageProcessor =...// 异步消息处理器,和上面的类似SendAsyncMessageProcessor sendAsyncMessageProcessor =...// 管理指标处理器,和上面的类似AdminMetricsProcessor adminMetricsProcessor =...// 心跳处理器,和上面的类似HeartBeatProcessor heartProcessor =...// 订阅处理器,和上面的类似SubscribeProcessor subscribeProcessor =...// 和上面的类似UnSubscribeProcessor unSubscribeProcessor =...// 回复消息处理器,和上面的类似ReplyMessageProcessor replyMessageProcessor =...}
3.这里就是http初始化的全部代码。
public class EventMeshHTTPServer extends AbstractHTTPServer {...//初始化public void init() throws Exception {logger.info("==================EventMeshHTTPServer Initialing==================");// 初始化线程组super.init("eventMesh-http");// 初始化线程池initThreadPool();// 对指标初始化,主要是把生成的指标用于后台数据处理metrics = new HTTPMetricsServer(this);metrics.init();// 消费者管理初始化,主要是把httpServer注册到事件总线上consumerManager = new ConsumerManager(this);consumerManager.init();producerManager = new ProducerManager(this);producerManager.init();// 重试httpRetryer = new HttpRetryer(this);httpRetryer.init();// 注册http处理器registerHTTPRequestProcessor();logger.info("--------------------------EventMeshHTTPServer inited");}...}
初始化完成之后,再启动http的服务器端,这里我也同样聊聊以下几点。
1.AbstractHTTPServer的启动,采用的是netty的异步模型框架搭建的。具体来讲,这里创建了两个线程池:bossGroup和workerGroup,前者是用来轮询accept事件并且和client建立连接的,后者是用来轮询read和write事件并且使用handlers处理io事件的。而且这里采用了回调机制,当调用发出后,并不一定立刻就能得到结果,而是在实际处理的时候调用这个组件完成后,通过状态、通知等回调告知调用者。
public abstract class AbstractHTTPServer extends AbstractRemotingServer {...@Overridepublic void start() throws Exception {super.start();Runnable r = () -> {// 创建服务器端启动的对象ServerBootstrap b = new ServerBootstrap();// 不进行加密通话?SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext() : null;b.group(this.bossGroup, this.workerGroup)// 设置两个线程组.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现.childHandler(new HttpsServerInitializer(sslContext))// 设置workerGroup的管道处理器.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);// 保持连接状态try {httpServerLogger.info("HTTPServer[port={}] started......", this.port);// 这里就是对http的端口进行绑定,并且启动服务器端,采用了回调机制ChannelFuture future = b.bind(this.port).sync();//关闭通道事件进行监听future.channel().closeFuture().sync();} catch (Exception e) {httpServerLogger.error("HTTPServer start Err!", e);try {// 关闭资源shutdown();} catch (Exception e1) {httpServerLogger.error("HTTPServer shutdown Err!", e);}return;}};Thread t = new Thread(r, "eventMesh-http-server");t.start();started.compareAndSet(false, true);}...}
2.设置管道处理器部分,当channel被注册之后,这个类中的initChannel方法就会被调用,也会执行在管道后面加入的handlers。
class HttpsServerInitializer extends ChannelInitializer<SocketChannel> {private SSLContext sslContext;public HttpsServerInitializer(SSLContext sslContext) {this.sslContext = sslContext;}@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();if (sslContext != null && useTLS) {SSLEngine sslEngine = sslContext.createSSLEngine();sslEngine.setUseClientMode(false);pipeline.addFirst("ssl", new SslHandler(sslEngine));}// 在管道后面加入handlerspipeline.addLast(new HttpRequestDecoder(),// 这个是对http解码new HttpResponseEncoder(),// 这个是对http的响应编码new HttpConnectionHandler(),// 这个是对http连接处理new HttpObjectAggregator(Integer.MAX_VALUE),// 这个是http对象聚合new HTTPHandler());// 这个是http的Handler的具体实现}}}
3.这里附上start的源码部分。
public class EventMeshHTTPServer extends AbstractHTTPServer {...public void start() throws Exception {super.start();// 指标metrics.start();// 消费者管理consumerManager.start();// 生产者管理producerManager.start();// 重试httpRetryer.start();logger.info("--------------------------EventMeshHTTPServer started");}...}
到此,EventMesh的HTTP Server实现部分源码解析就结束了。
注:本文内容由社区小伙伴陈创慧提供,原文地址:https://blog.csdn.net/CodePlayMe/article/details/120671622
{{m.name}}
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/199237.html