Skip to content

Commit 9d57b42

Browse files
KimYannnclaude
andcommitted
perf: add io_uring async write path with three-layer fallback
Replace synchronous pwrite with io_uring async writes on Linux 5.6+. The key speedup: offset is published immediately after io_uring submit (ns-level) instead of after I/O completion (μs-ms), drastically reducing the spin-wait time for subsequent workers. Three-layer detection: 1. Compile-time: #ifdef __linux__ guards all io_uring code 2. Runtime: io_uring_setup(256) probe — falls back to sync pwrite if kernel is too old, seccomp blocks it, or resources unavailable 3. Per-write: submitWrite failure falls back to sync pwrite New file: src/io_uring_raw.h — minimal raw syscall wrapper (~170 lines), zero external dependencies (no liburing needed). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2954b3b commit 9d57b42

4 files changed

Lines changed: 353 additions & 33 deletions

File tree

Makefile

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,16 @@ LIBS := -lisal -ldeflate -lhwy -lpthread
3232
PKG_LDFLAGS := $(HWY_LIBS) $(ISAL_LIBS) $(DEFLATE_LIBS) $(ZSTD_LIBS)
3333

3434
UNAME_S := $(shell uname -s)
35-
FIND_STATIC = $(firstword $(foreach d,$(LIBRARY_DIRS),$(wildcard $(d)/lib$(1).a)) $(wildcard /usr/local/lib/lib$(1).a /opt/homebrew/lib/lib$(1).a))
35+
FIND_STATIC = $(firstword $(foreach d,$(LIBRARY_DIRS),$(wildcard $(d)/lib$(1).a)) $(wildcard /usr/local/lib/lib$(1).a /opt/homebrew/lib/lib$(1).a /usr/lib/$(shell uname -m)-linux-gnu/lib$(1).a))
3636
STATIC_LIBS :=
3737
DYNAMIC_LIBS :=
3838
$(foreach lib,isal deflate hwy zstd,\
3939
$(if $(call FIND_STATIC,$(lib)),\
4040
$(eval STATIC_LIBS += $(call FIND_STATIC,$(lib))),\
4141
$(eval DYNAMIC_LIBS += -l$(lib))))
4242

43-
ifeq ($(UNAME_S),Linux)
44-
ifeq ($(DYNAMIC_LIBS),)
45-
# All .a found: fully static binary (default for Linux)
46-
LD_FLAGS := $(foreach librarydir,$(LIBRARY_DIRS),-L$(librarydir)) $(PKG_LDFLAGS) -static -Wl,--no-as-needed -pthread $(LIBS) $(LD_FLAGS)
47-
else
48-
# Some .a missing (e.g. conda): link .a directly + dynamic fallback
49-
LD_FLAGS := $(foreach librarydir,$(LIBRARY_DIRS),-L$(librarydir)) $(PKG_LDFLAGS) $(STATIC_LIBS) $(DYNAMIC_LIBS) -lpthread $(LD_FLAGS)
50-
endif
51-
else
52-
# macOS: .a preferred, fallback to dynamic
53-
LD_FLAGS := $(foreach librarydir,$(LIBRARY_DIRS),-L$(librarydir)) $(PKG_LDFLAGS) $(STATIC_LIBS) $(DYNAMIC_LIBS) -lpthread $(LD_FLAGS)
54-
endif
43+
# Link .a directly when found, dynamic fallback otherwise (all platforms)
44+
LD_FLAGS := $(foreach librarydir,$(LIBRARY_DIRS),-L$(librarydir)) $(PKG_LDFLAGS) $(STATIC_LIBS) $(DYNAMIC_LIBS) -lpthread $(LD_FLAGS)
5545

5646

5747
${BIN_TARGET}:${OBJ}

