Linux 异步 IO —— io_uring 学习 Shepard-Wang

引入

一 引入

linux aio 的缺陷:

  1. 只支持 DIRECT_IO
  2. 实现上拷贝消耗资源大
  3. 相比 epoll 的优势在缩小
  4. 接口相当复杂

io_uring 优势:

  1. 使用方便
  2. 在设计上是真正异步的:只要 设置了合适的 flag,它在 系统调用上下文中就只是将请求放入队列, 不会做其他任何额外的事情,保证了应用永远不会阻塞
  3. 通用性强:支持 Buffer IODirect IO,也支持 epoll 模型
  4. 特性丰富:基于 io_uring 甚至能重写(re-implement)Linux 的每个系统调用。
  5. 高性能:IO 请求 overhead

二 实现原理

io_uring 实现异步 I/O 的方式其实是一个生产者-消费者模型:

  1. 用户进程生产 I/O 请求,放入提交队列(Submission Queue,后续简称 SQ)。
  2. 内核消费 SQ 中的 I/O 请求,完成后将结果放入完成队列(Completion Queue,后续简称 CQ)。
  3. 用户进程从 CQ 中收割 I/O 结果。

SQCQ 是内核初始化 io_uring 实例的时候创建的。为了减少系统调用和减少用户进程与内核之间的数据拷贝,io_uring 使用 mmap 的方式让用户进程和内核共享 SQCQ 的内存空间。

另外,由于先提交的 I/O 请求不一定先完成,SQ 保存的其实是一个数组索引(数据类型 uint32),真正的 SQE(Submission Queue Entry)保存在一个独立的数组(SQ Array)。所以要提交一个 I/O 请求,得先在 SQ Array 中找到一个空闲的 SQE,设置好之后,将其数组索引放到 SQ 中。

用户进程、内核、SQCQSQ Array 之间的基本关系如下:

这两个队列:

  • 都是 单生产者、单消费者,size 是 2 的幂次;
  • 提供 无锁接口(lock-less access interface),内部使用 内存屏障 做同步(coordinated with memory barriers)。

使用方式

  • 请求
    • 应用创建 SQ entries (SQE),更新 SQ tail
    • 内核消费 SQE,更新 SQ head
  • 完成
    • 内核为完成的一个或多个请求创建 CQ entries (CQE),更新 CQ tail;
    • 应用消费 CQE,更新 CQ head。
    • 完成事件(completion events)可能以任意顺序到达,到总是与特定的 SQE 相关联的。
    • 消费 CQE 过程无需切换到内核态。

io_uring 实例可工作在三种模式:

  1. 中断驱动模式interrupt driven

    默认模式。可通过 io_uring_enter() 提交 I/O 请求,然后直接检查 CQ 状态判断是否完成。

  2. 轮询模式polled

    Busy-waiting for an I/O completion,而不是通过异步 IRQ(Interrupt Request)接收通知。

    这种模式需要文件系统(如果有)和块设备(block device)支持轮询功能。 相比中断驱动方式,这种方式延迟更低(连系统调用都省了), 但可能会消耗更多 CPU 资源。

    目前,只有指定了 O_DIRECT flag 打开的文件描述符,才能使用这种模式。当一个读 或写请求提交给轮询上下文(polled context)之后,应用(application)必须调用 io_uring_enter() 来轮询 CQ 队列,判断请求是否已经完成。

    对一个 io_uring 实例来说,不支持混合使用轮询和非轮询模式

  3. 内核轮询模式kernel polled

    这种模式中,会 创建一个内核线程(kernel thread)来执行 SQ 的轮询工作。

    使用这种模式的 io_uring 实例, 应用无需切到到内核态 就能触发(issue)I/O 操作。 通过 SQ 来提交 SQE,以及监控 CQ 的完成状态,应用无需任何系统调用,就能提交和收割 I/O(submit and reap I/Os)。

    如果内核线程的空闲时间超过了用户的配置值,它会通知应用,然后进入 idle 状态。 这种情况下,应用必须调用 io_uring_enter() 来唤醒内核线程。如果 I/O 一直很繁忙,内核线性是不会 sleep 的。

io_uring_setup

int io_uring_setup(int entries, struct io_uring_params *params);

内核提供了 io_uring_setup 系统调用来初始化一个 io_uring 实例。

  1. io_uring_setup 的返回值是一个文件描述符,暂且称为 ring_fd,用于后续的 mmap 内存映射和其它相关系统调用的参数。
  2. io_uring 会创建 SQCQSQ Array
  3. ``entries 参数表示的是 SQSQ Array 的大小,CQ 的大小默认是 2 * entries`。

params 参数既是输入参数,也是输出参数,其定义如下:

struct io_uring_params {
    __u32 sq_entries;
    __u32 cq_entries;
    __u32 flags;
    __u32 sq_thread_cpu;
    __u32 sq_thread_idle;
    __u32 features;
    __u32 resv[4];
    struct io_sqring_offsets sq_off;
    struct io_cqring_offsets cq_off;
};

struct io_sqring_offsets {
    __u32 head;
    __u32 tail;
    __u32 ring_mask;
    __u32 ring_entries;
    __u32 flags;
    __u32 dropped;
    __u32 array;
    __u32 resv[3];
};

struct io_cqring_offsets {
    __u32 head;
    __u32 tail;
    __u32 ring_mask;
    __u32 ring_entries;
    __u32 overflow;
    __u32 cqes;
    __u32 flags;
    __u32 resv[3];
};
  • flagssq_thread_cpusq_thread_idle 是输入参数,用于设置 io_uring 的一些特性。

  • resv[4] 是保留字段,我们暂且忽略。

  • 其它参数都是由内核设置的输出参数,用户进程可能需要使用这些参数进行一些初始化和判断:

    • sq_entries 是提交队列的大小。
    • cq_entries 是完成队列的大小。
    • features 描述当前内核版本支持的 io_uring 特性。其中,IORING_FEAT_SINGLE_MMAPio_uring 一个比较重要的特性:内核支持通过一次 mmap 完成 SQCQ 的内存映射,具体可以参考 liburingio_uring_mmap
    • sq_off 描述了 SQ 的一些属性的 offset
    • cq_off 描述了 CQ 的一些属性的 offset

提交/收割 IO 请求

初始化完成之后,我们需要向 io_uring 提交 I/O 请求。默认情况下,使用 io_uring 提交 I/O 请求需要:

  1. SQ Arrary 中找到一个空闲的 SQE
  2. 根据具体的 I/O 请求设置这个 SQE
  3. 将 SQE 的数组索引放到 SQ 中。
  4. 调用系统调用 io_uring_enter 提交 SQ 中的 I/O 请求。

为了进一步提升性能,io_uring 提供了内核轮询的方式来提交 I/O 请求(设置 params.flagsIORING_SETUP_SQPOLL 位):创建一个内核线程(简称 SQ 线程)对 SQ 进行轮询(polling),发现有未提交的 I/O 请求就自动进行提交:

  • 如果 I/O 请求源源不断,SQ 线程会一直轮询并提交 I/O 请求给内核,这个过程不需要经过系统调用。
  • 如果 SQ 线程的空闲时间超过 sq_thread_idle 毫秒,会自动睡眠,并设置 io_sq_ringflagssq_ring_ptr + p.sq_off.flags )的 IORING_SQ_NEED_WAKEUP 位。
  • 用户进程需要根据 flags 是否设置了 IORING_SQ_NEED_WAKEUP 来决定是否需要调用 io_uring_enter 来唤醒 SQ 线程

io_uring_register()

注册用于异步 I/O 的 文件或用户缓冲区

int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);

注册文件或用户缓冲区,使内核能 长时间持有对该文件在内核内部的数据结构引用, 或创建 应用内存的长期映射, 这个操作只会在注册时执行一次,而不是每个 I/O 请求都会处理,因此减少了 per-I/O overhead

注册的缓冲区(buffer)性质

  • Registered buffers 将会 被锁定在内存中(be locked in memory),并 计入用户的 RLIMIT_MEMLOCK 资源限制。
  • 此外,每个 buffer 有 1GB 的大小限制
  • 当前,buffers 必须是 匿名、非文件后端的内存(anonymous, non-file-backed memory),例如 malloc or mmap with the MAP_ANONYMOUS flag set 返回的内存。
  • Huge pages 也是支持的。整个 huge page 都会被 pin 到内核,即使只用到了其中一部分。
  • 已经注册的 buffer 无法调整大小,想调整只能先 unregister,再重新 register 一个新的。

io_uring_enter()

int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t *sig);

这个系统调用用于初始化和完成(initiate and complete)I/O,使用共享的 SQ 和 CQ。 单次调用同时执行:

  1. 提交新的 I/O 请求
  2. 等待 I/O 完成

参数:

  1. fdio_uring_setup() 返回的文件描述符;
  2. to_submit 指定了 SQ 中提交的 I/O 数量;
  3. 依据不同模式:
    • 默认模式,如果指定了 min_complete,会等待这个数量的 I/O 事件完成再返回;
    • 如果 io_uringpolling 模式,这个参数表示:
      1. 0:要求内核返回当前以及完成的所有 events,无阻塞;
      2. 非零:如果有事件完成,内核仍然立即返回;如果没有完成事件,内核会 poll,等待指定的次数完成,或者这个进程的时间片用完。

注意:对于 interrupt driven I/O,应用无需进入内核就能检查 CQ 的 event completions

当这个系统调用返回时,表示一定数量的 SEQ 已经被消费和提交了,此时可以安全的重用队列中的 SEQ。 此时 IO 提交有可能还停留在异步上下文中,即实际上 SQE 可能还没有被提交 —— 不过 用户不用关心这些细节 —— 当随后内核需要使用某个特定的 SQE 时,它已经复制了一份。

高级特性

io_uring 提供了一些用于特殊场景的高级特性:

  1. File registration(文件注册):每次发起一个指定文件描述的操 作,内核都需要花费一些时钟周期(cycles)将文件描述符映射到内部表示。 对于那些针对同一文件进行重复操作的场景,io_uring 支持提前注册这些文件,后面直接查找就行了。
  2. Buffer registration(缓冲区注册):与 file registration 类 似,direct I/O 场景中,内核需要 map/unmap memory areas。io_uring 支持提前 注册这些缓冲区(buffers)。
  3. Poll ring(轮询环形缓冲区):对于非常快是设备,处理中断的开 销是比较大的。io_uring 允许用户关闭中断,使用轮询模式。前面“三种工作模式”小节 也介绍到了这一点。
  4. Linked operations(链接操作):允许用户发送串联的请求。这两 个请求同时提交,但后面的会等前面的处理完才开始执行。

三 liburing 使用

io_uring_cqe/io_uring_sqe结构

struct io_uring_cqe {
	__u64	user_data;	/* sqe->data submission passed back */
	__s32	res;		/* result code for this event */
	__u32	flags;

	/*
	 * If the ring is initialized with IORING_SETUP_CQE32, then this field
	 * contains 16-bytes of padding, doubling the size of the CQE.
	 */
	__u64 big_cqe[];
};



struct io_uring_sqe {
	__u8	opcode;		/* type of operation for this sqe */
	__u8	flags;		/* IOSQE_ flags */
	__u16	ioprio;		/* ioprio for the request */
	__s32	fd;		/* file descriptor to do IO on */
	union {
		__u64	off;	/* offset into file */
		__u64	addr2;
		struct {
			__u32	cmd_op;
			__u32	__pad1;
		};
	};
	union {
		__u64	addr;	/* pointer to buffer or iovecs */
		__u64	splice_off_in;
	};
	__u32	len;		/* buffer size or number of iovecs */
	union {
		__kernel_rwf_t	rw_flags;
		__u32		fsync_flags;
		__u16		poll_events;	/* compatibility */
		__u32		poll32_events;	/* word-reversed for BE */
		__u32		sync_range_flags;
		__u32		msg_flags;
		__u32		timeout_flags;
		__u32		accept_flags;
		__u32		cancel_flags;
		__u32		open_flags;
		__u32		statx_flags;
		__u32		fadvise_advice;
		__u32		splice_flags;
		__u32		rename_flags;
		__u32		unlink_flags;
		__u32		hardlink_flags;
		__u32		xattr_flags;
		__u32		msg_ring_flags;
	};
	__u64	user_data;	/* data to be passed back at completion time */
	/* pack this to avoid bogus arm OABI complaints */
	union {
		/* index into fixed buffers, if used */
		__u16	buf_index;
		/* for grouped buffer selection */
		__u16	buf_group;
	} __attribute__((packed));
	/* personality to use, if used */
	__u16	personality;
	union {
		__s32	splice_fd_in;
		__u32	file_index;
		struct {
			__u16	addr_len;
			__u16	__pad3[1];
		};
	};
	union {
		struct {
			__u64	addr3;
			__u64	__pad2[1];
		};
		/*
		 * If the ring is initialized with IORING_SETUP_SQE128, then
		 * this field is used for 80 bytes of arbitrary command data
		 */
		__u8	cmd[0];
	};
};
#include <bits/stdc++.h>
#include <liburing.h>
#include <unistd.h>

char buf[1024] = {0};

int main() {
  int fd = open("1.txt", O_RDONLY, 0);
  
  io_uring ring;
  io_uring_queue_init(32, &ring, 0); // 初始化
   
  auto sqe = io_uring_get_sqe(&ring); // 从环中得到一块空位
    
  io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 为这块空位准备好操作
    
  io_uring_submit(&ring); // 提交任务
    
  io_uring_cqe* res; // 完成队列指针
  io_uring_wait_cqe(&ring, &res); // 阻塞等待一项完成的任务
    
  assert(res);
  std::cout << "read bytes: " << res->res << " \n";
  std::cout << buf << std::endl;
    
  io_uring_cqe_seen(&ring, res); // 将任务移出完成队列
    
  io_uring_queue_exit(&ring); // 退出
  
  return 0;
}

echo 服务器

所以,我们定义一下我们在任务之间传递的结构体

然后把它放到 user_data 中。

struct request {
  enum STATE { ACCEPT, READ, WRITE };
  int fd;
  STATE state;
  union {
    struct {
      sockaddr_in ipv4_addr;
      socklen_t lens;
    } addr;
    char buf[BUFSIZE];
  };
};

封装一下 accept/read/write 接口

class IOuring {
  io_uring ring;

 public:
  IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

  ~IOuring() { io_uring_queue_exit(&ring); }

  void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

  int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

  int submit() { return io_uring_submit(&ring); }

  void accpet_asyn(int sock_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::ACCEPT;
    body->fd = sock_fd;
    body->addr.lens = sizeof(sockaddr_in);
    io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
                         &(body->addr.lens), 0);
    io_uring_sqe_set_data(sqe, body);
  }

  void read_asyn(int client_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::READ;
    body->fd = client_fd;
    io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
    io_uring_sqe_set_data(sqe, body);
  }

  void write_asyn(int client_fd, request* body) {
    auto sqe = io_uring_get_sqe(&ring);
    body->state = request::WRITE;
    body->fd = client_fd;
    io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
    io_uring_sqe_set_data(sqe, body);
  }
};

使用:

#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

const int BUFSIZE = 1024;

int main() {
  /*init socket*/
  int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  sockaddr_in sock_addr;
  sock_addr.sin_port = htons(8000);
  sock_addr.sin_family = AF_INET;
  sock_addr.sin_addr.s_addr = INADDR_ANY;
  int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
  perror("");
  listen(sock_fd, 10);

  std::cout << "listen begin ..." << std::endl;

  /*io_uring*/
  IOuring ring(1024);

  ring.accpet_asyn(sock_fd, new request);
  ring.submit();

  while (true) {
    io_uring_cqe* cqe;
    ring.wait(&cqe);
    request* res = (request*)cqe->user_data;
    switch (res->state) {
      case request::ACCEPT:
        if (cqe->res > 0) {
          int client_fd = cqe->res;
          ring.accpet_asyn(sock_fd, res);
          ring.read_asyn(client_fd, new request);
          ring.submit();
        }
        std::cout << cqe->res << std::endl;
        break;
      case request::READ:
        if (cqe->res > 0) std::cout << res->buf << std::endl;
        ring.write_asyn(res->fd, res);
        ring.submit();
        break;
      case request::WRITE:
        if (cqe->res > 0) {
          close(res->fd);
          delete res;
        }
        break;
      default:
        std::cout << "error " << std::endl;
        break;
    }
    ring.seen(cqe);
  }

  return 0;
}

参考: