Flows — Cold Streams in Kotlin

Kotlin Flow often gets introduced as “RxJava but simpler” or “LiveData replacement.” That framing is convenient, but it hides what actually makes Flow powerful:

  • Flow is a coroutine-native stream abstraction with structured concurrency semantics.
  • It is cold by default, and “hot” streams are explicit types with explicit guarantees.
  • It has backpressure built-in through suspension, with additional tools for buffering and conflation.
  • Operators are not magic: they are suspending transformations that obey cancellation, context, and sequentiality rules.

This article targets middle–senior Android/Kotlin developers. The goal is not to list APIs, but to build a mental model you can rely on when flows behave unexpectedly in production.


Table of Contents

What Flow Is

Flow is a protocol: “how values are produced and consumed over time”

The Flow<T> interface is small:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

That looks trivial, but the semantics are where the value is:

A Flow is a description of an asynchronous computation that can emit multiple values sequentially, suspending between emissions, and that runs under coroutine cancellation.

“Cold” is the default execution model

A cold flow does not run until someone collects it. That implies:

  • No work is done (no network, no DB, no CPU) until collection begins.
  • Each collector triggers a new run of the upstream code.
  • Errors and cancellation are tied to the collector’s coroutine.

Example:

val f = flow {
    println("Starting upstream work")
    emit(fetchFromNetwork()) // suspends
}

f.collect { println("A: $it") }
f.collect { println("B: $it") }

You will see “Starting upstream work” twice.

Why this matters in Android: cold flows are ideal for “do work when UI needs it,” and structured cancellation means you don’t leak when UI stops collecting.

Flow is sequential by default (not single-threaded)

When you do:

flow.map { ... }.filter { ... }.collect { ... }

By default, this is a single logical pipeline: upstream emits → operator transforms → downstream consumes, in order.

  • Sequential means each element is processed in order.
  • It does not mean “runs on one thread.” Threads come from coroutine dispatchers (flowOn, collectors’ context, etc.).

Flow’s default sequentiality is a feature: it makes reasoning about ordering and cancellation much easier than implicit concurrency.

Flow vs suspend: “one result” vs “a timeline of results”

A suspend function returns a single value (or throws). A Flow models a timeline:

  • It can emit 0…N values.
  • It can emit values over time (e.g., DB updates).
  • It can suspend between emissions (backpressure-friendly).
  • It can be cancelled mid-stream in a structured way.

Cold vs Hot Flow

This is the most common conceptual gap. Many bugs come from treating a cold flow like a hot stream, or vice versa.

Cold Flow — “execution per collector”

Properties:

  • Lazy: does nothing until collected.
  • Re-executed: each collector runs upstream from scratch.
  • No shared state by default: collectors don’t influence each other.

Typical use cases:

  • One-shot network calls (flow { emit(api.get()) })
  • DB query streams (Room Flow<List<T>> is cold but reactive)
  • Computations you want to restart per screen / per subscription

Hot streams — “producer exists independently of collectors”

Hot streams produce values regardless of collectors. In Kotlin Flow world you mostly use:

  • StateFlow for state
  • SharedFlow for events / broadcasts
  • Channel for one-to-one communication (lower level)

Hot streams introduce questions cold flows avoid:

  • What happens when there are no subscribers?
  • Do new subscribers get past values?
  • Can values be dropped under pressure?

Those questions are answered by replay, buffer, and overflow policy.


StateFlow vs SharedFlow

If you only remember one thing:

StateFlow = state holder. SharedFlow = event/broadcast stream.

But let’s make it actionable.

StateFlow: “the latest state, always available”

Properties

  • Always has a current value (value)
  • New collectors immediately receive the latest value
  • Conflated by nature: only the latest value is kept
  • distinctUntilChanged() behavior is built in (state updates that are equal may be suppressed depending on how you set it)

Example:

data class UiState(val loading: Boolean, val items: List<String>)

val _state = MutableStateFlow(UiState(loading = true, items = emptyList()))
val state: StateFlow<UiState> = _state

When to use StateFlow (Android)

  • UI state in ViewModel
  • Any “source of truth” that the UI can render from at any time
  • Long-lived state that should survive collectors starting/stopping (e.g., configuration changes)

