RxJava Observable

Introduction

In my previous post, we saw about an introduction to RxJava, what it is and what it offers. In this post, we will dive deep into RxJava Observable and Subscribers (or Observers), what they are and how to create them and see RxJava observable examples.

Observables and Subscribers

An RxJava Observable supports emitting or pushing a sequence of items of type T. This implies that Observable is a generic type(Observable<T>). These items can optionally pass through multiple operators (like filter, map).

An Observer (or subscriber) subscribes to an Observable. The items emitted by the Observable ends up at these subscribers who consumes the items. In other words, the Observer reacts to whatever item or sequence of items the Observable emits.

Here’s a nice graphic (marble diagram) from the ReactiveX page

RxJava Observable

RxJava Observable

Let us look at the how an Observables pushes items to the Observers. There are many factory methods for creating an Observable in the Observable class and create is one of them. It takes an ObservableOnSubscribe<T> as a parameter.

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ... }

ObservableOnSubscribe is a functional interface that has a method called subscribe that takes a ObservableEmitter<T>. An ObservableEmitter is a Emitter.

public interface ObservableOnSubscribe<T> {
    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

public interface ObservableEmitter<T> extends Emitter<T> { ... }

public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

An Observable works by passing three types of events: 

  • onNext(): This method receives one item at a time in the emission sequence.
  • onComplete(): This is called once all the items are emitted (hence this is applicable only for finite Observable sources).
  • onError(): If an error is encountered anywhere in the chain, onError is called. This is communicated up the chain to the Observable and it can define error handling logic to handle it.

Once onComplete() or onError() is called, no more emissions will happen.

RxJava Observer

The Observer interface is similar to the Emitter interface.

public interface Observer<T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

The onSubscribe method is extra and was not part of the Emitter interface. If we subscribe to an Observable, passing a subscriber, the Observer’s onSubscribe method is called with a Disposable object. The observer can use this disposable to cancel (dispose) the connection (channel) with the Observable. This is used when the observer is no longer interested in the events or in case of an infinite Observable source.

I have organized the remainder of this post as:

  1. Look at the various ways to create an Observable
  2. Hot and Cold Observable
  3. Look at the various ways to add a subscriber (overloads of the subscribe method)

Creating an RxJava Observable

We will look at some of the most common options available to create an Observable.

Observable.create

As we saw earlier, the create method expects an ObservableOnSubscribe. Since it is a functional interface, we can define one using a lambda expression. We will emit a limited set of items using the emitter.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Dog");   
    emitter.onNext("Cat");
    emitter.onNext("Lion");
    emitter.onNext("Elephant");
    emitter.onComplete();
});
source.subscribe(item -> System.out.println("I have received a: " + item));

outputs

I have received a: Dog
I have received a: Cat
I have received a: Lion
I have received a: Elephant

The Emitter starts emitting one item at at time and there are no intermediate operators here. We have subscribed an observer by passing a Consumer<T> to the subscribe method. There are various overloaded methods of the subscribe method which are discussed towards the end of the post.

RxJava Operators

We can use operators that transform the items emitted. Say, we want to filter only those animals whose name is more than three letters. Here, the filter will return a new Observable<String> ignoring emissions that fail to match the passed predicate.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Dog");
    emitter.onNext("Cat");
    emitter.onNext("Lion");
    emitter.onNext("Elephant");
    emitter.onComplete();
});
source.filter(item -> item.length() > 3)
      .subscribe(item -> System.out.println("I have received a: " + item));

This prints

I have received a: Lion
I have received a: Elephant

Observable.just

Observable.just("Dog", "Cat", "Lion", "Elephant")
          .subscribe(item -> System.out.println("I have received a: " + item));

Observable.fromIterable

If you have an Iterable like a List or a Set, you can easily create an Observable out of it using fromIterable

List<String> animals = Arrays.asList(“Dog”, “Cat”, “Lion”, “Elephant”);
Observable.fromIterable(animals)
.subscribe(item -> System.out.println(“I have received a: ” + item));

List<String> animals = Arrays.asList("Dog", "Cat", "Lion", "Elephant");
Observable.fromIterable(animals)
    .subscribe(item -> System.out.println("I have received a: " + item));

Observable.range and Observable.rangeLong

You can generate a sequence of Integers (or Long) by using the range (or rangeLong). It takes the first number in the range and the number of elements to emit.

Observable.range(4, 3)
          .subscribe(System.out::println);

prints

4
5
6

Observable.interval

This is a time-based Observable. It emits sequence of numbers starting at 0L after each specified period or time interval. This runs in a special computation scheduler thread by default and hence the main thread will not wait for the firing. To see the output, I have added a sleep delay for 3 seconds. We can see that we have two emissions in a second since we have configured the emission interval as 500 milliseconds (thus it prints from 0 to 5).

 Observable.interval(500, TimeUnit.MILLISECONDS)
           .subscribe(System.out::println);
//Introduce a sleep delay to notice the emissions
Thread.sleep(3000);

Observable.fromCallable

