Skip to content

Commit ca0c7b9

Browse files
KimYannnclaude
andcommitted
fix: prevent deadlock in high thread counts and pwrite livelock (#678)
Two bugs caused hangs with large PE gz FASTQ files: 1. Lock-free list deadlock: canBeConsumed() required nextItemReady or producerFinished, but a single-item list has neither. When thread_count > PACK_IN_MEM_LIMIT, each worker gets ≤1 pack before reader backpressure — workers cannot consume their only pack, processed counter never advances, readers stay blocked. Fix: use produced > consumed as the consumability check. 2. Pwrite spin-wait livelock: hardware pause/yield instructions do not yield OS timeslice. Under CPU contention (Docker), spinning threads starve the predecessor thread that must publish its sequence. Fix: replace with std::condition_variable. Also fix per-thread compress buffer tracking (mCompBufSize was shared and never updated, causing repeated reallocation per call). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b138112 commit ca0c7b9

4 files changed

Lines changed: 31 additions & 16 deletions

File tree

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,9 @@ jobs:
6868
run: |
6969
./fastp --version
7070
./fastp -i testdata/R1.fq -o /dev/null
71+
72+
- name: upload binary
73+
uses: actions/upload-artifact@v4
74+
with:
75+
name: fastp-${{ runner.os }}-${{ runner.arch }}
76+
path: fastp

src/singleproducersingleconsumerlist.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class SingleProducerSingleConsumerList {
9292
inline bool canBeConsumed() {
9393
if(head == NULL)
9494
return false;
95-
return head->nextItemReady || producerFinished;
95+
return produced > consumed;
9696
}
9797
inline void produce(T val) {
9898
LockFreeListItem<T>* item = makeItem(val);

src/writerthread.cpp

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
1919
mNextSeq = NULL;
2020
mCompressors = NULL;
2121
mCompBufs = NULL;
22-
mCompBufSize = 0;
22+
mCompBufSizes = NULL;
2323
mBufferLists = NULL;
2424

2525
if (mPwriteMode) {
@@ -34,10 +34,13 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
3434
for (int t = 0; t < mOptions->thread; t++)
3535
mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression);
3636
// Pre-allocate per-worker compress buffers (avoids malloc/free per pack)
37-
mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case
37+
size_t initBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case
3838
mCompBufs = new char*[mOptions->thread];
39-
for (int t = 0; t < mOptions->thread; t++)
40-
mCompBufs[t] = new char[mCompBufSize];
39+
mCompBufSizes = new size_t[mOptions->thread];
40+
for (int t = 0; t < mOptions->thread; t++) {
41+
mCompBufs[t] = new char[initBufSize];
42+
mCompBufSizes[t] = initBufSize;
43+
}
4144
mWorkingBufferList = 0;
4245
mBufferLength = 0;
4346
} else {
@@ -114,11 +117,11 @@ void WriterThread::input(int tid, string* data) {
114117

115118
void WriterThread::inputPwrite(int tid, string* data) {
116119
size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size());
117-
// Grow pre-allocated buffer if needed
118-
if (bound > mCompBufSize) {
120+
// Grow per-worker buffer if needed
121+
if (bound > mCompBufSizes[tid]) {
119122
delete[] mCompBufs[tid];
120123
mCompBufs[tid] = new char[bound];
121-
// Note: mCompBufSize is shared but only grows, safe for other threads
124+
mCompBufSizes[tid] = bound;
122125
}
123126
size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(),
124127
mCompBufs[tid], bound);
@@ -130,16 +133,17 @@ void WriterThread::inputPwrite(int tid, string* data) {
130133

131134
size_t seq = mNextSeq[tid];
132135

133-
// Wait for previous batch's cumulative offset
136+
// Wait for previous batch's cumulative offset.
137+
// Uses condition variable to avoid priority-inversion livelock when
138+
// worker threads outnumber available CPUs (e.g. Docker containers).
134139
size_t offset = 0;
135140
if (seq > 0) {
136141
size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1);
137-
while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) {
138-
#if defined(__aarch64__)
139-
__asm__ volatile("yield");
140-
#elif defined(__x86_64__) || defined(__i386__)
141-
__asm__ volatile("pause");
142-
#endif
142+
if (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) {
143+
std::unique_lock<std::mutex> lk(mSeqMtx);
144+
mSeqCv.wait(lk, [&]() {
145+
return mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) == seq - 1;
146+
});
143147
}
144148
offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed);
145149
}
@@ -148,6 +152,7 @@ void WriterThread::inputPwrite(int tid, string* data) {
148152
size_t mySlot = seq & (OFFSET_RING_SIZE - 1);
149153
mOffsetRing[mySlot].cumulative_offset.store(offset + wsize, std::memory_order_relaxed);
150154
mOffsetRing[mySlot].published_seq.store(seq, std::memory_order_release);
155+
mSeqCv.notify_all();
151156

152157
// pwrite (concurrent with other workers on non-overlapping regions)
153158
if (wsize > 0) {
@@ -182,6 +187,7 @@ void WriterThread::cleanup() {
182187
delete[] mCompBufs[t];
183188
delete[] mCompBufs; mCompBufs = NULL;
184189
}
190+
delete[] mCompBufSizes; mCompBufSizes = NULL;
185191
return;
186192
}
187193
deleteWriter();

src/writerthread.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "options.h"
1010
#include <atomic>
1111
#include <mutex>
12+
#include <condition_variable>
1213
#include <libdeflate.h>
1314
#include "singleproducersingleconsumerlist.h"
1415

@@ -62,7 +63,9 @@ class WriterThread{
6263
size_t* mNextSeq;
6364
libdeflate_compressor** mCompressors;
6465
char** mCompBufs; // per-worker pre-allocated compress output buffers
65-
size_t mCompBufSize;
66+
size_t* mCompBufSizes; // per-worker buffer sizes
67+
std::mutex mSeqMtx;
68+
std::condition_variable mSeqCv;
6669
};
6770

6871
#endif

0 commit comments

Comments
 (0)