Why StateFlow fits UI
UI needs state, not history. When the screen re-subscribes, it wants:

  • “What is the current state now?”
    not:
  • “Replay the last 50 transitions.”

Pitfall
Using StateFlow for one-time events leads to repeated events on re-collection:

  • Navigation event in state → screen rotates → new collector sees same state → navigates again.

Solution: handle events differently (SharedFlow, Channel, or event wrappers).


SharedFlow: “a hot broadcast with configurable replay/buffer”

SharedFlow is essentially a coroutine-native “hot observable” where you decide:

  • replay: how many past values to replay to new subscribers
  • extraBufferCapacity: additional buffering beyond replay
  • onBufferOverflow: what to do when buffer is full

Example:

val _events = MutableSharedFlow<UiEvent>(
    replay = 0,
    extraBufferCapacity = 64,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<UiEvent> = _events

When to use SharedFlow

  • Multiple collectors should see the same stream (broadcast)
  • You need “events” rather than “state”
  • You want to tune replay/buffering explicitly

SharedFlow patterns in Android

  1. UI events (snackbar, navigation, toasts):
    • Usually replay = 0 so new collectors don’t re-trigger old events.
  2. Shared data stream (e.g., shared repository updates):
    • Might use replay = 1 to provide latest cached emission to newcomers.

StateFlow vs SharedFlow decision table

Use this as a practical rule set:

  • Choose StateFlow if:
    • There is a meaningful “current value” at all times.
    • UI can render from it immediately.
    • Losing intermediate updates is acceptable because only the latest matters.
  • Choose SharedFlow if:
    • You are modeling a stream of events (there may be no “current”).
    • You need broadcast to multiple collectors.
    • You need control over replay/buffer/drop behavior.

If you are uncertain, ask:

“If a new subscriber arrives now, should it instantly receive something?”

  • If yes, likely StateFlow (or SharedFlow with replay=1, but then ask why it isn’t state).
  • If no, likely SharedFlow (replay=0).

Channel vs SharedFlow for events

  • Channel is one-to-one (a send is received by a single receiver unless you fan out manually).
  • SharedFlow is one-to-many broadcast.

For UI events from ViewModel to UI:

  • If you want broadcast (rare): SharedFlow
  • If you want single consumption semantics: Channel (often used as Channel + receiveAsFlow())

However, SharedFlow can emulate many event patterns if configured correctly. The real choice depends on whether you want fan-out and how you handle collectors.


Flow Builders (with real-world intent)

flow {} — “build a cold flow with suspending emits”

Use when you want a cold flow and your upstream logic is suspending:

fun userFlow(id: String): Flow<User> = flow {
    emit(api.getUser(id))
}

Key property: emit suspends → producer cooperates with consumer (backpressure-friendly).


flowOf, asFlow — “simple sources”

  • flowOf(a, b, c) for known values.
  • list.asFlow() for turning iterables into flows.

Use them for operator pipelines, testing, and quick prototypes.


callbackFlow / channelFlow — “bridge callback/concurrent producers”

Use when values come from:

  • callback APIs
  • listeners (location updates, sensors)
  • multiple coroutines producing concurrently

Example bridging a callback:

fun locationUpdates(): Flow<Location> = callbackFlow {
    val callback = object : LocationCallback() {
        override fun onLocationResult(result: LocationResult) {
            trySend(result.lastLocation) // non-suspending
        }
    }
    client.requestLocationUpdates(request, callback, Looper.getMainLooper())

    awaitClose { client.removeLocationUpdates(callback) }
}

Why callbackFlow exists:

  • it gives you a safe lifecycle boundary (awaitClose)
  • it gives you a channel under the hood so you can emit from non-suspending callbacks

Flow Operators

Instead of listing operators alphabetically, we’ll group them by intent, and for each operator we’ll discuss:

  • What it guarantees
  • What it does not
  • Where it’s typically used

Transforming values

map

Property: 1-to-1 transformation, preserves order, does not change timing.
Use when: you want to convert domain → UI, parse, or compute derived values.

val uiModels = usersFlow.map { it.toUiModel() }


transform

Property: can emit 0…N values per input; you control emission.
Use when: mapping isn’t enough—e.g., emit intermediate states or conditionally emit.

flowOf("a", "bb").transform { s ->
    emit(s.length)
    if (s.length > 1) emit(-1)
}


scan (a.k.a. running reduce)

Property: emits accumulated state on every element.
Use when: build state progressively (great for reducers / MVI).

val runningTotal = flowOf(1,2,3).scan(0) { acc, v -> acc + v }
// emits: 0,1,3,6


runningReduce

Same idea as scan but without an initial value.


Filtering and shaping the stream

filter

Property: drops elements that don’t match predicate.
Use when: remove noise, enforce constraints.

flow.filter { it.isValid }


filterNotNull

Property: removes nulls and narrows type.
Use when: you have nullable upstream but want a clean downstream.


distinctUntilChanged

Property: suppresses consecutive duplicates.
Use when: UI updates should not re-render for same value.

Note: StateFlow behaves similarly by default because it’s state-like.


take, drop, takeWhile, dropWhile

Property: controls how many items pass through.
Use when:

  • take(1) for “first result wins”
  • takeWhile for “until condition breaks”

Side effects

onEach

Property: performs side effect and passes original element through unchanged.
Use when: logging, metrics, caching, debug traces.

flow.onEach { Timber.d("value=$it") }

If the side effect is expensive or suspending, remember it can slow the pipeline (and that might be good—backpressure).


onStart

Property: runs when collection begins; can emit.
Use when: emit initial loading state, start analytics span.


onCompletion

Property: runs when upstream completes or is cancelled (cause may be non-null).
Use when: cleanup, emit terminal UI state, stop timers.


Terminal operators (collection and materialization)

collect

Property: suspends until flow completes or coroutine is cancelled.
Use when: you want to react to every element.


collectLatest

Property: cancels previous collector block when a new value arrives.
Use when: UI rendering should always show latest, and previous work is obsolete.

Example: rendering expensive UI or processing search results:

searchResults.collectLatest { results ->
    render(results) // if new results arrive, previous render is cancelled
}

Important: collectLatest is effectively a backpressure strategy: “drop/cancel ongoing work for previous values.”


first, firstOrNull, single, toList

Use when: you need to bridge flow → synchronous-ish result.

Be careful: toList() collects everything → may be unbounded memory if flow is infinite.


Combining flows

combine

Property: emits whenever any upstream emits, using latest values from all.
Use when: deriving UI state from multiple sources (user + settings + cache).

val ui = combine(userFlow, settingsFlow) { user, settings ->
    UiState(user, settings)
}

Key point: combine needs each upstream to emit at least once before it emits.


zip

Property: pairs elements by index; waits for both.
Use when: you truly need 1-to-1 pairing (rare in UI, more in data pipelines).


merge

Property: interleaves emissions from multiple flows (no pairing).
Use when: multiple sources are independent events.


Flattening (Flow-of-Flow)

These operators define what happens when each upstream value triggers an async sub-stream.

flatMapConcat

Property: sequential; waits for each inner flow to finish before starting next.
Use when: order must be preserved and operations must not overlap (e.g., queue writes).


flatMapMerge

Property: concurrent; inner flows run in parallel up to concurrency.
Use when: independent requests can be parallelized (batch loading, prefetch).

ids.asFlow().flatMapMerge(concurrency = 4) { id ->
    flow { emit(api.get(id)) }
}

Trade-off: results can arrive out of order.


flatMapLatest

Property: cancels previous inner flow when new upstream value arrives.
Use when: “latest input wins” patterns (search queries, text changes).

This is one of the most common Flow tools in Android UI.


Timing and rate control

debounce

Property: waits for quiet period before emitting.
Use when: user typing, reducing spam events.


sample

Property: emits latest value at a fixed interval.
Use when: you want regular updates (e.g., sensor stream) without processing everything.


timeout

(If used via custom patterns / withTimeout around collect; Flow doesn’t have a direct timeout operator in base, but patterns exist.)
Use when: fail fast for slow sources.


Context and concurrency control

flowOn

Property: changes upstream context (where upstream runs).
Use when: upstream does heavy work and should not run on Main.

val f = flow { emit(cpuHeavy()) }.flowOn(Dispatchers.Default)

Common pitfall: flowOn affects upstream only, not downstream.


buffer

Property: decouples producer and consumer using a buffer; can improve throughput.
Use when: upstream produces fast, downstream occasionally slow, and you can tolerate buffering.


conflate

Property: keeps only latest value when collector is slow (drops intermediate).
Use when: only latest matters (UI state refresh), and intermediate values aren’t meaningful.


Exceptions in Flow

How exceptions propagate

In Flow, exceptions follow normal coroutine rules:

  • If upstream throws, collection fails unless handled.
  • Cancellation is not an “error” in the same way; it’s a control signal.
flow {
    emit(1)
    error("boom")
}.collect { println(it) } // throws

catch catches upstream, not downstream

This is subtle but critical:

flow
  .map { riskyMap(it) }
  .catch { e -> emit(fallbackValue) }  // catches exceptions from upstream of catch
  .collect { riskyCollect(it) }        // exceptions here are NOT caught by catch

If riskyCollect throws, you need try/catch around collect (or handle inside collector).

Correct patterns

Pattern A: fallback value

val safe = upstream.catch { emit(default) }

Pattern B: map exception into domain error

val resultFlow = upstream
    .map { Result.success(it) }
    .catch { emit(Result.failure(it)) }

This is great for UI: a single stream of “success or error state” instead of exceptions bubbling into the UI layer.

retry and retryWhen

When to use:

  • transient errors (network)
  • backoff strategies
  • rate-limited APIs
upstream.retryWhen { cause, attempt ->
    if (cause is IOException && attempt < 3) {
        delay(200L * (attempt + 1))
        true
    } else false
}

Important: Don’t retry on logical errors (HTTP 4xx) unless you have a reason.

Cancellation vs failure

Cancellation exceptions (like CancellationException) are used by coroutines to stop work.
Treating them as errors often causes weird bugs (e.g., “show error toast when user navigated away”).

Good practice:

  • Don’t swallow cancellations unintentionally.
  • When catching, rethrow cancellation unless you are certain.

Flow’s Built-in Backpressure

Backpressure is where Flow’s coroutine-native design becomes obvious.

Backpressure problem statement

If a producer emits values faster than the consumer can handle, you get one of these failure modes:

  • Memory growth (buffer grows unbounded)
  • Latency growth (consumer is always behind)
  • Dropped values (sometimes okay, sometimes catastrophic)

RxJava solved this with explicit backpressure-aware types and complex operator contracts.

Flow solves it with a simpler rule:

By default, Flow backpressure is handled by suspension: the producer suspends when the consumer is slow.

Why “suspension = backpressure” actually works

In a basic flow:

val f = flow {
    repeat(1_000) {
        emit(it)          // suspends if downstream is not ready
    }
}

f.collect {
    delay(50)             // slow consumer
}

Because emit is suspending, the producer cannot outpace the consumer indefinitely. There is no implicit unbounded queue.

This is the safest default:

  • stable memory usage
  • predictable throughput
  • natural cancellation

Throughput vs responsiveness trade-off

Suspension-based backpressure is safe, but sometimes you want different behavior:

  • UI rendering: you may want “latest only” to stay responsive.
  • Telemetry: you may want to drop old events under load.
  • Batch processing: you may want buffering to maximize throughput.

Flow gives you explicit tools to choose.


buffer: decouple producer and consumer

upstream
    .buffer(capacity = 64)
    .collect { slowWork(it) }

What it does

  • Producer can run ahead up to the buffer size.
  • Consumer drains buffer at its own pace.

When to use

  • Upstream is bursty (fast bursts), downstream is occasionally slow.
  • You can tolerate some queued values.

Risk

  • Buffers add memory usage.
  • Buffers increase latency (you may process stale values).

Buffer overflow strategies (important for hot streams)

For SharedFlow (and channel-based flows), you can choose overflow behavior:

  • SUSPEND: backpressure via suspension (default)
  • DROP_OLDEST: keep new values, drop old
  • DROP_LATEST: keep old values, drop new

Example:

val events = MutableSharedFlow<Event>(
    replay = 0,
    extraBufferCapacity = 64,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

When to DROP_OLDEST

  • “Latest matters” (UI events like progress updates)
  • You want responsiveness over completeness

When to DROP_LATEST

  • You want to ensure earlier queued items are processed (rare in UI)
  • You prefer consistency of earlier events over accepting new ones

conflate: keep only the latest when collector is slow

upstream
    .conflate()
    .collect { render(it) }

What it does

  • If collector is slow, intermediate values are dropped.
  • Collector always gets the most recent available.

When to use

  • UI state updates where intermediate states are not meaningful
  • streams like “current progress percent” where only latest matters

Pitfall

  • If every intermediate value matters (e.g., financial ticks), conflation breaks correctness.

collectLatest: cancellation-based backpressure

collectLatest is not exactly the same as conflate, but it is often the better UI tool.

upstream.collectLatest { value ->
    expensiveRender(value) // cancelled if a new value arrives
}

What it gives you

  • You don’t just drop values—you also cancel stale work.
  • Prevents wasted CPU when new values arrive frequently.

When to use

  • search results rendering
  • diff calculations
  • large recompositions or expensive UI pipelines

Backpressure with flatMapMerge and concurrency

flatMapMerge(concurrency = N) is a throughput lever—and also a backpressure control lever.

  • Too low concurrency: slow throughput.
  • Too high concurrency: overloads network/CPU → can become the producer of overload.

Use it intentionally.

A common pattern:

ids.asFlow()
  .flatMapMerge(concurrency = 4) { id -> flow { emit(api.get(id)) } }
  .buffer(16)
  .collect { consume(it) }

Here:

  • concurrency limits parallel work
  • buffer smooths bursts
  • collector remains stable

Hot stream backpressure in UI (StateFlow/SharedFlow)

  • StateFlow is conflated: latest wins.
    • Great for UI state.
    • It naturally avoids overload from rapid updates.
  • SharedFlow depends on configuration:
    • if onBufferOverflow=SUSPEND and no buffer → producers may suspend (backpressure)
    • if you drop values → you’re choosing responsiveness over completeness

Putting it together: a realistic Android ViewModel pattern

State with StateFlow, Events with SharedFlow

data class UiState(
    val query: String = "",
    val loading: Boolean = false,
    val results: List<String> = emptyList(),
    val error: String? = null
)

sealed interface UiEvent {
    data class ShowSnackbar(val message: String) : UiEvent
    data object NavigateToDetails : UiEvent
}

class SearchViewModel(
    private val repo: Repo
) : ViewModel() {

    private val _state = MutableStateFlow(UiState())
    val state: StateFlow<UiState> = _state

    private val _events = MutableSharedFlow<UiEvent>(
        replay = 0,
        extraBufferCapacity = 16,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    val events: SharedFlow<UiEvent> = _events

    private val queryFlow = _state
        .map { it.query }
        .distinctUntilChanged()
        .debounce(300)

    init {
        queryFlow
            .flatMapLatest { query ->
                flow {
                    emit(repo.search(query)) // suspend call
                }
                .onStart { _state.update { it.copy(loading = true, error = null) } }
                .catch { e ->
                    _state.update { it.copy(loading = false, error = e.message) }
                    _events.tryEmit(UiEvent.ShowSnackbar("Search failed"))
                    emit(emptyList())
                }
            }
            .onEach { results ->
                _state.update { it.copy(loading = false, results = results) }
            }
            .launchIn(viewModelScope)
    }

    fun onQueryChanged(q: String) {
        _state.update { it.copy(query = q) }
    }
}

Why this is a good model

  • StateFlow holds current UI state (safe for re-collection).
  • SharedFlow emits transient events without replay.
  • flatMapLatest ensures latest query wins (cancels previous work).
  • Error handling maps exceptions into UI state + event, not crashes.

Closing Thoughts

If you treat Flow as “Rx but smaller,” you’ll fight it.
If you treat Flow as “structured concurrency + streams,” it becomes predictable.

Here are the mental models I want you to leave with:

  • Cold Flow: “execution per collector”
  • StateFlow: “always-current state”
  • SharedFlow: “broadcast with replay/buffer/drop policies”
  • Backpressure: “suspension by default; buffering/conflation/cancellation when you choose”
  • Operators: “explicit semantics; choose concat/merge/latest based on ordering and cancellation needs”

Leave a Reply

Your email address will not be published. Required fields are marked *