Welcome to Innominds Blog
Enjoy our insights and engage with us!

Getting Started With RxJava and RxAndroid

By Karthikeya Koppuravuri,
rx java

In the current market environment, Reactive is a niche word. It is a concept wherein elemental changes on the UI are driven by the changes in the data by binding, observing or scheduling to listen to these changes. We have always wondered what’s Reactive and have made our hands dirty with the Reactive programming in mobile. It turned out to be really interesting and we understood that it can be of great help if used in the right way.

This blog will explain some basic use cases and integration logic for React in mobile app development. Further reading will give you a gist on what is reactive programming and how it can be leveraged in mobile technologies, basics of observables, observers and reactive operators.

Enter RxJava:

RxJava is an adaptation of Reactive programming for Java. The most straightforward one is for Asynchronous calls, where you subscribe for an observable that will eventually emit data and complete itself. A typical example of this could be observables from a retrofit web call. It is like a callback on the web call where we have control on which thread the call needs to be executed and can combine this with a wide variety of powerful operators to integrate or transform the data. This is one of the best ways to use RxJava but this is not all. There are other scenarios where we can effectively use it. Let's assume we are in a stock market application and its feed has to be updated whenever there is a change in stock price. Here, the data stream from server never ends hence we can subscribe to this data just once and keep receiving updates of the observed data until the observer decides to unsubscribe. 

RxAndroid:

RxAndroid is an extension of RxJava with few added classes related to Android. To be specific there are schedulers introduced in RxAndroid which plays a major role in supporting multi-thread operations in android. Schedulers decide if the block of code should run on a worker thread or the main thread. Besides this, it's more or less like RxJava.

Even though there are many schedulers, one mainly uses Schedulers.io () and AndroidSchedulers.mainThread(). Below are the list of schedulers available and their basic usage.

  • Shedulers.io (): This is used to perform non-CPU intensive operations like network call, database operations, etc. This maintains a pool of threads.
  • AndroidSchedulers.mainThread(): This runs on the main thread, UI thread. One should be careful to not run long operations as it may cause ANR error.
  • Schedulers.newThread(): A new thread will be created every time this is scheduled. It is a good practice to not use it until there is a very long running operation as the new thread created through this operation will not be reused anywhere.
  • Schedulers.computation(): This can be used to perform high CPU intensive operations like bitmap operation or processing huge data. The number of threads created here entirely depends on the number of cores the CPU has.
  • Schedulers.from(): This allows us to create a scheduler from an executor by limiting the number of threads to be created. When the thread pool is occupied, tasks will be in queue.
  • Schedulers.trampoline(): It executes the tasks in First In – First Out manner. All the scheduled tasks will be executed one by one by limiting the number of background threads to one.
  • Schedulers.immediate(): This scheduler executes the task immediately in a synchronous way by blocking the main thread.
  • Schedulers.Single(): This scheduler will execute all the tasks in the sequential order they are added. This can be used when there is a necessity for sequential execution on requirement.

Now that we know the basics of Reactive and RxJava, we will dive into other basics.

RxJava is all about two key components Observers and Observables. In addition to those, there are components like schedulers, operators and subscription.

Observable: It is a data stream that emits data.
Observer: This is the counterpart of observable and it receives data emitted by observable.
Subscription: It is the bond between the observable and observer. There could be multiple observers for one observable.
Operator: Operator modifies the data emitted by observable before the observer receives them.
Schedulers: This decides the thread on which the observer receives data and observable emits data.

Let's get into basic coding on how to implement this.

  1. We need an observable that can emit data, let's create one.

    Java Code

    Observable mobileObservable = Observable.just("Samsung", "Apple", "Oppo", "Motorola", "Xiomi");   
  2. Now let's create an observer that listens to the above observable. The observer has following interface methods to know the status of observable.
    • onSubscribe(): Method would be called when an observer subscribes to the observable.
    • onNext(): This method would be called when an observer starts emitting the data.
    • onError(): This is called when an error occurs while observing.
    • onComplete(): When an observable completes the emission of all the items, onComplete() will be called.

    Example for Observer pattern:

    Java Code

    Observer mobileObserver = new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
     
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "Name: " + s);
                }
     
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: " + e.getMessage());
                }
     
                @Override
                public void onComplete() {
                    Log.d(TAG, "All items are emitted!");
                }
            };
  3. Now that we have an observer and an observable we should make the observer subscribe to observable. We have two more methods here observeOn() and subcribeOn()
    • observeOn(AndroidSchedulers.mainThread()): This tells the observer to receive the data on android UI thread so that you can take any UI related actions.
    • subscribeOn(Schedulers.io ()): This tells the observable to run the task on a background thread.

    Java Code

    mobileObservable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(animalsObserver);


    When we run this program, the below output can be seen;

    onSubscribe
    Name: Samsung
    Name: Apple
    Name: Oppo
    Name: Motorola
    Name: Xiomi
    All items are emitted!

