Kotlin Asynchronous Flow

[Fuente: https://kotlinlang.org/docs/flow.html]

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

Representing multiple values

Multiple values can be represented in Kotlin using collections. For example, we can have a simple function that returns a List of three numbers and then print them all using forEach:

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) }
}

Open in Playground →

Target: JVMRunning on v.1.9.22

You can get the full code from here.

This code outputs:

1
2
3

Sequences

If we are computing the numbers with some CPU-consuming blocking code (each computation taking 100ms), then we can represent the numbers using a Sequence:

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

Open in Playground →

Target: JVMRunning on v.1.9.22

You can get the full code from here.

This code outputs the same numbers, but it waits 100ms before printing each one.

Suspending functions

However, this computation blocks the main thread that is running the code. When these values are computed by asynchronous code we can mark the simple function with a suspend modifier, so that it can perform its work without blocking and return the result as a list:

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

Open in Playground →

Target: JVMRunning on v.1.9.22

You can get the full code from here.

This code prints the numbers after waiting for a second.

Flows

Using the List<Int> result type, means we can only return all the values at once. To represent the stream of values that are being computed asynchronously, we can use a Flow<Int> type just like we would use a Sequence<Int> type for synchronously computed values:

              
fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

Open in Playground →

Target: JVMRunning on v.1.9.22

You can get the full code from here.

This code waits 100ms before printing each number without blocking the main thread. This is verified by printing “I’m not blocked” every 100ms from a separate coroutine that is running in the main thread:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

Notice the following differences in the code with the Flow from the earlier examples:

  • A builder function of Flow type is called flow.
  • Code inside a flow { ... } builder block can suspend.
  • The simple function is no longer marked with a suspend modifier.
  • Values are emitted from the flow using an emit function.
  • Values are collected from the flow using a collect function.

We can replace delay with Thread.sleep in the body of simple‘s flow { ... } and see that the main thread is blocked in this case.

Flows are cold

Flows are cold streams similar to sequences — the code inside a flow builder does not run until the flow is collected. This becomes clear in the following example:

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

Open in Playground →

Target: JVMRunning on v.1.9.22

You can get the full code from here.

Which prints:

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

This is a key reason the simple function (which returns a flow) is not marked with suspend modifier. The simple() call itself returns quickly and does not wait for anything. The flow starts afresh every time it is collected and that is why we see “Flow started” every time we call collect again.

Flow cancellation basics

Flows adhere to the general cooperative cancellation of coroutines. As usual, flow collection can be cancelled when the flow is suspended in a cancellable suspending function (like delay). The following example shows how the flow gets cancelled on a timeout when running in a withTimeoutOrNull block and stops executing its code:

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

Open in Playground →

Target: JVMRunning on v.1.9.22

Notice how only two numbers get emitted by the flow in the simple function, producing the following output:

Emitting 1
1
Emitting 2
2
Done

See Flow cancellation checks section for more details.

Flow builders

The flow { ... } builder from the previous examples is the most basic one. There are other builders that allow flows to be declared:

  • The flowOf builder defines a flow that emits a fixed set of values.
  • Various collections and sequences can be converted to flows using the .asFlow() extension function.

For example, the snippet that prints the numbers 1 to 3 from a flow can be rewritten as follows:

// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }

Open in Playground →

Target: JVMRunning on v.1.9.22

You can get the full code from here.