Netty 4 实现一个 NettyClient详解编程语言

本文章为作者原创,有问题的地方请提出指正。

1、类继承Diagram

Netty 4 实现一个 NettyClient详解编程语言

2、定义EndPoint类

目前仅仅定义了2个方法,分别用来获取本地或远程服务器的地址。

package netty; 
 
import java.net.InetSocketAddress; 
 
/** 
 * @author xfyou 
 * @date 2019/8/28 
 */ 
public interface EndPoint { 
 
  /** 
   * Return  the local Inet address 
   * 
   * @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code> 
   * if this <code>EndPoint</code> does not represent a network connection. 
   */ 
  InetSocketAddress getLocalAddress(); 
 
  /** 
   * Return the remote Inet address 
   * 
   * @return The remote Inet address to which this <code>EndPoint</code> is bound, or <code>null</code> 
   * if this <code>EndPoint</code> does not represent a network connection. 
   */ 
  InetSocketAddress getRemoteAddress(); 
 
}

2、定义AbstractClass类

主要是定义几个抽象方法:

  • doOpen – 创建引导类Bootstrap;
  • doConnect – 创建Channel并连接远程服务器;
  • getChannel – 获取已创建的Channel

另外,提供了2个公共的方法给外部调用:

  • send – 发送消息(OutBound)
  • receive – 接收消息 (InBound)

内部私有的write()方法。write方法负责在connect成功后,把消息写到远程peer。翻阅源码,我们可以看到如下的调用栈:

  • channel.writeAndFlush
    • pipeline.writeAndFlush (pipleline为channel实例所关联的pipleline实例)
      • AbstractChannelHandlerContext.writeAndFlush (每个ChannelHanlder都有一个对应的ChannelHandlerContext,可以从这个ChannelHanlderConext获取Channel、ChannelHanlder和ChannelPipeline)
        • AbstractChannelHandlerContext.write(在这个方法里面有一个executor.inEventLoop()的判断,这个地方很重要,它主要是判断当前线程是否是EventLoop分配的线程,如果是则直接使用EventLoop分配的线程执行,否则会将当前要执行的任务封装成一个Task,然后塞到一个LinkedBlockQueue里面去等待后续的调度执行。这样做的目的主要是就是把用户线程的操作封装成Task放入队列,统一由I/O线程来处理)
package netty; 
 
import io.netty.buffer.ByteBuf; 
import io.netty.buffer.Unpooled; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import lombok.*; 
import lombok.extern.slf4j.Slf4j; 
import org.apache.commons.lang.NotImplementedException; 
 
import java.net.InetSocketAddress; 
import java.nio.charset.StandardCharsets; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
 
/** 
 * @author xfyou 
 * @date 2019/8/29 
 */ 
@Slf4j 
@RequiredArgsConstructor 
abstract class AbstractClient implements EndPoint { 
 
  @NonNull 
  private String hostName; 
 
  @NonNull 
  private int port; 
 
  @NonNull 
  @Getter(value = AccessLevel.PROTECTED) 
  private int connectionTimeout; 
 
  protected final CountDownLatch countDownLatch = new CountDownLatch(1); 
  protected String respMsg; 
 
  @SneakyThrows 
  public void send(Object message) { 
    doOpen(); 
    doConnect(); 
    write(message); 
  } 
 
  @SneakyThrows 
  public String receive() { 
    boolean b = countDownLatch.await(getConnectionTimeout(), TimeUnit.MILLISECONDS); 
    if (!b) { 
      log.error("Timeout(" + getConnectionTimeout() + "ms) when receiving response message"); 
    } 
    return respMsg; 
  } 
 
  private void write(Object message) { 
    Channel channel = getChannel(); 
    if (null != channel) { 
      ChannelFuture f = channel.writeAndFlush(byteBufferFrom(message)).syncUninterruptibly(); 
      if (!f.isSuccess()) { 
        log.error("Failed to send message to " + getRemoteAddress() + f.cause().getMessage()); 
      } 
    } 
  } 
 
  private ByteBuf byteBufferFrom(Object message) { 
    return message instanceof String ? Unpooled.copiedBuffer((String) message, StandardCharsets.UTF_8) : Unpooled.copiedBuffer((byte[]) message); 
  } 
 
  @Override 
  public InetSocketAddress getRemoteAddress() { 
    return new InetSocketAddress(hostName, port); 
  } 
 
  @Override 
  public InetSocketAddress getLocalAddress() { 
    throw new NotImplementedException("This method is not need to be implemented"); 
  } 
 
  /** 
   * Open client. 
   * 
   * @throws Throwable 
   */ 
  protected abstract void doOpen() throws Throwable; 
 
  /** 
   * Connect to server. 
   * 
   * @throws Throwable 
   */ 
  protected abstract void doConnect() throws Throwable; 
 
  /** 
   * Get the connected channel. 
   * 
   * @return channel 
   */ 
  protected abstract Channel getChannel(); 
 
}

4、定义NettyClient类

NettyClient类继承了AbstractClient类,主要是实现了doOpen、doConnect、getChannel类;同时实现了一个自定义的ChannelHander用来在ChannelActive时获取Channel以及有消息返回时读取消息。

  • doOpen方法的实现。创建引导类并在引导类上注册相关属性;
    • 注册NioEventLoopGroup,基于java NIO传输的一个线程池,线程池的默认大小为:CPU核数*2。当一个新的Channel被创建后,Netty会从这个NioEventLoopGroup中选择一个线程来为此Channel创建一个关联的EventLoop(用来监听关联Channel的所有的I/O事件,比如连接、断开连接、读、写等);
    • 注册NioSocketChannel类类型,这个类型说明将要创建的Channel的实例的类型,客户端为:NioSocketChannel,服务器端为:NioServerSocketChannel;Bootstrap会根据这个class来创建一个BootstrapChannelFactory<NioSocketChannel>实例(Channel工厂类,用于将来在connect时创建Channel);
    • 设置相关Option选项
    • 注册自定义的ChannelHandler,这些ChannelHandler会被注册到与Channel相关联的ChannelPipleline中,用来拦截消息并做相应的处理。
  • doConnect方法的实现。通过已创建的Channel来连接到远程服务器。前面我们已经在Bootstrap中设置的超时时间,所以connect时可以使用忽略线程中断阻塞的方式去连接,直到超时。connect时会先通过BootstrapChannelFactory<NioSocketChannel>来创建一个NioSocketChannel实例,并把这个NioSocketChannel实例注册到NioEventGroup中去(从线程池中按某种算法选择一个EventLoop来和当前的Channel建立对应关系,可以是1:N,即一个EventLoop可以对应多个Channel )。EventLoop同时也是一个EventLoopExecutor,EventLoop和Channel对应起来后就可以处理所有这个Channel的I/O操作了。一句话,某个Channel的所有I/O操作都是线程池(NioEventGroup)中的某个I/O线程(EventLoopExecutor)来异步处理的。
package netty; 
 
import io.netty.bootstrap.Bootstrap; 
import io.netty.buffer.ByteBuf; 
import io.netty.channel.*; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import lombok.extern.slf4j.Slf4j; 
 
import java.nio.charset.StandardCharsets; 
 
/** 
 * @author xfyou 
 * @date 2019/8/28 
 */ 
@Slf4j 
public class NettyClient extends AbstractClient { 
 
  private Bootstrap bootstrap; 
 
  private volatile Channel channel; 
 
  private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); 
 
  public NettyClient(String hostName, int port, int connectionTimeout) { 
    super(hostName, port, connectionTimeout); 
  } 
 
  private class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { 
 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
      super.channelActive(ctx); 
      channel = ctx.channel(); 
    } 
 
    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
      try { 
        respMsg = msg.toString(StandardCharsets.UTF_8); 
      } finally { 
        countDownLatch.countDown(); 
        ctx.close(); 
      } 
    } 
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
      log.error("An exception was thrown, cause:" + cause.getMessage()); 
      ctx.close(); 
    } 
  } 
 
  @Override 
  protected void doOpen() throws Throwable { 
    bootstrap = new Bootstrap(); 
    bootstrap 
        .group(NIO_GROUP) 
        .remoteAddress(getRemoteAddress()) 
        .channel(NioSocketChannel.class) 
        .option(ChannelOption.TCP_NODELAY, true) 
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectionTimeout()) 
        .handler(new ChannelInitializer<SocketChannel>() { 
          @Override 
          protected void initChannel(SocketChannel ch) throws Exception { 
            ch.pipeline().addLast(new ClientHandler()); 
          } 
        }); 
  } 
 
  @Override 
  public void doConnect() { 
    ChannelFuture f = bootstrap.connect().syncUninterruptibly(); 
    if (!f.isSuccess() && null != f.cause()) { 
      log.error("The client failed to connect the server:" + getRemoteAddress() + ",error message is:" + f.cause().getMessage()); 
    } 
  } 
 
  @Override 
  protected Channel getChannel() { 
    return channel; 
  } 
 
}

5、测试类

package netty; 
 
import lombok.SneakyThrows; 
 
/** 
 * Test 
 * 
 * @author xfyou 
 */ 
public class Test { 
 
  @SneakyThrows 
  public static void main(String[] args) { 
    NettyClient client = new NettyClient("127.0.0.1", 8080, 45000); 
    client.send("aaa".getBytes()); 
    // maybe do something else 
    System.out.println(client.receive()); 
  } 
   
}

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/20625.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论