Introducing Disposable

Now that we know we have subscribed to an observable there could be an event where the observer does not want to listen to the events anymore. Disposable is the keyword for unsubscribing/stopping to listen to events. In android disposable is useful in avoiding memory leaks. Consider you have multiple observers and observables and you need to dispose all of them in onDestroy state of the activity. It is a real pain for the developers to handle every disposable object. Here comes our savior - composite disposable. One can use compositeDisposable.clear() to dispose all onDestroy of an activity.

Below is the sample code to write composite disposable observables

Java Code

compositeDisposable.add(
                mobileObservable
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .filter(new Predicate() {
                            @Override
                            public boolean test(String s) throws Exception {
                                return s.toLowerCase().startsWith("s");
                            }
                        })
                        .subscribeWith(new DisposableObserver() {
 
            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);
            }
 
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
 
            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
            }
        }));
compositeDisposable.add(
               mobileObservable
                       .subscribeOn(Schedulers.io())
                       .observeOn(AndroidSchedulers.mainThread())
                       .filter(new Predicate() {
                           @Override
                           public boolean test(String s) throws Exception {
                               return s.toLowerCase().startsWith("s");
                           }
                       })
                       .map(new Function<string, string="">() {
                           @Override
                           public String apply(String s) throws Exception {
                               return s.toUpperCase();
                           }
                       })
                       .subscribeWith(new DisposableObserver() {


           @Override
           public void onNext(String s) {
               Log.d(TAG, "Name: " + s);
           }

           @Override
           public void onError(Throwable e) {
               Log.e(TAG, "onError: " + e.getMessage());
           }

           @Override
           public void onComplete() {
               Log.d(TAG, "All items are emitted!");
           }
       };));
    

In Android, these different disposable objects can be easily disposed of in onDestroy like below

Java Code

@Override
   protected void onDestroy() {
       super.onDestroy();

       // don't send events once the activity is destroyed
       compositeDisposable.clear();
   }
         

We have discussed filter and map in the above code, we will soon address this.

Introduction to Operator
Operators transform the data emitted by observables before observers receive it. These mainly help in transforming the received data into chunks that can be used directly on the UI. Sorting, filtering are some of the few operations that are performed. In the above code snippet, we have used filter() operator which filters the data by applying a conditional statement. The data which meets the condition will be emitted and the remaining data will be ignored. In the above example the mobile names which start with letter s will be filtered.

Java Code

.filter(new Predicate() {
        @Override
        public boolean test(String s) throws Exception {
            return s.toLowerCase().startsWith("s");
        }
})

and the resulting output would be

onSubscribe
Name: samsung
All items are emitted!

Types of Operators

All the operators are categorized depending on the type of work it executes. Some operators like create, just, fromArray, range are used to create the observables, while other operators like debounce, filter, skip, last are used to filter the data emitted by observable. There are many other categories that do mathematical operations, error handling and other utility operations. Detailed information of these operators are available in a separate article here.

Chaining of Operators

Sometimes the desired data stream can’t be achieved using a single operator. In that case, you can use multiple operators together. When multiple operators are used, the operators take the result from the previous operator.

Let’s take an example of emitting numbers from 1 to 20. But in this case, we want to filter out the even numbers and append a string at the end of each number.

Java Code

Observable.range(1, 20)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .filter(new Predicate() {
                   @Override
                   public boolean test(Integer integer) throws Exception {
                       return integer % 2 == 0;
                   }
               })
               .map(new Function<integer, string="">() {
                   @Override
                   public String apply(Integer integer) throws Exception {
                       return integer + " is even number";
                   }
               })
               .subscribe(new Observer() {
                   @Override
                   public void onSubscribe(Disposable d) {   
                   }

                   @Override
                   public void onNext(String s) {
                       Log.d(TAG, "onNext: " + s);
                   }

                   @Override
                   public void onError(Throwable e) {
                   }

                   @Override
                   public void onComplete() {
                       Log.d(TAG, "All numbers emitted!");
                   }
               });
           

OUTPUT

onNext: 2 is even number
onNext: 4 is even number
onNext: 6 is even number
onNext: 8 is even number
onNext: 10 is even number
onNext: 12 is even number
onNext: 14 is even number
onNext: 16 is even number
onNext: 18 is even number
onNext: 20 is even number
All numbers emitted!

Here the range operator generated numbers from 1 to 20, filter() will apply a condition on each number and map() will transform the data from integer to string by appending a string at the end of each emission.

Types of Observables & Observers

Observables differ from another in the way they produce the data and the number of emissions each observable makes. We must wisely choose the best suited observable depending on the use case.

Observable

Observer

Number of emissions

Observable

Observer

Multiple or none

Single

SingleObserver

One

Maybe

MaybeObserve

One or none

Flowable

Observer

Multiple or none

Completable

CompletableObserver

None


1. Observable and Observer

Observable is probably the most used. It can emit one or more items.
// emitting single Object

Observable <object >

// emitting a list of Objects at once, but in this case, considering Single Observable is the best option

Observable <List <object > >

2. Single and SingleObserver

Single emits one value or throws an error. A use case may be a network call to get a response from the server.

3. Maybe and MaybeObserver

Maybe observable may or may not emit a value. This observable can be used when you are expecting an item to be emitted optionally.

4. Completable and CompletableObserver

Completable observable won’t emit any data, instead it notifies the status of the task - either success or failure. This observable can be used when you want to perform some task and not expect any value. A use case would be updating some data on the server by making PUT request.

5. Flowable and Observer

Flowable observable should be used when an observable is generating a huge amount of events/data than the observer can handle. As per official documentation, Flowable can be used when the source is generating 10k+ events and the subscriber can’t consume it all.

Networking Through RXJava

Irrespective of the number of examples that are given for managing, observing, filtering or operating the data, when it comes to a real application scenario, most of the times, the data is got through a web service call or a REST API. This can be smoothly applied with operators on the response received. We will combine retrofit with our RxJava that makes a great combination.

Please note that one should add dependencies of Retrofit in the app-level grade file. We will not be covering the setup of Retrofit builder and API service class here.

single observable is used for endpoint fetchData as a single response is emitted

Java Code

// Fetch all notes
    @GET("data/all")
    Single<list> fetchData();

Once the API Service instance is created we can make network call using RXJava

Java Code

apiService.fetchData()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new DisposableSingleObserver<list>() {
        @Override
        public void onSuccess(List notes) {
            // Received the data.
        }
        @Override
        public void onError(Throwable e) {
            // Network error
        }
    });

Completable observable could be used for operations like an update or delete as it need not send any response back. However, it gives the status back which can be captured by OnComplete callback method.

Java Code

@FormUrlEncoded
   @PUT("data/{id}")
   Completable updateData(@Path("id") int noteId, @Field("data") String data);
 

Java Code

apiService.updateData(datumId, data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new DisposableCompletableObserver() {
        @Override
        public void onComplete() {
            Log.d(TAG, "Note updated!");

            Note n = notesList.get(position);
            n.setNote(note);

            // Update item and notify adapter
            notesList.set(position, n);
            mAdapter.notifyItemChanged(position);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
            showError(e);
        }
    }));

Please note that we have used onComplete instead of onSuccess as it does not get any response back. Depending on the method of call, your response may be available at onSuccess or onComplete methods. Use the appropriate function call to complete the UI updation.

Conclusion

These are some of the basic terms and methods for using RxJava within your project. Though it depends on each case with regards to the type of classes to be used, the overall concept is based out of subscribing, observing and operating on the stream of data. Please refer to Official RXAndroid Repo  for more details and updates.

Innominds with over two decades of experience in Enterprise Mobile Application Development can help its clients optimize budget and get the best deal for the investment. Innominds offers comprehensive packaged Mobile Development Solutions for your organization’s digital workforce transformation.

Innominds’ expertise in all stages of development from design to release to an app store helps its clients build an engaging enterprise grade mobile application. Innominds Mobility Center of Excellence (CoE) includes a strong team with deep experience across multiple technologies like Native, Hybrid, Cross-Platform & Cross-Platform Native Frameworks, along with reusable components and processes in place to strategize, design and develop your mobile application.

Interested! For any demos or project discussions, please write to us at marketing@innominds.com and know more about our offerings.

Topics: Mobility

Karthikeya Koppuravuri

Karthikeya Koppuravuri

Karthikeya Koppuravuri - Senior Engineer - Software Engineering

Subscribe to Email Updates

Authors

Show More

Recent Posts