netty 粘包和拆包
# netty 粘包和拆包
# 粘包问题演示
server端
package com.bing;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
public class EchoServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(28080))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind().sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
@ChannelHandler.Sharable
static class EchoServerHandler extends SimpleChannelInboundHandler {
private int counter;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println("-----start------\n" + body + "\n------end------");
String content = "receive" + ++counter;
ByteBuf resp = Unpooled.copiedBuffer(content.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
}
client端
package com.bing;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
public class EchoClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("handler", new EchoClientHandler());
}
});
try {
ChannelFuture future = bootstrap.connect("localhost", 28080).sync();
future.channel().writeAndFlush("Hello world, i'm online");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
static class EchoClientHandler extends SimpleChannelInboundHandler {
private int counter;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println(body + " count:" + ++counter + "----end----\n");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] req = ("hello").getBytes();
for (int i = 0; i < 20; i++) {
ByteBuf message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
}
}
每次运行,输出不保证一致。
server端
-----start------
hello
------end------
-----start------
hellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohellohello
------end------
client端
receive1receive2 count:1----end----
由此可见,client端循环20次发送数据,server端只接收了两次,出现了粘包。
# 如何解决
业界常用的解决方案:
- 定长协议:每次发送固定长度的报文,长度不足的在后边补空格,虽然实现简单,但浪费带宽。
- 添加边界:每次发送的报文都以约定好的边界符结尾,接收方收到边界符则认为是一个完整的报文。
- 变长协议: 将报文分为消息头和消息体,消息头中存储了消息体的长度,接收方按长度获取报文。
netty也提供了一些编码器和解码器:
解码器 | 描述 |
---|---|
ByteToMessageDecoder | 用于实现自定义解码器 |
MessageToMessageDecoder | 一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类 |
LineBasedFrameDecoder | 通过在包尾添加边界符来区分整包消息 |
StringDecoder | 字符串解码器 |
DelimiterBasedFrameDecoder | 特殊字符作为分隔符来区分整包消息 |
FixedLengthFrameDecoder | 报文大小固定长度,不够空格补全 |
ProtoBufVarint32FrameDecoder | 通过 Protobuf 解码器来区分整包消息 |
ProtobufDecoder | Protobuf 解码器 |
LengthFieldBasedFrameDecoder | 指定长度来标识整包消息,通过在包头指定整包长度来约定包长 |
ProtobufEncoder | Protobuf 编码器 |
MessageToByteEncoder | 将 Java 对象编码成 ByteBuf |
MessageToMessageEncoder | 如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个 |
LengthFieldPrepender | LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节 |
如下举例说明:
方案 | 描述 | 编码器 | 解码器 | 注意事项 |
---|---|---|---|---|
方案一 | 在数据结尾添加特殊符号,表明数据边界 | LineEncoder | LineBasedFrameDecoder | 数据中不能包含边界符,否则会出现消息解码错误 |
方案二 | 在数据头部声明数据长度,按长度读数据 | LengthFieldBasedFrameDecoder |
# 方案一
在发送方添加编码器LineEncoder, client代码中添加
pipeline.addLast(new LineEncoder(LineSeparator.DEFAULT, CharsetUtil.UTF_8));
在接收方添加解码器LineBasedFrameDecoder, server代码中添加
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
client发送数据时,每次都添加结束符
byte[] req = ("hello" + StringUtil.NEWLINE).getBytes();
每次运行,输出都一致。
server端
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
hello
------end------
-----start------
Hello world, i'm online
------end------
client端
receive1receive2receive3receive4receive5receive6receive7receive8receive9receive10receive11receive12receive13receive14receive15receive16receive17receive18receive19receive20receive21 count:1----end----
# 方案二
将一个数据包拆分为两部分:长度域
和 数据域
。
netty提供了解码器LengthFieldBasedFrameDecoder, 其构造方法有5个参数,控制解码器如何从数据块中提取出有效数据,这5个参数自前向后依次是:
- maxFrameLength:数据包的最大长度
- lengthFieldOffset:长度域其实位置在数据包中的偏移量
- lengthFieldLength:长度域占的字节数
- lengthAdjustment:长度域偏移量矫正。如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。
- initialBytesToStrip:丢弃此索引之前的数据
server代码中添加
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
client发送数据代码修改为
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 20; i++) {
String req = "hello" + i;
ByteBuf message = Unpooled.buffer(1024);
byte[] bytes = req.getBytes(Charset.forName("utf-8"));
//设置长度域的值,为有效数据的长度
message.writeInt(bytes.length);
//设置有效数据
message.writeBytes(bytes);
ctx.writeAndFlush(message);
}
}
-----start------
hello0
------end------
-----start------
hello1
------end------
-----start------
hello2
------end------
-----start------
hello3
------end------
-----start------
hello4
------end------
-----start------
hello5
------end------
-----start------
hello6
------end------
-----start------
hello7
------end------
-----start------
hello8
------end------
-----start------
hello9
------end------
-----start------
hello10
------end------
-----start------
hello11
------end------
-----start------
hello12
------end------
-----start------
hello13
------end------
-----start------
hello14
------end------
-----start------
hello15
------end------
-----start------
hello16
------end------
-----start------
hello17
------end------
-----start------
hello18
------end------
-----start------
hello19
------end------
client端
receive1receive2receive3receive4receive5receive6receive7receive8receive9receive10receive11receive12receive13receive14receive15receive16receive17receive18receive19receive20 count:1----end----
上次更新: 2023/12/08, 06:34:37