Skip to content

Commit

Permalink
dev content review feedback addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiago Ferreira authored and Tiago Ferreira committed Jun 24, 2020
1 parent 3cce1b2 commit a399349
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 57 deletions.
48 changes: 19 additions & 29 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ Asynchronous methods allow you to perform a lengthy operation, such as input/out
In the context of REST clients, making synchronous HTTP requests can be a time consuming process. The network may be slow, or maybe the upstream service is overwhelmed and can't respond quickly. These lengthy operations can block the execution of your thread when it's in use and prevent other work from being completed.

== What is reactive programming?
Reactive programming is an extension to asynchronous programming and a paradigm where data availability controls execution that avoids overwhelming the consumer. In other words, execution advances when new information becomes available. Changes, once they occur, are propagated via data streams and this makes it generally easier to implement an asynchronous and non-blocking system. This allows you to be more efficient with the resources you use in your application.
It also leverages the publisher-subscriber pipeline instead of the more traditional producer-consumer model. The former requires very little configuration and thus has little overhead. This makes this model much simpler to implement, allowing you to have a more elegant solution. One of the greatest advantage of reactive programming is that you can focus mainly on business logic instead of having to worry about implementation and configuration.
The usage of reactive streams brings with it tools, among others, to handle data flow, backpressure, thread safety and error propagation. +
Reactive programming is an extension to asynchronous programming and a paradigm concerned with data streams and the propagation of change. In other words, execution advances when new information becomes available. Changes, once they occur, are propagated via data streams and this makes it generally easier to implement an asynchronous and non-blocking system.

== What you will learn
You will learn how the default reactive JAX-RS client accesses remote RESTful services using asynchronous method calls. You will then learn how to update the client so that it uses custom objects form third-party libraries such as http://cxf.apache.org/docs/jax-rs.html[CXF^] or in this case https://eclipse-ee4j.github.io/jersey/[Jersey^]. You will also use this library to invoke your new client using Jersey’s custom invoker `RxObservableInvoker`.
Reactive programming brings with it many advantages. One of the greatest advantage of reactive programming is that you can focus mainly on business logic instead of having to worry about implementation and configuration. It is alot simpler to do asynchronous work and setup complex threading because reactive programming avoids the use of callbacks that asynchronous programming brings with itself. This also brings with it a much faster runtime execution of your application, which allows for a much better user experience. Lastly, it makes your code much more readable and easier to fix. You can add or remove blocks of code from individual data streams which means you can easily amend a specific, distressed stream. The usage of reactive streams brings with it tools, among others, to handle data flow, backpressure, thread safety and error propagation. +

== What you'll learn
You will learn how the default reactive JAX-RS client accesses remote RESTful services using asynchronous method calls. You will then learn how to update the client so that it uses custom objects from third-party libraries such as http://cxf.apache.org/docs/jax-rs.html[CXF^] or in this case https://eclipse-ee4j.github.io/jersey/[Jersey^]. You will also use this library to invoke your new client using Jersey’s custom invoker `RxObservableInvoker`.
You will then update your resources to accommodate the new client. Instead of using elementary, single-use asynchronous solutions, you will use asynchronous data streams to transport data. Doing so, you will learn the benefits of using the `Observable` class over the more traditional `CompletionStage` interface, like more scalable implementation. You will also gain exposure to the rich set of operators that the former includes.

The application you will be working with is a manager that maintains an inventory of available systems.
Expand All @@ -64,11 +64,7 @@ https://openliberty.io/guides/microprofile-rest-client.html[Consuming RESTful se

== Additional Prerequisites

- *Docker*: Docker is a tool that you can use to deploy and run applications with containers. You can think of Docker like a virtual machine that runs various applications. However, unlike a typical virtual machine, you can run these applications simultaneously on a single system and independent of one another.

Install Docker by following the instructions in the https://docs.docker.com/engine/installation[official Docker documentation^].

