一、Netty
1. Netty的线程模型
netty通过Reactor模型基于多路复用器接收并处理用户请求,这个Reactor模型分为三种:
第一种是Reactor单线程模型,它是使用一个线程来处理客户端的连接和IO处理
第二种是Reactor多线程模型,他使用一个Acceptor线程来处理客户端的连接,并使用线程池来处理Handler的IO操作
第三种是Reactor主从多线程模型,这种模型的Acceptor和Handler处理都是线程池
Netty使用的是第二种模型,即使你这样写:(boosGroup表示Acceptor)
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
但是由于服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费.
2. Netty的执行过程
首先我们想想Nio的启动流程:
// 1. 打开服务端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 打开 Selector
Selector selector = Selector.open();
// 3. 服务端 Socket 监听8080端口, 并配置为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 将 channel 注册到 selector 中.
// 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
// 4. Channel注册到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 5. 不断循环
while (true) {
//迭代selectedkey
}
第一步、打开服务端 ServerSocketChannel
第二步、通过 Selector.open() 打开一个 Selector.
第三步、绑定端口
第四步、将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set),并且监听事件为SelectionKey.OP_ACCEPT
第五步、循环,不断重复:
- 调用 select() 方法
- 调用 selector.selectedKeys()
- 获取 selected keys
- 迭代每个 selected key:
1) 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
2) 判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 “SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()” 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
3) 根据需要更改 selected key 的监听事件.
4) 将已经处理过的 key 从 selected keys 集合中删除.
然后理一理Netty服务端的启动流程:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap bs = new ServerBootstrap();
bs.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
}
});
bs.bind(8001).sync();
对于这个启动代码,.group、.channel、.option、.handler这些方法都是把参数传入到Bootstrap,实际上使用是在client调用connect和Server调用doBind方法时才会使用:
就拿connect来说说做了什么事
1、 new NioEventLoopGroup()
1)实例化EventLoopGroup,在构造方法中会一直调用父构造方法,最终在MultithreadEventExecutorGroup中会实例化EventLoopGroup,在这个构造方法内部,首先会创建一个EventExecutor数组
this.children = new SingleThreadEventExecutor[nThreads];
2)创建一个EventExecutor数组并初始化,大小为构造时new NioEventLoopGroup(4),如果不指定默认为处理器核心数*2 ,这个EventExecutor 其实是一个Executor,他用于执行Runnable
然后会调用newChild初始化EventExecutor数组
this.children[terminationListener] = this.newChild(threadFactory, args);
3)newChiild方法实际上是初始化一个NioEventLoop
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider)args[0]); }
4)NioEventLoop的构造方法,调用了openSelector方法,打开了一个selector——对应NIO的第二步
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if(selectorProvider == null) { throw new NullPointerException("selectorProvider"); } else { this.provider = selectorProvider; this.selector = this.openSelector(); } }
可见一个new NioEventLoopGroup() 实际上是初始化了一个NioEventLoop数组,并且在实例化NioEventLoop时调用了openSelector方法,打开了一个selector
2、然后根据.channel方法传入的类型,通过工厂方法设计模式实例化一个Channel,并且打开一个SocketChannel(对应NIO的第一步)
private static SocketChannel newSocket(SelectorProvider provider) { ... return provider.openSocketChannel(); }
3、然后根据.option方法传入的参数,初始化channel,给channel设置各种参数
4、然后调用group().register(channel)进行channel注册到selector中
5、上一步的注册,最终会到达AbstractNioChannel.doRegister方法,该方法通过
javaChannel().register(eventLoop().selector, 0, this)
将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.
注意这里把channel注册到selector中,interestOps居然是0,根据NIO,一个连接注册到Selector的Channel,监听事件应该是SelectionKey.OP_ACCEPT,下一步会说到
6、调用了doDegister方法注册成功后,会触发ChannelRegistered事件,ChannelRegistered事件会在ChannelPipelines中传递,从HeadHandler传到TailHandler中,传递完成之后注册成功!
(5、6完成了Channel在Selector的注册,对应NIO的第四步)
ps:当有客户端连接进来时,会怎样?
- 首先判断ServerSocketChannel监听是否成功,如果成功,触发ChannelActive事件,该事件也会在ChannelPipelines中传递
- 然后根据配置是否需要触发ChannelRead事件,触发了HeadHandler的read方法,最后再服务端链路注册成功之后,重新将操作位设置为SelectionKey.OP_ACCEPT
7、执行bind方法,绑定端口,(3,4,5,6)这四步,实际上都会在第7步执行之后才执行,bind方法最终会调用到doBind0方法,绑定端口(这一步对应NIO的第三步)
此时,已经完成了NIO的1,2,3,4步,还剩第5步,在一个循环中,不断重复,迭代selectorkey
8、循环在哪里?
复杂点说:
因为NioEventLoop是一个SingleThreadEventExecutor,一个NioEventLoop会和一个线程绑定,NioEventLoop中有一个run方法。
所以EventLoop的启动,实际上就是EventLoop启动线程。在SingleThreadEventExecutor的execute方法中,有一个startThread方法,这个方法就是用来启动EventLoop的。
当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动,最终会调用到NioEventLoop的run方法
简单点说
当执行bind方法,会导致NioEventLoop的启动(也就是Executor的启动),NioEventLoop和一个线程绑定,所以NioEventLoop的启动会调用到自身的run方法
run方法逻辑:
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();//判断是否又IO事件就绪
} else {
select(oldWakenUp);//判断是否有IO事件就绪
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();//执行selectkey的read write操作
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
...
}
}
}
processSelectedKeys中就是处理事件的地方,read,write,connect等,它内部是通过UnSafe类来实现的,就如NIO对read,write等事件的处理,而例如read,wirte方法,Unsafe最终会调用它的父类AbstractNioByteChannel的read方,write方法,最终在这里会完成数据的读写,并且通过调用 pipeline.fireChannelRead 发送一个 inbound 事件,给其他的channel处理
(这一步对应NIO的第五步)
Netty的执行流程和NIO的执行流程联系完毕
总结:
1、一个EventLoopGroup 内部 维护了一个EventLoop数组,EventLoop实际上是一个Executor,大小为实例化时传入或者是默认的处理器核心数*2,Executor中是用来执行Runnable的
2、一个EventLoop中有一个 Selector 属性,SocketChannel会注册到EventLoop的 Selector 中去
3、一个客户端链接,会使用一个EventLoop来去处理
3、注册事件,读事件,写事件都会在ChannelPipeline中传递,从HeadHandler到TailHandler
4、一个NioEventLoop会和一个线程联系,Channel的IO操作,如监听,读,写是在EventLoop中run方法完成的,而run方法内部是通过Unsafe类完成的,而UnSafe类最终又会调用父类AbstractNioByteChannel来处理读写连接的事件,并且会把该事件在pipeline中传递
4. 客户端的连接服务端
Bootstrap调用connect后,最终会来到下面的方法
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if(localAddress != null) { this.javaChannel().socket().bind(localAddress);//绑定地址 } boolean success = false; boolean var5; try { boolean connected = this.javaChannel().connect(remoteAddress); if(!connected) { this.selectionKey().interestOps(8); } success = true; var5 = connected; } finally { if(!success) { this.doClose(); } } return var5; }
最终会调用NioSocketChannel的doConnect方法,而该方法内部调用:
boolean connected = javaChannel().connect(remoteAddress);
javaChannel()会返回java.nio.channels.SocketChannel,SocketChannel调用connect方法,跟Nio的一样,也就是说,Netty的客户端连接最终还是执行Nio的连接
三、Netty知识点
前言:NioEventLoop的作用
第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理,客户端的连接等;
(后面的分析主要是这个作用)
第二个是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.
NioEventLoop和一个线程联系,并且内部有一个消息队列,在启动NioEventLoop时,调用run方法,会首先去处理消息队列的任务,然后再去select就绪的IO事件
1. Netty服务端的启动过程
1、首先实例化NioEventLoopGroup,最终会初始化一个NioEventLoop数组,NioEventLoop是一个Executor,每一个NioEventLoop和一个线程联系。
2、当调用doBind绑定端口后,会进行一系列的初始化操作,例如初始化pipeline和channel的option,并且会把ServerSocketChannel注册到NioEventLoop的selector上,并且此时的instrestOps是SelectionKey.OP_ACCEPT,此时的ServerSocketChannel是处于监听状态。
3、然后会取出一个NioEventLoop,启动NioEventLoop,会在run方法中,在循环中监听事件(Accept,read,write)
2. Accept过程
1、当客户端进行connect后,服务端的BossGroup启动的NioEventLoop会捕获到SelectionKey.OP_ACCEPT事件
2、 然后会调用NioMessageUnsafe的read方法来处理连接事件,然后会取出连接的客户端的NioSocketChannel,然后触发ChannelActive事件,该事件也会在ChannelPipelines中传递,然后再触发客户端NioSocketChannel和服务端NioServerSocketChannel的ChannelRead操作
3、然后服务端的ServerBootStrap会为客户端的NioSocketChannel设置childHandler参数
4、紧接着会把取得客户端的NioSocketChannel通过childGroup.register(child)将NioSocketChannel注册到work的NioEventLoop中,这个过程和NioServerSocketChannel注册到boss的NioEventLoop的过程一样,最终交付给work线程对应的selector进行read事件的监听。
3. Read操作
同样是在NioEventLoop中进行的,当work线程的selector检测到OP_READ事件发生时,触发ChannelRead操作,read操作完成后,该事件会在Pipeline中传递下去,给Pipeline的Handler依次处理
4. epoll bug (Selector 空轮询)
当NioEventLoop中select方法内,selector的轮询结果为空,消息队列也没有新的消息要处理,说明是一个空轮询,有可能会导致jdk的epoll bug,会导致Selector的空轮询,使IO线程一直处于100%状态
解决:
通过重建Selector的方式,首先当前reBuildSelector是否是其他线程发出的,如果是,则放入消息队列中,由NioEventLoop的线程负责调用。然后打开新的Selector,通过循环,将老的Selector注册的Channel重新出则到新的Selector中,并关闭老的Selector
5. TCP粘包拆包问题
原因:
- 应用程序write写入的字节大小 > TCP发送窗口缓冲区大小
- TCP分片
- IP分片
MSS:TCP数据包每次能够传输的最大数据大小
MTU:数据链路层每一次传输的最大传输单元
TCP分片:当前发送的数据包 > MSS
IP分片:当前IP层发送的分组 > MTU
解决:
1、消息定长,每个报文指定固定的字节
例如FixedLengthFrameDecoder,它是固定长度解码器,可以根据执行的长度来编解码,如果收到的报文长度不足指定的长度,会先缓存等到下一个消息来进行拼包,直到达到报文长度
2、使用一些分隔符,例如以/n或/r/n来分割(LineBasedFrameDecoder),或者可以自定义分隔符(DelimiterBasedFrameDecoder)
3、将消息分为消息头和消息体,消息头包含表示消息总长度
4、其他更复杂的协议
6. ChannelOption
1、ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
2、ChannelOption.SO_KEEPALIVE
是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。
3、ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF
这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
4、ChannelOption.TCP_NODELAY
在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据,也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
TCP_NODELAY就是用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false。
5、ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用。
比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/6873.html