这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
pipeline无疑是 netty中非常重要的一环,在io处理中netty通过已存放在pipeline中的handler有序处理socket中的信息,而pipeline中包含的都是一个个的handler,它是怎么添加进去的呢
创建代码如下:其中有handler和childhandler
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 绑定线程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 绑定监听端口 .localAddress(this.port) .handler(new LoggingHandler(LogLevel.INFO)) // 绑定客户端连接时候触发操作 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //字符串解码 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); //字符串编码 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); } }); // 服务器异步创建绑定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); // 关闭服务器通道 cf.channel().closeFuture().sync(); } finally { // 释放线程池资源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } }
顺着服务端bind方法点进去会看到如下:创建完channel实例后 会调用一个init方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
init方法为抽象方法 服务端实现如下
void init(Channel channel) throws Exception { //省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看这里 添加了一个ChannelInitializer实例 实例中实现了initChannel 获取配置的handler添加到pipeline p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
而pipeline的addlast方法如下
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //这方法很简单 添加到链表中 但是此时添加的是上文的ChannelInitializer 而不是我们定义的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //这个方法就是关键 将ChannelInitializer中的handler添加到pipeline中 以及删除ChannelInitializer //但请注意 首次注册不会走到这里 在前面的判断就会返回 会在注册的逻辑中触发 最终调用的一致 callHandlerAdded0(newCtx); return this; }
查看ChannelInitializer中的关键代码如下 OK 通过如下代码我们就看到 调用了ChannelInitializer的initChannel方法 就是在最开始我们定义的那个,然后里面就用继续调用pipeline add我们真正的handler
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看这里删除pipeline中的 ChannelInitializer ChannelInitializer 其实就是一个handler 只是它重写了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
关于“netty中pipeline如何添加handler”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/tech/opensource/219652.html