Skip to content

SequenceBarrier.waitForSequence sometimes doesn't respect BlockingWaitStrategy #462

@tibco-jufernan

Description

@tibco-jufernan

Describe the bug
With many producers and a blocking wait strategy, SequenceBarrier.waitForSequence(nextSequence) can return a value less than its input. Specifically, it can return nextSequence - 1.

To Reproduce

Run thisFails(). It calls testWithNumProducers, which fails if waitForSequence(nextSequence) returns a value less than its input.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;

class WaitForReproTest {

    @Test
    void thisFails() throws InterruptedException {
        testWithNumProducers(128);
    }

    @Test
    void thisWorks() throws InterruptedException {
        testWithNumProducers(3);
    }

    void testWithNumProducers(int numProducers) throws InterruptedException {
        WaitForRepro w = new WaitForRepro();

        w.startConsumer();
        w.startProducers(numProducers);

        Thread.sleep(4000);

        Assertions.assertFalse(w.failed());
    }

    static class WaitForRepro {

        private final RingBuffer<Integer> buffer;
        private final SequenceBarrier barrier;
        private volatile boolean failed;

        public WaitForRepro() {
            buffer = RingBuffer.createMultiProducer(() -> 2, 16, new BlockingWaitStrategy());
            barrier = buffer.newBarrier();
        }

        public void startConsumer() {
            Executors.newSingleThreadExecutor().submit(() -> {
                long nextSequence = 0;
                long availableSequence;
                while (true) {
                    availableSequence = barrier.waitFor(nextSequence);
                    if (nextSequence > availableSequence) {
                        fail();
                    }
                    while (nextSequence <= availableSequence) {
                        buffer.get(nextSequence++);
                    }
                    Thread.sleep(5);
                }
            });
        }

        public void startProducers(int numProducers) {
            ExecutorService executor = Executors.newFixedThreadPool(numProducers);
            for (int i = 0; i < numProducers; ++i) {
                executor.submit(() -> {
                    while (true) {
                        buffer.publishEvent((event, seq) -> {
                            // NOP
                        });
                        Thread.sleep(3);
                    }
                });
            }
        }

        private void fail() {
            failed = true;
        }

        public boolean failed() {
            return failed;
        }
    }
}

Expected behavior
I expected that with a BlockingWaitStrategy, waitFor(nextSequence) should block until it can return nextSequence or a value greater than it.

Desktop (please complete the following information):

  • OS: MacOS
  • Version 3.4.4 and, 4.0.0
  • JVM Version OpenJDK Runtime Environment Temurin-17.0.8+7 (build 17.0.8+7)

Additional context
I had opened a thread on Google Groups, but thought that this problem might be more easily addressed as a GitHub issue.

In 2.10.4, I did not observe this behavior.

When debugging, I found that the blocking wait strategy returns the a sequence greater than or equal to sequence on line 56 of waitFor. But on line 63, sequencer.getHighestPublishedSequence returns sequence - 1.
https://github.com/LMAX-Exchange/disruptor/blob/3.4.4/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java#L51

In 2.10.4, waitFor simply defers to the wait strategy, so sequence - 1 can never be returned with a blocking wait strategy.
https://github.com/LMAX-Exchange/disruptor/blob/2.10.4/code/src/main/com/lmax/disruptor/ProcessingSequenceBarrier.java#L40

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions