It is possible to wade through the complexities of reactive data types and APIs and embrace the full power and elegance of functional reactive programming by appreciating a few concepts and watching out for pitfalls. In this article, we’ll explore a few tips to keep in mind while programming with a reactive framework. We’ll choose the Reactor framework to illustrate the concepts (Similar ones apply to the other popular framework out there: Rx).
Although many examples use a rather hard-coded publisher such as
Flux.just(1, 2, 3, 4), it is important to note that many real-world publishers represent I/O operations such as outbound HTTP calls or database calls. To depict a true I/O, some examples introduce artificial delays - as in
Tip 1 - Make sure there are no orphaned publishers
Reactive streams are lazy, i.e. publishers donot evaluate unless there is atleast one subscription to them.
In the above example, a
.block() (after a
collectList()) can be used to evaluate the transformed publisher, although this is a bad idea if this code is executed in the context of a non-blocking server such as Netty. Functions in real-world applications using reactive streams keep returning them to their caller, eventually bubbling the publishers up to the end consumer. For eg; a REST API built using Springboot 2.x most certainly return
Mono reactive types from the Controller methods, which in turn get consumed by HTTP response streams.
A better way of consuming a publisher is via the
subscribe method. However, to keep things as simple as possible (i.e. to run the examples on a main method), we’ll stick to printing with blocking calls. Let us define convenience functions to consume and print a publisher:
A very subtle albeit important illustration involves transformation functions that return publishers.
Monos were silently discarded since
map is not a good enough consumer. For a ‘deep consumption’, use
In chained method invocations on reactive streams, an accidental
map instead of
flatMap can introduce bugs that are hard to troubleshoot.
Tip 2 - Donot over-subscribe a publisher
Multiple subscriptions to a publisher, especially ones representing I/O, can significantly degrade the performance of your application. In the below example, the same I/O operation gets executed as many times as there are subscriptions, which is 2 in this case.
The antidote to this problem is to perform all the required transformations (
f2) within the same
Tip 3 - Parallelize generously
If there are two or more publishers that can execute in parallel (for eg; the publishers talk to different downstream systems), leverage Reactive APIs such as
mergeWith so that consumption doesn’t happen serially and the total time doesn’t keep adding up.
The cardinal sin of
zip functions are not only useful to execute operations in parallel, but they also help to assemble results of the operations into an aggregate object.
Beware of zipped streams: they wait only for the shortest length stream to finish. It is therefore typical to zip
Monos since their cardinality can atmost be 1. Even here there is a catch: if one of the participating streams return a
Mono.empty(), the entire operation short-circuits - which means some of the
Monos may not be even evaluated.
flatMap may also be used to fire off calls in parallel, but streams resulting from each operation are merged - i.e. the shortest operation stream gets emitted first to the output stream. Be prepared to re-arrange the elements in the stream if the use-case demands that ordering be preserved.
Tip 4 - Beware of nested reactive data types
While on a chained method invocation spree, it is easy to get caught in a mire of nested reactive and/or stream data types such as
Flux<Mono<List>. Such types are recipes for over-subscribed publishers, and you must quickly quell the situation by using flattening operators.
Case 1: Flatten a publisher of lists (i.e.
You can also apply transformations if required. In the above example, if you want to double the numbers, specify
(x -> x * 2)
Case 2: Flatten a list of publishers (i.e.
flatMap after a
This is also applicable for converting a
Flux<T>. You can even apply transformations as shown in Case 1.
Case 3: Flatten nested publishers (for eg;
You can flatten other publisher combinations too, for eg;
Mono<Flux<T>>, double monos (
Mono<Mono<T>) and double fluxes (
Flux<Flux<T>). Here too, you can even apply transformations as shown in Case 1.
Tip 5 - Convert blocking calls to reactive ones
On an enterprise-y project, you occassionally bump into that annoying database driver or that little library method which does not support asynchronous programming and streams. Fortunately, we can convert synchronous calls to reactive by wrapping them with a
Mono.fromCallable. Make sure that you dedicate another threadpool for this executing this operation.