Back-pressure in Kotlin Flows

With back-pressure, your tasks become polite and they wait for their turn.

Do you want more information about Kotlin Flow? Read this first!

This is one of the best features of reactive programming and it is borrowed from fluid dynamics. Wikipedia defines it as:

Resistance or force opposing the desired flow of fluid through pipes

So, in our domain, it means to control the data flow to be sure that the load doesn’t kill our application. Traditionally reactive programming libraries have some dedicate feature but with Flows the back-pressure management is achieved using Kotlin suspending functions. All things in Flows are marked with suspend modifier, so if there is a high load, the execution will be suspended without blocking any thread, until the application is ready to accept more elements.

Let’s stop thinking about this way too serious definition and let’s start thinking about a funnier definition! Let’s imagine that you have an oven and you would like to bake your muffin. If you have only 4 spots in your oven, you can bake only 4 muffins at once. If you try to bake all muffins together, you need more electricity and, if you are lucky, you will burn your sweet, or if you are unlucky, you can break your oven. Any way you think that you are going faster, but it isn’t true and it is very risky. Now think that your oven is a software installed in a server with 4 CPUs, electricity is memory usage, and think what you are doing to your poor application!

Not so clear? Let’s go with a little example! For instance, look at the simple flow below that emits one element every second:

fun currTime() = System.currentTimeMillis()
fun threadName() = Thread.currentThread().namevar start: Long = 0

fun emitter(): Flow<Int> =
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(1000)
print("Emit $it (${currTime() - start}ms) ")
}

and collects values in 3 seconds using the code below:

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.collect {
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}

The example needs around 20 seconds (3s * 5 + 1s * 5) to end, especially because the element collecting is very slow:

The example above is a clear simulation of a situation where the collecting is slower than emission and so we need a back-pressure mechanism, and as we can see, this is inside the Flow nature.

The first idea that could come in our mind can be to move the fast emit operation in another thread, so the slow collect phase can take its time:

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.flowOn(Dispatchers.Default)
.collect {
...

We can do better? Of course, for instance, we can decide to buffer the collecting, so we don’t rely on a new thread instance, with all downsides about memory and context switching:

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.buffer()
.collect {
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}

So the new output will be:

As you can see above, when the collector is busy, the program starts the buffering and the collecting starts again after two emissions when the first collect method invocation ends. But, more importantly, the flow needs only ~16 seconds to be executed, like the solution that relies on a new thread instance.

In some scenarios, we can decide to lose data to be even faster. In this case, we can conflate our emission by missing some elements.

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.conflate()
.collect {
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}

As you can see below, only the last element emitted before that the collector became free it was collected. So, we miss emissions 2 and 4, but the flow needs only ~10 seconds to be executed.

More on Kotlin Coroutine

--

--

--

Open-space invader, bug hunter and software writer

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

THE LAST DAY OF THE 7 DAYS TOP RACING EVENT AT “DRAGON 8”

Industry Use Case for Kubernetes — Booking.com

What’s left before the Hatch!

Setting short Periodic and Absolute timeouts in AWS & Serverless

Dropshipping Without Oberlo: 15 Awesome Oberlo Alternatives

ELA Punks 1.0

This is GREAT NEWS #WSGARMY!🎊

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store

Davide Cerbo

Open-space invader, bug hunter and software writer

More from Medium

Kotlin Multiplatform library with Kotest and Gradle Version Catalog

Kotlin generic type inference doesn’t support super constructor call’s argument type.

Filter your dependencies artifacts by repository

Gradle Task Ordering