Skip to content

Race condition when use RecordParser with concatMapCompletable and observeOn #4297

@ben1222

Description

@ben1222

Version

vertx 4.1.7

Context

I encountered some issues when using RecordParser with concatMapCompletable and observeOn like the following code to process records one by one on worker thread:

RecordParser.newDelimited(delimiter, asyncFile)
  .toFlowable()
  .concatMapCompletable(data -> Completable.complete()
    .observeOn(blockingScheduler(executor))
    .andThen(process(data))
  )

The issues including:

  1. Sometime the RecordParser will suddenly stop emitting record, there's no error and not reached end of file and it is not disposed... it just stuck there.
  2. Sometime a MissingBackpressureException is thrown

After investigated and tried create a unit test for this issue, it looks like a race condition on RecordParserImpl:
Usually the record is emitted from RecordParserImpl on event loop thread. (RecordParserImpl.handle)
However, when backpressure exists (concatMapCompletable here), it can be the thread running on downstream to request item from upstream. (RecordParserImpl.fetch)
In my case, the inner stream of concatMapCompletable is switched to a worker thread using observeOn, so it will be the worker thread requesting item from RecordParserImpl during backpressure.

When the RecordParserImpl.handle running on event loop thread and the RecordParserImpl.fetch running on worker thread are called at same time, race condition happens because RecordParserImpl is not written in thread-safe way.
The race condition includes but not limited to:

  • In RecordParserImpl.handleParsing(), both thread may passed the parsing check and do the parsing concurrently
  • demand could be modified concurrently and result in unexpected value
  • When RecordParserImpl.handle has filled all demand and paused upstream, but before parsing is set to false, the RecordParserImpl.fetch could add demand and exit quickly due to parsing is true, and then it will stuck - upstream is paused so RecordParserImpl.handle will not be called again, downstream has requested item and is waiting for next item.

Steps to reproduce

Here's a unit test that could reproduce the issue:

  @Test
  public void testBackpressure4() throws Throwable {
    Logger LOG = LogManager.getLogger();
    String fileName = "/tmp/testBackpressure";
    FileWriter writer = new FileWriter(fileName);
    for (int i = 0; i < 300; i++) {
      writer.write(String.format("%1000d\n", i));
    }
    writer.close();

    int count = 100;
    for (int i = 0; i < count; i++) {
      LOG.info("{}: Start", i);
      Vertx vertx = Vertx.vertx();
      WorkerExecutor executor = vertx.createSharedWorkerExecutor("shared-worker", 1);
      Throwable err = Single.just(i)
          .flatMapCompletable(n -> vertx.fileSystem()
              .rxOpen(fileName, new OpenOptions().setWrite(false).setRead(true))
              .flatMapCompletable(f -> RecordParser.newDelimited("\n", f).toFlowable()
                  .map(data -> {
                    final String dataString = data.toString().trim();
                    LOG.info("{}: Read record {}", n, dataString);
                    return dataString;
                  })
                  .concatMapCompletable(data -> Completable.complete()
                    .observeOn(blockingScheduler(executor))
                    .doOnComplete(() -> LOG.info("{}: processed record {}", n, data))
                  )
                  .onErrorResumeNext(e -> f.rxClose().andThen(Completable.error(e)))
                  .andThen(Completable.defer(f::rxClose))
              )
          )
          .timeout(5, TimeUnit.SECONDS)
          .blockingGet();

      vertx.close();

      if (err == null) {
        LOG.info("{}: Success", i);
      } else {
        LOG.error("{}: failure", i, err);
        if (!(err instanceof TimeoutException)) { // it will fail the case when issue 2 happens, remove the "!" to fail the case when issue 1 happens
          throw err;
        }
      }
    }
  }

In the printed log, we can see the xx: Read record yy log can sometime be printed on event loop thread (RecordParserImpl.handle) and sometime be printed on shared worker thread (due to backpressure, RecordParserImpl.fetch)

The backtrace for issue 1 looks like:

The source did not signal an event for 5 seconds and has been terminated.java.util.concurrent.TimeoutException: The source did not signal an event for 5 seconds and has been terminated.
	at io.reactivex.internal.operators.completable.CompletableTimeout$DisposeTask.run(CompletableTimeout.java:109)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
	at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

The backtrace for issue 2 looks like:

io.reactivex.exceptions.MissingBackpressureException: Queue full?!
	at io.reactivex.internal.operators.mixed.FlowableConcatMapCompletable$ConcatMapCompletableObserver.onNext(FlowableConcatMapCompletable.java:121)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
	at io.vertx.reactivex.impl.FlowableReadStream.lambda$subscribeActual$2(FlowableReadStream.java:86)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:285)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:27)
	at io.vertx.core.file.impl.AsyncFileImpl.handleBuffer(AsyncFileImpl.java:425)
	at io.vertx.core.file.impl.AsyncFileImpl.lambda$new$0(AsyncFileImpl.java:110)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:240)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:130)
	at io.vertx.core.file.impl.AsyncFileImpl.lambda$doRead$5(AsyncFileImpl.java:407)
	at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
	at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
	at app//io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at app//io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at app//io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base@11.0.7/java.lang.Thread.run(Thread.java:834)

Not sure if it is a problem in RecordParser or concatMapCompletable (is it expected for upstream to be requested on worker thread in this case?) or maybe it is not a desired to use them in this way?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions