Android Unidirectional Data Flow — Kotlin Flow vs. RxJava

Recap

UDF Structure

UDF Structure

  1. The view creates events initiated by the user or Android lifecycle.
  2. The ViewModel requests data from the repository layer.
  3. The repository returns data from the network/database.
  4. The ViewModel creates/updates the view state, comprising the UI, and effects, which are one-time events like navigation or error-handling.
  5. The view observes the view states/effect changes.

:muscle|type_1_2: UDF Strengths

  • Linear flow of app logic with events initiating the data returned
  • Control UI & async events
  • Pre-defined test structure
  • Quicker debugging

Additional setups required

Photo by Jeremy Bishop on Unsplash

In addition to implementing the core libraries for Retrofit , Room , and the Paging tools below, additional setup is required for Flow and Rx.

Core functionality

:ocean:️️️ Flow — Add kotlinx.coroutines

  • When creating a new project in Android Studio select Kotlin as the language under the Configure your project step.
  • For creating/updating the view state and effects the kotlinx.coroutines library, starting with release 1.3.6 , provides StateFlow and MutableStateFlow similar to LiveData ’s immutable and mutable variants.

⛓️ Rx — Requires two additional libraries

  • RxKotlin library for standard Rx functionality including RxJava.
  • RxAndroid library for Android-specific functionality like using the main thread with AndroidSchedulers.MainThread .

Retrofit network requests

:ocean: ️️ Flow — Works out of the box

Rx — Add RxJava adapter to Retrofit.

  • There are different RxJava adapter libraries based on the RxJava version number, as outlined in this StackOverflow post .

Room SQL database and Paging library

️:ocean: Flow — Add the LiveData lifecycle .

  • This is used to convert the LiveData PagedList into a Flow using the asFlow extension method.

⛓️ Rx — Requires two additional libraries

See: Room :link: RxJava

Additional setups summary

In addition to the normal Retrofit, Room, and Paging setups, Flow requires three fewer libraries compared to Rx.

build.gradle(:someModule)

:ocean: Flow

⛓️ Rx

Data layer — Retrofit network requests

Photo by Scott Webb on Unsplash

Being able to handle both one-time requests and constant streams of information from the network are key capabilities of reactive programming.

Request return type

The type required for Flow, of a List , is simpler than using an Observable . However, Flow’s functions are slightly more complicated requiring the suspend modifier in order to be run with proper threading outlined in the Process data section below. Returning a list is useful to handle information more easily as we’ll also see in the Check response status section .

FeedService.kt

:ocean: Flow

⛓️ Rx

Backpressure

The sample in CryptoTweets showcases a simple one-time request. However, it is worth mentioning more advanced cases in which Flow and Rx have the ability to handle constant streams of data that may lead to backpressure. Backpressure is when there is an ongoing and incoming stream of data that needs to be processed. For instance, this can be from a web-socket type network request that maintains a constant connection. Managing backpressure requires processing the incoming data without causing issues such as slowing down the UI, crashing the app, or losing important information.

:ocean: Flow

Coroutines handle backpressure by default with the flow / collect pattern by managing data streams synchronously. Flow by default performs sequentially. The code runs one emission from within the source flow, then one collection is run. This process continues until all of the data is processed.

You would not see any code that explicitly handles back-pressure, because it automatically happens behind the scenes due to the support for suspension that is provided by Kotlin compiler. — Roman Elizarov (Kotlin Team Lead), Reactive Streams and Kotlin Flows

Coroutines offer further strategies to handle backpressure and improve the speed of which data is processed.

buffer
conflate
collectLatest

⛓️ Rx

By default, an Observable will not manage backpressure. If there is an existing source emitting data, and then is later observed, the data prior to the observation/subscription will not be processed. Instead, Rx handles backpressure using a Flowable which defines the strategy to handle backpressure.

  • BackpressureStrategy.BUFFER — Allows the source to hold the data until the subscriber can consume it, similar to what coroutines do by default
  • BackpressureStrategy.DROP — Discards events that cannot be consumed in time by the subscriber, similar to coroutines’ conflate .
  • BackpressureStrategy.LATEST — Overwrites previous data with new data if the subscriber is behind

See: RxJava 2 — Flowable , Baeldung

Check response status

After receiving information from a network request, doing a quick check on the status of the information is often required.

:ocean: Flow

Because the threading and lifecycle are defined at the initialization of the flow as shown in the Process data section below, the tweetsResponse may simply be checked for emptiness.

FeedRepository.kt

⛓ ️Rx

