Netty

零拷贝(性能优化)

image-20230117211211130

零拷贝并不是不进行拷贝, 而是指0次CPU拷贝(从内核空间缓冲区->用户空间缓冲区)。而CPU拷贝是比DMA拷贝耗费的时间长得多, 所以减少CPU拷贝是提升性能的关键。

mmap内存映射方式适用于小数据, 而sendFile()方式适用于大文件的传输

graph TD
    A[应用程序] -->|1. 数据写入| B(内核缓冲区)
    B -->|2. 内核复制| C(网络适配器缓冲区)
    C -->|3. 发送到网络| D[目的地]

    A -->|零拷贝| E(网络适配器缓冲区)
    E -->|零拷贝| D
sequenceDiagram
    participant A as 应用程序
    participant B as 内核缓冲区
    participant C as 网络适配器缓冲区
    participant D as 目的地
    A->>B: 写入数据
    B->>C: 复制数据
    C->>D: 发送数据
    A->>C: 零拷贝发送数据
    C->>D: 发送数据

mmap(内存映射)

mmap是指让用户区和内核区的虚拟内存映射到同一片物理内存上.

mmap其实并不是零拷贝, 因为只是减少了从内核缓冲区到用户缓冲区的一次CPU拷贝, 但是仍然存在从内核缓冲区到Socket Buffer的一次CPU拷贝.

image-20230117211326776

image-20230117212440029

sendFile(传递文件描述符)

image-20230117212722623

图: Linux2.1版本的sendFile()函数

在Linux2.1版本中, 数据不经过用户态, 直接从内核缓冲区进入Socket Buffer. 由于不经过用户态, 因此相较于mmap方式额外减少了一次上下文切换. 但是sendFile()函数仍然存在一次从内核缓冲区到Socket Buffer的CPU拷贝. 所以也不是真正意义上的零拷贝.

但是在Linux2.4版本中, 进一步优化, 直接从内核缓冲区到网卡协议栈(protocol engine), 而从内核缓冲区到Socket Buffer的CPU拷贝并不是拷贝全部信息, 而是一种元数据的CPU拷贝(下图中的灰色部分), 而元数据拷贝的消耗可以忽略不计. 因此在Linux2.3版本中的sendFile()函数可以认为是真正意义上的零拷贝.

这种元数据拷贝是通过将文件描述符传送给Socket缓冲区.

image-20230117213324595

图: Linux2.4版本的sendFile()函数

零拷贝在项目中的应用

transferTo()

Netty

Netty快速入门

为什么使用Netty?

  • NIO的类库和API繁杂, 使用麻烦, 需要熟练掌握Selector, ServerSocketChannel, SocketChannel, ByteBuffer等
  • 需要非常熟悉多线程编程和网络编程才能编写出高质量的NIO程序
  • 开发工作量和难度都非常大, 例如客户端面临以下问题:
    • 断线重连
    • 重复接入
    • 消息编码和解码
    • 安全认证
    • 网络闪断
    • 半包读写(什么是半包读写?)
    • 失败缓存
    • 网络拥塞
    • 异常流的处理
  • NIO中Epoll的bug, 会导致Selector的空轮询, 最终导致CPU占用100%
  • NIO是IO多路复用模式, 一个I/O线程处理多个Channel, 程序的调试和跟踪非常麻烦, 往往只能靠一些日志来辅助分析, 定位问题的难度很大

Netty是什么

Netty是最流行的NIO通信框架, 在许多主流的RPC框架(例如Dubbo)中都是用Netty作为其通信组件.

Netty优点:

  • API使用简单, 开发门槛低
  • 功能强大, 预置了多种编码解码功能, 支持多种主流协议
  • 定制能力强, 可以通过ChannelHandler对框架进行灵活扩展
  • 性能高, 和其它主流NIO框架对比, 综合性能最优
  • 成熟稳定, 修复了所有已发现的JDK NIO的bug
  • 在多个领域的应用得到了考验(Elasticsearch, Dubbo)

Netty基础应用

  1. 引入pom依赖

     <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
         <version>4.1.87.Final</version>
         <scope>compile</scope>
     </dependency>
  1. Server端

     package org.example.netty;
    
     import io.netty.bootstrap.ServerBootstrap;
     import io.netty.buffer.ByteBuf;
     import io.netty.buffer.Unpooled;
     import io.netty.channel.*;
     import io.netty.channel.nio.NioEventLoopGroup;
     import io.netty.channel.socket.SocketChannel;
     import io.netty.channel.socket.nio.NioServerSocketChannel;
    
     import java.nio.charset.Charset;
     import java.util.Date;
    
    
public class Server {
    public void bind(int port) {
        //配置服务端的NIO线程组
        //NioEventLoopGroup是个线程组, 包含一组NIO线程, 专门用于处理网络事件, 实际上就是Reactor线程组
        //bossGroup用于接收客户端的连接
        //workerGroup用于进行SocketChannel的网络读写
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //ServerBootstrap对象, 用于启动NIO服务端的辅助启动类, 下面是对其进行配置, 目的是降低服务端的开发复杂度
        //group(): 将两个NIO线程组当作形参传递到serverBootstrap中
        //channel(): 设置创建的Channel为NioServerSocketChannel类型, 对应ServerSocketChannel
        //option(): 配置TCP参数
        //childHandler(): 绑定I/O事件的处理类(使用匿名类), 处理类主要用于处理网络I/O事件, 例如记录日志, 对消息进行编码和解码等
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new ServerHandler());
                    }
                });

        ChannelFuture channelFuture = null;
        try {
            //绑定服务端的监听端口, 调用同步阻塞方法sync()等待绑定操作完成
            //ChannelFuture对象的功能类似于JDK并发包中的Future, 主要用于异步操作的通知回调
            channelFuture = serverBootstrap.bind(port).sync();
            //阻塞, 等待服务端监听端口关闭之后, main()函数才退出
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            //优雅地关闭资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8888;
        new Server().bind(port);
    }
}

class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        // 获取缓冲区中可读的字节数
        int counts = byteBuf.readableBytes();
        // 创建相应大小的字节数组, 一次读取完成
        byte[] request = new byte[counts];
        byteBuf.readBytes(request);
        String s = new String(request, Charset.defaultCharset());
        System.out.println("The time server receive order : " + s);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(s) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        ByteBuf response = Unpooled.copiedBuffer(currentTime.getBytes());
        //将响应消息通过write()方法异步发送给Client客户端
        ctx.write(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //flush()方法的作用: 将消息发送队列中的消息写入到SocketChannel中发送给对方
        // 从性能角度考虑, 为了防止频繁地唤醒Selector进行消息发送,
        // write()方法并不直接将消息写入到SocketChannel, 而是将消息发送到发送缓冲区,
        // 再通过调用flush()方法, 将发送缓冲区中的响应消息全部写入到SocketChannel中
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当发生异常时, 关闭ChannelHandlerContext, 释放相关的句柄资源
        ctx.close();
    }
}
```
  1. Client端

     import io.netty.bootstrap.Bootstrap;
     import io.netty.buffer.ByteBuf;
     import io.netty.buffer.Unpooled;
     import io.netty.channel.*;
     import io.netty.channel.nio.NioEventLoopGroup;
     import io.netty.channel.socket.SocketChannel;
     import io.netty.channel.socket.nio.NioSocketChannel;
     import java.nio.charset.Charset;
     import java.util.logging.Logger;
    
     public class Client {
         public static void main(String[] args) {
             int port = 8888;
             new Client().connect("127.0.0.1", port);
         }
    
         public void connect(String host, int port) {
             EventLoopGroup group = new NioEventLoopGroup();
    
             //客户端的辅助启动类Bootstrap
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .option(ChannelOption.TCP_NODELAY, true)
                     .handler(new ChannelInitializer<SocketChannel>() {
    
                         /**
                          * 当创建NioSocketChannel成功之后, 在进行初始化时,
                          * 将其ChannelHandler设置到ChannelPipeline中, 用于处理网络I/O事件
                          * @param socketChannel            the {@link Channel} which was registered.
                          */
                         @Override
                         protected void initChannel(SocketChannel socketChannel) {
                             socketChannel.pipeline().addLast(new ClientHandler());
                         }
                     });
    
             try {
                 ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
                 channelFuture.channel().closeFuture().sync();
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
                 group.shutdownGracefully();
             }
         }
     }
    
    
class ClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger LOGGER = Logger.getLogger(ClientHandler.class.getName());
    private final ByteBuf firstMessage;

    public ClientHandler() {
        //将字符串写入到ByteBuf对象中:
        // 1. getBytes(): String->byte[]
        // 2. writeBytes(): byte[]->ByteBuf
        byte[] request = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(request.length);
        firstMessage.writeBytes(request);
    }

    /**
     * 当客户端和服务端TCP链路建立成功之后, Netty的NIO线程会调用channelActive()方法,
     * 发送查询时间的指令给客户端, 调用ChannelHandlerContext对象的writeAndFlush()方法将请求消息发送给服务端
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);
    }

    /**
     * 当服务端返回应答消息时, channelRead()方法被调用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] request = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(request);

        String s = new String(request, Charset.defaultCharset());
        System.out.println("Now is : " + s);
    }

    /**
     * 发生异常时, 打印日志, 释放客户端资源
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.warning("Unexpected exception from downstream" + cause.getMessage());
        ctx.close();
    }
}
```

