IO多路复用

IO模型

  • BIO: 同步阻塞式

    适用于连接数目比较少且固定的架构

  • NIO: 同步非阻塞式

    适用于连接数目多, 但是连接比较短(轻量操作)的架构

  • AIO: 异步非阻塞式

    适用于连接数目多, 且连接比较长(重操作)的架构

对于同步、异步和阻塞、非阻塞的理解

同步和异步

  • 同步: 顺序执行,例如线程同步
  • 异步: 通过回调函数执行

阻塞和非阻塞

  • 阻塞: 执行代码后,除非事件发生,否则不会继续向下执行。
  • 非阻塞: 可以继续向下执行代码

accept()read() 会被阻塞

BIO

BIO(Blocking IO)是同步阻塞式的IO通信, 服务器实现模式为一个连接一个线程, 如果这个连接不做任何事情则会造成不必要的开销, 因此可以使用线程池机制来改善, 但是线程池机制也会引入新的问题: 如果请求数量超过线程池的最大连接数量, 则会造成后续请求失败. 因此BIO模式适用于连接数目比较少且固定的架构.

客户端与服务器通信过程(BIO模式)

image-20230114235458276

服务端流程

  1. 定义一个ServerSocket对象, 进行服务端的端口注册
  2. 监听客户端的Socket连接请求
  3. 从Socket管道中得到InputStream, 获取客户端发送过来的请求数据
  4. 进行业务逻辑的处理
  5. 向Socket管道中推送OutputStream, 向客户端返回响应数据

客户端流程

  1. 连接服务器端口
  2. 发送请求信息(即向Socket中发送OutputStream)
  3. 等待服务器响应, 从Socket中得到InputStream

总交互流程

  1. 服务端: 进行端口注册, 并监听客户端的Socket连接请求
  2. 客户端: 连接服务器端口, 发送请求数据
  3. 服务端: 从Socket中获取客户端发送的请求数据, 进行业务处理, 然后返回响应数据
  4. 客户端: 从Socket中获取服务器返回的响应数据

伪异步IO

伪异步IO的概念来源于实践, 表示通过线程池来做缓冲区的方法.

同步阻塞案例的演示

简单案例

public class Server {
    //为了简化处理, 并没有实现BIO模式中的一个客户端一个线程, 而是将处理逻辑写在了主线程中
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(8888);

        Socket socket = serverSocket.accept();

        InputStream is = socket.getInputStream();
        // 按行读取字符需要使用BufferedReader
        // BufferedReader只能通过Reader获取, 因此需要将InputStream->Reader, 即使用InputStreamReader
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        String msg;
        while ((msg = br.readLine()) != null) {
            System.out.println("msg = " + msg);
        }
    }
}
public class Client {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket("127.0.0.1", 8888);
        OutputStream os = socket.getOutputStream();
        PrintStream ps = new PrintStream(os);
        //由于服务端的处理逻辑是读取完一行后打印, 所以这里故意使用print()而不是使用println()
        ps.print("hello world");
        ps.flush();
        //客户端线程睡5s,更易观察效果
        Thread.sleep(5000);
    }
}

阻塞原因分析

  1. 在总交互流程的第3步中, 从br.readLine()的代码逻辑中, 服务端Server会一直等待客户端发送至少一行数据.
  2. 而客户端Client在发送“hello world”之后并没有换行符, 服务端接收到“hello world”之后并不会认为这是一行数据, 所以阻塞继续等待客户端发送剩下的数据.
  3. 客户端由于Thread.sleep(5000)阻塞了5s之后(此时服务端线程也阻塞了5s)会结束, 此时服务端发现客户端的Socket连接断开, 因此会报错.

image-20230115003352673

image-20230115003424794

多发和多收消息的演示案例

多客户端演示案例

需要引入线程, 每有一个socket连接请求则创建一个线程

NIO