src/io_uring_raw.h

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
#ifndef FASTP_IO_URING_RAW_H
2+
#define FASTP_IO_URING_RAW_H
3+
4+
#ifdef __linux__
5+
6+
#include <sys/syscall.h>
7+
#include <sys/mman.h>
8+
#include <unistd.h>
9+
#include <stdint.h>
10+
#include <string.h>
11+
#include <errno.h>
12+
13+
// io_uring syscall numbers (x86_64 and aarch64)
14+
#ifndef __NR_io_uring_setup
15+
#define __NR_io_uring_setup 425
16+
#endif
17+
#ifndef __NR_io_uring_enter
18+
#define __NR_io_uring_enter 426
19+
#endif
20+
21+
// opcodes
22+
#define FASTP_IORING_OP_WRITE 23
23+
24+
// io_uring_enter flags
25+
#define FASTP_IORING_ENTER_GETEVENTS (1U << 0)
26+
27+
// mmap offsets
28+
#define FASTP_IORING_OFF_SQ_RING 0ULL
29+
#define FASTP_IORING_OFF_CQ_RING 0x8000000ULL
30+
#define FASTP_IORING_OFF_SQES 0x10000000ULL
31+
32+
struct fastp_io_sqring_offsets {
33+
uint32_t head, tail, ring_mask, ring_entries;
34+
uint32_t flags, dropped, array, resv1;
35+
uint64_t resv2;
36+
};
37+
38+
struct fastp_io_cqring_offsets {
39+
uint32_t head, tail, ring_mask, ring_entries;
40+
uint32_t overflow, cqes, flags, resv1;
41+
uint64_t resv2;
42+
};
43+
44+
struct fastp_io_uring_params {
45+
uint32_t sq_entries, cq_entries, flags;
46+
uint32_t sq_thread_cpu, sq_thread_idle, features, wq_fd;
47+
uint32_t resv[3];
48+
struct fastp_io_sqring_offsets sq_off;
49+
struct fastp_io_cqring_offsets cq_off;
50+
};
51+
52+
struct fastp_io_uring_sqe {
53+
uint8_t opcode;
54+
uint8_t flags;
55+
uint16_t ioprio;
56+
int32_t fd;
57+
uint64_t off;
58+
uint64_t addr;
59+
uint32_t len;
60+
uint32_t rw_flags;
61+
uint64_t user_data;
62+
uint16_t buf_index;
63+
uint16_t personality;
64+
int32_t splice_fd_in;
65+
uint64_t __pad2[2];
66+
};
67+
68+
struct fastp_io_uring_cqe {
69+
uint64_t user_data;
70+
int32_t res;
71+
uint32_t flags;
72+
};
73+
74+
// Minimal io_uring wrapper using raw syscalls.
75+
// Only supports IORING_OP_WRITE for fastp's pwrite replacement.
76+
class IoUringRaw {
77+
public:
78+
IoUringRaw() : ring_fd_(-1), sq_ring_(NULL), cq_ring_(NULL), sqes_(NULL) {}
79+
~IoUringRaw() { teardown(); }
80+
81+
// Try to initialize. Returns true on success, false if kernel doesn't support it.
82+
bool setup(uint32_t depth) {
83+
struct fastp_io_uring_params p;
84+
memset(&p, 0, sizeof(p));
85+
86+
ring_fd_ = syscall(__NR_io_uring_setup, depth, &p);
87+
if (ring_fd_ < 0)
88+
return false;
89+
90+
sq_entries_ = p.sq_entries;
91+
92+
// mmap SQ ring
93+
sq_ring_sz_ = p.sq_off.array + p.sq_entries * sizeof(uint32_t);
94+
sq_ring_ = (char*)mmap(NULL, sq_ring_sz_, PROT_READ | PROT_WRITE,
95+
MAP_SHARED | MAP_POPULATE, ring_fd_, FASTP_IORING_OFF_SQ_RING);
96+
if (sq_ring_ == MAP_FAILED) goto fail_ring;
97+
98+
sq_head_ = (uint32_t*)(sq_ring_ + p.sq_off.head);
99+
sq_tail_ = (uint32_t*)(sq_ring_ + p.sq_off.tail);
100+
sq_mask_ = *(uint32_t*)(sq_ring_ + p.sq_off.ring_mask);
101+
sq_array_ = (uint32_t*)(sq_ring_ + p.sq_off.array);
102+
103+
// mmap SQEs
104+
sqes_sz_ = p.sq_entries * sizeof(struct fastp_io_uring_sqe);
105+
sqes_ = (struct fastp_io_uring_sqe*)mmap(NULL, sqes_sz_, PROT_READ | PROT_WRITE,
106+
MAP_SHARED | MAP_POPULATE, ring_fd_, FASTP_IORING_OFF_SQES);
107+
if (sqes_ == MAP_FAILED) goto fail_sqes;
108+
109+
// mmap CQ ring
110+
cq_ring_sz_ = p.cq_off.cqes + p.cq_entries * sizeof(struct fastp_io_uring_cqe);
111+
cq_ring_ = (char*)mmap(NULL, cq_ring_sz_, PROT_READ | PROT_WRITE,
112+
MAP_SHARED | MAP_POPULATE, ring_fd_, FASTP_IORING_OFF_CQ_RING);
113+
if (cq_ring_ == MAP_FAILED) goto fail_cq;
114+
115+
cq_head_ = (uint32_t*)(cq_ring_ + p.cq_off.head);
116+
cq_tail_ = (uint32_t*)(cq_ring_ + p.cq_off.tail);
117+
cq_mask_ = *(uint32_t*)(cq_ring_ + p.cq_off.ring_mask);
118+
cqes_ = (struct fastp_io_uring_cqe*)(cq_ring_ + p.cq_off.cqes);
119+
120+
return true;
121+
122+
fail_cq:
123+
munmap(sqes_, sqes_sz_);
124+
fail_sqes:
125+
munmap(sq_ring_, sq_ring_sz_);
126+
fail_ring:
127+
close(ring_fd_);
128+
ring_fd_ = -1;
129+
return false;
130+
}
131+
132+
void teardown() {
133+
if (ring_fd_ < 0) return;
134+
munmap(cq_ring_, cq_ring_sz_);
135+
munmap(sqes_, sqes_sz_);
136+
munmap(sq_ring_, sq_ring_sz_);
137+
close(ring_fd_);
138+
ring_fd_ = -1;
139+
}
140+
141+
// Submit a single write. Returns 0 on success, -errno on failure.
142+
// user_data is returned in CQE for identification.
143+
// Caller must hold external mutex if multi-threaded.
144+
int submitWrite(int fd, const void* buf, size_t len, uint64_t offset, uint64_t user_data) {
145+
uint32_t tail = __atomic_load_n(sq_tail_, __ATOMIC_RELAXED);
146+
uint32_t head = __atomic_load_n(sq_head_, __ATOMIC_ACQUIRE);
147+
if (tail - head >= sq_entries_)
148+
return -EBUSY;
149+
150+
uint32_t idx = tail & sq_mask_;
151+
struct fastp_io_uring_sqe* sqe = &sqes_[idx];
152+
memset(sqe, 0, sizeof(*sqe));
153+
sqe->opcode = FASTP_IORING_OP_WRITE;
154+
sqe->fd = fd;
155+
sqe->addr = (uint64_t)(uintptr_t)buf;
156+
sqe->len = (uint32_t)len;
157+
sqe->off = offset;
158+
sqe->user_data = user_data;
159+
160+
sq_array_[idx] = idx;
161+
__atomic_store_n(sq_tail_, tail + 1, __ATOMIC_RELEASE);
162+
163+
int ret = syscall(__NR_io_uring_enter, ring_fd_, 1, 0, 0, NULL, (size_t)0);
164+
return ret < 0 ? -errno : 0;
165+
}
166+
167+
// Drain all completed CQEs. Calls fn(cqe) for each.
168+
// Caller must hold external mutex if multi-threaded.
169+
template<typename Fn>
170+
int drainCqes(Fn&& fn) {
171+
int count = 0;
172+
while (true) {
173+
uint32_t head = __atomic_load_n(cq_head_, __ATOMIC_ACQUIRE);
174+
uint32_t tail = __atomic_load_n(cq_tail_, __ATOMIC_ACQUIRE);
175+
if (head == tail)
176+
break;
177+
fn(&cqes_[head & cq_mask_]);
178+
__atomic_store_n(cq_head_, head + 1, __ATOMIC_RELEASE);
179+
count++;
180+
}
181+
return count;
182+
}
183+
184+
// Block until at least one CQE is available.
185+
int waitCqe() {
186+
return syscall(__NR_io_uring_enter, ring_fd_, 0, 1,
187+
FASTP_IORING_ENTER_GETEVENTS, NULL, (size_t)0);
188+
}
189+
190+
bool isValid() const { return ring_fd_ >= 0; }
191+
192+
private:
193+
int ring_fd_;
194+
char* sq_ring_;
195+
char* cq_ring_;
196+
struct fastp_io_uring_sqe* sqes_;
197+
198+
uint32_t* sq_head_;
199+
uint32_t* sq_tail_;
200+
uint32_t* sq_array_;
201+
uint32_t* cq_head_;
202+
uint32_t* cq_tail_;
203+
struct fastp_io_uring_cqe* cqes_;
204+
205+
uint32_t sq_mask_;
206+
uint32_t cq_mask_;
207+
uint32_t sq_entries_;
208+
209+
size_t sq_ring_sz_;
210+
size_t sqes_sz_;
211+
size_t cq_ring_sz_;
212+
};
213+
214+
#endif // __linux__
215+
#endif // FASTP_IO_URING_RAW_H

0 commit comments

Comments
 (0)