Netty聊天系统(1)通过自定义协议实现客户端与服务器端通信


1 自定义实现客户端与服务器端通信的协议

1.1 通信协议的设计

自定义的通信协议需要哪些内容

1)魔数:第一个字段一般是魔数,一般固定的几个字节。一个PNG图片的编码中有固定数量固定内容的字节,用于表示这是一个PNG图片;Java的Class文件打头有一串魔数用于表示这是一个class文件;同样,我们的通信协议也是这么定义,服务器端或客户端收到数据包之后,会先读取魔数看看是不是我们定义的通信协议,只有是我们的通信协议时才能按照我们定义的规则正确读取数据。

2)版本号:用于一个字节表示,比如Http协议有1.0/1.1/2.0版本,用于标识当前的数据包使用的是哪个版本号

3)序列化算法:我们Netty通信过程中,使用Java对象进行数据传输,就一定会涉及到序列化与反序列化,用一个字节说明采用哪种序列化方式。

4)指令:你发给我这个数据包时要干什么。比如TCP协议中,SYN位置1就代表我是要跟你建立连接、FIN位置1就是代表我要断开连接。在聊天系统中,服务器收到客户端的数据包,指令位用于说明客户端发送这条数据包的目的,是要给某人发送消息呢还是要添加某人为好友呢还是怎么的…

5)数据长度:我发送给你的数据包的数据部分的长度是多少。解决TCP粘包拆包问题

6)数据部分:传输的数据

1.2 通信协议的实现

1.2.1 Java对象

1)定义通信过程中传输的Java对象的抽象类

package maolaoke.top.netty.protocol;

import lombok.Data;

@Data
public abstract class Packet {
          
   
    //协议版本
    private Byte version = 1;

    //获取指令
    public abstract Byte getCommand();
}

2)~~定义一个枚举类,列出所有的指令。~~定义一个Command接口来模拟枚举,因为枚举默认是Integer类型,而我们要传输的是一个字节,用Byte来表示更合适。

public interface Command {
          
   
    Byte LOGIN_REQUEST = 1;  //表示这是一个登录请求
}

登录请求的Java类定义:

import maolaoke.top.netty.protocol.Command;

public class LoginRequestPacket  extends Packet{
          
   
    private Integer userId;

    private String username;

    private String password;
    
    @Override
    public Byte getCommand() {
          
   
        return Command.LOGIN_REQUEST;
    }
}

1.2.2 序列化

1)定义一个接口来模拟枚举,枚举所有的序列化标识。

public interface SerializerAlgorithm {
          
   
    byte JSON_SERIALIZER = 1;  //JSON序列化
}

2)定义一个序列化接口

public interface Serializer {
          
   
    //获取序列化方式
    byte getSerializerAlgorithm();

    //序列化方法
    byte[] serialize(Object object);

    //反序列化
    <T> T deserialize(Class<T> clazz, byte[] bytes);
}

3)实现JSON方式的序列化。

导入fastJson依赖包。

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.47</version>
</dependency>
import com.alibaba.fastjson.JSON;

public class JsonSerializer implements Serializer {
          
   
    @Override
    public byte getSerializerAlgorithm(){
          
   
        return SerializerAlgorithm.JSON_SERIALIZER;
    }

    @Override
    public byte[] serialize(Object object) {
          
   
        return JSON.toJSONBytes(object);
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
          
   
        return JSON.parseObject(bytes, clazz);
    }
}

1.2.3 编解码

PacketCodeC.java实现编解码,将发送的Java对象编码成标准的自定义通信协议的格式,将接收到的数据包解码成ByteBuf。

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import maolaoke.top.netty.protocol.Command;
import maolaoke.top.netty.protocol.packet.LoginRequestPacket;
import maolaoke.top.netty.protocol.packet.Packet;
import maolaoke.top.netty.protocol.serializer.JsonSerializer;
import maolaoke.top.netty.protocol.serializer.Serializer;

import java.util.HashMap;
import java.util.Map;

