A detailed guide to refactoring code from rxJava to coroutines. This post assumes you are familiar with rxJava, MVVM architecture and have a basic knowledge of coroutines.
Let's say you have a project that uses rxJava extensively. I mean why not! Its a great way to handle asynchronicity and write event based logic. There are numerous operators to choose from, threading becomes simpler, abstracted and handling errors at various layers of application does not need a lot of effort. There are hot and cold observables, delays, intervals, overflow strategies and so much more which helps us solve complex use cases. I for sure would not have started development for an enterprise level app, in Android, without introducing rxJava two years ago!
My goal here is not to convince you to switch to coroutines. I am assuming you are already looking forward to use it in your project and considering to refactor relevant part of your codebase written in rxJava to Coroutines, which was stably introduced in kotlin 1.3.
Coroutines are a concurrency design pattern that makes it easier to write asynchronous and non-blocking code.
It is Light-weight & Performative.
Can communicate with each other.
Supports structured concurrency.
Has support for cancellation.
Seamlessly integrates with other members of jetpack family.
Supported by major libraries.
We will walk ourselves through refactoring an rxJava call to a coroutine one. You will also see how much less verbose and more expressive our code becomes. I will first show you the rxJava classes and then their coroutine equivalents.
The project uses the following architecture
Our business logics includes:
Fetch data from API.
Cache the latest data to file system.
Return the latest data.
If there is no network, return cached data - if available.
If no cached data is available - show error.
Code walkthrough:
Let us take a look at a snippet from the viewModel, with rxJava and liveData, where we observe the data, after requesting to the useCase class, and then notify the UI with an appropriate state to render:
overridefungetListOfCountries(){compositeDisposable.add(countryListUseCase.subscribeForData().doOnSubscribe{countryListViewStates.postValue(ViewStates.CountryListStates.ShowLoading)}.subscribeOn(schedulerProvider.io()).subscribe({countryListDataWrapper->when(countryListDataWrapper){isUseCaseWrapper.Success->{// create a state when data is available to displayvalcurrentState=ViewStates.CountryListStates.ShowContent(isLoading=false,hasError=false,errorMessage="",showList=true,countryList=countryListDataWrapper.data.list)//post the current state to liveDatacountryListViewStates.postValue(currentState)}isUseCaseWrapper.Failed->{//create an error state when no data is available to displayvalcurrentState=ViewStates.CountryListStates.ShowContent(isLoading=false,hasError=true,errorMessage=NO_NETWORK,showList=false,countryList=ArrayList())//post the current state to liveDatacountryListViewStates.postValue(currentState)}}},{//create a state for handling generic errorsvalcurrentState=ViewStates.CountryListStates.ShowContent(isLoading=false,hasError=true,errorMessage=SOMETHING_WENT_WRONG,showList=false,countryList=ArrayList())//post the current state to liveDatacountryListViewStates.postValue(currentState)}))}
Now let us look at the snippet for the same logic which uses coroutines:
overridefungetListOfCountries(){viewModelScope.launch{when(valuseCaseData=countryListUseCase.requestForData()){isUseCaseWrapper.Success->{//retrieve data received for successvallistOfCountries=useCaseData.data.listvalcurrentViewState=CountryListStates.ShowContent(isLoading=false,hasError=false,errorMessage="",showList=true,countryList=listOfCountries)//setting this view state as the current state of the stateFlowcountryListViewStates.value=currentViewState}isUseCaseWrapper.Failed->{//creating a state for failed eventsvalcurrentViewState=CountryListStates.ShowContent(isLoading=false,hasError=true,errorMessage=useCaseData.reason.value,showList=false,countryList=ArrayList())//setting this view state as the current state of the stateFlowcountryListViewStates.value=currentViewState}}}}
There is almost no change to the part where we observe the data in the viewModel, except
onError block of rxJava, which is now taken care of, in the useCase class.
How differently suspend functions are called, compared to observing an rx observable.
And there is one other change - which we discuss in the end.
Now let us look at the useCase class which did the heavy lifting and kept the viewModel code more readable and relatively cleaner.
Below is the getter method in rxJava version of the useCase class where we write our logic to fetch data and emit it via a publishSubject:
overridefunsubscribeForData():Observable<UseCaseWrapper<DataWrapper>>{compositeDisposable.add(countryApiRepo.getData()// don't block the main thread .subscribeOn(schedulerProvider.computation())// store API response to local storage.flatMap{return@flatMapsyncDataToLocal(it)}// convert network layer models to domain models for UI consumption .compose(transformCountryObjects())// put data inside a wrapper for viewModel usage.map{return@mapmapToDataWrapper(Source.NETWORK,it)}// get data from local storage in case api call throws error .onErrorResumeNext(getDataFromLocal()).subscribe({wrappedData->// There is no internet and cached data is unavailableif(wrappedData.source==Source.LOCAL&&wrappedData.list.isEmpty()){dataRepo.onNext(UseCaseWrapper.Failed(ReasonToFail.NO_NETWORK_AVAILABLE))}else{// Data is available, either from cached or from networkdataRepo.onNext(UseCaseWrapper.Success(wrappedData))}},{// Handle generic exceptionsdataRepo.onNext(UseCaseWrapper.Failed(ReasonToFail.SOMETHING_WENT_WRONG))}))returndataRepo}
UseCase class encapsulates the business logic of the application. In the above rxJava implementation we made the subscribeForData() function reactive. If you look at the syntax, its pretty neat and understandable to be honest - If you are familiar with rxJava.
But if you take a minute and think about it - do we really need reactivity to solve the use case? Caching an API response and giving it back when there is no network, is what we are trying to do. If you ask me, in my humble opinion, we are making the code reactive just because of the fact that we are using rxJava. Hence the tool that we are using is defining our paradigm.
Let us take a look at the coroutines version of it:
overridesuspendfunrequestForData():UseCaseWrapper<DataWrapper>{returntry{//get data from either network of Localval(dataResponse,dataSource)=getDataFromAvailableSource()if(dataSource==Source.NETWORK){//sync to file storage if response is from networksyncDataToLocal(dataResponse)}//convert network layer model to domain layer model for UI consumptionvalmappedApiResponse=mapRawDataToModelClass(dataResponse)valdataWrapper=DataWrapper(mappedApiResponse,dataSource)//return data successfully on completionUseCaseWrapper.Success(dataWrapper)}catch(thrownExceptions:UseCaseException){/*
handle exception caused by any of the local
functions and relay the same to UI layer
*/UseCaseWrapper.Failed(thrownExceptions.reason)}catch(genericExceptions:Exception){// generic exception handlingUseCaseWrapper.Failed(ReasonToFail.SOMETHING_WENT_WRONG)}}
Looking at the coroutines useCase class - you can see some significant changes, as compared to the rxJava counterpart shared above.
In the coroutine implementation we have made separate suspend functions for each individual logical blocks of code. Each of these functions run with their own coroutine scope in a withContext() block - which helps us run these tasks in a serial fashion, off the main thread. This simplifies threading and does not force reactiveness. For example this function below is responsible for writing the API response to file storage and likewise there are others:
Now let us look at one other significant part (which we said we would discuss later) - how we notify the changes to the UI. We used liveData to communicate between viewModel and view in the rxJava code, where we observe the liveData from the view (Fragment in this case):
Here we have used StateFlow (a hot observable) in the viewModel to maintain the state of the UI and it is being collected by the View.
This is how I have initialized the stateFlow in viewModel:
Now here is the thing between stateflow and liveData - StateFlow has some advantages over liveData
You don't need to worry what thread you are on to update the value of StateFlow.
You always have a defined initial state.
A guarantee of the value being non null - where you are observing, because of course you cannot set a null value.
But It comes at an additional cost:
StateFlow is not lifecycle aware like liveData.
In order to make stateFlow lifecycle aware we are using launchWhenStarted of LifecycleCoroutineScope. This will ensure UI events wont be observed when the view is no longer active.
However we would need to cancel the coroutine scope of launchWhenStarted explicitly on onStop, because otherwise the older scopes will still keep receiving the updates, when the UI becomes active again.
So Its completely up to you whether you want to have lifecycle aware liveData or you want StateFlow for the UI layer - depending upon how you want to handle the logic.
rxJava had a good learning curve. Once you get accustomed to it - it's a life saver no doubt. But for the parts where we just need to offload some work off the the main thread and there is no such dire need for reactivity we can easily switch to coroutines and get the job done effectively.
Here is the link to the project on github. Master branch has the rxJava code and the branch branch_coroutine has the coroutine migrated code.
An Android app that fetches a list of countries from an open API and displays it in a recycler view
Application uses MVVM architecture and below is the structire
We fetch list of counries from an open API and cache it in the File System
When there is no network we return the cached data
If there is no data available in the cache we show user an error.
The App uses RXJava, retrofit along with dagger Android as tech stack.