Davide Cerbo

Dec 1, 2019

4 min read

Back-pressure in Kotlin Flows

With back-pressure, your tasks become polite and they wait for their turn.

Resistance or force opposing the desired flow of fluid through pipes

So, in our domain, it means to control the data flow to be sure that the load doesn’t kill our application. Traditionally reactive programming libraries have some dedicate feature but with Flows the back-pressure management is achieved using Kotlin suspending functions. All things in Flows are marked with suspend modifier, so if there is a high load, the execution will be suspended without blocking any thread, until the application is ready to accept more elements.

fun currTime() = System.currentTimeMillis()
fun threadName() = Thread.currentThread().namevar start: Long = 0

fun emitter(): Flow<Int> =
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(1000)
print("Emit $it (${currTime() - start}ms) ")
}
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.collect {
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.flowOn(Dispatchers.Default)
.collect {
...
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.buffer()
.collect {
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
emitter()
.conflate()
.collect {
print("\nCollect $it starts (${currTime() - start}ms) ")
delay(3000)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
print("\nCollected in $time ms")
}

More on Kotlin Coroutine