Introduction to RxJava

Introduction

When Object-oriented programming emerged, we modeled the real world concepts as objects. Those objects have a set of properties or fields and a set of methods that operate on those properties. A class represents a blueprint, and an object is one such an instance of the blueprint. Though this worked fine for many cases, it become complex when there are a lot of objects that are interacting with each other.

Next, the functional programming paradigm emerged, and it changed the way we write code. Two primary elements of functional programming is the immutability and laziness (for the major part achieved through lambdas). The functional programming paradigm does not replace OO, but it complements it instead.

Now, we have the reactive programming paradigm that has come up. It is a functional event-driven programming approach. The world is modeled wherein everything is in motion. There are two main actors viz., observables (analogous to producers) and observers (consumers or subscribers).

The Reactive Manifesto

The Reactive Manifesto describes Reactive Systems to have the following four properties:

  • Responsive: The system responds in a timely manner with an established upper bound or SLA thereby delivering a consistent quality of service. 
  • Resilient: The system must be resilient to failure and must be able to recover from failures.
  • Elastic: The system must be responsible under varying workload. It must be able to balance the load by adding or removing resources dynamically.
  • Message Driven: Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. Even failures are handled via the same message passing mechanisms.

What is ReactiveX and RxJava

ReactiveX is an API for asynchronous programming with observable streams. It is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. It was created by Erik Meijer for .NET and was called the Reactive Programming Framework for .NET – Reactive Extensions (also called ReactiveX or Rx). It was ported to several languages and platforms including Javascript, Java, Python and C++. As it is evident now, RxJava is the ReactiveX port for Java.

The fundamental idea of ReactiveX is that events are data and data are events.

RxJava

There are two versions of RxJava 1.0 and 2.0. The 1.0 is the older version and is only getting bug fixes and without any major development (probably some backports). 2.0 is the latest one that is based on the Reactive Streams which is a library that sets a standard of asynchronous stream implementations.

Note: The java namespace for RxJava classes are in rx 1.0 and io.reactivex in 2.0.

Importing RxJava

Searching for io.reactivex.rxjava2 in Maven Central Search will show you the latest version available. (The artifact id is rxjava)

Using Maven

<dependency>
  <groupId>io.reactivex.rxjava2</groupId>
  <artifactId>rxjava</artifactId>
  <version>x.y.z</version>
</dependency>

Replace x.y.z with the latest version

Using Gradle

dependencies {
   compile 'io.reactivex.rxjava2:rxjava:x.y.z'
}

Replace x.y.z with the latest version

Importing Dependent Jar

You need not use Maven or Gradle and can just download the RxJava and Reactive Streams jar from Maven and add them to the class path.

Quick look at RxJava

We will have a look at a simple code without going into much details

package com.javadevcentral.rxdemo;

import io.reactivex.Observable;

public class Demo {
    public static void main(String[] args) {
        Observable.range(1, 20)
                .filter(i -> i % 2 == 0)
                .map(i -> i * i)
                .subscribe(System.out::println);
    }
}

This outputs

4
16
36
64
100
144
196
256
324
400

This creates an Observable that emits a sequence of integer within the specified range. The first parameter to range() is the staring value and the second parameter is the number of elements to emit. So, here it emits numbers (event or data) in the range 1 to 20 (inclusive). Then, we use a filter operation to filter out the odd numbers and consume only even numbers. The result of a filter operation yields a new Observable. Next, we map each number to its square. At the end, we plug in a subscriber that simply prints the input it receives.

The generation of the number sequence is called emissions. We have the source Observable where the emission starts and at the final stage they are consumed by an Observer and are printed. The filter and the map steps are both an Observable and an Observer. They are an Observer with respect to their immediate upstream and an Observable with respect to their immediate downstream i.e., filter is an Observer subscribed to the initial emission and is an Observable producing a new emission that is being consumed by the map step. The map step is unaware of whether the data it receives is coming from the original source or from an intermediate operator like filter here.

Relation to Java 8 Streams

The above code is in many ways similar to Java 8 streams. Using Java 8 streams, we would do something like the below code that yields the same result.

package com.javadevcentral;

import java.util.stream.IntStream;

public class Demo {
    public static void main(String[] args) {
        IntStream.rangeClosed(1, 20)
                .filter(i -> i % 2 == 0)
                .map(i -> i * i)
                .forEach(System.out::println);
    }
}

We start with a stream of numbers from 1 to 20 and each of the steps (filter, map) yields a new stream. The forEach is the terminal operation where they get printed.

So, how is RxJava different from Java 8 streams?

In a lot of ways.

Firstly, Observables are push-based whereas Streams are pull-based. There can be any number of subscribers or observers plugged in. New observers can join anytime. Based on the nature of the Observable, the new observers can get the emission replayed from the beginning (Cold observable) or they will get only the new data emitted from the time they have joined (Hot observable)

Secondly, the subscribers need not necessarily consume all the data and can cancel their subscription at any time. Above all, this is much needed in case of infinite sources of emission. This will ensure the resources at the source are cleaned up properly.

Thirdly, the subscribers can pass a separate logic for error handling. This adds a lot of flexibility in modeling and error handling.

What’s Next

We saw what reactive programming and RxJava is and what it offers and also looked at how to consume the RxJava library into our project. I demonstrated RxJava with the help of a simple code. We have only scratched the surface and there is a lot to this. We will explore it in my future posts. Stay tuned.

References

  1. ReactiveX
  2. RxJava Github page
  3. What is different in 2.0

Leave a Reply