1 自定义实现客户端与服务器端通信的协议
1.1 通信协议的设计
自定义的通信协议需要哪些内容
1)魔数:第一个字段一般是魔数,一般固定的几个字节。一个PNG图片的编码中有固定数量固定内容的字节,用于表示这是一个PNG图片;Java的Class文件打头有一串魔数用于表示这是一个class文件;同样,我们的通信协议也是这么定义,服务器端或客户端收到数据包之后,会先读取魔数看看是不是我们定义的通信协议,只有是我们的通信协议时才能按照我们定义的规则正确读取数据。
2)版本号:用于一个字节表示,比如Http协议有1.0/1.1/2.0版本,用于标识当前的数据包使用的是哪个版本号
3)序列化算法:我们Netty通信过程中,使用Java对象进行数据传输,就一定会涉及到序列化与反序列化,用一个字节说明采用哪种序列化方式。
4)指令:你发给我这个数据包时要干什么。比如TCP协议中,SYN位置1就代表我是要跟你建立连接、FIN位置1就是代表我要断开连接。在聊天系统中,服务器收到客户端的数据包,指令位用于说明客户端发送这条数据包的目的,是要给某人发送消息呢还是要添加某人为好友呢还是怎么的…
5)数据长度:我发送给你的数据包的数据部分的长度是多少。解决TCP粘包拆包问题
6)数据部分:传输的数据
1.2 通信协议的实现
1.2.1 Java对象
1)定义通信过程中传输的Java对象的抽象类
package maolaoke.top.netty.protocol; import lombok.Data; @Data public abstract class Packet { //协议版本 private Byte version = 1; //获取指令 public abstract Byte getCommand(); }
2)~~定义一个枚举类,列出所有的指令。~~定义一个Command接口来模拟枚举,因为枚举默认是Integer类型,而我们要传输的是一个字节,用Byte来表示更合适。
public interface Command { Byte LOGIN_REQUEST = 1; //表示这是一个登录请求 }
登录请求的Java类定义:
import maolaoke.top.netty.protocol.Command; public class LoginRequestPacket extends Packet{ private Integer userId; private String username; private String password; @Override public Byte getCommand() { return Command.LOGIN_REQUEST; } }
1.2.2 序列化
1)定义一个接口来模拟枚举,枚举所有的序列化标识。
public interface SerializerAlgorithm { byte JSON_SERIALIZER = 1; //JSON序列化 }
2)定义一个序列化接口
public interface Serializer { //获取序列化方式 byte getSerializerAlgorithm(); //序列化方法 byte[] serialize(Object object); //反序列化 <T> T deserialize(Class<T> clazz, byte[] bytes); }
3)实现JSON方式的序列化。
导入fastJson依赖包。
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
import com.alibaba.fastjson.JSON; public class JsonSerializer implements Serializer { @Override public byte getSerializerAlgorithm(){ return SerializerAlgorithm.JSON_SERIALIZER; } @Override public byte[] serialize(Object object) { return JSON.toJSONBytes(object); } @Override public <T> T deserialize(Class<T> clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); } }
1.2.3 编解码
PacketCodeC.java实现编解码,将发送的Java对象编码成标准的自定义通信协议的格式,将接收到的数据包解码成ByteBuf。
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import maolaoke.top.netty.protocol.Command; import maolaoke.top.netty.protocol.packet.LoginRequestPacket; import maolaoke.top.netty.protocol.packet.Packet; import maolaoke.top.netty.protocol.serializer.JsonSerializer; import maolaoke.top.netty.protocol.serializer.Serializer; import java.util.HashMap; import java.util.Map; public class PacketCodeC { private static final int MAGIC_NUMBER = 0x15794516; //魔数,由于int正好是4字节,所以用int表示即可 private static final Map<Byte, Class<? extends Packet>> packetTypeMap; //用于通过command指令找到具体的Java类 private static final Map<Byte, Serializer> serializerMap; static { packetTypeMap = new HashMap<>(); packetTypeMap.put(Command.LOGIN_REQUEST, LoginRequestPacket.class); serializerMap = new HashMap<>(); Serializer serializer = new JsonSerializer(); serializerMap.put(serializer.getSerializerAlgorithm(), serializer); } /** * 编码 * @param packet * @return */ public void encoder(Packet packet, ByteBuf byteBuf){ //序列化Java对象 byte[] bytes = Serializer.DEFAULT.serialize(packet); //编码成标准通信协议数据包 byteBuf.writeInt(MAGIC_NUMBER); //魔数 byteBuf.writeByte(packet.getVersion()); //版本号 byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); //序列化算法 byteBuf.writeByte(packet.getCommand()); //指令 byteBuf.writeInt(bytes.length); //数据段长度 byteBuf.writeBytes(bytes); //数据段 } /** * 解码 */ public Packet decode(ByteBuf byteBuf){ int magic_number = byteBuf.readInt(); if(magic_number != MAGIC_NUMBER) throw new RuntimeException("解析错误"); byteBuf.skipBytes(1); //先不处理版本号 //序列化标识 byte serializeAlgorithm = byteBuf.readByte(); //指令 byte command = byteBuf.readByte(); //数据包长度 int length = byteBuf.readInt(); //读取数据 byte[] data = new byte[length]; byteBuf.readBytes(data); Class<? extends Packet> requestType = packetTypeMap.get(command); Serializer serializer = serializerMap.get(serializeAlgorithm); if(requestType != null && serializer != null){ return serializer.deserialize(requestType, data); } return null; } }
测试
public class test { public static void main(String[] args) { JsonSerializer jsonSerializer = new JsonSerializer(); LoginRequestPacket loginRequestPacket = new LoginRequestPacket(); loginRequestPacket.setVersion((byte) 1); loginRequestPacket.setUserId(1); loginRequestPacket.setUsername("zhangsan"); loginRequestPacket.setPassword("123456"); PacketCodeC packetCodeC = new PacketCodeC(); //编码 ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); packetCodeC.encoder(loginRequestPacket,byteBuf); //解码 LoginRequestPacket decode =(LoginRequestPacket) packetCodeC.decode(byteBuf); System.out.println(decode.getUsername()); } }
2 通过自定义协议实现登录功能
自定义一个编解码器
Netty为我们提供了许多编解码器,由于我们自定义的协议,所以我们可以继承MessageToByteEncoder来实现编码器,继承ByteToMessageDecoder实现解码器。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import top.maolaoke.wechat.protocol.packet.Packet; public class PacketEncoder extends MessageToByteEncoder<Packet> { @Override protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { PacketCodeC.INSTANCE.encoder(msg, out); } }
public class PacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(PacketCodeC.INSTANCE.decode(in)); } }
自定义好了编解码器,只需要将其加入pipeline中即可,利用责任链设计模式会自动的去找到对应的编解码器。
客户端
public class Client { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new PacketDecoder()); //解码器 ch.pipeline().addLast(new LoginResponseHandler()); //登录返回时的处理器 ch.pipeline().addLast(new PacketEncoder()); //编码器 } }); ChannelFuture future = bootstrap.connect("localhost", 6666).sync(); future.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("开始登陆"); System.out.println("请输入用户名"); Scanner scanner = new Scanner(System.in); String username = scanner.nextLine(); System.out.println("请输入密码"); String password = scanner.nextLine(); LoginRequestPacket loginRequestPacket = new LoginRequestPacket(); loginRequestPacket.setUserId(100); loginRequestPacket.setUsername(username); loginRequestPacket.setPassword(password); ctx.channel().writeAndFlush(loginRequestPacket); } @Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception { if(loginResponsePacket.isSuccess()){ System.out.println("登录成功"); }else { System.out.println(loginResponsePacket.getReason()); } } }
服务器端
public class Server { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(12); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new PacketDecoder()); pipeline.addLast(new LoginRequestHandler()); pipeline.addLast(new PacketEncoder()); } }); //启动服务器 ChannelFuture bind = serverBootstrap.bind(6666).sync(); System.out.println("服务器启动"); //监听关闭 bind.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
由于在pipeline中先加入了解码器,我们不需要手动的去实现解码,而是可以继承SimpleChannelInboundHandler并加上泛型去重写channelRead0方法即可,加上泛型的意义在于,只处理该泛型类型的数据。这样可以避免所有的数据都在一个handler中处理,我们只需要编写多个handler,每个handler只负责处理它的任务,最后将所有handler加入pipeline中,不该该handler处理的数据就传给下一个handler。(责任链模式的好处,避免大量的ifelse语句)
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket packet) throws Exception { String username = packet.getUsername(); System.out.println(username + "用户登录成功"); //登录后返回客户端响应,告诉客户端是否登录成功 LoginResponsePacket loginResponsePacket = new LoginResponsePacket(); loginResponsePacket.setVersion(packet.getVersion()); loginResponsePacket.setSuccess(true); ctx.channel().writeAndFlush(loginResponsePacket); } }
新增一个LoginResponsePacket类
@Data public class LoginResponsePacket extends Packet { private boolean success; //是否成功 private String reason; //失败的原因 @Override public Byte getCommand() { return Command.LOGIN_RESPONSE; } }
新增一个类,注意需要在Command接口中添加对应的指令,在PacketCodeC中的packetTypeMap中添加该类型,否则会编码失败。
3 客户端与服务器端收发消息
定义收发消息的对象,并添加command和packetTypeMap
@Data public class MsgRequestPacket extends Packet { private String message; @Override public Byte getCommand() { return Command.MSG_REQUEST; } }
public class MsgResponsePacket extends Packet { private String message; @Override public Byte getCommand() { return Command.MSG_RESPONSE; } }
在登录成功之后,客户端开启一个线程去等待控制台发送消息
@Override protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception { if(loginResponsePacket.isSuccess()){ System.out.println("登录成功"); //登录成功后,启动一个线程接受控制台输入,并发送数据 startSendMsgThread(ctx.channel()); }else { System.out.println(loginResponsePacket.getReason()); } } private static void startSendMsgThread(Channel channel){ new Thread(()->{ System.out.println("输入消息后回车发送到服务器端"); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String msg = scanner.nextLine(); MsgRequestPacket msgRequestPacket = new MsgRequestPacket(); msgRequestPacket.setMessage(msg); channel.writeAndFlush(msgRequestPacket); } }).start(); }
客户端新增一个处理消息的Handler
public class MsgResponseHandler extends SimpleChannelInboundHandler<MsgResponsePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, MsgResponsePacket msg) throws Exception { String message = msg.getMessage(); System.out.println("收到服务器端传来的消息:" + message); } }
服务器端也新增一个处理消息的handler
public class MsgRequestHandler extends SimpleChannelInboundHandler<MsgRequestPacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, MsgRequestPacket msg) throws Exception { String message = msg.getMessage(); System.out.println("收到客户端消息:" + message); MsgResponsePacket msgResponsePacket = new MsgResponsePacket(); msgResponsePacket.setMessage("你好,客户端"); ctx.channel().writeAndFlush(msgResponsePacket); } }
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/290939.html