55#include < fcntl.h>
66#include < cerrno>
77#include < cstring>
8- #include < thread>
98
109WriterThread::WriterThread (Options* opt, string filename, bool isSTDOUT){
1110 mOptions = opt;
@@ -15,29 +14,32 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){
1514
1615 mPwriteMode = !isSTDOUT && ends_with (filename, " .gz" ) && mOptions ->thread > 1 ;
1716 mFd = -1 ;
18- mOffsetRing = NULL ;
17+ mRing = NULL ;
1918 mNextSeq = NULL ;
19+ mCumulativeOffset = 0 ;
2020 mCompressors = NULL ;
2121 mCompBufs = NULL ;
22- mCompBufSize = 0 ;
22+ mCompBufSizes = NULL ;
2323 mBufferLists = NULL ;
2424
2525 if (mPwriteMode ) {
2626 mFd = open (mFilename .c_str (), O_WRONLY | O_CREAT | O_TRUNC, 0644 );
2727 if (mFd < 0 )
2828 error_exit (" Failed to open for pwrite: " + mFilename );
29- mOffsetRing = new OffsetSlot[OFFSET_RING_SIZE ];
29+ mRing = new PwriteSlot[PWRITE_RING_SIZE ];
3030 mNextSeq = new size_t [mOptions ->thread ];
3131 for (int t = 0 ; t < mOptions ->thread ; t++)
3232 mNextSeq [t] = t;
3333 mCompressors = new libdeflate_compressor*[mOptions ->thread ];
3434 for (int t = 0 ; t < mOptions ->thread ; t++)
3535 mCompressors [t] = libdeflate_alloc_compressor (mOptions ->compression );
36- // Pre-allocate per-worker compress buffers (avoids malloc/free per pack)
37- mCompBufSize = PACK_SIZE * 500 ; // ~500 bytes/read worst case
36+ size_t initBufSize = PACK_SIZE * 500 ;
3837 mCompBufs = new char *[mOptions ->thread ];
39- for (int t = 0 ; t < mOptions ->thread ; t++)
40- mCompBufs [t] = new char [mCompBufSize ];
38+ mCompBufSizes = new size_t [mOptions ->thread ];
39+ for (int t = 0 ; t < mOptions ->thread ; t++) {
40+ mCompBufs [t] = new char [initBufSize];
41+ mCompBufSizes [t] = initBufSize;
42+ }
4143 mWorkingBufferList = 0 ;
4244 mBufferLength = 0 ;
4345 } else {
@@ -54,7 +56,7 @@ WriterThread::~WriterThread() {
5456
5557bool WriterThread::isCompleted ()
5658{
57- if (mPwriteMode ) return true ; // no writer thread needed
59+ if (mPwriteMode ) return true ;
5860 return mInputCompleted && (mBufferLength ==0 );
5961}
6062
@@ -72,25 +74,13 @@ bool WriterThread::setInputCompleted() {
7274}
7375
7476void WriterThread::setInputCompletedPwrite () {
75- int W = mOptions ->thread ;
76- size_t lastSeq = 0 ;
77- bool anyProcessed = false ;
78- for (int t = 0 ; t < W; t++) {
79- if (mNextSeq [t] != (size_t )t) {
80- size_t workerLastSeq = mNextSeq [t] - W;
81- if (!anyProcessed || workerLastSeq > lastSeq) {
82- lastSeq = workerLastSeq;
83- anyProcessed = true ;
84- }
85- }
86- }
87- size_t offset = anyProcessed ?
88- mOffsetRing [lastSeq & (OFFSET_RING_SIZE - 1 )].cumulative_offset .load (std::memory_order_relaxed) : 0 ;
89- ftruncate (mFd , offset);
77+ // Flush all remaining slots
78+ flushReady ();
79+ ftruncate (mFd , mCumulativeOffset );
9080}
9181
9282void WriterThread::output (){
93- if (mPwriteMode ) return ; // no-op
83+ if (mPwriteMode ) return ;
9484 SingleProducerSingleConsumerList<string*>* list = mBufferLists [mWorkingBufferList ];
9585 if (!list->canBeConsumed ()) {
9686 usleep (100 );
@@ -114,46 +104,48 @@ void WriterThread::input(int tid, string* data) {
114104
115105void WriterThread::inputPwrite (int tid, string* data) {
116106 size_t bound = libdeflate_gzip_compress_bound (mCompressors [tid], data->size ());
117- // Grow pre-allocated buffer if needed
118- if (bound > mCompBufSize ) {
107+ if (bound > mCompBufSizes [tid]) {
119108 delete[] mCompBufs [tid];
120109 mCompBufs [tid] = new char [bound];
121- // Note: mCompBufSize is shared but only grows, safe for other threads
110+ mCompBufSizes [tid] = bound;
122111 }
123112 size_t outsize = libdeflate_gzip_compress (mCompressors [tid], data->data (), data->size (),
124113 mCompBufs [tid], bound);
125114 if (outsize == 0 )
126115 error_exit (" libdeflate gzip compression failed" );
127116 delete data;
128- const char * writeData = mCompBufs [tid];
129- size_t wsize = outsize;
130117
131118 size_t seq = mNextSeq [tid];
119+ size_t slot = seq & (PWRITE_RING_SIZE - 1 );
132120
133- // Wait for previous batch's cumulative offset
134- size_t offset = 0 ;
135- if (seq > 0 ) {
136- size_t prevSlot = (seq - 1 ) & (OFFSET_RING_SIZE - 1 );
137- while (mOffsetRing [prevSlot].published_seq .load (std::memory_order_acquire) != seq - 1 ) {
138- #if defined(__aarch64__)
139- __asm__ volatile (" yield" );
140- #elif defined(__x86_64__) || defined(__i386__)
141- __asm__ volatile (" pause" );
142- #endif
143- }
144- offset = mOffsetRing [prevSlot].cumulative_offset .load (std::memory_order_relaxed);
121+ // Wait if slot not yet free (ring backpressure from previous round)
122+ while (mRing [slot].state .load (std::memory_order_acquire) != 0 ) {
123+ usleep (1 );
145124 }
146125
147- // Publish offset BEFORE pwrite — next worker starts immediately
148- size_t mySlot = seq & (OFFSET_RING_SIZE - 1 );
149- mOffsetRing [mySlot].cumulative_offset .store (offset + wsize, std::memory_order_relaxed);
150- mOffsetRing [mySlot].published_seq .store (seq, std::memory_order_release);
126+ // Deposit compressed data (FREE → COMPRESSED)
127+ mRing [slot].data = mCompBufs [tid];
128+ mRing [slot].size = outsize;
129+ mRing [slot].state .store (1 , std::memory_order_release);
130+
131+ // Try to assign offsets for consecutive ready slots
132+ flushReady ();
151133
152- // pwrite (concurrent with other workers on non-overlapping regions)
153- if (wsize > 0 ) {
134+ // Wait for MY offset to be assigned (COMPRESSED → OFFSET_READY)
135+ while (mRing [slot].state .load (std::memory_order_acquire) != 2 ) {
136+ flushReady (); // help flush if possible
137+ // Another worker may be flushing; brief yield
138+ if (mRing [slot].state .load (std::memory_order_acquire) != 2 )
139+ usleep (1 );
140+ }
141+
142+ // Concurrent pwrite — offset already computed, no ordering wait
143+ if (outsize > 0 ) {
154144 size_t written = 0 ;
155- while (written < wsize) {
156- ssize_t ret = pwrite (mFd , writeData + written, wsize - written, offset + written);
145+ size_t offset = mRing [slot].offset ;
146+ while (written < outsize) {
147+ ssize_t ret = pwrite (mFd , mRing [slot].data + written,
148+ outsize - written, offset + written);
157149 if (ret < 0 ) {
158150 if (errno == EINTR) continue ;
159151 error_exit (" pwrite failed: " + string (strerror (errno)));
@@ -164,13 +156,35 @@ void WriterThread::inputPwrite(int tid, string* data) {
164156 }
165157 }
166158
159+ // Mark slot free for reuse
160+ mRing [slot].state .store (0 , std::memory_order_release);
161+
167162 mNextSeq [tid] += mOptions ->thread ;
168163}
169164
165+ void WriterThread::flushReady () {
166+ if (!mFlushMtx .try_lock ())
167+ return ;
168+ size_t seq = mFlushSeq .load (std::memory_order_relaxed);
169+ while (true ) {
170+ size_t slot = seq & (PWRITE_RING_SIZE - 1 );
171+ if (mRing [slot].state .load (std::memory_order_acquire) != 1 )
172+ break ;
173+ // Assign offset (fast — just an addition)
174+ mRing [slot].offset = mCumulativeOffset ;
175+ mCumulativeOffset += mRing [slot].size ;
176+ // COMPRESSED → OFFSET_READY
177+ mRing [slot].state .store (2 , std::memory_order_release);
178+ seq++;
179+ }
180+ mFlushSeq .store (seq, std::memory_order_release);
181+ mFlushMtx .unlock ();
182+ }
183+
170184void WriterThread::cleanup () {
171185 if (mPwriteMode ) {
172186 if (mFd >= 0 ) { close (mFd ); mFd = -1 ; }
173- delete[] mOffsetRing ; mOffsetRing = NULL ;
187+ delete[] mRing ; mRing = NULL ;
174188 delete[] mNextSeq ; mNextSeq = NULL ;
175189 if (mCompressors ) {
176190 for (int t = 0 ; t < mOptions ->thread ; t++)
@@ -182,6 +196,7 @@ void WriterThread::cleanup() {
182196 delete[] mCompBufs [t];
183197 delete[] mCompBufs ; mCompBufs = NULL ;
184198 }
199+ delete[] mCompBufSizes ; mCompBufSizes = NULL ;
185200 return ;
186201 }
187202 deleteWriter ();
0 commit comments