Kotlin introduced Flow as a powerful, coroutine-friendly way to handle streams of asynchronous data. Flow is now central to modern Android development, Kotlin backend services, and reactive-style data processing—especially when combined with StateFlow, SharedFlow, and Jetpack libraries.
But what exactly is a Flow? How does it differ from LiveData or RxJava Observables? What makes it “cold”? And how do you build, transform, and consume flows safely?
This article will guide you through:
- What Flow is
- Cold vs hot streams
- Flow builders (flow, flowOf, asFlow)
- Flow operators (map, filter, flatMap, etc.)
- Exceptions in Flow and how to handle them
- Flow’s built-in backpressure solution
- A real example: processing real-time data streams
Let’s start with the basics.
What Is Flow?
A Flow is a type that represents a stream of asynchronously produced values.
Think of it as:
A sequence that emits values over time.
Where a List<Int> gives all values at once, a Flow<Int> gives them one by one, asynchronously.
Key characteristics of Flow
- Cold: nothing happens until someone collects it
- Asynchronous: built on coroutines, not threads
- Suspending: can pause between emissions
- Declarative: supports many operators (map, filter, etc.)
Flow is defined as:
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
Simple example
val flow = flow {
emit(1)
emit(2)
emit(3)
}
Collecting it:
flow.collect { value ->
println(value)
}
Output:
1
2
3
This is the simplest example of a cold flow.
Cold vs Hot Streams
To understand Flow properly, you must understand the difference between cold and hot streams.
Cold Flow
Cold flows behave like:
- a function call
- a generator
- a fresh stream for every collector
Characteristics:
- Each collector starts from the beginning
- Work is executed once per collector
- Flows do not run unless
collect()is called
Analogy:
A cold water tap that only starts running when you turn it on.
Example: cold Flow
val flow = flow {
println("Start producing")
emit(1)
emit(2)
}
flow.collect { println("Collector A: $it") }
flow.collect { println("Collector B: $it") }
Output:
Start producing
Collector A: 1
Collector A: 2
Start producing
Collector B: 1
Collector B: 2
The side effect (Start producing) happens twice — once per collector.
Hot Stream
Hot streams emit values regardless of collectors.
Examples:
- Broadcast channel
- SharedFlow
- StateFlow
- RxJava Subjects
- LiveData
Example analogy:
A radio transmitter broadcasting continuously—listeners either tune in or miss data.
Flow = cold stream by default
This makes Flow predictable and safe.
You always know:
- when it starts
- who controls it
- when it stops
Hot flows exist too (SharedFlow, StateFlow), but classical Flow is cold.
Flow Builders: flow, flowOf, asFlow
These builders help you turn values or collections into Flows.
1. flow {} — the most flexible builder
Allows for suspending, loops, and custom asynchronous logic.
val numbers = flow {
for (i in 1..3) {
delay(500)
emit(i)
}
}
Explanation
delay()works because Flow is coroutine-basedemit()sends a value downstream- This flow emits 1, 2, 3 with half-second delays
2. flowOf() — simple static flow
Creates a flow of predefined values.
val flow = flowOf(1, 2, 3)
Useful for testing.
3. asFlow() — convert collections or ranges
Examples:
listOf(1,2,3).asFlow()
(1..5).asFlow()
sequenceOf("A","B").asFlow()
Behind the scenes, it uses a simple loop with a suspension point inside.
Flow Operators (map, filter, flatMap)
Operators transform or combine flows.
They are suspending and intermediate: nothing runs until collection.
map
Transforms each value:
val doubled = flow.map { it * 2 }
filter
Keeps only values that meet a condition:
val evens = flow.filter { it % 2 == 0 }
flatMapConcat / flatMapMerge / flatMapLatest
These transform each value into a new Flow.
flatMapConcat
Sequential: waits for each inner flow.
flowOf(1,2).flatMapConcat {
flowOf("$it A", "$it B")
}
Output:
1 A
1 B
2 A
2 B
flatMapMerge
Runs flows concurrently.
flowOf(1,2).flatMapMerge {
flow {
delay(200)
emit("$it done")
}
}
Faster due to concurrency.
flatMapLatest
Cancels previous inner flow when new value arrives.
Perfect for search/filter UI.
Example:
searchFlow
.flatMapLatest { query ->
api.search(query) // new search cancels previous one
}
collect
Terminal operator that starts the whole pipeline.
flow.collect {
println(it)
}
Exceptions in Flow
Flows handle exceptions using coroutine rules:
- Exceptions inside a Flow upstream cancel the entire flow
try/catchworks around operatorscatchoperator can intercept upstream errors
Option 1: try/catch
try {
flow.collect { println(it) }
} catch(e: Exception) {
println("Error: $e")
}
Option 2: catch operator
More idiomatic:
flow
.map { process(it) }
.catch { e -> emit(-1) }
.collect { println(it) }
Behavior:
- catches upstream errors
- does NOT catch downstream consumer exceptions
Finally with Flow
You can use onCompletion:
flow
.onCompletion { println("Done") }
.collect()
Backpressure Mechanism
Backpressure occurs when:
The producer emits values faster than the consumer can process them.
Flow handles backpressure naturally because it is suspending:
- The producer suspends if the consumer is slow
- No buffering unless explicitly requested
- No dropped, overwritten, or queued values by default
Example:
flow {
repeat(5) {
println("Emit $it")
emit(it)
}
}.collect {
delay(1000) // slow consumer
println("Received $it")
}
Output:
Emit 0
Received 0
Emit 1
Received 1
...
The producer waits for the consumer—problem solved.
Special cases: buffers and channels
Flow provides:
buffer()conflate()collectLatest()
But default behavior = safe and backpressure-aware.
Example: Processing Real-Time Data Streams
Let’s build a practical example using Flow for real-time processing.
Scenario
You receive sensor data from a device every 100ms:
- readings can be noisy
- you only want readings above a threshold
- you want to calculate a moving average
- you want to stop processing when cancelled
Perfect use case for Flow.
Step 1: Create a flow that emits sensor data
fun sensorFlow(): Flow<Int> = flow {
while (true) {
val reading = readSensor() // imaginary function
emit(reading)
delay(100)
}
}
Explanation
- This is a cold stream: starts when collected
emitsends sensor valuesdelay(100)simulates real-time input
Step 2: Process the data
val processedFlow = sensorFlow()
.filter { it > 50 } // remove noise
.map { it / 2 } // scale values
.onEach { println("Reading: $it") }
Step 3: Compute a moving average
fun Flow<Int>.movingAverage(window: Int): Flow<Double> = flow {
val buffer = ArrayDeque<Int>()
collect { value ->
buffer += value
if (buffer.size > window) buffer.removeFirst()
val avg = buffer.average()
emit(avg)
}
}
How it works
collectconsumes upstream valuesbufferstores the latest N readingsemit(avg)sends moving average downstream
Step 4: Combine everything
val averaged = processedFlow.movingAverage(window = 5)
Step 5: Collect in a coroutine
CoroutineScope(Dispatchers.Default).launch {
averaged.collect { avg ->
println("Average: $avg")
}
}
Cancellation behavior
If the coroutine collecting the flow is cancelled:
- sensor loop stops
- map/filter stop
- movingAverage stops
- delays stop
- everything cleans up
Flow handles cancellation automatically.
Conclusion
Kotlin Flow is a powerful, flexible, coroutine-friendly solution for handling asynchronous streams. It’s safe, backpressure-aware, cold by default, and highly composable.
You learned:
- What Flow is and why it exists
- Cold vs hot streams
- Flow builders (
flow,flowOf,asFlow) - Operators (
map,filter,flatMapLatest, etc.) - Exception handling (
catch,onCompletion, try/catch) - Flow’s natural backpressure solution
- A real-world example for real-time data
Key takeaways
- Flow is cold: nothing runs until collected
- Operators are declarative and suspend-friendly
- Backpressure is handled automatically
- Flows respond to coroutine cancellation
- You can build highly complex pipelines with minimal code
