22#include " util.h"
33#include < memory.h>
44#include < unistd.h>
5+ #include < fcntl.h>
6+ #include < cerrno>
7+ #include < cstring>
8+ #include < thread>
59
610WriterThread::WriterThread (Options* opt, string filename, bool isSTDOUT){
711 mOptions = opt;
8-
912 mWriter1 = NULL ;
10-
1113 mInputCompleted = false ;
1214 mFilename = filename;
1315
14- initWriter (filename, isSTDOUT);
15- initBufferLists ();
16- mWorkingBufferList = 0 ; // 0 ~ mOptions->thread-1
17- mBufferLength = 0 ;
16+ // Detect gz output for parallel compression
17+ mPreCompressed = !isSTDOUT && ends_with (filename, " .gz" );
18+
19+ mCompressionLevel = mOptions ->compression ;
20+ mCompressors = NULL ;
21+
22+ // pwrite mode: workers compress + pwrite in parallel (gz output, multi-threaded)
23+ mPwriteMode = mPreCompressed && !isSTDOUT && mOptions ->thread > 1 ;
24+ mFd = -1 ;
25+ mOffsetRing = NULL ;
26+ mNextSeq = NULL ;
27+ mBufferLists = NULL ;
28+
29+ if (mPwriteMode ) {
30+ mFd = open (mFilename .c_str (), O_WRONLY | O_CREAT | O_TRUNC , 0644 );
31+ if (mFd < 0 )
32+ error_exit (" Failed to open for pwrite: " + mFilename );
33+ mOffsetRing = new OffsetSlot[OFFSET_RING_SIZE ];
34+ mNextSeq = new size_t [mOptions ->thread ];
35+ for (int t = 0 ; t < mOptions ->thread ; t++)
36+ mNextSeq [t] = t;
37+ mAccumBuf = new string[mOptions ->thread ];
38+ mCompressors = new libdeflate_compressor*[mOptions ->thread ];
39+ for (int t = 0 ; t < mOptions ->thread ; t++)
40+ mCompressors [t] = libdeflate_alloc_compressor (mCompressionLevel );
41+ mWorkingBufferList = 0 ;
42+ mBufferLength = 0 ;
43+ } else {
44+ initWriter (filename, isSTDOUT);
45+ initBufferLists ();
46+ mWorkingBufferList = 0 ;
47+ mBufferLength = 0 ;
48+ }
1849}
1950
2051WriterThread::~WriterThread () {
2152 cleanup ();
2253}
2354
24- bool WriterThread::isCompleted ()
55+ bool WriterThread::isCompleted ()
2556{
57+ if (mPwriteMode ) return true ; // no writer thread needed
2658 return mInputCompleted && (mBufferLength ==0 );
2759}
2860
2961bool WriterThread::setInputCompleted () {
62+ if (mPwriteMode ) {
63+ setInputCompletedPwrite ();
64+ mInputCompleted = true ;
65+ return true ;
66+ }
3067 mInputCompleted = true ;
3168 for (int t=0 ; t<mOptions ->thread ; t++) {
3269 mBufferLists [t]->setProducerFinished ();
3370 }
3471 return true ;
3572}
3673
74+ void WriterThread::setInputCompletedPwrite () {
75+ // Flush remaining accumulated data for all workers
76+ for (int t = 0 ; t < mOptions ->thread ; t++)
77+ flushPwriteBatch (t);
78+
79+ int W = mOptions ->thread ;
80+ size_t lastSeq = 0 ;
81+ bool anyProcessed = false ;
82+ for (int t = 0 ; t < W; t++) {
83+ if (mNextSeq [t] != (size_t )t) {
84+ size_t workerLastSeq = mNextSeq [t] - W;
85+ if (!anyProcessed || workerLastSeq > lastSeq) {
86+ lastSeq = workerLastSeq;
87+ anyProcessed = true ;
88+ }
89+ }
90+ }
91+ size_t offset = anyProcessed ?
92+ mOffsetRing [lastSeq & (OFFSET_RING_SIZE - 1 )].cumulative_offset .load (std::memory_order_relaxed) : 0 ;
93+ ftruncate (mFd , offset);
94+ }
95+
3796void WriterThread::output (){
38- SingleProducerSingleConsumerList<string*>* list = mBufferLists [mWorkingBufferList ];
97+ if (mPwriteMode ) return ; // no-op
98+ SingleProducerSingleConsumerList<string*>* list = mBufferLists [mWorkingBufferList ];
3999 if (!list->canBeConsumed ()) {
40100 usleep (100 );
41101 } else {
@@ -47,19 +107,107 @@ void WriterThread::output(){
47107 }
48108}
49109
50-
51110void WriterThread::input (int tid, string* data) {
111+ if (mPwriteMode ) {
112+ inputPwrite (tid, data);
113+ return ;
114+ }
52115 mBufferLists [tid]->produce (data);
53116 mBufferLength ++;
54117}
55118
119+ // 256KB batch: FASTQ's LZ77 window saturates at ~256KB (measured: 41.97% vs
120+ // 41.67% single-stream). Beyond 256KB, compression ratio gains are < 0.15%.
121+ static const size_t PWRITE_BATCH_SIZE = 256 * 1024 ;
122+
123+ void WriterThread::inputPwrite (int tid, string* data) {
124+ if (mPreCompressed ) {
125+ // Accumulate raw data, compress as a larger batch for better ratio
126+ mAccumBuf [tid].append (*data);
127+ delete data;
128+ if (mAccumBuf [tid].size () >= PWRITE_BATCH_SIZE )
129+ flushPwriteBatch (tid);
130+ } else {
131+ // Uncompressed: write directly (shouldn't reach here, pwrite only for gz)
132+ delete data;
133+ }
134+ }
135+
136+ void WriterThread::flushPwriteBatch (int tid) {
137+ if (mAccumBuf [tid].empty ()) return ;
138+
139+ // Compress with per-worker libdeflate compressor (same algorithm as master)
140+ const string& raw = mAccumBuf [tid];
141+ size_t bound = libdeflate_gzip_compress_bound (mCompressors [tid], raw.size ());
142+ string writeData;
143+ writeData.resize (bound);
144+ size_t outsize = libdeflate_gzip_compress (mCompressors [tid], raw.data (), raw.size (),
145+ &writeData[0 ], bound);
146+ if (outsize == 0 )
147+ error_exit (" libdeflate gzip compression failed" );
148+ writeData.resize (outsize);
149+ mAccumBuf [tid].clear ();
150+
151+ size_t seq = mNextSeq [tid];
152+
153+ // Wait for previous batch's cumulative offset
154+ size_t offset = 0 ;
155+ if (seq > 0 ) {
156+ size_t prevSlot = (seq - 1 ) & (OFFSET_RING_SIZE - 1 );
157+ while (mOffsetRing [prevSlot].published_seq .load (std::memory_order_acquire) != seq - 1 ) {
158+ #if defined(__aarch64__)
159+ __asm__ volatile (" yield" );
160+ #elif defined(__x86_64__) || defined(__i386__)
161+ __asm__ volatile (" pause" );
162+ #endif
163+ }
164+ offset = mOffsetRing [prevSlot].cumulative_offset .load (std::memory_order_relaxed);
165+ }
166+
167+ // Publish offset BEFORE pwrite
168+ size_t wsize = writeData.size ();
169+ size_t mySlot = seq & (OFFSET_RING_SIZE - 1 );
170+ mOffsetRing [mySlot].cumulative_offset .store (offset + wsize, std::memory_order_relaxed);
171+ mOffsetRing [mySlot].published_seq .store (seq, std::memory_order_release);
172+
173+ // pwrite
174+ if (wsize > 0 ) {
175+ size_t written = 0 ;
176+ while (written < wsize) {
177+ ssize_t ret = pwrite (mFd , writeData.data () + written, wsize - written, offset + written);
178+ if (ret < 0 ) {
179+ if (errno == EINTR ) continue ;
180+ error_exit (" pwrite failed: " + string (strerror (errno)));
181+ }
182+ if (ret == 0 )
183+ error_exit (" pwrite returned 0 (disk full?)" );
184+ written += ret;
185+ }
186+ }
187+
188+ mNextSeq [tid] += mOptions ->thread ;
189+ }
190+
56191void WriterThread::cleanup () {
192+ if (mPwriteMode ) {
193+ if (mFd >= 0 ) { close (mFd ); mFd = -1 ; }
194+ delete[] mOffsetRing ; mOffsetRing = NULL ;
195+ delete[] mNextSeq ; mNextSeq = NULL ;
196+ delete[] mAccumBuf ; mAccumBuf = NULL ;
197+ if (mCompressors ) {
198+ for (int t = 0 ; t < mOptions ->thread ; t++)
199+ libdeflate_free_compressor (mCompressors [t]);
200+ delete[] mCompressors ; mCompressors = NULL ;
201+ }
202+ return ;
203+ }
57204 deleteWriter ();
58- for (int t=0 ; t<mOptions ->thread ; t++) {
59- delete mBufferLists [t];
205+ if (mBufferLists ) {
206+ for (int t=0 ; t<mOptions ->thread ; t++)
207+ delete mBufferLists [t];
208+ delete[] mBufferLists ;
209+ mBufferLists = NULL ;
60210 }
61- delete[] mBufferLists ;
62- mBufferLists = NULL ;
63211}
64212
65213void WriterThread::deleteWriter () {
0 commit comments