Learn more about containers on the https://www.docker.com/resources/what-container[official Docker website^]
You need to have Docker installed. For installation instructions, refer to the official https://docs.docker.com/get-docker/[Docker documentation^]. You will build and run the microservices in Docker containers. An installation of Apache Kafka is provided in another Docker container.

// =================================================================================================
// Getting started
Expand All @@ -93,24 +89,17 @@ include::start/query/src/main/java/io/openliberty/guides/query/QueryResource.jav

Navigate to the `start` directory to begin.

An implementation of the default reactive JAX-RS client is already provided for you. The classes changed are [hotspot file=0]`InventoryClient` and [hotspot file=1]`QueryResource` which now implement the default JAX-RS reactive client.
In the client, notice how the [hotspot=getSystem file=0]`getSystem()` returns the `CompletionStage<T>` interface. This interface represents a unit or stage of a computation, and once the associated computation completes, the value contained can be retrieved. Alternatively, the `CompletionStage` interface can be chained with additional stages using the [hotspot=thenAcceptAsync file=1]`thenAcceptAsync()` method for further processing. Exceptions can be handled in a callback provided to the [hotspot=exceptionally file=1]`exceptionally()` method. The [hotspot=thenAcceptAsync file=1]`thenAcceptAsync()` and [hotspot=exceptionally file=1]`exceptionally()` methods together behave like an asynchronous try-catch block. It is important to note that when you return a `CompletionStage` in the resource, it doesn't guarantee that computation is complete and that the response has been built. Rather, it just means that the service will respond to the caller once the `CompletionStage` has completed.

In the InventoryClient class, note how the [hotspot=getSystem file=0]`getSystem()` method contains an [hotspot=rx file=0]`rx()` call. This is the default reactive invoker for the `CompletionStage` interface and this method is what retrieves the `CompletionStageRxInvoker` class and allows for these methods to function correctly with the `CompletionStage` interface return type.
An implementation of the default reactive JAX-RS client is already provided for you. The classes you will change are [hotspot file=0]`InventoryClient` and [hotspot file=1]`QueryResource` which now implement the default JAX-RS reactive client.

In the `QueryResource` class, whatever object you want to return is returned in the [hotspot=thenAcceptAsync file=1]`thenAcceptAsync()` method.
The [hotspot file=0]`InventoryClient` class is used to retrieve the inventory data. In the [hotspot file=0]`InventoryClient` class, notice how the [hotspot=getSystem file=0]`getSystem()` returns the `CompletionStage<T>` interface. This interface represents a unit or stage of a computation, and once the associated computation completes, the value contained can be retrieved. Notice how it also contains an [hotspot=rx file=0]`rx()` call. This is the default reactive invoker for the `CompletionStage` interface and this method is what retrieves the `CompletionStageRxInvoker` class and allows for these methods to function correctly with the `CompletionStage` interface return type.

Similarly to the synchronous approach, if we successfully get the systems from the inventory microservice, then the resource will respond with an HTTP status of 200 and the body will contain a list of systems.
The [hotspot file=1]`QueryResource` class is use to process the data coming in from the data streams, which is accessed through the hotspot file=0]`InventoryClient`, and serve it when all the services have completed execution. The [hotspot=thenAcceptAsync file=1]`thenAcceptAsync()` and [hotspot=exceptionally file=1]`exceptionally()` methods together behave like an asynchronous try-catch block. The data is retrieved from the `CompletionStage` interface and processed in the [hotspot=thenAcceptAsync file=1]`thenAcceptAsync()` method. It is important to note that when you return a `CompletionStage` in the resource, it doesn't guarantee that computation is complete and that the response has been built.

== Updating the client to use third-party libraries

== Updating the client to use third-party objects
JAX-RS supports the usage of third-party libraries like https://eclipse-ee4j.github.io/jersey[Jersey^]. Jersey provides a client-builder interface which allows users to make a request for the data. It allows the use of specific invokers as configuration so that the client can support https://github.com/ReactiveX/RxJava[RxJava^] objects instead of only the `CompletionStage` interface. These custom objects are useful for covering use cases that the `CompletionStage` interface cannot.