源码解析

ChannelHandler接口

image-20230119225926279

ChannelInboundHandler: 包含处理入站事件的方法

ChannelInboundHandlerAdapter: ChannelInboundHandler的默认实现类

  • channelRead(): 每个传入的消息都要调用
  • channelReadComplete():

Netty的组件和设计

ByteBuf

Java NIO中提供了ByteBuffer, 其完全可以满足NIO编程的需要, 但是也有一些缺点:

  • ByteBuffer长度固定, 容量不能动态扩展和收缩
  • ByteBuffer只有一个标识位置的指针position, 读写的时候需要通过flip()和rewind()进行切换
  • ByteBuffer的API功能有限, 一些常用的实用的高级特性需要额外编程实现

为了弥补上面的这些不足, Netty提供了自己的ByteBuffer实现(ByteBuf)

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.kqueue.KQueueDatagramChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

public class FutureMain {
    public static void main(String[] args) {
        Channel channel = new KQueueDatagramChannel();

        //异步执行, 即运行该代码时不会等待执行完成, 因此channelFuture是操作成功的结果, 也可能是操作失败的结果
        ChannelFuture channelFuture = channel.connect(new InetSocketAddress(8888));

        //如果在添加监听器之前操作结果就已知了, 那么会立即调用该部分的处理逻辑
        channelFuture.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                //操作执行成功的处理逻辑
                ByteBuf byteBuf = Unpooled.copiedBuffer("Hello World", Charset.defaultCharset());
                future.channel().writeAndFlush(byteBuf);
            } else {
                //操作执行失败的处理逻辑
                Throwable cause = future.cause();
                System.out.println("cause = " + cause);
            }
        });
    }
}

Channel, EventLoop和ChannelFuture

Channel: Socket

EventLoop: 控制流, 多线程, 并发

ChannelFuture: 异步通知

Channel接口

image-20230120010544256

每个 Channel 都将会被分配一个 ChannelPipeline 和 ChannelConfig. ChannelConfig包含了该Channel的所有配置设置, 并且支持热更新.

image-20230120223436759

image-20230120223447811

Channel 的正常生命周期如图 6-1 所示. 当这些状态发生改变时, 将会生成对应的事件. 这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler, 其可以随后对它们做出响应

image-20230120223725233

image-20230120223958357

