|
Note
|
This repository contains the guide documentation source. To view the guide in published form, view it on the Open Liberty website. |
Learn how to use a reactive JAX-RS client to asynchronously invoke RESTful microservices over HTTP.
First, you’ll learn how to create a reactive JAX-RS client application by using the default reactive JAX-RS client APIs. You will then learn how to take advantage of the RxJava reactive extensions with a pluggable reactive JAX-RS client provider that’s published by Eclipse Jersey. The JAX-RS client is an API used to communicate with RESTful web services. The API makes it easy to consume a web service by using the HTTP protocol, which means that you can efficiently implement client-side applications. The reactive client extension to JAX-RS is an API that enables you to use the reactive programming model when using the JAX-RS client.
Reactive programming is an extension of asynchronous programming and focuses on the flow of data through data streams. Reactive applications process data when it becomes available and respond to requests as soon as processing is complete. The request to the application and response from the application are decoupled so that the application is not blocked from responding to other requests in the meantime. Because reactive applications can run faster than synchronous applications, they provide a much smoother user experience.
The application in this guide demonstrates how the JAX-RS client accesses remote RESTful services by using asynchronous method calls. You’ll first look at the supplied client application that uses the JAX-RS default CompletionStage-based provider. Then, you’ll modify the client application to use Jersey’s RxJava provider, which is an alternative JAX-RS reactive provider. Both Jersey and Apache CXF provide third-party reactive libraries for RxJava and were tested for use in Open Liberty.
The application that you will be working with consists of three microservices, system, inventory, and query. Every 15 seconds, the system microservice calculates and publishes an event that contains its current average system load. The inventory microservice subscribes to that information so that it can keep an updated list of all the systems and their current system loads.
The microservice that you will modify is the query service. It communicates with the inventory service to determine which system has the highest system load and which system has the lowest system load.
The system and inventory microservices use MicroProfile Reactive Messaging to send and receive the system load events. If you want to learn more about reactive messaging, see the Creating reactive Java microservices guide.
You need to have Docker installed. For installation instructions, refer to the official Docker documentation. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.
Navigate to the start directory to begin.
JAX-RS provides a default reactive provider that you can use to create a reactive REST client using the CompletionStage interface.
Create an InventoryClient class, which retrieves inventory data, and a QueryResource class, which queries data from the inventory service.
Create theInventoryClientinterface.query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java
InventoryClient.java
link:defaultrx/query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java[role=include]The getSystem() method returns the CompletionStage interface. This interface represents a unit or stage of a computation. When the associated computation completes, the value can be retrieved. The rx() method calls the CompletionStage interface. It retrieves the CompletionStageRxInvoker class and allows these methods to function correctly with the CompletionStage interface return type.
Create theQueryResourceclass.query/src/main/java/io/openliberty/guides/query/QueryResource.java
QueryResource.java
link:defaultrx/query/src/main/java/io/openliberty/guides/query/QueryResource.java[role=include]The systemLoad endpoint asynchronously processes the data that is retrieved by the InventoryClient interface and serves that data after all of the services respond. The thenAcceptAsync() and exceptionally() methods together behave like an asynchronous try-catch block. The data is processed in the thenAcceptAsync() method only after the CompletionStage interface finishes retrieving it. When you return a CompletionStage type in the resource, it doesn’t necessarily mean that the computation completed and the response was built.
A CountDownLatch object is used to track how many asynchronous requests are being waited on. After each thread is completed, the countdown() methodcounts the CountDownLatch object down towards 0. This means that the value returns only after the thread that’s retrieving the value is complete.The await() method stops and waits until all of the requests are complete. While the countdown completes, the main thread is free to perform other tasks. In this case, no such task is present.
The system, inventory, and query microservices will be built in Docker containers. If you want to learn more about Docker containers, check out the Containerizing microservices guide.
Start your Docker environment.
To build the application, run the Maven install and package goals from the command-line session in the start directory:
mvnw.cmd -pl models install
mvnw.cmd package./mvnw -pl models install
./mvnw package./mvnw -pl models install
./mvnw packageRun the following commands to containerize the microservices:
docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.
docker build -t query:1.0-SNAPSHOT query/.Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It creates containers for Kafka and all of the microservices in the project.
.\scripts\startContainers.bat./scripts/startContainers.sh./scripts/startContainers.shThe microservices will take some time to become available. See the http://localhost:9085/health and http://localhost:9080/health URLs to confirm that the inventory and query microservices are up and running. Once the microservices are up and running, you can access the application by making requests to the query/systemLoad endpoint at the http://localhost:9080/query/systemLoad URL.
When the service is ready, you see an output similar to the following example. This example was formatted for readability:
{
"highest": {
"hostname": "30bec2b63a96",
"systemLoad": 6.1
},
"lowest": {
"hostname": "55ec2b63a96",
"systemLoad": 0.1
}
}The JSON output contains a highest attribute that represents the system with the highest load. Similarly, the lowest attribute represents the system with the lowest load. The JSON output for each of these attributes contains the hostname and systemLoad of the system.
When you are done checking out the application, run the following command to stop the query microservice. Leave the system and inventory services running because they will be used when the application is rebuilt later in the guide:
docker stop queryAlthough JAX-RS provides the default reactive provider that returns CompletionStage types, you can alternatively use another provider that supports other reactive frameworks like RxJava. The Apache CXF and Eclipse Jersey projects produce such providers. You’ll now update the web client to use the Jersey reactive provider for RxJava. With this updated reactive provider, you can write clients that use RxJava objects instead of clients that use only the CompletionStage interface. These custom objects provide a simpler and faster way for you to create scalable RESTful services with a CompletionStage interface.
Replace the Maven configuration file.
query/pom.xml
pom.xml
link:finish/query/pom.xml[role=include]The jersey-rx-client-rxjava and jersey-rx-client-rxjava2 dependencies provide the RxInvokerProvider classes, which are registered to the jersey-client ClientBuilder class.
Update the client to accommodate the custom object types that you are trying to return. You’ll need to register the type of object that you want inside the client invocation.
Replace theInventoryClientinterface.query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java
InventoryClient.java
link:finish/query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java[role=include]The return type of the getSystem() method is now an Observable object instead of a CompletionStage interface. Observable is a collection of data that waits to be subscribed to before it can release any data and is part of RxJava. The rx() method now needs to contain RxObservableInvoker.class as an argument. This argument calls the specific invoker, RxObservableInvoker, for the Observable class that’s provided by Jersey.
In the getSystem() method, the register(RxObservableInvokerProvider) method call registers the RxObservableInvoker class,which means that the client can recognize the invoker provider.
In some scenarios, a producer might generate more data than the consumers can handle. JAX-RS can deal with cases like these by using the RxJava Flowable class with backpressure. To learn more about RxJava and backpressure, see JAX-RS reactive extensions with RxJava backpressure.
Now that the client methods return the Observable class, you must update the resource to accommodate these changes.
Replace theQueryResourceclass.query/src/main/java/io/openliberty/guides/query/QueryResource.java
QueryResource.java
link:finish/query/src/main/java/io/openliberty/guides/query/QueryResource.java[role=include]The goal of the systemLoad() method is to return the system with the largest load and the system with the smallest load. The systemLoad endpoint first gets all of the hostnames by calling the getSystems() method. Then it loops through the hostnames and calls the getSystem() method on each one.
Instead of using the thenAcceptAsync() method, Observable uses the subscribe() method to asynchronously process data. Thus, any necessary data processing happens inside the subscribe() method. In this case, the necessary data processing is saving the data in the temporary Holder class. The Holder class is used to store the value that is returned from the client because values cannot be returned inside the subscribe() method. The highest and lowest load systems are updated in the updateValues() method.
Run the Maven install and package goals from the command-line session in the start directory:
mvnw.cmd -pl query package./mvnw -pl query package./mvnw -pl query packageRun the following command to containerize the query microservice:
docker build -t query:1.0-SNAPSHOT query/.Next, use the provided script to restart the query service in a Docker container.
.\scripts\startQueryContainer.bat./scripts/startQueryContainer.sh./scripts/startQueryContainer.shSee the http://localhost:9080/health URL to confirm that the query microservice is up and running. Once the query microservice is up and running, you can access the application by making requests to the query/systemLoad endpoint at the http://localhost:9080/query/systemLoad URL.
Switching to a reactive programming model freed up the thread that was handling your request to query/systemLoad. While the client request is being handled, the thread can handle other work.
When you are done checking out the application, run the following script to stop the application:
.\scripts\stopContainers.bat./scripts/stopContainers.shA few tests are included for you to test the basic functionality of the query microservice. If a test failure occurs, then you might have introduced a bug into the code.
Create theQueryServiceITclass.query/src/test/java/it/io/openliberty/guides/query/QueryServiceIT.java
QueryServiceIT.java
link:finish/query/src/test/java/it/io/openliberty/guides/query/QueryServiceIT.java[role=include]The testSystemLoad() test case verifies that the query service can correctly calculate the highest and lowest system loads.
Verify that the tests pass by running the Maven verify goal on the query service:
mvnw.cmd -pl query verifyexport TESTCONTAINERS_RYUK_DISABLED=true
./mvnw -pl query verifyFor more information about disabling Ryuk, see the Testcontainers custom configuration document.
export TESTCONTAINERS_RYUK_DISABLED=true
./mvnw -pl query verifyFor more information about disabling Ryuk, see the Testcontainers custom configuration document.
When the tests succeed, you see output similar to the following example:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running it.io.openliberty.guides.query.QueryServiceIT
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.88 s - in it.io.openliberty.guides.query.QueryServiceIT
Results:
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0You modified an application to make HTTP requests by using a reactive JAX-RS client with Open Liberty and Jersey’s RxJava provider.
