零拷贝(性能优化)

零拷贝并不是不进行拷贝, 而是指0次CPU拷贝(从内核空间缓冲区->用户空间缓冲区)。而CPU拷贝是比DMA拷贝耗费的时间长得多, 所以减少CPU拷贝是提升性能的关键。
mmap内存映射方式适用于小数据, 而sendFile()方式适用于大文件的传输
graph TD
A[应用程序] -->|1. 数据写入| B(内核缓冲区)
B -->|2. 内核复制| C(网络适配器缓冲区)
C -->|3. 发送到网络| D[目的地]
A -->|零拷贝| E(网络适配器缓冲区)
E -->|零拷贝| D
sequenceDiagram
participant A as 应用程序
participant B as 内核缓冲区
participant C as 网络适配器缓冲区
participant D as 目的地
A->>B: 写入数据
B->>C: 复制数据
C->>D: 发送数据
A->>C: 零拷贝发送数据
C->>D: 发送数据
mmap(内存映射)
mmap是指让用户区和内核区的虚拟内存映射到同一片物理内存上.
mmap其实并不是零拷贝, 因为只是减少了从内核缓冲区到用户缓冲区的一次CPU拷贝, 但是仍然存在从内核缓冲区到Socket Buffer的一次CPU拷贝.
sendFile(传递文件描述符)
在Linux2.1版本中, 数据不经过用户态, 直接从内核缓冲区进入Socket Buffer. 由于不经过用户态, 因此相较于mmap方式额外减少了一次上下文切换. 但是sendFile()函数仍然存在一次从内核缓冲区到Socket Buffer的CPU拷贝. 所以也不是真正意义上的零拷贝.
但是在Linux2.4版本中, 进一步优化, 直接从内核缓冲区到网卡协议栈(protocol engine), 而从内核缓冲区到Socket Buffer的CPU拷贝并不是拷贝全部信息, 而是一种元数据的CPU拷贝(下图中的灰色部分), 而元数据拷贝的消耗可以忽略不计. 因此在Linux2.3版本中的sendFile()函数可以认为是真正意义上的零拷贝.
这种元数据拷贝是通过将文件描述符传送给Socket缓冲区.
零拷贝在项目中的应用
transferTo()
Netty
Netty快速入门
为什么使用Netty?
- NIO的类库和API繁杂, 使用麻烦, 需要熟练掌握Selector, ServerSocketChannel, SocketChannel, ByteBuffer等
- 需要非常熟悉多线程编程和网络编程才能编写出高质量的NIO程序
- 开发工作量和难度都非常大, 例如客户端面临以下问题:
- 断线重连
- 重复接入
- 消息编码和解码
- 安全认证
- 网络闪断
- 半包读写(什么是半包读写?)
- 失败缓存
- 网络拥塞
- 异常流的处理
- NIO中Epoll的bug, 会导致Selector的空轮询, 最终导致CPU占用100%
- NIO是IO多路复用模式, 一个I/O线程处理多个Channel, 程序的调试和跟踪非常麻烦, 往往只能靠一些日志来辅助分析, 定位问题的难度很大
Netty是什么
Netty是最流行的NIO通信框架, 在许多主流的RPC框架(例如Dubbo)中都是用Netty作为其通信组件.
Netty优点:
- API使用简单, 开发门槛低
- 功能强大, 预置了多种编码解码功能, 支持多种主流协议
- 定制能力强, 可以通过ChannelHandler对框架进行灵活扩展
- 性能高, 和其它主流NIO框架对比, 综合性能最优
- 成熟稳定, 修复了所有已发现的JDK NIO的bug
- 在多个领域的应用得到了考验(Elasticsearch, Dubbo)
Netty基础应用
引入pom依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.87.Final</version> <scope>compile</scope> </dependency>
Server端
package org.example.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.nio.charset.Charset; import java.util.Date;
public class Server {
public void bind(int port) {
//配置服务端的NIO线程组
//NioEventLoopGroup是个线程组, 包含一组NIO线程, 专门用于处理网络事件, 实际上就是Reactor线程组
//bossGroup用于接收客户端的连接
//workerGroup用于进行SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//ServerBootstrap对象, 用于启动NIO服务端的辅助启动类, 下面是对其进行配置, 目的是降低服务端的开发复杂度
//group(): 将两个NIO线程组当作形参传递到serverBootstrap中
//channel(): 设置创建的Channel为NioServerSocketChannel类型, 对应ServerSocketChannel
//option(): 配置TCP参数
//childHandler(): 绑定I/O事件的处理类(使用匿名类), 处理类主要用于处理网络I/O事件, 例如记录日志, 对消息进行编码和解码等
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = null;
try {
//绑定服务端的监听端口, 调用同步阻塞方法sync()等待绑定操作完成
//ChannelFuture对象的功能类似于JDK并发包中的Future, 主要用于异步操作的通知回调
channelFuture = serverBootstrap.bind(port).sync();
//阻塞, 等待服务端监听端口关闭之后, main()函数才退出
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//优雅地关闭资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8888;
new Server().bind(port);
}
}
class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
// 获取缓冲区中可读的字节数
int counts = byteBuf.readableBytes();
// 创建相应大小的字节数组, 一次读取完成
byte[] request = new byte[counts];
byteBuf.readBytes(request);
String s = new String(request, Charset.defaultCharset());
System.out.println("The time server receive order : " + s);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(s) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf response = Unpooled.copiedBuffer(currentTime.getBytes());
//将响应消息通过write()方法异步发送给Client客户端
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//flush()方法的作用: 将消息发送队列中的消息写入到SocketChannel中发送给对方
// 从性能角度考虑, 为了防止频繁地唤醒Selector进行消息发送,
// write()方法并不直接将消息写入到SocketChannel, 而是将消息发送到发送缓冲区,
// 再通过调用flush()方法, 将发送缓冲区中的响应消息全部写入到SocketChannel中
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//当发生异常时, 关闭ChannelHandlerContext, 释放相关的句柄资源
ctx.close();
}
}
```
Client端
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.nio.charset.Charset; import java.util.logging.Logger; public class Client { public static void main(String[] args) { int port = 8888; new Client().connect("127.0.0.1", port); } public void connect(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); //客户端的辅助启动类Bootstrap Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { /** * 当创建NioSocketChannel成功之后, 在进行初始化时, * 将其ChannelHandler设置到ChannelPipeline中, 用于处理网络I/O事件 * @param socketChannel the {@link Channel} which was registered. */ @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline().addLast(new ClientHandler()); } }); try { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { group.shutdownGracefully(); } } }
class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = Logger.getLogger(ClientHandler.class.getName());
private final ByteBuf firstMessage;
public ClientHandler() {
//将字符串写入到ByteBuf对象中:
// 1. getBytes(): String->byte[]
// 2. writeBytes(): byte[]->ByteBuf
byte[] request = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(request.length);
firstMessage.writeBytes(request);
}
/**
* 当客户端和服务端TCP链路建立成功之后, Netty的NIO线程会调用channelActive()方法,
* 发送查询时间的指令给客户端, 调用ChannelHandlerContext对象的writeAndFlush()方法将请求消息发送给服务端
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMessage);
}
/**
* 当服务端返回应答消息时, channelRead()方法被调用
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] request = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(request);
String s = new String(request, Charset.defaultCharset());
System.out.println("Now is : " + s);
}
/**
* 发生异常时, 打印日志, 释放客户端资源
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.warning("Unexpected exception from downstream" + cause.getMessage());
ctx.close();
}
}
```
源码解析
ChannelHandler接口
ChannelInboundHandler: 包含处理入站事件的方法
ChannelInboundHandlerAdapter: ChannelInboundHandler的默认实现类
- channelRead(): 每个传入的消息都要调用
- channelReadComplete():
Netty的组件和设计
ByteBuf
Java NIO中提供了ByteBuffer, 其完全可以满足NIO编程的需要, 但是也有一些缺点:
- ByteBuffer长度固定, 容量不能动态扩展和收缩
- ByteBuffer只有一个标识位置的指针position, 读写的时候需要通过flip()和rewind()进行切换
- ByteBuffer的API功能有限, 一些常用的实用的高级特性需要额外编程实现
为了弥补上面的这些不足, Netty提供了自己的ByteBuffer实现(ByteBuf)
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
public class FutureMain {
public static void main(String[] args) {
Channel channel = new KQueueDatagramChannel();
//异步执行, 即运行该代码时不会等待执行完成, 因此channelFuture是操作成功的结果, 也可能是操作失败的结果
ChannelFuture channelFuture = channel.connect(new InetSocketAddress(8888));
//如果在添加监听器之前操作结果就已知了, 那么会立即调用该部分的处理逻辑
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
//操作执行成功的处理逻辑
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello World", Charset.defaultCharset());
future.channel().writeAndFlush(byteBuf);
} else {
//操作执行失败的处理逻辑
Throwable cause = future.cause();
System.out.println("cause = " + cause);
}
});
}
}
Channel, EventLoop和ChannelFuture
Channel: Socket
EventLoop: 控制流, 多线程, 并发
ChannelFuture: 异步通知
Channel接口

每个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig. ChannelConfig包含了该Channel的所有配置设置, 并且支持热更新.
Channel 的正常生命周期如图 6-1 所示. 当这些状态发生改变时, 将会生成对应的事件. 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler, 其可以随后对它们做出响应
ChannelHandler中的方法和Channel的生命周期密切相关.
channelFuture接口
TCP粘包/拆包问题的解决方案
对于TCP编程, 无论是客户端还是服务端, 当读取或者发送数据的时候, 都需要考虑TCP底层的粘包/拆包机制.
TCP粘包/拆包
TCP是一个“流”协议, TCP底层并不了解上层业务数据的具体含义, 底层会根据TCP缓冲区的实际情况进行包的划分. 所以在业务上认为, 一个完整的包可能会被TCP拆分成多个包进行发送, 也有可能把多个小的包封装成一个大的数据包发送, 这就是所谓的TCP粘包和拆包问题.
假设Client客户端发送两个数据包D1和D2, 由于Server端一次读取的字节数是不确定的, 所以有以下可能情况:
- 第1次读取D1, 第2次读取D2
- 第1次读取D1和D2(粘包)
- 第1次读取D1和D2的一部分, 第2次读取D2的剩余部分(拆包)
- 第1次读取D1的一部分, 第2次读取D1的剩余部分和D2(拆包)
- 由于D1和D2远大于TCP接收的滑动窗口, 所以D1和D2都需要进行多次拆包
由于TCP粘包/拆包导致功能异常的案例展示
class ServerHandler extends ChannelInboundHandlerAdapter {
private int counter = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
// 获取缓冲区中可读的字节数
int counts = byteBuf.readableBytes();
// 创建相应大小的字节数组, 一次读取完成
byte[] request = new byte[counts];
byteBuf.readBytes(request);
String s = new String(request, Charset.defaultCharset())
.substring(0, request.length - System.getProperty("line.separator").length());
System.out.println("The time server receive order : " + s + "\n the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(s) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf response = Unpooled.copiedBuffer(currentTime.getBytes());
//将响应消息通过write()方法异步发送给Client客户端
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//flush()方法的作用: 将消息发送队列中的消息写入到SocketChannel中发送给对方
// 从性能角度考虑, 为了防止频繁地唤醒Selector进行消息发送,
// write()方法并不直接将消息写入到SocketChannel, 而是将消息发送到发送缓冲区,
// 再通过调用flush()方法, 将发送缓冲区中的响应消息全部写入到SocketChannel中
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//当发生异常时, 关闭ChannelHandlerContext, 释放相关的句柄资源
ctx.close();
}
}
自定义协议
协议要素
- 魔数: 用来第一时间判断是否为无效包
- 版本号: 支持协议的升级
- 序列化算法: 消息正文采用哪种序列化和反序列化方法
- 指令类型: 登录, 注册, 私聊, 群聊等
- 请求序号: 为了双工通信, 提供异步能力
- 正文长度
- 消息正文
- xml格式(早期)
- json格式
编解码器
解码器
解码器用于处理入站数据, 本质上也是一个ChannelInboundHandler, 用于将数据从一种格式转换为另一种格式
将字节解码为消息
ByteToMessageDecoder和ReplayingDecoder
将一种消息类型解码为另一种消息类型
MessageToMessageDecoder