This allows us to convert the result of a callable or computation into an Observable. This is helpful when the source computation can throw an exception. This exception can propagate to the subscriber onError logic (more on this in a minute) rather than just blowing up and terminating.

Observable.defer()

Usually, from the ways to create a Observable we have seen, the source is not stateful.

Example: If we have an Observable.range, beginning with a value specified by the start variable. If we change the start and then subscribe again, you will find that the second Observer does not see this change.

private static int start = 4;
public static void main(String[] args) {
    Observable<Integer> rangeObservable = Observable.range(start, 3);
    rangeObservable.subscribe(System.out::println);
    //Change start
    start = 5;
    rangeObservable.subscribe(System.out::println);
}

This emits 4, 5, 6 for both the subscribers.

If we need each observer creation depend on some state, we can use Observable.defer() for this.

Observable<Integer> rangeObservable = Observable.defer(() -> Observable.range(start, 3));
rangeObservable.subscribe(System.out::println);

//modify start
start = 5;
rangeObservable.subscribe(System.out::println);

This emits 4, 5, 6 for the first subscriber and 5, 6, 7 for the second subscriber.

Observable error(), never() and empty()

Observable.error directly throws an error with the specified exception. This will end up at the onError() at the observer.

Observable.never never emits anything, forever leaving observers waiting for emissions and as a result this does not even call onComplete().

Observable.empty() represent empty datasets. It thus emits nothing and calls onComplete(). This is essentially RxJava’s concept of null.

All these are useful for testing purposes.

Hot and Cold Observable

It defines the behavior of an Observable when multiple Observers are present. Therefore, it defines the relationship between an Observable and an Observer depending on how the Observable is implemented.

A Cold Observable is one which replays the emissions when a new observer. Thus, new observers are guaranteed to see the whole emission even if they subscribe late. Each subscriber will get its own stream of emission. The Observable will first play all the emissions to the first Observer and then call onComplete(). Then, it will play all the emissions again to the second Observer and call onComplete(). All of the Observable creation methods that we saw will result in a cold observable.

In contrast, a hot observable, broadcasts the emissions to all the observables at the same time. If a new observer comes in late, it will receive the emissions emitted from now on and not receive the old emissions. Logically, a hot Observables often represent events rather than finite datasets. Example: The events from a twitter stream source. Hence, this mostly mean that the source is infinite.

Observable subscribe

We will have a look at the various overloaded methods of the subscribe method. We already saw the one that takes a Consumer<T> where we printed the received item from the emission.

There is one that takes a Consumer<T> and a Consumer<? super Throwable>. The latter one is the logic that is run if there is an error encountered anywhere in the chain.

The third overload method, in addition to the above parameters, takes an Action instance. It is an interface within the RxJava and it has a single method called run(). When the source observable calls the onComplete(), this logic is executed.

The last overload, in addition to the above, takes a Consumer<? super Disposable> that receives the upstream’s Disposable. As stated earlier, the observer can use this to stop receiving further items when the source observable hasn’t completed yet.

boolean shouldThrow = false;
Observable<String> sourceObservable = Observable.create(emitter -> {
    emitter.onNext("Dog");
    emitter.onNext("Cat");
    emitter.onNext("Lion");
    emitter.onNext("Elephant");
    if (shouldThrow) {
        emitter.onError(new RuntimeException("Throwing an exception"));
    } else {
        emitter.onComplete();
    }
});
sourceObservable.filter(item -> item.length() > 3)
        .subscribe(item -> System.out.println("I have received a: " + item),
                throwable -> System.out.println(throwable.getMessage()),
                () -> System.out.println("DONE"),               
                d -> System.out.println("Got the disposable"));

When the shouldThrow is false, this prints

Got the disposable
I have received a: Lion I have received a: Elephant DONE

When the shouldThrow is true, this prints

Got the disposable
I have received a: Lion I have received a: Elephant Throwing an exception

You could have observed that each of the existing overloaded subscribe method mirrors the methods in the Observer interface we saw earlier. Thus, we can write the above subscription part as

sourceObservable
    .filter(item -> item.length() > 3)
    .subscribe(new Observer<String>() {
        private Disposable disposable;
        @Override
        public void onSubscribe(Disposable d) {
            this.disposable = d; //Setting it so we can use later
            System.out.println("Got the disposable");
        }

        @Override
        public void onNext(String s) {
            System.out.println("I have received a: " + s);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
       }

        @Override
        public void onComplete() {
            System.out.println("DONE");
        }
    });

Conclusion

We had a very good introduction to the RxJava Observables and Observes (a.k.a subscribers). We had a look at the various ways to create the Observables and Observers and how to subscribe to an observable with examples. In addition, we looked at cold and hot observables too. This should give you a good place to start and start playing with the APIs. To get started, have a look at my previous post – Importing RxJava section to know how to get the RxJava library into your project.

Happy Coding 🙂

References

  1. ReactiveX Observable
  2. RxJava Observable Javadoc

If you liked my post, please do leave a comment and share my post.

Leave a Reply