This requires subscribing to to the data and defining the threading in order to do a basic check, like whether the data is empty. subscribeOn defines the thread that will launch the process, and observeOn defines on which thread the data will be returned on. Finally, subscribe returns the data. This pattern also creates a layer of nested code.

FeedRepository.kt

Process data

:ocean: Flow

When Flow is initiated, the threading and lifecycle is defined. flowOn defines which thread the flow will be launched in and withContext defines in which thread that data will be returned. The process is started in the Dispatchers.IO thread, as a network request is being sent, and returned in Dispatchers.Main , as the response will populate the view state and UI.

launchIn defines the lifecycle. The lifecycle is automatically managed by the Android ViewModel. The coroutine Flow will be destroyed when the ViewModel is destroyed.

FeedViewModel.kt

⛓️ Rx

The threading is defined with the same methods as in Check response status.

However, the lifecycle of the Observable needs to be managed. By adding the Observable to a CompositeDisposable , all of the created Observables can be cleared by the view activity/fragment in onDestroy by calling the public method in the ViewModel clearAndDisposeDisposables .

FeedViewModel.kt

Data layer — Room database with Paging library

Photo by David Clode on Unsplash

Inserting and querying data from the DAO

:ocean:⛓️ Flow + Rx

Both use a DataSource.Factory in the data access object in order to insert and query data from the Room SQL database.

FeedDao.kt

Creating a PagedList

:ocean: Flow

The FeedDao getAllTweets method is called, and the extension function toLiveData builds LiveData of type PagedList. Then, the extension function asFlow converts this into a Flow of type PagedList. The tweetsQuery data can be collected using collect and emitted using emit to the ViewModel which initiated the repository data request.

Additional threading management could be added using withContent if further processing was required on other threads. Otherwise, the threading has already been defined when this method was initiated in the ViewModel above.

FeedRepository.kt

⛓️ Rx

The process here is similar, but using toObservable extension function to convert the query directly to an observable instead of to LiveData first. Threading is managed twice, first off in the ViewModel when building the view states and effects and initializing the repository request, and secondly here in the repository when processing the data.

FeedRepository.kt

View states — States + effects

Photo by Vino Li on Unsplash

:ocean: Flow

This is where mutable MutableStateFlow and immutable StateFlow are useful. For updating the view state data in the ViewModel a private mutable data class is used which updates the public non-mutable view state that is observed by the view, in this case a fragment. The view effects follow the same pattern.

FeedViewState.kt

FeedViewModel.kt

FeedFragment.kt

⛓️ Rx

Rx Subjects are both an Observer and an Observable, allowing their data to be mutated and observed. ReplaySubject is a great fit for the view state and effects because all values added to the Observable are emitted to all of its subscribers.

Also, in addition to clearing and disposing of the CompositeDisposable in the ViewModel above, all observers in the fragment must be cleared too.

See: Exploring RxJava in Android — Different types of Subjects , Anitaa Murthy

FeedViewState.kt

FeedViewModel.kt

FeedFragment.kt

⚖️ Summary — Advantages of Flow and Rx

Photo by 贝莉儿 DANIST on Unsplash

In cases working with legacy code or teams not yet onboard with Kotlin Flow, it is important to be able to write well structured code with Rx so that all of the developers working on common code adopt a similar strategy. Given the opportunity to create a new app/feature, Kotlin Flow has a short learning curve, built-in integrations with Android components and widely adopted open-source libraries, and expanding it’s use cases rapidly.

:ocean: Flow

  • Quick setup — Requires three fewer libraries compared to Rx.
  • Lifecycle management — Inherently controls the lifecycle of the operation upon launch.
  • Threading management — Can be handled in one place when the flow is launched rather than each time data is observed. Additional customization can be added if needed. This results in cleaner and simpler code.
  • Avoid nesting nightmares — The callback style pattern is harder to work with and creates the need to initialize instance variables within a function to retrieve information from within nested logic, like when creating the PagedList above.
  • Ease-of-use — It is easy to learn new topics via reading the good ol’ documentation . The docs provide interactive examples for almost every topic to test, iterate, break, and understand. Does anyone truly understand the marbles diagrams in Rx?

⛓️ Rx

  • Mass adoption — Most apps built pre-2018 use Rx.
  • Battle tested APIs — Kotlin Flow contains many experimental APIs, indicated by @ExperimentalCoroutinesApi , which is temporary as the libraries mature.
  • Larger selection of operators — In CryptoTweets the data stream is simple, but in more complex cases, due to the maturity of Rx, there might be more advanced operators/transformations not yet available with Kotlin coroutines and Flow. For both CryptoTweets and the beta app I’ve published to the Play Store, Coinverse , I have not run into such cases where coroutines have not been able to do the job.
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章