ChannelHandler中的方法和Channel的生命周期密切相关.

image-20230120224306022

channelFuture接口

image-20230120135058710

image-20230120151350291

TCP粘包/拆包问题的解决方案

对于TCP编程, 无论是客户端还是服务端, 当读取或者发送数据的时候, 都需要考虑TCP底层的粘包/拆包机制.

TCP粘包/拆包

TCP是一个“流”协议, TCP底层并不了解上层业务数据的具体含义, 底层会根据TCP缓冲区的实际情况进行包的划分. 所以在业务上认为, 一个完整的包可能会被TCP拆分成多个包进行发送, 也有可能把多个小的包封装成一个大的数据包发送, 这就是所谓的TCP粘包和拆包问题.

image-20230118112657837

假设Client客户端发送两个数据包D1和D2, 由于Server端一次读取的字节数是不确定的, 所以有以下可能情况:

  • 第1次读取D1, 第2次读取D2
  • 第1次读取D1和D2(粘包)
  • 第1次读取D1和D2的一部分, 第2次读取D2的剩余部分(拆包)
  • 第1次读取D1的一部分, 第2次读取D1的剩余部分和D2(拆包)
  • 由于D1和D2远大于TCP接收的滑动窗口, 所以D1和D2都需要进行多次拆包

由于TCP粘包/拆包导致功能异常的案例展示

class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        // 获取缓冲区中可读的字节数
        int counts = byteBuf.readableBytes();
        // 创建相应大小的字节数组, 一次读取完成
        byte[] request = new byte[counts];
        byteBuf.readBytes(request);
        String s = new String(request, Charset.defaultCharset())
                .substring(0, request.length - System.getProperty("line.separator").length());
        System.out.println("The time server receive order : " + s + "\n the counter is : " + ++counter);

        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(s) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf response = Unpooled.copiedBuffer(currentTime.getBytes());
        //将响应消息通过write()方法异步发送给Client客户端
        ctx.write(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //flush()方法的作用: 将消息发送队列中的消息写入到SocketChannel中发送给对方
        // 从性能角度考虑, 为了防止频繁地唤醒Selector进行消息发送,
        // write()方法并不直接将消息写入到SocketChannel, 而是将消息发送到发送缓冲区,
        // 再通过调用flush()方法, 将发送缓冲区中的响应消息全部写入到SocketChannel中
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当发生异常时, 关闭ChannelHandlerContext, 释放相关的句柄资源
        ctx.close();
    }
}

自定义协议

协议要素

  • 魔数: 用来第一时间判断是否为无效包
  • 版本号: 支持协议的升级
  • 序列化算法: 消息正文采用哪种序列化和反序列化方法
  • 指令类型: 登录, 注册, 私聊, 群聊等
  • 请求序号: 为了双工通信, 提供异步能力
  • 正文长度
  • 消息正文
    • xml格式(早期)
    • json格式

编解码器

解码器

解码器用于处理入站数据, 本质上也是一个ChannelInboundHandler, 用于将数据从一种格式转换为另一种格式

  • 将字节解码为消息

    ByteToMessageDecoder和ReplayingDecoder

  • 将一种消息类型解码为另一种消息类型

    MessageToMessageDecoder

ByteToMessageDecoder


   转载规则


《Netty》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
BeanFactory容器的实现 BeanFactory容器的实现
BeanFactory 容器的实现BeanFactory 不会做的事情: 不会主动调用 BeanFactoryPostProcessor 不会主动调用(添加)BeanPostProcessor 不会主动实例化单例对象(懒加载) 不会解析B
2023-04-02
下一篇 
IO多路复用 IO多路复用
IO模型 BIO: 同步阻塞式 适用于连接数目比较少且固定的架构 NIO: 同步非阻塞式 适用于连接数目多, 但是连接比较短(轻量操作)的架构 AIO: 异步非阻塞式 适用于连接数目多, 且连接比较长(重操作)的架构 对
2023-04-01
  目录