JAX-RS supports the usage of third-party libraries like https://eclipse-ee4j.github.io/jersey[Jersey^], and can allow for client configurations that are different from the default one.

By using Jersey, the client can be updated to support https://github.com/ReactiveX/RxJava[RxJava^] objects instead of only the `CompletionStage` interface. These custom objects are useful for covering use cases that the `CompletionStage` interface cannot.


The ReactiveX API and the required Jersey libraries are included as dependencies to your [hotspot file=0]`query/pom.xml` file. Look for the dependencies with `artifactIDs` of [hotspot=67-71 file=0]`rxjava`, [hotspot=72-76 file=0]`jersey-client`, [hotspot=77-81 file=0]`jersey-rx-client-rxjava` and [hotspot=82-86 file=0]`jersey-rx-client-rxjava2`.
The https://github.com/ReactiveX/RxJava[RxJava^] and required Jersey libraries are included as dependencies to your [hotspot file=0]`query/pom.xml` file. [hotspot=67-71 file=0]`rxjava` allows the application to use the https://github.com/ReactiveX/RxJava[RxJava^] API. [hotspot=77-81 file=0]`jersey-rx-client-rxjava` and [hotspot=82-86 file=0]`jersey-rx-client-rxjava2` provide the Rx Invoker provider classes which are then registered to the [hotspot=72-76 file=0]`jersey-client` client-builder.

pom.xml
[source,xml,linenums,role="code_column"]
Expand All @@ -131,13 +120,12 @@ InventoryClient.java
include::finish/query/src/main/java/io/openliberty/guides/query/client/InventoryClient.java[]
----

The changes involve changing the return types of the [hotspot=getSystem file=1]`getSystem()` method from a `CompletionStage<T>` interface to an `Observable<T>` object. `http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html[Observable^]` is a class that is part of ReactiveX and, as will be seen later, is a more flexible data type than the `CompletionStage` interface.
The [hotspot=rx file=1]`rx()` invoker also must contain `RxObservableInvoker.class` as an argument. This is to invoke the specific invoker for the `Observable` class provided by Jersey. The invoker depends on the type of object that you wish to return. If you wanted to return a `http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html[Flowable^]` object instead, you would need to pass `RxFlowableInvoker.class`. For this to work however, the invoker would need to have a provider that is registered with the client. Notice in the [hotspot=webTarget file=1]`webTarget()` method, the [hotspot=register file=1]`register(RxObservableInvokerProvider)` call that allows for the client to recognize the `RxObservableInvoker` class. Without registering the invoker provider, the invoker will not be recognized.
The changes involve changing the return types of the [hotspot=getSystem file=1]`getSystem()` method from a `CompletionStage<T>` interface to an `Observable<T>` object. `http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html[Observable^]` is a collection of data that awaits to be subscribed to before it can release any data and is part of https://github.com/ReactiveX/RxJava[RxJava^]. The [hotspot=rx file=1]`rx()` invoker also must contain `RxObservableInvoker.class` as an argument. This is to invoke the specific invoker for the `Observable` class provided by Jersey. Notice in the [hotspot=webTarget file=1]`webTarget()` method, the [hotspot=register file=1]`register(RxObservableInvokerProvider)` call that allows for the client to recognize the `RxObservableInvoker` class. Without registering the invoker provider, the invoker will not be recognized.

Sometimes there are scenarios where a producer will generate more data than the consumers and handle. In cases like these, JAX-RS can deal with this issue using `backpressure` and `Flowables`. You can learn more with this post about https://openliberty.io/blog/2019/04/10/jaxrs-reactive-extensions.html?fbclid=IwAR00IgA2lwRsp0_lRTOVTibTw7oZTwkHr_pCDLXKxIxwA8EgE_xrxk22r5A[JAX-RS reactive extensions with RxJava Backpressure^]


