如果这篇博客帮助到你,可以请我喝一杯咖啡~
CC BY 4.0 (除特别声明或转载文章外)
引入
一 引入
linux
aio
的缺陷:
- 只支持
DIRECT_IO
- 实现上拷贝消耗资源大
- 相比
epoll
的优势在缩小 - 接口相当复杂
io_uring
优势:
- 使用方便
- 在设计上是真正异步的:只要 设置了合适的 flag,它在 系统调用上下文中就只是将请求放入队列, 不会做其他任何额外的事情,保证了应用永远不会阻塞
- 通用性强:支持
Buffer IO
和Direct IO
,也支持epoll
模型 - 特性丰富:基于
io_uring
甚至能重写(re-implement
)Linux 的每个系统调用。 - 高性能:
IO
请求overhead
小
二 实现原理
io_uring
实现异步 I/O
的方式其实是一个生产者-消费者模型:
- 用户进程生产
I/O
请求,放入提交队列(Submission Queue,后续简称 SQ)。 - 内核消费
SQ
中的I/O
请求,完成后将结果放入完成队列(Completion Queue,后续简称 CQ)。 - 用户进程从
CQ
中收割I/O
结果。
SQ
和 CQ
是内核初始化 io_uring
实例的时候创建的。为了减少系统调用和减少用户进程与内核之间的数据拷贝,io_uring
使用 mmap
的方式让用户进程和内核共享 SQ
和 CQ
的内存空间。
另外,由于先提交的 I/O
请求不一定先完成,SQ
保存的其实是一个数组索引(数据类型 uint32),真正的 SQE
(Submission Queue Entry)保存在一个独立的数组(SQ Array
)。所以要提交一个 I/O
请求,得先在 SQ Array
中找到一个空闲的 SQE
,设置好之后,将其数组索引放到 SQ
中。
用户进程、内核、SQ
、CQ
和 SQ Array
之间的基本关系如下:
这两个队列:
- 都是 单生产者、单消费者,size 是 2 的幂次;
- 提供 无锁接口(lock-less access interface),内部使用 内存屏障 做同步(coordinated with memory barriers)。
使用方式:
- 请求
- 应用创建
SQ entries
(SQE
),更新SQ tail
; - 内核消费
SQE
,更新SQ head
。
- 应用创建
- 完成
- 内核为完成的一个或多个请求创建
CQ entries
(CQE),更新CQ
tail; - 应用消费
CQE
,更新CQ
head。 - 完成事件(completion events)可能以任意顺序到达,到总是与特定的 SQE 相关联的。
- 消费 CQE 过程无需切换到内核态。
- 内核为完成的一个或多个请求创建
io_uring
实例可工作在三种模式:
中断驱动模式(
interrupt driven
)默认模式。可通过
io_uring_enter()
提交I/O
请求,然后直接检查CQ
状态判断是否完成。轮询模式(
polled
)Busy-waiting for an I/O completion,而不是通过异步
IRQ
(Interrupt Request)接收通知。这种模式需要文件系统(如果有)和块设备(block device)支持轮询功能。 相比中断驱动方式,这种方式延迟更低(连系统调用都省了), 但可能会消耗更多
CPU
资源。目前,只有指定了
O_DIRECT
flag
打开的文件描述符,才能使用这种模式。当一个读 或写请求提交给轮询上下文(polled context)之后,应用(application)必须调用io_uring_enter()
来轮询CQ
队列,判断请求是否已经完成。对一个
io_uring
实例来说,不支持混合使用轮询和非轮询模式。内核轮询模式(
kernel polled
)这种模式中,会 创建一个内核线程(kernel thread)来执行 SQ 的轮询工作。
使用这种模式的
io_uring
实例, 应用无需切到到内核态 就能触发(issue)I/O 操作。 通过SQ
来提交SQE
,以及监控CQ
的完成状态,应用无需任何系统调用,就能提交和收割I/O
(submit and reap I/Os)。如果内核线程的空闲时间超过了用户的配置值,它会通知应用,然后进入
idle
状态。 这种情况下,应用必须调用io_uring_enter()
来唤醒内核线程。如果I/O
一直很繁忙,内核线性是不会sleep
的。
io_uring_setup
int io_uring_setup(int entries, struct io_uring_params *params);
内核提供了 io_uring_setup
系统调用来初始化一个 io_uring
实例。
io_uring_setup
的返回值是一个文件描述符,暂且称为ring_fd
,用于后续的mmap
内存映射和其它相关系统调用的参数。io_uring
会创建SQ
、CQ
和SQ Array
- ``entries
参数表示的是
SQ和
SQ Array的大小,
CQ的大小默认是
2 * entries`。
params
参数既是输入参数,也是输出参数,其定义如下:
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 features;
__u32 resv[4];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};
struct io_sqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 flags;
__u32 dropped;
__u32 array;
__u32 resv[3];
};
struct io_cqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 overflow;
__u32 cqes;
__u32 flags;
__u32 resv[3];
};
flags
、sq_thread_cpu
、sq_thread_idle
是输入参数,用于设置io_uring
的一些特性。resv[4]
是保留字段,我们暂且忽略。其它参数都是由内核设置的输出参数,用户进程可能需要使用这些参数进行一些初始化和判断:
sq_entries
是提交队列的大小。cq_entries
是完成队列的大小。features
描述当前内核版本支持的io_uring
特性。其中,IORING_FEAT_SINGLE_MMAP
是io_uring
一个比较重要的特性:内核支持通过一次mmap
完成SQ
和CQ
的内存映射,具体可以参考liburing
的io_uring_mmap
。sq_off
描述了SQ
的一些属性的offset
。cq_off
描述了CQ
的一些属性的offset
。
提交/收割 IO 请求
初始化完成之后,我们需要向 io_uring
提交 I/O
请求。默认情况下,使用 io_uring
提交 I/O 请求需要:
- 从
SQ Arrary
中找到一个空闲的SQE
。 - 根据具体的 I/O 请求设置这个
SQE
。 - 将 SQE 的数组索引放到
SQ
中。 - 调用系统调用
io_uring_enter
提交SQ
中的I/O
请求。
为了进一步提升性能,io_uring
提供了内核轮询的方式来提交 I/O
请求(设置 params.flags
的 IORING_SETUP_SQPOLL
位):创建一个内核线程(简称 SQ 线程)对 SQ 进行轮询(polling
),发现有未提交的 I/O 请求就自动进行提交:
- 如果
I/O
请求源源不断,SQ
线程会一直轮询并提交I/O
请求给内核,这个过程不需要经过系统调用。 - 如果
SQ
线程的空闲时间超过sq_thread_idle
毫秒,会自动睡眠,并设置io_sq_ring
的flags
(sq_ring_ptr + p.sq_off.flags
)的IORING_SQ_NEED_WAKEUP
位。 - 用户进程需要根据
flags
是否设置了IORING_SQ_NEED_WAKEUP
来决定是否需要调用io_uring_enter
来唤醒 SQ 线程
io_uring_register()
注册用于异步 I/O
的 文件或用户缓冲区
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);
注册文件或用户缓冲区,使内核能 长时间持有对该文件在内核内部的数据结构引用, 或创建 应用内存的长期映射, 这个操作只会在注册时执行一次,而不是每个 I/O 请求都会处理,因此减少了 per-I/O overhead
。
注册的缓冲区(buffer)性质
Registered buffers
将会 被锁定在内存中(be locked in memory),并 计入用户的 RLIMIT_MEMLOCK 资源限制。- 此外,每个 buffer 有 1GB 的大小限制。
- 当前,
buffers
必须是 匿名、非文件后端的内存(anonymous, non-file-backed memory),例如malloc
ormmap
with theMAP_ANONYMOUS
flag set
返回的内存。 Huge pages
也是支持的。整个huge page
都会被pin
到内核,即使只用到了其中一部分。- 已经注册的
buffer
无法调整大小,想调整只能先unregister
,再重新register
一个新的。
io_uring_enter()
int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t *sig);
这个系统调用用于初始化和完成(initiate and complete)I/O,使用共享的 SQ 和 CQ。 单次调用同时执行:
- 提交新的 I/O 请求
- 等待 I/O 完成
参数:
fd
是io_uring_setup()
返回的文件描述符;to_submit
指定了 SQ 中提交的 I/O 数量;- 依据不同模式:
- 默认模式,如果指定了
min_complete
,会等待这个数量的 I/O 事件完成再返回; - 如果
io_uring
是polling
模式,这个参数表示:- 0:要求内核返回当前以及完成的所有 events,无阻塞;
- 非零:如果有事件完成,内核仍然立即返回;如果没有完成事件,内核会 poll,等待指定的次数完成,或者这个进程的时间片用完。
- 默认模式,如果指定了
注意:对于 interrupt driven I/O,应用无需进入内核就能检查 CQ 的 event completions。
当这个系统调用返回时,表示一定数量的 SEQ 已经被消费和提交了,此时可以安全的重用队列中的 SEQ。 此时 IO 提交有可能还停留在异步上下文中,即实际上 SQE 可能还没有被提交 —— 不过 用户不用关心这些细节 —— 当随后内核需要使用某个特定的 SQE 时,它已经复制了一份。
高级特性
io_uring
提供了一些用于特殊场景的高级特性:
- File registration(文件注册):每次发起一个指定文件描述的操 作,内核都需要花费一些时钟周期(cycles)将文件描述符映射到内部表示。 对于那些针对同一文件进行重复操作的场景,
io_uring
支持提前注册这些文件,后面直接查找就行了。 - Buffer registration(缓冲区注册):与 file registration 类 似,direct I/O 场景中,内核需要 map/unmap memory areas。
io_uring
支持提前 注册这些缓冲区(buffers)。 - Poll ring(轮询环形缓冲区):对于非常快是设备,处理中断的开 销是比较大的。
io_uring
允许用户关闭中断,使用轮询模式。前面“三种工作模式”小节 也介绍到了这一点。 - Linked operations(链接操作):允许用户发送串联的请求。这两 个请求同时提交,但后面的会等前面的处理完才开始执行。
三 liburing 使用
io_uring_cqe/io_uring_sqe
结构
struct io_uring_cqe {
__u64 user_data; /* sqe->data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;
/*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
__u64 big_cqe[];
};
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};
#include <bits/stdc++.h>
#include <liburing.h>
#include <unistd.h>
char buf[1024] = {0};
int main() {
int fd = open("1.txt", O_RDONLY, 0);
io_uring ring;
io_uring_queue_init(32, &ring, 0); // 初始化
auto sqe = io_uring_get_sqe(&ring); // 从环中得到一块空位
io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 为这块空位准备好操作
io_uring_submit(&ring); // 提交任务
io_uring_cqe* res; // 完成队列指针
io_uring_wait_cqe(&ring, &res); // 阻塞等待一项完成的任务
assert(res);
std::cout << "read bytes: " << res->res << " \n";
std::cout << buf << std::endl;
io_uring_cqe_seen(&ring, res); // 将任务移出完成队列
io_uring_queue_exit(&ring); // 退出
return 0;
}
echo
服务器
所以,我们定义一下我们在任务之间传递的结构体
然后把它放到 user_data
中。
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
封装一下 accept/read/write
接口
class IOuring {
io_uring ring;
public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }
~IOuring() { io_uring_queue_exit(&ring); }
void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }
int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }
int submit() { return io_uring_submit(&ring); }
void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}
void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};
使用:
#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
const int BUFSIZE = 1024;
int main() {
/*init socket*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in sock_addr;
sock_addr.sin_port = htons(8000);
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
perror("");
listen(sock_fd, 10);
std::cout << "listen begin ..." << std::endl;
/*io_uring*/
IOuring ring(1024);
ring.accpet_asyn(sock_fd, new request);
ring.submit();
while (true) {
io_uring_cqe* cqe;
ring.wait(&cqe);
request* res = (request*)cqe->user_data;
switch (res->state) {
case request::ACCEPT:
if (cqe->res > 0) {
int client_fd = cqe->res;
ring.accpet_asyn(sock_fd, res);
ring.read_asyn(client_fd, new request);
ring.submit();
}
std::cout << cqe->res << std::endl;
break;
case request::READ:
if (cqe->res > 0) std::cout << res->buf << std::endl;
ring.write_asyn(res->fd, res);
ring.submit();
break;
case request::WRITE:
if (cqe->res > 0) {
close(res->fd);
delete res;
}
break;
default:
std::cout << "error " << std::endl;
break;
}
ring.seen(cqe);
}
return 0;
}
参考: