Flows — Cold Streams in Kotlin

Kotlin introduced Flow as a powerful, coroutine-friendly way to handle streams of asynchronous data. Flow is now central to modern Android development, Kotlin backend services, and reactive-style data processing—especially when combined with StateFlow, SharedFlow, and Jetpack libraries.

But what exactly is a Flow? How does it differ from LiveData or RxJava Observables? What makes it “cold”? And how do you build, transform, and consume flows safely?

This article will guide you through:

  1. What Flow is
  2. Cold vs hot streams
  3. Flow builders (flow, flowOf, asFlow)
  4. Flow operators (map, filter, flatMap, etc.)
  5. Exceptions in Flow and how to handle them
  6. Flow’s built-in backpressure solution
  7. A real example: processing real-time data streams

Let’s start with the basics.


What Is Flow?

A Flow is a type that represents a stream of asynchronously produced values.
Think of it as:

A sequence that emits values over time.

Where a List<Int> gives all values at once, a Flow<Int> gives them one by one, asynchronously.

Key characteristics of Flow

  • Cold: nothing happens until someone collects it
  • Asynchronous: built on coroutines, not threads
  • Suspending: can pause between emissions
  • Declarative: supports many operators (map, filter, etc.)

Flow is defined as:

interface Flow<out T> {
    suspend fun collect(collector: FlowCollector<T>)
}

Simple example

val flow = flow {
    emit(1)
    emit(2)
    emit(3)
}

Collecting it:

flow.collect { value ->
    println(value)
}

Output:

1
2
3

This is the simplest example of a cold flow.


Cold vs Hot Streams

To understand Flow properly, you must understand the difference between cold and hot streams.


Cold Flow

Cold flows behave like:

  • a function call
  • a generator
  • a fresh stream for every collector

Characteristics:

  • Each collector starts from the beginning
  • Work is executed once per collector
  • Flows do not run unless collect() is called

Analogy:

A cold water tap that only starts running when you turn it on.

Example: cold Flow

val flow = flow {
    println("Start producing")
    emit(1)
    emit(2)
}

flow.collect { println("Collector A: $it") }
flow.collect { println("Collector B: $it") }

Output:

Start producing
Collector A: 1
Collector A: 2
Start producing
Collector B: 1
Collector B: 2

The side effect (Start producing) happens twice — once per collector.


Hot Stream

Hot streams emit values regardless of collectors.
Examples:

  • Broadcast channel
  • SharedFlow
  • StateFlow
  • RxJava Subjects
  • LiveData

Example analogy:

A radio transmitter broadcasting continuously—listeners either tune in or miss data.


Flow = cold stream by default

This makes Flow predictable and safe.
You always know:

  • when it starts
  • who controls it
  • when it stops

Hot flows exist too (SharedFlow, StateFlow), but classical Flow is cold.


Flow Builders: flow, flowOf, asFlow

These builders help you turn values or collections into Flows.


1. flow {} — the most flexible builder

Allows for suspending, loops, and custom asynchronous logic.

val numbers = flow {
    for (i in 1..3) {
        delay(500)
        emit(i)
    }
}

Explanation

  • delay() works because Flow is coroutine-based
  • emit() sends a value downstream
  • This flow emits 1, 2, 3 with half-second delays

2. flowOf() — simple static flow

Creates a flow of predefined values.

val flow = flowOf(1, 2, 3)

Useful for testing.


3. asFlow() — convert collections or ranges

Examples:

listOf(1,2,3).asFlow()

(1..5).asFlow()

sequenceOf("A","B").asFlow()

Behind the scenes, it uses a simple loop with a suspension point inside.


Flow Operators (map, filter, flatMap)

Operators transform or combine flows.
They are suspending and intermediate: nothing runs until collection.


map

Transforms each value:

val doubled = flow.map { it * 2 }


filter

Keeps only values that meet a condition:

val evens = flow.filter { it % 2 == 0 }


flatMapConcat / flatMapMerge / flatMapLatest

These transform each value into a new Flow.

flatMapConcat

Sequential: waits for each inner flow.

flowOf(1,2).flatMapConcat {
    flowOf("$it A", "$it B")
}

Output:

1 A
1 B
2 A
2 B


flatMapMerge

Runs flows concurrently.

flowOf(1,2).flatMapMerge {
    flow {
        delay(200)
        emit("$it done")
    }
}

Faster due to concurrency.


flatMapLatest

Cancels previous inner flow when new value arrives.
Perfect for search/filter UI.

Example:

searchFlow
    .flatMapLatest { query ->
        api.search(query) // new search cancels previous one
    }


collect

Terminal operator that starts the whole pipeline.

flow.collect {
    println(it)
}


Exceptions in Flow

Flows handle exceptions using coroutine rules:

  • Exceptions inside a Flow upstream cancel the entire flow
  • try/catch works around operators
  • catch operator can intercept upstream errors

Option 1: try/catch

try {
    flow.collect { println(it) }
} catch(e: Exception) {
    println("Error: $e")
}


Option 2: catch operator

More idiomatic:

flow
    .map { process(it) }
    .catch { e -> emit(-1) }
    .collect { println(it) }

Behavior:

  • catches upstream errors
  • does NOT catch downstream consumer exceptions

Finally with Flow

You can use onCompletion:

flow
    .onCompletion { println("Done") }
    .collect()


Backpressure Mechanism

Backpressure occurs when:

The producer emits values faster than the consumer can process them.

Flow handles backpressure naturally because it is suspending:

  • The producer suspends if the consumer is slow
  • No buffering unless explicitly requested
  • No dropped, overwritten, or queued values by default

Example:

flow {
    repeat(5) {
        println("Emit $it")
        emit(it)
    }
}.collect {
    delay(1000) // slow consumer
    println("Received $it")
}

Output:

Emit 0
Received 0
Emit 1
Received 1
...

The producer waits for the consumer—problem solved.

Special cases: buffers and channels

Flow provides:

  • buffer()
  • conflate()
  • collectLatest()

But default behavior = safe and backpressure-aware.


Example: Processing Real-Time Data Streams

Let’s build a practical example using Flow for real-time processing.

Scenario

You receive sensor data from a device every 100ms:

  • readings can be noisy
  • you only want readings above a threshold
  • you want to calculate a moving average
  • you want to stop processing when cancelled

Perfect use case for Flow.


Step 1: Create a flow that emits sensor data

fun sensorFlow(): Flow<Int> = flow {
    while (true) {
        val reading = readSensor()   // imaginary function
        emit(reading)
        delay(100)
    }
}

Explanation

  • This is a cold stream: starts when collected
  • emit sends sensor values
  • delay(100) simulates real-time input

Step 2: Process the data

val processedFlow = sensorFlow()
    .filter { it > 50 }         // remove noise
    .map { it / 2 }             // scale values
    .onEach { println("Reading: $it") }


Step 3: Compute a moving average

fun Flow<Int>.movingAverage(window: Int): Flow<Double> = flow {
    val buffer = ArrayDeque<Int>()

    collect { value ->
        buffer += value
        if (buffer.size > window) buffer.removeFirst()

        val avg = buffer.average()
        emit(avg)
    }
}

How it works

  • collect consumes upstream values
  • buffer stores the latest N readings
  • emit(avg) sends moving average downstream

Step 4: Combine everything

val averaged = processedFlow.movingAverage(window = 5)


Step 5: Collect in a coroutine

CoroutineScope(Dispatchers.Default).launch {
    averaged.collect { avg ->
        println("Average: $avg")
    }
}


Cancellation behavior

If the coroutine collecting the flow is cancelled:

  • sensor loop stops
  • map/filter stop
  • movingAverage stops
  • delays stop
  • everything cleans up

Flow handles cancellation automatically.


Conclusion

Kotlin Flow is a powerful, flexible, coroutine-friendly solution for handling asynchronous streams. It’s safe, backpressure-aware, cold by default, and highly composable.

You learned:

  • What Flow is and why it exists
  • Cold vs hot streams
  • Flow builders (flow, flowOf, asFlow)
  • Operators (map, filter, flatMapLatest, etc.)
  • Exception handling (catch, onCompletion, try/catch)
  • Flow’s natural backpressure solution
  • A real-world example for real-time data

Key takeaways

  • Flow is cold: nothing runs until collected
  • Operators are declarative and suspend-friendly
  • Backpressure is handled automatically
  • Flows respond to coroutine cancellation
  • You can build highly complex pipelines with minimal code

Leave a Reply

Your email address will not be published. Required fields are marked *