== Updating the JAX resources
== Updating the client to use the reactive model

Now that the client methods return the `Observable<T>` class, you must update the resource to accommodate these changes.

Expand All @@ -152,7 +140,9 @@ QueryResource.java
include::finish/query/src/main/java/io/openliberty/guides/query/QueryResource.java[]
----

Instead of using `thenAcceptAsync()`, observables use the [hotspot=subscribe file=0]`subscribe()` method to asynchronously process data. Thus any required processing of data will be performed in the [hotspot=subscribe file=0]`subscribe()` call. In this case, it is simply saving the data in the temporary [hotspot=holder hotspot=holderClass file=0]`holder`. The temporary holder is used to store the value returned from the client because values cannot be returned inside [hotspot=subscribe file=0]`subscribe()`. The [hotspot=await file=0]`countdownLatch.await()` will ensure that the function will only return a value once the required operation is complete. The `CountdownLatch` will countdown towards 0 after the completion of a thread via the [hotspot=countdown file=0]`countdownLatch.countdown()` call. This means that the value will only return once the thread that's retrieving the value is complete. While waiting for this countdown to complete, the main thread is free to perform other tasks. In this case, no such task is present, but a simple task can be given to occupy the thread.
Instead of using `thenAcceptAsync()`, observables use the [hotspot=subscribe file=0]`subscribe()` method to asynchronously process data. Thus any required processing of data will be performed in the [hotspot=subscribe file=0]`subscribe()` call. In this case, it is simply saving the data in the temporary [hotspot=holder hotspot=holderClass file=0]`holder`. The temporary holder is used to store the value returned from the client because values cannot be returned inside [hotspot=subscribe file=0]`subscribe()`.

A [hotspot=countdownlatch file=0]`CountDownLatch` is used to keep track of how many asynchronous requests we are waiting on. The [hotspot=countdownlatch file=0]`CountDownLatch` will countdown towards 0 after the completion of a thread via the [hotspot=countdown file=0]`countdown` method call. This means that the value will only return once the thread that's retrieving the value is complete. Using the [hotspot=await file=0]`await` method, we stop and wait until all our requests are complete. While waiting for this countdown to complete, the main thread is free to perform other tasks. In this case, no such task is present, but a simple task can be given to occupy the thread.

// =================================================================================================
// Building the application
Expand Down Expand Up @@ -180,7 +170,7 @@ Run the following commands to containerize the microservices:
```
docker build -t system:1.0-SNAPSHOT system/.
docker build -t inventory:1.0-SNAPSHOT inventory/.
docker build -t inventory:1.0-SNAPSHOT query/.
docker build -t query:1.0-SNAPSHOT query/.
```

Next, use the provided script to start the application in Docker containers. The script creates a network for the containers to communicate with each other. It also creates containers for Kafka, Zookeeper, and all of the microservices in the project.
Expand Down Expand Up @@ -226,7 +216,7 @@ You can also use `curl -X GET \http://localhost:9080/query/systemLoad` command i

The JSON output has a `highest` attribute that represents the system with the highest load. Similarly, the `lowest` attribute represents the system with the lowest load. The JSON output for each of these feature the respective hostname and systemLoad of the system.

Switching to an asynchronous programming model has freed up the thread handling your request to `/query/systemLoad`. While the client request is being handled, the thread can handle other work.
Switching to an reactive programming model has freed up the thread handling your request to `/query/systemLoad`. While the client request is being handled, the thread can handle other work.

When you are done checking out the application, run the following script to stop the application:

Expand Down
Binary file modified assets/QueryService.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public class QueryResource {
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Properties> systemLoad() {
List<String> systems = inventoryClient.getSystems();
// tag::countdownlatch[]
CountDownLatch remainingSystems = new CountDownLatch(systems.size());
// end::countdownlatch[]
// tag::holder[]
final Holder systemLoads = new Holder();
// end::holder[]
Expand Down
Loading

0 comments on commit a399349

Please sign in to comment.