Playing with Kotlin Flows

Davide Cerbo
6 min readOct 22, 2019

--

Nowadays we are listening to words Reactive programming every day, and many projects, like Spring, are adding reactive support to their libraries.

If you are a Java or Kotlin programmer you usually hear frameworks like RxJava, Reactive Streams, and Reactor, but with Kotlin there is a new alternative provided by the language: Flows!

Flows are basically asynchronous streams that relay on coroutines for their execution. When we are talking about asynchronous streams we can have:

  • hot stream: the stream is always active and the subscription to the flow starts from the point where is the emission of data on the stream. Usually, you can think about a movie on television, you start to see it starting from when you turn on the TV, not from the beginning.
  • cold stream: the stream always starts from the beginning. Coming back to the movie, a cold stream is when you are seeing it from Netflix (or your favorite streaming platform) and you will start it from the beginning. Generally, we can say that a cold stream is lazy.

Flows are cold streams, very similar to sequences, but they allow suspending and, as I said before, they are asynchronous.

For those who don’t know coroutines, they provide an alternative to thread blocking using suspension. This means that the caller thread will not be blocked while it waits for resources, but it will be suspended, without block anything (i.g. our UI), while it is waiting.

Table of contents

Flows anatomy

The anatomy of a flow statement is very easy:

flow {
emit("Hello world")
}.collect(...)

The first row creates the flow (builder), the second row emits something on the flow, and the third row starts the flow and collects elements (terminal operation). Because flows are lazy, these don’t start until the terminal operation is invoked.

Flows lifecycle

Flows API provides some method that can be invoked during the lifecycle of a flow

flow {
(1..5).forEach {
delay(1000) //delay the emission
emit(it)
if (it == 2) throw RuntimeException("Error on $it")
}
}
.onEach { println("On each $it") }
.onStart { println("Starting flow") }
.onCompletion { println("Flow completed") }
.catch { ex -> println("Exception message: ${ex.message}") }
.toList()

In the example above I build a flow that emits numbers from 1 to 5, every second, the expected output will be:

Starting flow
On each 1
On each 2
Flow completed
Exception message: Error on 2

As you can see, I print something in the console at the beginning and at the end of the flow, for each emission, and I catch the error after the second emission. The flow will complete where there is an exception.

The method toList is a terminal operation helpful to collect all values in a list.

Intermediate flow operators

As for other types of streams, I can transform the flow using the map method:

(1..5)
.asFlow()
.map { it * 2 }
.onEach { println("On each $it") }
.toList()

The returned list will be 2, 4, 6, 8, 10.

Or flatMapConcat if I would flat flow of flows:

(1..5)
.asFlow()
.flatMapConcat {
flow {
emit(it)
emit(it * it)
}
}
.onEach { println("On each $it") }
.toList()

The result will be 1, 1, 2, 4, 3, 9, 4, 16, 5, 25.

These operators are for transforming, but there were more operators for limiting the size of the emission, like drop, to skip firsts data emitted:

(1..5)
.asFlow()
.drop(2) //skips first two emitted integers
.toList() //the result will be [3, 4, 5]

Or, like take, to terminate the flow after an amount of data emitted:

(1..5)
.asFlow()
.take(2) //takes first two emitted integers
.toList() //the result will be [1, 2]

Error handling

In order to handle error on Flows, we can use the flow operator catch to log a message, as in the lifecycle example above, or we can resume another flow using emit, to emit a single element at once, or emitAll to emit a whole flow:

(1..5).asFlow().onEach {
if (it == 3) throw RuntimeException("Error on $it")
}.catch {
emitAll((99..100).asFlow())
}.toList()

The example below will return [1, 2, 99, 100].

To handle error we can also use retry or retryWhen operator:

(1..5).asFlow().onEach {
if (it == 3) throw RuntimeException("Error on $it")
}
.onEach { println("Emitting $it") }
.retryWhen { cause, attempt ->
println("cause $cause attempt, $attempt")
attempt < 1
}.toList()

In the example above in the console we will see:

Emitting 1
Emitting 2
cause java.lang.RuntimeException: Error on 3, attempt 0
Emitting 1
Emitting 2
cause java.lang.RuntimeException: Error on 3, attempt 1

And finally an exception with the message “Error on 3” will be thrown.

So, the flow will be retried 2 times before the failure. The flow is cold, so it will be executed, always, from the beginning.

Comparing to the traditional reactive framework there is a huge difference. For instance in Project Reactor, the exception will be shown only if it is explicitly defined as an onError function, otherwise, the exception will be ignored.

Backpressure

This is one of the best features of reactive programming and it is borrowed from fluid dynamics. Wikipedia defines it as:

Resistance or force opposing the desired flow of fluid through pipes

Want to know more? Read this post!

Testing

Testing is a crucial practice in our daily life. To try the example above I wrote this test class using AssertK, an assertion library for Kotlin inspired by AssertJ.

The use of this library is very simple:

@Test
fun `map value test`() = runBlocking {
assertThat {
(1..5)
.asFlow()
.map { it * 2 }
.onEach { println("On each $it") }
.toList()
}.isSuccess().containsExactly(2, 4, 6, 8, 10)
}

Before all, I need to use runBlocking, because coroutines are asynchronous and my test will end before the code execution. Inside that, I use assertThat to wrap the code I’m testing, and after I will verify that the result is what I’m expecting. In the official documentation and on the example code that I provided on Github, you can find more examples.

Integration with existing libraries

Today there is a lot of libraries that provide a reactive implementation, someone is using Reactor, others are using RxJava, not so many are using Kotlin Flows. This could be an issue, but, fortunately, Kotlin's power comes in our help! Using extension function it is possible to convert a Flow to Reactor Flux, or to a RxJava Flowable, and vice-versa:

//Project Reactor
val reactorFlux: Flux<String> = flow { ... }.asFlux()
val fluxToFlow: Flow<String> = reactorFlux.asFlow()
//RxJava
val rxjavaFlowable: Flowable<String> = flow { ... }.asFlowable()
val flowableToFlow: Flow<String> = rxjavaFlowable.asFlow()
//Reactive Streams
val rStreamFlowable: Publisher<String> = flow { ... }.asPublisher()
val flowableToFlow: Flow<String> = rStreamFlowable.asFlow()

This could be very useful in order to reuse existing code and libraries.

Just remember to add the right dependency:

<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<!-- OR --><dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-rx2</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>
<!-- OR --><dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactive</artifactId>
<version>${kotlinx-coroutines.version}</version>
</dependency>

More on Kotlin Coroutine

Resources

--

--

Davide Cerbo
Davide Cerbo

Written by Davide Cerbo

Open-space invader, bug hunter and software writer

Responses (1)