public class PacketCodeC {
          
   
    private static final int MAGIC_NUMBER = 0x15794516;  //魔数,由于int正好是4字节,所以用int表示即可
    private static final Map<Byte, Class<? extends Packet>> packetTypeMap;  //用于通过command指令找到具体的Java类
    private static final Map<Byte, Serializer> serializerMap;
    static {
          
   
        packetTypeMap = new HashMap<>();
        packetTypeMap.put(Command.LOGIN_REQUEST, LoginRequestPacket.class);

        serializerMap = new HashMap<>();
        Serializer serializer = new JsonSerializer();
        serializerMap.put(serializer.getSerializerAlgorithm(), serializer);
    }

    /**
     * 编码
     * @param packet
     * @return
     */
    public void encoder(Packet packet, ByteBuf byteBuf){
          
   
        //序列化Java对象
        byte[] bytes = Serializer.DEFAULT.serialize(packet);

        //编码成标准通信协议数据包
        byteBuf.writeInt(MAGIC_NUMBER);  //魔数
        byteBuf.writeByte(packet.getVersion());  //版本号
        byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());  //序列化算法
        byteBuf.writeByte(packet.getCommand());  //指令
        byteBuf.writeInt(bytes.length);  //数据段长度
        byteBuf.writeBytes(bytes);  //数据段
    }


    /**
     * 解码
     */
    public Packet decode(ByteBuf byteBuf){
          
   
        int magic_number = byteBuf.readInt();
        if(magic_number != MAGIC_NUMBER)
            throw new RuntimeException("解析错误");

        byteBuf.skipBytes(1);  //先不处理版本号

        //序列化标识
        byte serializeAlgorithm = byteBuf.readByte();

        //指令
        byte command = byteBuf.readByte();

        //数据包长度
        int length = byteBuf.readInt();

        //读取数据
        byte[] data = new byte[length];
        byteBuf.readBytes(data);

        Class<? extends Packet> requestType = packetTypeMap.get(command);
        Serializer serializer = serializerMap.get(serializeAlgorithm);
        
        if(requestType != null && serializer != null){
          
   
            return serializer.deserialize(requestType, data);
        }
        return null;
    }

}

测试

public class test {
          
   
    public static void main(String[] args) {
          
   
        JsonSerializer jsonSerializer = new JsonSerializer();
        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();

        loginRequestPacket.setVersion((byte) 1);
        loginRequestPacket.setUserId(1);
        loginRequestPacket.setUsername("zhangsan");
        loginRequestPacket.setPassword("123456");

        PacketCodeC packetCodeC = new PacketCodeC();
        //编码
        ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
        packetCodeC.encoder(loginRequestPacket,byteBuf);
        //解码
        LoginRequestPacket decode =(LoginRequestPacket) packetCodeC.decode(byteBuf);
        System.out.println(decode.getUsername());
    }
}

2 通过自定义协议实现登录功能

自定义一个编解码器

Netty为我们提供了许多编解码器,由于我们自定义的协议,所以我们可以继承MessageToByteEncoder来实现编码器,继承ByteToMessageDecoder实现解码器。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import top.maolaoke.wechat.protocol.packet.Packet;

public class PacketEncoder extends MessageToByteEncoder<Packet> {
          
   
    @Override
    protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
          
   
        PacketCodeC.INSTANCE.encoder(msg, out);
    }
}
public class PacketDecoder extends ByteToMessageDecoder {
          
   

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
          
   
        out.add(PacketCodeC.INSTANCE.decode(in));
    }
}

自定义好了编解码器,只需要将其加入pipeline中即可,利用责任链设计模式会自动的去找到对应的编解码器。

客户端

public class Client {
          
   
    public static void main(String[] args) throws InterruptedException {
          
   
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
          
   
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
          
   
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
          
   
                            ch.pipeline().addLast(new PacketDecoder()); //解码器
                            ch.pipeline().addLast(new LoginResponseHandler());  //登录返回时的处理器
                            ch.pipeline().addLast(new PacketEncoder());  //编码器
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 6666).sync();
            future.channel().closeFuture().sync();
        }finally {
          
   
            group.shutdownGracefully();
        }
    }
}
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
          
   
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
          
   
        System.out.println("开始登陆");
        System.out.println("请输入用户名");
        Scanner scanner = new Scanner(System.in);
        String username = scanner.nextLine();
        System.out.println("请输入密码");
        String password = scanner.nextLine();

        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
        loginRequestPacket.setUserId(100);
        loginRequestPacket.setUsername(username);
        loginRequestPacket.setPassword(password);

        ctx.channel().writeAndFlush(loginRequestPacket);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception {
          
   
        if(loginResponsePacket.isSuccess()){
          
   
            System.out.println("登录成功");
        }else {
          
   
            System.out.println(loginResponsePacket.getReason());
        }
    }
}

