说起 Reactor,相信大部分人都很陌生。但是在实际开发中你们可能都用到了它。
首先,在一些高性能 NIO 框架中有使用,比如:netty。再比如,Redis 中也有使用。所以说 Reactor 对我们来说是熟悉而又陌生的。
我写了一大堆的 WebFlux 教程,很多人可能还不知道。但其实它的底层也是 Reactor。Reactor 是未来的一种趋势,未来已来,而你还没做好准备。
Reactor 是一种应用在服务器端的开发模式,目的是提高服务端程序的并发能力,其实就是实现了 I/O 多路复用这种 I/O 模型。为了学习它,我们来看一看传统的多线程实现餐厅点餐的方式。
上图,可能是我们去餐馆吃饭经常遇到的场景。这个流程可以归纳如下:
- 服务员 X 给出菜单,并等待点菜
- 顾客 Y 查看菜单,并点菜
- 服务员 X 把菜单交给厨师,厨师照着做菜
- 厨师 N 做好菜,上餐桌
这个模式,看似不错,但实际上效率并不高。如果餐厅生意越来越好,那么顾客人数就会不断增加,这时服务员就有点处理不过来了。需要老板增加人数了,但是精明的老板发现,每个服务员在服务完客人后,都要去休息一下, 因此老板就说,“你们都别休息了,在旁边待命”。这样可能 10 个服务员也来得及服务 20 个顾客了。这也是“线程池”的方式,通过重用线程来减少线程的创建和销毁时间,从而提高性能。
但是随着生意的继续变好,顾客人数又增加了。老板想到仅仅靠剥削服务员的休息时间也没有办法服务这么多顾客。于是精明的老板又发现每个服务员并不是一直在干活的,大部分时间他们只是站在餐桌旁边等客人点菜。
所以,老板就又变招了。要求各个服务员,客人点菜的时候你们就别傻站着了,先去服务其它客人,有客人点好的时候喊你们再过去。
按照这个模式,运行了两天,老板发现根本就不需要那么多的服务员,于是裁了一波员,甚至可以只有一个服务员。最终可以用下图进行归纳总结。
上面这个图就非常的符合 Reactor 模式的核心思想:减少等待。当遇到需要等待 IO 时,先释放资源,而在 IO 完成时,再通过事件驱动 (event driven) 的方式,继续接下来的处理。从整体上减少了资源的消耗。
在 Java 中 Reactor 又分为 3 种模式:经典模式、多工作线程模式、多 Reactor 模式。下面我们先来说第一种 Reactor 经典模式。
Reactor 经典模式又称为单线程模式。这种模式的特点就是,当多个 client 连接时,有一个专门的 ServerSocketChannel 负责 OP_ACCEPT 事件。对于具体的 I/O 操作,分配给另外一个 ServerSocketChannel 的 OP_READ 事件。简单的实现代码如下:
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if (count <= 0) {
socketChannel.close();
key.cancel();
continue;
}
}
keys.remove(key);
}
}
这种单线程模式,有一个问题,不知道细心的网友有没有发现。这个 selector.select() 是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获 Channel 注册时指定的所关注的事件。所以,效率还有提升空间。
实际上,在多核 CPU 的时代,它并不能重复的利用 CPU 的多核优势,所以,这种模式用的不多。于是,基于多线程模式的 Reactor 就出现了。
这种模式就是在 read 的时候,利用多线程模型。而 acceptor 还是有一个线程来负责。具体的简化代码如下所示:
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
if(selector.selectNow() < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
readKey.attach(new Processor());
} else if (key.isReadable()) {
Processor processor = (Processor) key.attachment();
processor.process(key);
}
}
}
具体的读请求处理在如下所示的 Processor 类中。该类中设置了一个静态的线程池处理所有请求。而 process 方法并不直接处理 I/O 请求, 而是把该 I/O 操作提交给上述线程池去处理,这样就充分利用了多线程的优势,同时将对新连接的处理和读/写操作的处理放在了不同的线程中, 读/写操作不再阻塞对新连接请求的处理。
public class Processor {
private static final ExecutorService service = Executors.newFixedThreadPool(16);
public void process(SelectionKey selectionKey) {
service.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
selectionKey.cancel();
return null;
} else if(count == 0) {
return null;
}
return null;
});
}
}
上面的模式看似完美,但实际上还有改进空间。
其实,我们可以将 Reactor 分成两部分,一部分时 mainReactor,一部分是 subReactor,这就是多 Reactor 模式。
mainReactor 负责监听并 accept 新连接,然后将建立的 socket 通过多路复用器(Acceptor)分派给 subReactor。subReactor 负责多路分离已连接的 socket,读写网络数据;业务处理功能,其交给 worker 线程池完成。通常,subReactor 个数上可与 CPU 个数等同。多 Reactor 示例代码如下所示:
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[coreNum];
for (int i = 0; i < processors.length; i++) {
processors[i] = new Processor();
}
int index = 0;
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
keys.remove(key);
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
Processor processor = processors[(int) ((index++) % coreNum)];
processor.addChannel(socketChannel);
processor.wakeup();
}
}
}
Processor 的相关代码如下所示:
public class Processor {
private static final ExecutorService service =
Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
public Processor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
start();
}
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void wakeup() {
this.selector.wakeup();
}
public void start() {
service.submit(() -> {
while (true) {
if (selector.select(500) <= 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
key.cancel();
continue;
} else if (count == 0) {
continue;
}
}
}
}
});
}
}
这个多 Reactor 的模式其实就是 Netty 的模式,模仿 Netty 的关键代码。
: » 被蒙在鼓里的高性能 Reactor 的经典模式、多工作线程模式、多Reactor模式
原创文章,作者:sunnyman218,如若转载,请注明出处:https://blog.ytso.com/tech/java/252076.html