JVM是一把双刃剑,它提供了统一的操作系统,与特定操作系统平台的细枝末节都被隐藏起来,因此方便编程,但是隐藏操作系统同时意味着特定操作系统下那些独具特色、功能强大的特性被挡在JVM之外。现代操作系统底层提供一些高效的I/O操作,新的IO就是为了利用上这些操作系统提供的新特性。总结:本质上还是操作系统的进步。

Java NIO 需要看所处的操作系统环境来判断是 非阻塞式IO(Windows系统) 还是 IO多路复用(Linux系统)。非阻塞式IO 和 IO多路复用 的区别在于是用户态进行轮询还是在内核态进行轮询。非阻塞式IO是在用户态进行轮询,则循环中每次调用 read() 都需要进行一次用户态和内核态的切换,开销过大。而IO多路复用则是在内核态进行轮询,减少了用户态和内核态的切换次数。

IO多路复用技术包括:select、poll 和 epoll。假如有 100w 个文件描述符(客户端连接),那么 select 会调用 100w 次 read 操作,而可能其中只有 2 个文件描述符中存在数据读写,使用 epoll 则只会调用 2 次 read 操作。目前理解:select 和 epoll 都不可避免的去判断这 100w 个文件描述符中是否有事件发生,但是 epoll 只需要再进行 2 次 read 系统调用;但 select 需要进行 100w 次 read 系统调用,根据每一次 read 的返回结果来读取数据,没有数据的客户端连接也会调用一次,只是没有读取到数据而已。

NIO开发步骤

Server端

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Server {

    public static void main(String[] args) throws Exception {

        // 这一部分可以放置到
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //Selector只能和非阻塞模式的Channel配合使用, 而FileChannel是不可以配置成非阻塞模式的
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8888));
        Selector selector = Selector.open();
        //ServerSocketChannel-> OP_ACCEPT服务器准备好接受连接
        //SocketChannel->OP_CONNECT客户端可以连接到服务器
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);


        while (selector.select() > 0) {
            //Server服务端只需要监听ACCEPT和READ事件
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey selectionKey = it.next();
                if (selectionKey.isAcceptable()) {
                    // 所有客户端的第一次请求都是向serverSocketChannel中发送连接请求
                    // 每有一个连接请求, 分配一个通道socketChannel
                    // 而socketChannel中监听读事件, 也就是服务端等待客户端发送请求信息, 然后在isReadable()的处理逻辑中执行处理逻辑然后返回给Client客户端响应数据
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    //读就绪
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    //读逻辑处理
                    //todo: 可以在try-catch中处理Client离线的逻辑: (1).SelectionKey取消, (2).Channel关闭
                    //todo: 优化时可以将处理逻辑另开一个线程, 而选择器只作轮询的选择和判断就绪操作
                    //todo: 更进一步, 可以使用线程池来执行处理逻辑
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int readCounts;
                    while ((readCounts = socketChannel.read(buffer)) > 0) {
                        buffer.flip();
                        String s = new String(buffer.array(), 0, readCounts);
                        System.out.println(s);
                        buffer.clear();
                    }
                }

                //将选择键从Selected Key Set中移除. 需要使用迭代器的remove(), 而不是使用selectionKey.cancel().
                //cancel()方法<->register()方法, 会取消Channel和Selector的注册关系
                //使用增强for的缺点就是不能访问下标和删除集合中的元素
                it.remove();
            }

        }
    }
}

Client端

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class Client {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8888));

        FileInputStream fis = new FileInputStream("C:/test.txt");
        FileChannel fileChannel = fis.getChannel();

        //transferTo底层使用到了零拷贝
        //在Windows系统中, 一次transferTo最多传送8MB, 对于大文件需要多次发送, position, count也需要计算得出
        //在Linux系统中, 一次transferTo即可
        fileChannel.transferTo(0, fileChannel.size(), socketChannel);
    }
}

Buffer缓冲区

Channel通道

Channel是双向操作的, 既可以用于读(ReadableByteChannel接口), 也可以用于写(WritableByteChannel接口), 还可以用于读写同时操作(ReadableByteChannel接口和WritableByteChannel接口), 所有的Channel类都实现了读写接口, 即Channel是双向操作的.

Channel相较于Stream而言, Channel是全双工的, 而Stream是单向的.

I/O广义上可以分为两大类:

  • File IO(文件IO)
  • Stream IO(流IO)

通道作为I/O服务的导管, 相应地也有两种类型地Channel:

  • File文件通道

    FileChannel: 文件

  • Socket套接字通道

    • DatagramChannel: UDP
    • SocketChannel: TCP
    • ServerSocketChannel: 服务器TCP

FileChannel类

打开Channel

SocketChannel可以直接创建, 而FileChannel只能通过调用下面三类实例对象的getChannel()方法来获取:

  • FileInputStream
  • FileOutputStream
  • RandomAccessFile
RandomAccessFile file = new RandomAccessFile("C:/test.txt", "rw");
FileChannel channel = file.getChannel();

关闭Channel

从Channel中读取数据(从 Channel 到 Buffer)

channel.read(buffer)操作

int bufferSize = 40;
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
// readCounts代表实际读取数据的字节长度
// 计算逻辑:
// 1. readCounts = min(bufferSize, 文件剩余数据长度);
// 2. readCounts = (readCounts != 0) ? readCounts : -1;
int readCounts = channel.read(buffer);
//此时buffer中存在数据

向Channel中写入(从 Buffer 到 Channel)

channel.write(buffer)操作

DatagramChannel类

SocketChannel类

ServerSocketChannel类

Channel源码

import java.io.IOException;
import java.io.Closeable;

public interface Channel extends Closeable {
    public boolean isOpen();
    public void close() throws IOException;
}

Selector选择器

Selector,SelectableChannel 和 SelectionKey 这三个类组成了使得在 Java 平台上使得就绪检查变得可行的三驾马车.

SelectableChannel类

SelectableChannel并不是Selector选择器的一个组成部分, 而是属于Channel, 所有的SocketChannel都属于SelectableChannel, 而FileChannel不是SelectableChannel.

由于Selector中涉及使用到的Channel都属于SelectableChannel, 所以将SelectableChannel在这部分引入一下.

SelectionKey类

取消逻辑

image-20230116222431922
  • 当Channel关闭, 所有相关的键会被自动取消(即添加到相应Selector的取消键集合中)

    image-20230116222903906
  • 当Selector关闭, 所有注册到该Selector的Channel都被注销, 相关的键被取消

    image-20230116223229206

Selector类

Selector是注册各种IO事件的地方, 当我们关注的事件发生时, 由Selector对象进行通知.

每个Selector对象维护三个与SelectionKey相关集合:

  • Registered Key Set (已注册键的集合)
  • Selected Key Set (已选择键的集合)
  • Cancelled Key Set (已取消键的集合)

选择过程select()

  1. 检查Cancelled Key Set.

    遍历每一个已取消的键, 从集合中移除, 也从其它两个集合(Registered Key SetSelected Key Set)中移除, 注销相关的通道

  2. 检查Registered Key Set中每一个SelectionKey键的interest集合

停止选择过程wakeup()

wakeup()提供了使线程从被阻塞的select()方法中优雅退出的能力.

wakeup()作用:

  • 如果当前正在执行select()方法, 那么使得Selector选择器上的第一个还没有返回的select()操作立即返回.
  • 如果当前没有执行select()方法, 那么后续第一次(下一次)select()的调用将立即返回. 除后续第一次外select()方法将正常进行. (延迟处理特性)
  • 两次select()方法中连续多次调用wakeup()和只调用一次wakeup()的作用相同

   转载规则


《IO多路复用》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
Netty Netty
零拷贝(性能优化) 零拷贝并不是不进行拷贝, 而是指0次CPU拷贝(从内核空间缓冲区->用户空间缓冲区)。而CPU拷贝是比DMA拷贝耗费的时间长得多, 所以减少CPU拷贝是提升性能的关键。 mmap内存映射方式适用于小数据,
2023-04-01
下一篇 
  目录