服务器端

public class Server {
          
   
    public static void main(String[] args) throws InterruptedException {
          
   
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(12);
        try {
          
   
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
          
   
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
          
   
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new PacketDecoder());
                            pipeline.addLast(new LoginRequestHandler());
                            pipeline.addLast(new PacketEncoder());
                        }
                    });

            //启动服务器
            ChannelFuture bind = serverBootstrap.bind(6666).sync();
            System.out.println("服务器启动");
            //监听关闭
            bind.channel().closeFuture().sync();

        }finally {
          
   
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

由于在pipeline中先加入了解码器,我们不需要手动的去实现解码,而是可以继承SimpleChannelInboundHandler并加上泛型去重写channelRead0方法即可,加上泛型的意义在于,只处理该泛型类型的数据。这样可以避免所有的数据都在一个handler中处理,我们只需要编写多个handler,每个handler只负责处理它的任务,最后将所有handler加入pipeline中,不该该handler处理的数据就传给下一个handler。(责任链模式的好处,避免大量的ifelse语句)

public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
          
   
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket packet) throws Exception {
          
   
        String username = packet.getUsername();
        System.out.println(username + "用户登录成功");
        //登录后返回客户端响应,告诉客户端是否登录成功
        LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
        loginResponsePacket.setVersion(packet.getVersion());
        loginResponsePacket.setSuccess(true);

        ctx.channel().writeAndFlush(loginResponsePacket);
    }
}

新增一个LoginResponsePacket类

@Data
public class LoginResponsePacket extends Packet {
          
   
    private boolean success;  //是否成功

    private String reason;   //失败的原因

    @Override
    public Byte getCommand() {
          
   
        return Command.LOGIN_RESPONSE;
    }
}

新增一个类,注意需要在Command接口中添加对应的指令,在PacketCodeC中的packetTypeMap中添加该类型,否则会编码失败。

3 客户端与服务器端收发消息

定义收发消息的对象,并添加command和packetTypeMap

@Data
public class MsgRequestPacket extends Packet {
          
   
    private String message;

    @Override
    public Byte getCommand() {
          
   
        return Command.MSG_REQUEST;
    }
}
public class MsgResponsePacket extends Packet {
          
   
    private String message;
    @Override
    public Byte getCommand() {
          
   
        return Command.MSG_RESPONSE;
    }
}

在登录成功之后,客户端开启一个线程去等待控制台发送消息

@Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception {
          
   
        if(loginResponsePacket.isSuccess()){
          
   
            System.out.println("登录成功");
            //登录成功后,启动一个线程接受控制台输入,并发送数据
            startSendMsgThread(ctx.channel());
        }else {
          
   
            System.out.println(loginResponsePacket.getReason());
        }
    }

    private static void startSendMsgThread(Channel channel){
          
   
        new Thread(()->{
          
   
            System.out.println("输入消息后回车发送到服务器端");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
          
   
                String msg = scanner.nextLine();
                MsgRequestPacket msgRequestPacket = new MsgRequestPacket();
                msgRequestPacket.setMessage(msg);

                channel.writeAndFlush(msgRequestPacket);
            }
        }).start();
    }

客户端新增一个处理消息的Handler

public class MsgResponseHandler extends SimpleChannelInboundHandler<MsgResponsePacket> {
          
   
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MsgResponsePacket msg) throws Exception {
          
   
        String message = msg.getMessage();
        System.out.println("收到服务器端传来的消息:" + message);
    }
}

服务器端也新增一个处理消息的handler

public class MsgRequestHandler extends SimpleChannelInboundHandler<MsgRequestPacket> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MsgRequestPacket msg) throws Exception {
        String message = msg.getMessage();
        System.out.println("收到客户端消息:" + message);

        MsgResponsePacket msgResponsePacket = new MsgResponsePacket();
        msgResponsePacket.setMessage("你好,客户端");
        ctx.channel().writeAndFlush(msgResponsePacket);
    }
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/290939.html

(0)
上一篇 2022年10月12日
下一篇 2022年10月12日

发表回复

登录后才能评论