Skip to content

Commit a6c4d9c

Browse files
KimYannnclaude
andcommitted
fix: prevent deadlock in high thread counts and pwrite livelock (#678)
Two bugs caused hangs with large PE gz FASTQ files: 1. Lock-free list deadlock: canBeConsumed() required nextItemReady or producerFinished, but a single-item list has neither. When thread_count > PACK_IN_MEM_LIMIT, each worker gets ≤1 pack before reader backpressure — workers cannot consume their only pack, processed counter never advances, readers stay blocked. Fix: use produced > consumed as the consumability check. 2. Pwrite spin-wait livelock: hardware pause/yield instructions do not yield OS timeslice. Under CPU contention (Docker), spinning threads starve the predecessor thread that must publish its sequence. Fix: replace with std::condition_variable. Also fix per-thread compress buffer tracking (mCompBufSize was shared and never updated, causing repeated reallocation per call). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b138112 commit a6c4d9c

4 files changed

Lines changed: 24 additions & 14 deletions

File tree

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,9 @@ jobs:
6868
run: |
6969
./fastp --version
7070
./fastp -i testdata/R1.fq -o /dev/null
71+
72+
- name: upload binary
73+
uses: actions/upload-artifact@v4
74+
with:
75+
name: fastp-${{ runner.os }}-${{ runner.arch }}
76+
path: fastp

src/singleproducersingleconsumerlist.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ class SingleProducerSingleConsumerList {
9999
if(head==NULL) {
100100
head = item;
101101
tail = item;
102+
// Signal the first item is consumable (no predecessor to set this)
103+
head->nextItemReady.store(true, std::memory_order_release);
102104
} else {
103105
tail->nextItem = item;
104106
tail->nextItemReady = true;

src/writerthread.cpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <cerrno>
77
#include <cstring>
88
#include <thread>
9+
#include <chrono>
910

1011
WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
1112
mOptions = opt;
@@ -19,7 +20,7 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
1920
mNextSeq = NULL;
2021
mCompressors = NULL;
2122
mCompBufs = NULL;
22-
mCompBufSize = 0;
23+
mCompBufSizes = NULL;
2324
mBufferLists = NULL;
2425

2526
if (mPwriteMode) {
@@ -34,10 +35,13 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
3435
for (int t = 0; t < mOptions->thread; t++)
3536
mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression);
3637
// Pre-allocate per-worker compress buffers (avoids malloc/free per pack)
37-
mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case
38+
size_t initBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case
3839
mCompBufs = new char*[mOptions->thread];
39-
for (int t = 0; t < mOptions->thread; t++)
40-
mCompBufs[t] = new char[mCompBufSize];
40+
mCompBufSizes = new size_t[mOptions->thread];
41+
for (int t = 0; t < mOptions->thread; t++) {
42+
mCompBufs[t] = new char[initBufSize];
43+
mCompBufSizes[t] = initBufSize;
44+
}
4145
mWorkingBufferList = 0;
4246
mBufferLength = 0;
4347
} else {
@@ -114,11 +118,11 @@ void WriterThread::input(int tid, string* data) {
114118

115119
void WriterThread::inputPwrite(int tid, string* data) {
116120
size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size());
117-
// Grow pre-allocated buffer if needed
118-
if (bound > mCompBufSize) {
121+
// Grow per-worker buffer if needed
122+
if (bound > mCompBufSizes[tid]) {
119123
delete[] mCompBufs[tid];
120124
mCompBufs[tid] = new char[bound];
121-
// Note: mCompBufSize is shared but only grows, safe for other threads
125+
mCompBufSizes[tid] = bound;
122126
}
123127
size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(),
124128
mCompBufs[tid], bound);
@@ -130,16 +134,13 @@ void WriterThread::inputPwrite(int tid, string* data) {
130134

131135
size_t seq = mNextSeq[tid];
132136

133-
// Wait for previous batch's cumulative offset
137+
// Wait for previous batch's cumulative offset.
138+
// Sleep yields CPU to prevent livelock under contention.
134139
size_t offset = 0;
135140
if (seq > 0) {
136141
size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1);
137142
while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) {
138-
#if defined(__aarch64__)
139-
__asm__ volatile("yield");
140-
#elif defined(__x86_64__) || defined(__i386__)
141-
__asm__ volatile("pause");
142-
#endif
143+
std::this_thread::sleep_for(std::chrono::microseconds(1));
143144
}
144145
offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed);
145146
}
@@ -182,6 +183,7 @@ void WriterThread::cleanup() {
182183
delete[] mCompBufs[t];
183184
delete[] mCompBufs; mCompBufs = NULL;
184185
}
186+
delete[] mCompBufSizes; mCompBufSizes = NULL;
185187
return;
186188
}
187189
deleteWriter();

src/writerthread.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class WriterThread{
6262
size_t* mNextSeq;
6363
libdeflate_compressor** mCompressors;
6464
char** mCompBufs; // per-worker pre-allocated compress output buffers
65-
size_t mCompBufSize;
65+
size_t* mCompBufSizes; // per-worker buffer sizes
6666
};
6767

6868
#endif

0 commit comments

Comments
 (0)