Skip to content

JsonParser continuing processing after exception causes issues for RxJava 3 Adapters #4623

@DemonicTutor

Description

@DemonicTutor

Version

this commit
which fixed related issue: #4338

Context

We use JsonParser with HttpServerRequest bodies to process streams but also with the RxJava Adapters.

I think there are 2 issues present:

  1. the stream wants to cancel / unsubscribe on the first exception (removing the exception handler)
    this causes a NPE exception here:

    for (IOException ioe : exceptions) {
    exceptionHandler.handle(ioe);
    }

  2. if a exception occurs JsonParser first emits successful events and then the exception including jsonevents AFTER the exception

    checkPending();
    checkExceptions();

I could not write a clean reproducer/test-case for this yet.
When consuming a JsonArray containing larger JsonObjects we transform them into DTOs.
The json was malformed {goodobj},THISISBAD{goodobj} and instead of the JsonParserException terminating the processing JsonParser emitted only a part of the second {goodobj} leading to a wrong DTO.

instead of emitting the json-event for the entire {goodobj} we just got "someproperty":true from inside said obj

Steps to reproduce

import com.fasterxml.jackson.core.JsonParseException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.parsetools.JsonParser;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.rxjava3.CompletableHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

@ExtendWith(VertxExtension.class)
class JsonParserTest {

  private static final HttpServerOptions serverOptions = new HttpServerOptions()
    .setPort(8080)
    .setHost("localhost");
  private static final HttpClientOptions clientOptions = new HttpClientOptions()
    .setDefaultHost("localhost")
    .setDefaultPort(8080);

  private Vertx vertx;
  private HttpClient client;

  @BeforeEach
  void beforeEach(final io.vertx.core.Vertx vertx) {
    this.vertx = Vertx.newInstance(vertx);
    this.client = this.vertx.createHttpClient(clientOptions);
  }

  @Test
  void shouldFailSingleBadJsonEvent(final VertxTestContext context) {
    final var checkpoint = context.checkpoint(2);
    vertx.getOrCreateContext().exceptionHandler(context::failNow);
    rxTest(
      Flowable.fromArray(
        Buffer.buffer("[b]")
      ),
      (status, actual) -> context.verify(() -> {
        Assertions.assertEquals(200, status);
        Assertions.assertEquals(List.of(),
          actual
        );
      }),
      cause -> {
        checkpoint.flag();
        context.verify(() -> Assertions.assertEquals(JsonParseException.class, cause.getClass()));
      }
    )
      .subscribe(CompletableHelper.toObserver(context.succeeding(nothing -> checkpoint.flag())));
  }

  @Test
  void shouldFailBadJsonEventInbetween(final VertxTestContext context) {
    final var checkpoint = context.checkpoint(2);
    vertx.getOrCreateContext().exceptionHandler(context::failNow);
    rxTest(
      Flowable.fromArray(
        Buffer.buffer("["),
        Buffer.buffer("1,"),
        Buffer.buffer("2,"),
        Buffer.buffer("3,"),
        Buffer.buffer("b,"),
        Buffer.buffer("4,"),
        Buffer.buffer("5,"),
        Buffer.buffer("6,"),
        Buffer.buffer("7,"),
        Buffer.buffer("8,"),
        Buffer.buffer("9,"),
        Buffer.buffer("]")
      ),
      (status, actual) -> context.verify(() -> {
        Assertions.assertEquals(200, status);
        Assertions.assertEquals(List.of(
            Buffer.buffer("["),
            Buffer.buffer("1"),
            Buffer.buffer("2"),
            Buffer.buffer("3")
          ),
          actual
        );
      }),
      cause -> {
        checkpoint.flag();
        context.verify(() -> Assertions.assertEquals(JsonParseException.class, cause.getClass()));
      }
    )
      .subscribe(CompletableHelper.toObserver(context.succeeding(nothing -> checkpoint.flag())));
  }

  private Completable rxTest(final Flowable<Buffer> given,
                             final BiConsumer<Integer, List<Buffer>> actual,
                             final Consumer<Throwable> exception) {
    return Completable.concatArray(
      vertx.createHttpServer(serverOptions)
        .requestHandler(
          request -> io.vertx.rxjava3.core.parsetools.JsonParser.newInstance(JsonParser.newParser(request.getDelegate()))
            .toFlowable()
            .doOnSubscribe(dis -> request.getDelegate().response().setChunked(true))
            .map(event -> switch (event.type()) {
              case START_OBJECT -> Buffer.buffer("{");
              case END_OBJECT -> Buffer.buffer("}");
              case START_ARRAY -> Buffer.buffer("[");
              case END_ARRAY -> Buffer.buffer("]");
              case VALUE -> Buffer.buffer(event.value().toString());
            })
            .concatMapCompletable(event -> request.response().rxWrite(event))
            .doOnError(exception::accept)
            .onErrorComplete()
            .andThen(request.response().rxEnd())
            .subscribe()
        )
        .rxListen()
        .ignoreElement(),
      client.rxRequest(HttpMethod.POST, "/path")
        .flatMap(request -> request.rxSend(given))
        .flatMap(response -> response.toObservable().toList().doOnSuccess(body -> actual.accept(response.statusCode(), body)))
        .ignoreElement()
    );
  }
}

Extra

  • Anything that can be relevant such as OS version, JVM version

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions