Skip to content

Deadlock on 1.3.3 #695

@jchorl

Description

@jchorl

Hi!

I am wondering if there is still a deadlock on v1.3.3. I was able to reproduce it. I was running with 20 threads total.

Candidly I don't understand the codebase well enough, so I apologize in advance if this explanation doesn't make sense (full disclosure, claude found it and fixed it).

Here's the timeline:

  1. Processing threads finish at different times

    Time T: Thread 0 finishes all its reads, exits processorTask()

    • Thread 0 will NEVER write to BufferList[0] again
    • BufferList[0] might be empty at this point
  2. Last thread marks completion

    Time T+100: Thread 15 (last one) finishes

    // finishedCount reaches 16
    // Calls setInputCompleted() on all writers:
    for(int t=0; t<mOptions->thread; t++) {
        mBufferLists[t]->setProducerFinished();  // Mark ALL lists as done
    }
  3. The canBeConsumed() logic

    inline bool canBeConsumed() {
        if(head == NULL)
            return false;  // Empty list
        // Has at least one node
        return head->nextItemReady.load(std::memory_order_acquire)
            || (head == tail)  // Last node in list
            || producerFinished.load(std::memory_order_acquire);  // Producer done
    }

The Deadlock - writer thread stuck at BufferList[0]:

  • head == NULL (empty list)
  • canBeConsumed() returns FALSE
  • producerFinished flag is TRUE, but the check never reaches it because of early return!
  • Stays at mWorkingBufferList = 0
  • usleep(100), loop again
  • Still at BufferList[0]...
  • INFINITE LOOP

Meanwhile BufferList[1-15] still have data that needs to be written!

I was able to fix it with this diff:

diff --git a/src/writerthread.cpp b/src/writerthread.cpp
index 5d21091..1b52fbd 100644
--- a/src/writerthread.cpp
+++ b/src/writerthread.cpp
@@ -96,6 +96,9 @@ void WriterThread::output(){
     if (mPwriteMode) return;  // no-op
     SingleProducerSingleConsumerList<string*>* list = mBufferLists[mWorkingBufferList];
     if(!list->canBeConsumed()) {
+        // Advance to next list even if current list is not ready
+        // to avoid getting stuck on an empty list when other lists have data
+        mWorkingBufferList = (mWorkingBufferList+1)%mOptions->thread;
         usleep(100);
     } else {
         string* str = list->consume();

I was also trying to figure out if this could lead to bugs. As best I can tell, it cannot, because all of the mBufferLists belong to this thread and it will loop back around until all queues are empty.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions