The Great Migration: From Rxjava to Coroutines

mukherjeeavik

Avik Mukherjee

Posted on April 18, 2021

The Great Migration: From Rxjava to Coroutines

A detailed guide to refactoring code from rxJava to coroutines.
This post assumes you are familiar with rxJava, MVVM architecture and have a basic knowledge of coroutines.

Let's say you have a project that uses rxJava extensively. I mean why not! Its a great way to handle asynchronicity and write event based logic. There are numerous operators to choose from, threading becomes simpler, abstracted and handling errors at various layers of application does not need a lot of effort. There are hot and cold observables, delays, intervals, overflow strategies and so much more which helps us solve complex use cases. I for sure would not have started development for an enterprise level app, in Android, without introducing rxJava two years ago!

My goal here is not to convince you to switch to coroutines. I am assuming you are already looking forward to use it in your project and considering to refactor relevant part of your codebase written in rxJava to Coroutines, which was stably introduced in kotlin 1.3.

Coroutines are a concurrency design pattern that makes it easier to write asynchronous and non-blocking code.

  • It is Light-weight & Performative.
  • Can communicate with each other.
  • Supports structured concurrency.
  • Has support for cancellation.
  • Seamlessly integrates with other members of jetpack family.
  • Supported by major libraries.

We will walk ourselves through refactoring an rxJava call to a coroutine one. You will also see how much less verbose and more expressive our code becomes. I will first show you the rxJava classes and then their coroutine equivalents.

The project uses the following architecture
project_architecture (1)

Our business logics includes:

  • Fetch data from API.
  • Cache the latest data to file system.
  • Return the latest data.
  • If there is no network, return cached data - if available.
  • If no cached data is available - show error.

Code walkthrough:

Let us take a look at a snippet from the viewModel, with rxJava and liveData, where we observe the data, after requesting to the useCase class, and then notify the UI with an appropriate state to render:



    override fun getListOfCountries() {
        compositeDisposable.add(
            countryListUseCase.subscribeForData()
                .doOnSubscribe {
                    countryListViewStates.postValue(ViewStates.CountryListStates.ShowLoading)
                }
                .subscribeOn(schedulerProvider.io())
                .subscribe({ countryListDataWrapper ->
                    when (countryListDataWrapper) {
                        is UseCaseWrapper.Success -> {
                            // create a state when data is available to display
                            val currentState = ViewStates.CountryListStates.ShowContent(
                                isLoading = false,
                                hasError = false,
                                errorMessage = "",
                                showList = true,
                                countryList = countryListDataWrapper.data.list
                            )
                            //post the current state to liveData
                            countryListViewStates.postValue(currentState)
                        }
                        is UseCaseWrapper.Failed -> {
                            //create an error state when no data is available to display
                            val currentState = ViewStates.CountryListStates.ShowContent(
                                isLoading = false,
                                hasError = true,
                                errorMessage = NO_NETWORK,
                                showList = false,
                                countryList = ArrayList()
                            )
                            //post the current state to liveData
                            countryListViewStates.postValue(currentState)
                        }
                    }
                }, {
                    //create a state for handling generic errors
                    val currentState = ViewStates.CountryListStates.ShowContent(
                        isLoading = false,
                        hasError = true,
                        errorMessage = SOMETHING_WENT_WRONG,
                        showList = false,
                        countryList = ArrayList()
                    )
                    //post the current state to liveData
                    countryListViewStates.postValue(currentState)
                })
        )
    }




Enter fullscreen mode Exit fullscreen mode

Now let us look at the snippet for the same logic which uses coroutines:



    override fun getListOfCountries() {
        viewModelScope.launch {
            when (val useCaseData = countryListUseCase.requestForData()) {
                is UseCaseWrapper.Success -> {
                    //retrieve data received for success
                    val listOfCountries = useCaseData.data.list
                    val currentViewState = CountryListStates.ShowContent(
                        isLoading = false,
                        hasError = false,
                        errorMessage = "",
                        showList = true,
                        countryList = listOfCountries
                    )
                    //setting this view state as the current state of the stateFlow
                    countryListViewStates.value = currentViewState
                }
                is UseCaseWrapper.Failed -> {
                    //creating  a state for failed events
                    val currentViewState = CountryListStates.ShowContent(
                        isLoading = false,
                        hasError = true,
                        errorMessage = useCaseData.reason.value,
                        showList = false,
                        countryList = ArrayList()
                    )
                    //setting this view state as the current state of the stateFlow
                    countryListViewStates.value = currentViewState
                }
            }
        }
    }


Enter fullscreen mode Exit fullscreen mode

There is almost no change to the part where we observe the data in the viewModel, except

  • onError block of rxJava, which is now taken care of, in the useCase class.
  • How differently suspend functions are called, compared to observing an rx observable.
  • And there is one other change - which we discuss in the end.

Now let us look at the useCase class which did the heavy lifting and kept the viewModel code more readable and relatively cleaner.

Below is the getter method in rxJava version of the useCase class where we write our logic to fetch data and emit it via a publishSubject:



    override fun subscribeForData(): Observable<UseCaseWrapper<DataWrapper>> {
        compositeDisposable.add(
            countryApiRepo.getData()
                // don't block the main thread    
                .subscribeOn(schedulerProvider.computation())
                // store API response to local storage
                .flatMap { return@flatMap syncDataToLocal(it) }
                // convert network layer models to domain models for UI consumption    
                .compose(transformCountryObjects())
                // put data inside a wrapper for viewModel usage
                .map { return@map mapToDataWrapper(Source.NETWORK, it) }
                // get data from local storage in case api call throws error    
                .onErrorResumeNext(getDataFromLocal())
                .subscribe({ wrappedData ->
                    // There is no internet and cached data is unavailable
                    if (wrappedData.source == Source.LOCAL && wrappedData.list.isEmpty()) {
                        dataRepo.onNext(UseCaseWrapper.Failed(ReasonToFail.NO_NETWORK_AVAILABLE))
                    } else {
                        // Data is available, either from cached or from network
                        dataRepo.onNext(UseCaseWrapper.Success(wrappedData))
                    }
                }, {
                    // Handle generic exceptions
                    dataRepo.onNext(UseCaseWrapper.Failed(ReasonToFail.SOMETHING_WENT_WRONG))
                })
        )
        return dataRepo
    }


Enter fullscreen mode Exit fullscreen mode

UseCase class encapsulates the business logic of the application. In the above rxJava implementation we made the subscribeForData() function reactive. If you look at the syntax, its pretty neat and understandable to be honest - If you are familiar with rxJava.

But if you take a minute and think about it - do we really need reactivity to solve the use case? Caching an API response and giving it back when there is no network, is what we are trying to do. If you ask me, in my humble opinion, we are making the code reactive just because of the fact that we are using rxJava. Hence the tool that we are using is defining our paradigm.

Let us take a look at the coroutines version of it:



    override suspend fun requestForData(): UseCaseWrapper<DataWrapper> {
        return try {
            //get data from either network of Local
            val (dataResponse, dataSource) = getDataFromAvailableSource()
            if (dataSource == Source.NETWORK) {
                //sync to file storage if response is from network
                syncDataToLocal(dataResponse)
            }
            //convert network layer model to domain layer model for UI consumption
            val mappedApiResponse = mapRawDataToModelClass(dataResponse)
            val dataWrapper = DataWrapper(mappedApiResponse, dataSource)
            //return data successfully on completion
            UseCaseWrapper.Success(dataWrapper)
        } catch (thrownExceptions: UseCaseException) {
            /*
            handle exception caused by any of the local
            functions and relay the same to UI layer
             */
            UseCaseWrapper.Failed(thrownExceptions.reason)
        } catch (genericExceptions: Exception) {
            // generic exception handling
            UseCaseWrapper.Failed(ReasonToFail.SOMETHING_WENT_WRONG)
        }
    }


Enter fullscreen mode Exit fullscreen mode

Looking at the coroutines useCase class - you can see some significant changes, as compared to the rxJava counterpart shared above.
In the coroutine implementation we have made separate suspend functions for each individual logical blocks of code. Each of these functions run with their own coroutine scope in a withContext() block - which helps us run these tasks in a serial fashion, off the main thread. This simplifies threading and does not force reactiveness. For example this function below is responsible for writing the API response to file storage and likewise there are others:



    private suspend fun syncDataToLocal(networkResponse: CountryListResponse) {
        withContext(Dispatchers.Default) {
            val content = gson.toJson(networkResponse)
            fileRepository.saveDataToLocal(content)
        }
    }


Enter fullscreen mode Exit fullscreen mode

Now let us look at one other significant part (which we said we would discuss later) - how we notify the changes to the UI. We used liveData to communicate between viewModel and view in the rxJava code, where we observe the liveData from the view (Fragment in this case):



    private fun observeViewSates() {
        sharedViewModel.observeViewStates().observe(viewLifecycleOwner, Observer { viewStates ->
            when (viewStates) {
                is ViewStates.CountryListStates.ShowLoading -> {
                    //show loader
                }
                is ViewStates.CountryListStates.ShowContent -> {
                    //render content state
                }
            }
        })
    }


Enter fullscreen mode Exit fullscreen mode

Here is the counterpart using coroutines:



    private fun observeViewSates() {
        uiStateJob = lifecycleScope.launchWhenStarted {
            sharedViewModel.observeViewStates().collect { viewStates ->
                when (viewStates) {
                    is CountryListStates.ShowLoading -> {
                        //show loader
                    }
                    is CountryListStates.ShowContent -> {
                        //render content state
                    }
                }
            }
        }
    }


Enter fullscreen mode Exit fullscreen mode

Here we have used StateFlow (a hot observable) in the viewModel to maintain the state of the UI and it is being collected by the View.
This is how I have initialized the stateFlow in viewModel:



private val countryListViewStates = MutableStateFlow<CountryListStates>(
            CountryListStates.ShowLoading
        )


Enter fullscreen mode Exit fullscreen mode

Now here is the thing between stateflow and liveData - StateFlow has some advantages over liveData

  • You don't need to worry what thread you are on to update the value of StateFlow.
  • You always have a defined initial state.
  • A guarantee of the value being non null - where you are observing, because of course you cannot set a null value.

But It comes at an additional cost:

  • StateFlow is not lifecycle aware like liveData.
  • In order to make stateFlow lifecycle aware we are using launchWhenStarted of LifecycleCoroutineScope. This will ensure UI events wont be observed when the view is no longer active.
  • However we would need to cancel the coroutine scope of launchWhenStarted explicitly on onStop, because otherwise the older scopes will still keep receiving the updates, when the UI becomes active again.

[You can read more from the Android official docs]: https://developer.android.com/kotlin/flow/stateflow-and-sharedflow

So Its completely up to you whether you want to have lifecycle aware liveData or you want StateFlow for the UI layer - depending upon how you want to handle the logic.

rxJava had a good learning curve. Once you get accustomed to it - it's a life saver no doubt. But for the parts where we just need to offload some work off the the main thread and there is no such dire need for reactivity we can easily switch to coroutines and get the job done effectively.

Here is the link to the project on github. Master branch has the rxJava code and the branch branch_coroutine has the coroutine migrated code.

CountryListing

An Android app that fetches a list of countries from an open API and displays it in a recycler view

Application uses MVVM architecture and below is the structire

coroutines_architecture

We fetch list of counries from an open API and cache it in the File System When there is no network we return the cached data If there is no data available in the cache we show user an error. The App uses RXJava, retrofit along with dagger Android as tech stack.




Cover Photo by Mariko margetson on Unsplash

💖 💪 🙅 🚩
mukherjeeavik
Avik Mukherjee

Posted on April 18, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related