1 自定义实现客户端与服务器端通信的协议
1.1 通信协议的设计
自定义的通信协议需要哪些内容
1)魔数:第一个字段一般是魔数,一般固定的几个字节。一个PNG图片的编码中有固定数量固定内容的字节,用于表示这是一个PNG图片;Java的Class文件打头有一串魔数用于表示这是一个class文件;同样,我们的通信协议也是这么定义,服务器端或客户端收到数据包之后,会先读取魔数看看是不是我们定义的通信协议,只有是我们的通信协议时才能按照我们定义的规则正确读取数据。
2)版本号:用于一个字节表示,比如Http协议有1.0/1.1/2.0版本,用于标识当前的数据包使用的是哪个版本号
3)序列化算法:我们Netty通信过程中,使用Java对象进行数据传输,就一定会涉及到序列化与反序列化,用一个字节说明采用哪种序列化方式。
4)指令:你发给我这个数据包时要干什么。比如TCP协议中,SYN位置1就代表我是要跟你建立连接、FIN位置1就是代表我要断开连接。在聊天系统中,服务器收到客户端的数据包,指令位用于说明客户端发送这条数据包的目的,是要给某人发送消息呢还是要添加某人为好友呢还是怎么的…
5)数据长度:我发送给你的数据包的数据部分的长度是多少。解决TCP粘包拆包问题
6)数据部分:传输的数据
1.2 通信协议的实现
1.2.1 Java对象
1)定义通信过程中传输的Java对象的抽象类
package maolaoke.top.netty.protocol;
import lombok.Data;
@Data
public abstract class Packet {
//协议版本
private Byte version = 1;
//获取指令
public abstract Byte getCommand();
}
2)~~定义一个枚举类,列出所有的指令。~~定义一个Command接口来模拟枚举,因为枚举默认是Integer类型,而我们要传输的是一个字节,用Byte来表示更合适。
public interface Command {
Byte LOGIN_REQUEST = 1; //表示这是一个登录请求
}
登录请求的Java类定义:
import maolaoke.top.netty.protocol.Command;
public class LoginRequestPacket extends Packet{
private Integer userId;
private String username;
private String password;
@Override
public Byte getCommand() {
return Command.LOGIN_REQUEST;
}
}
1.2.2 序列化
1)定义一个接口来模拟枚举,枚举所有的序列化标识。
public interface SerializerAlgorithm {
byte JSON_SERIALIZER = 1; //JSON序列化
}
2)定义一个序列化接口
public interface Serializer {
//获取序列化方式
byte getSerializerAlgorithm();
//序列化方法
byte[] serialize(Object object);
//反序列化
<T> T deserialize(Class<T> clazz, byte[] bytes);
}
3)实现JSON方式的序列化。
导入fastJson依赖包。
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
import com.alibaba.fastjson.JSON;
public class JsonSerializer implements Serializer {
@Override
public byte getSerializerAlgorithm(){
return SerializerAlgorithm.JSON_SERIALIZER;
}
@Override
public byte[] serialize(Object object) {
return JSON.toJSONBytes(object);
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return JSON.parseObject(bytes, clazz);
}
}
1.2.3 编解码
PacketCodeC.java实现编解码,将发送的Java对象编码成标准的自定义通信协议的格式,将接收到的数据包解码成ByteBuf。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import maolaoke.top.netty.protocol.Command;
import maolaoke.top.netty.protocol.packet.LoginRequestPacket;
import maolaoke.top.netty.protocol.packet.Packet;
import maolaoke.top.netty.protocol.serializer.JsonSerializer;
import maolaoke.top.netty.protocol.serializer.Serializer;
import java.util.HashMap;
import java.util.Map;
public class PacketCodeC {
private static final int MAGIC_NUMBER = 0x15794516; //魔数,由于int正好是4字节,所以用int表示即可
private static final Map<Byte, Class<? extends Packet>> packetTypeMap; //用于通过command指令找到具体的Java类
private static final Map<Byte, Serializer> serializerMap;
static {
packetTypeMap = new HashMap<>();
packetTypeMap.put(Command.LOGIN_REQUEST, LoginRequestPacket.class);
serializerMap = new HashMap<>();
Serializer serializer = new JsonSerializer();
serializerMap.put(serializer.getSerializerAlgorithm(), serializer);
}
/**
* 编码
* @param packet
* @return
*/
public void encoder(Packet packet, ByteBuf byteBuf){
//序列化Java对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
//编码成标准通信协议数据包
byteBuf.writeInt(MAGIC_NUMBER); //魔数
byteBuf.writeByte(packet.getVersion()); //版本号
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); //序列化算法
byteBuf.writeByte(packet.getCommand()); //指令
byteBuf.writeInt(bytes.length); //数据段长度
byteBuf.writeBytes(bytes); //数据段
}
/**
* 解码
*/
public Packet decode(ByteBuf byteBuf){
int magic_number = byteBuf.readInt();
if(magic_number != MAGIC_NUMBER)
throw new RuntimeException("解析错误");
byteBuf.skipBytes(1); //先不处理版本号
//序列化标识
byte serializeAlgorithm = byteBuf.readByte();
//指令
byte command = byteBuf.readByte();
//数据包长度
int length = byteBuf.readInt();
//读取数据
byte[] data = new byte[length];
byteBuf.readBytes(data);
Class<? extends Packet> requestType = packetTypeMap.get(command);
Serializer serializer = serializerMap.get(serializeAlgorithm);
if(requestType != null && serializer != null){
return serializer.deserialize(requestType, data);
}
return null;
}
}
测试
public class test {
public static void main(String[] args) {
JsonSerializer jsonSerializer = new JsonSerializer();
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setVersion((byte) 1);
loginRequestPacket.setUserId(1);
loginRequestPacket.setUsername("zhangsan");
loginRequestPacket.setPassword("123456");
PacketCodeC packetCodeC = new PacketCodeC();
//编码
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
packetCodeC.encoder(loginRequestPacket,byteBuf);
//解码
LoginRequestPacket decode =(LoginRequestPacket) packetCodeC.decode(byteBuf);
System.out.println(decode.getUsername());
}
}
2 通过自定义协议实现登录功能
自定义一个编解码器
Netty为我们提供了许多编解码器,由于我们自定义的协议,所以我们可以继承MessageToByteEncoder来实现编码器,继承ByteToMessageDecoder实现解码器。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import top.maolaoke.wechat.protocol.packet.Packet;
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
PacketCodeC.INSTANCE.encoder(msg, out);
}
}
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(PacketCodeC.INSTANCE.decode(in));
}
}
自定义好了编解码器,只需要将其加入pipeline中即可,利用责任链设计模式会自动的去找到对应的编解码器。
客户端
public class Client {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new PacketDecoder()); //解码器
ch.pipeline().addLast(new LoginResponseHandler()); //登录返回时的处理器
ch.pipeline().addLast(new PacketEncoder()); //编码器
}
});
ChannelFuture future = bootstrap.connect("localhost", 6666).sync();
future.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("开始登陆");
System.out.println("请输入用户名");
Scanner scanner = new Scanner(System.in);
String username = scanner.nextLine();
System.out.println("请输入密码");
String password = scanner.nextLine();
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(100);
loginRequestPacket.setUsername(username);
loginRequestPacket.setPassword(password);
ctx.channel().writeAndFlush(loginRequestPacket);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception {
if(loginResponsePacket.isSuccess()){
System.out.println("登录成功");
}else {
System.out.println(loginResponsePacket.getReason());
}
}
}
服务器端
public class Server {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(12);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new PacketDecoder());
pipeline.addLast(new LoginRequestHandler());
pipeline.addLast(new PacketEncoder());
}
});
//启动服务器
ChannelFuture bind = serverBootstrap.bind(6666).sync();
System.out.println("服务器启动");
//监听关闭
bind.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
由于在pipeline中先加入了解码器,我们不需要手动的去实现解码,而是可以继承SimpleChannelInboundHandler并加上泛型去重写channelRead0方法即可,加上泛型的意义在于,只处理该泛型类型的数据。这样可以避免所有的数据都在一个handler中处理,我们只需要编写多个handler,每个handler只负责处理它的任务,最后将所有handler加入pipeline中,不该该handler处理的数据就传给下一个handler。(责任链模式的好处,避免大量的ifelse语句)
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket packet) throws Exception {
String username = packet.getUsername();
System.out.println(username + "用户登录成功");
//登录后返回客户端响应,告诉客户端是否登录成功
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(packet.getVersion());
loginResponsePacket.setSuccess(true);
ctx.channel().writeAndFlush(loginResponsePacket);
}
}
新增一个LoginResponsePacket类
@Data
public class LoginResponsePacket extends Packet {
private boolean success; //是否成功
private String reason; //失败的原因
@Override
public Byte getCommand() {
return Command.LOGIN_RESPONSE;
}
}
新增一个类,注意需要在Command接口中添加对应的指令,在PacketCodeC中的packetTypeMap中添加该类型,否则会编码失败。
3 客户端与服务器端收发消息
定义收发消息的对象,并添加command和packetTypeMap
@Data
public class MsgRequestPacket extends Packet {
private String message;
@Override
public Byte getCommand() {
return Command.MSG_REQUEST;
}
}
public class MsgResponsePacket extends Packet {
private String message;
@Override
public Byte getCommand() {
return Command.MSG_RESPONSE;
}
}
在登录成功之后,客户端开启一个线程去等待控制台发送消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) throws Exception {
if(loginResponsePacket.isSuccess()){
System.out.println("登录成功");
//登录成功后,启动一个线程接受控制台输入,并发送数据
startSendMsgThread(ctx.channel());
}else {
System.out.println(loginResponsePacket.getReason());
}
}
private static void startSendMsgThread(Channel channel){
new Thread(()->{
System.out.println("输入消息后回车发送到服务器端");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
MsgRequestPacket msgRequestPacket = new MsgRequestPacket();
msgRequestPacket.setMessage(msg);
channel.writeAndFlush(msgRequestPacket);
}
}).start();
}
客户端新增一个处理消息的Handler
public class MsgResponseHandler extends SimpleChannelInboundHandler<MsgResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgResponsePacket msg) throws Exception {
String message = msg.getMessage();
System.out.println("收到服务器端传来的消息:" + message);
}
}
服务器端也新增一个处理消息的handler
public class MsgRequestHandler extends SimpleChannelInboundHandler<MsgRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgRequestPacket msg) throws Exception {
String message = msg.getMessage();
System.out.println("收到客户端消息:" + message);
MsgResponsePacket msgResponsePacket = new MsgResponsePacket();
msgResponsePacket.setMessage("你好,客户端");
ctx.channel().writeAndFlush(msgResponsePacket);
}
}
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/290939.html