Testing Coroutines & Flows

Testing coroutine-based code is one of the most important skills for Android and Kotlin developers. Coroutines introduce asynchronous execution, background work, Flow streams, delays, and lifecycle–all of which make testing more complex than testing simple synchronous functions.

Fortunately, the Kotlin team provides powerful test utilities that allow you to:

  • control coroutine execution
  • eliminate randomness
  • fast-forward time (without real delays)
  • capture Flow emissions
  • test ViewModels deterministically
  • ensure code behaves as expected under concurrency

This article explains everything you need to know about testing coroutine and Flow code effectively and cleanly.

We cover:

  1. Standard coroutine test utilities
  2. TestCoroutineScheduler
  3. Testing suspend functions
  4. Testing Flow emissions
  5. Controlling time with advanceTimeBy
  6. Example: Unit testing a ViewModel with StateFlow

This article assumes you have a basic understanding of coroutines and flows.


Standard Coroutine Test Utilities

Kotlin provides a dedicated library for testing coroutine code:

testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:<version>"

This gives access to:

  • runTest
  • TestScope
  • TestCoroutineDispatcher (deprecated but still used in older code)
  • TestCoroutineScheduler
  • virtual time manipulation (advanceTimeBy, advanceUntilIdle)
  • UnconfinedTestDispatcher
  • functions for replacing Dispatchers.Main

runTest — the recommended entry point

You should use runTest to run nearly all coroutine tests.

Example:

@Test
fun testSomething() = runTest {
    val result = mySuspendFunction()
    assertEquals(42, result)
}

runTest creates:

  • a deterministic scheduler
  • a coroutine scope
  • a test dispatcher
  • safe handling for delays, timeouts, and scheduling

Important behavior

Inside runTest, delays do not cause real waiting.

delay(5000)

runs instantly in tests.

This is critical for fast tests.


Replacing Dispatchers.Main

For ViewModel or UI code, you often must replace the Main dispatcher.

At test start:

private val testDispatcher = UnconfinedTestDispatcher()

@Before
fun setup() {
    Dispatchers.setMain(testDispatcher)
}

@After
fun tearDown() {
    Dispatchers.resetMain()
}

This ensures:

  • UI-bound code works without Android environment
  • Coroutine scheduling is deterministic
  • No accidental use of real main thread

TestCoroutineScheduler

TestCoroutineScheduler is the brains behind the testing environment.

It controls:

  • virtual time
  • delays
  • execution order
  • scheduling of tasks

In older coroutine test APIs you had to manage dispatchers manually.
Now the scheduler is built directly into runTest.


Why scheduler matters

Real coroutines rely on the system clock.
In tests, relying on real time leads to:

  • slow tests
  • flaky behavior
  • timing bugs

Virtual time removes all of this.

Example:

@Test
fun testTime() = runTest {
    var count = 0

    launch {
        delay(1000)
        count++
    }

    assertEquals(0, count)

    advanceTimeBy(1000)

    assertEquals(1, count)
}

The scheduler executes the delayed coroutine instantly when we advance time.


Testing Suspend Functions

Suspend functions may contain:

  • delay
  • network calls
  • database queries
  • CPU switching (withContext)
  • Flow operators

Testing suspend functions is straightforward.


Basic test

suspend fun loadUser(): User {
    delay(1000)
    return User("Alice")
}

@Test
fun testSuspendFunction() = runTest {
    val user = loadUser()
    assertEquals("Alice", user.name)
}

Why this is easy?

Because:

  • delay(1000) does not block real time
  • everything runs synchronously within virtual time
  • test remains fast

Testing suspend functions using IO/Default dispatchers

This pattern works fine:

@Test
fun testWithDispatcher() = runTest {
    val result = withContext(Dispatchers.IO) {
        computeStuff()
    }
    assertEquals(expected, result)
}

Inside runTest:

  • Dispatchers.IO and Default are automatically replaced
  • everything becomes deterministic

However, if you want stricter control, you can inject dispatchers via constructor.


Testing Flow Emissions

Flow testing is one of the most important parts of coroutine testing because:

  • flows emit many values
  • emissions may depend on time
  • they may collect indefinitely
  • collectors can suspend
  • debounce/timeout operators must be tested

Kotlin provides multiple ways to test flows.


1. Test Flow with toList()

@Test
fun testSimpleFlow() = runTest {
    val flow = flow {
        emit(1)
        emit(2)
    }

    val results = flow.toList()

    assertEquals(listOf(1, 2), results)
}


2. Testing time-based Flows (debounce, delay, etc.)

@Test
fun testDebounceFlow() = runTest {
    val stateFlow = MutableStateFlow("a")

    val results = mutableListOf<String>()

    val job = launch {
        stateFlow
            .debounce(300)
            .collect { results += it }
    }

    stateFlow.value = "b"
    advanceTimeBy(300)

    assertEquals(listOf("b"), results)
    job.cancel()
}

Key points

  • advanceTimeBy triggers debounce completion
  • no real waiting
  • test remains deterministic

3. Using Turbine (HIGHLY recommended)

Turbine is a Flow testing library:

testImplementation "app.cash.turbine:turbine:<version>"

Example:

@Test
fun testUsingTurbine() = runTest {
    flowOf(1, 2, 3).test {
        assertEquals(1, awaitItem())
        assertEquals(2, awaitItem())
        assertEquals(3, awaitItem())
        awaitComplete()
    }
}

Turbine makes it easier to test:

  • long streams
  • event-based flows
  • cancellations
  • Flow completion
  • time-based emissions

Controlling Time with advanceTimeBy

One of the most powerful coroutine test features is virtual time.

Instead of:

delay(5000)

which slows tests by 5 seconds…

We use:

advanceTimeBy(5000)

which executes instantly.


Example: testing debounce logic

@Test
fun testDebounce() = runTest {
    val input = MutableStateFlow("")

    val results = mutableListOf<String>()

    val job = launch {
        input.debounce(400).collect { results += it }
    }

    input.value = "h"
    input.value = "he"
    input.value = "hel"

    advanceTimeBy(400)

    assertEquals(listOf("hel"), results)

    job.cancel()
}

Explanation

debounce(400) only emits after 400 ms of no new values.
Virtual time makes this trivial.


advanceUntilIdle

This runs all scheduled tasks until no more are pending.

advanceUntilIdle()

Works well with:

  • channels
  • flows
  • large asynchronous systems

Example: Unit Testing ViewModel with StateFlow

Now let’s integrate everything into a real-world test.


ViewModel to test

data class UiState(
    val loading: Boolean = false,
    val data: List<String> = emptyList(),
    val error: String? = null
)

class MyViewModel(
    private val repo: Repository
) : ViewModel() {

    private val _state = MutableStateFlow(UiState())
    val state = _state.asStateFlow()

    fun load() {
        viewModelScope.launch {
            _state.value = UiState(loading = true)

            try {
                val items = repo.fetchItems()
                _state.value = UiState(data = items)
            } catch (e: Exception) {
                _state.value = UiState(error = e.message)
            }
        }
    }
}


Mock repository

Using a mock implementation:

class FakeRepo : Repository {
    override suspend fun fetchItems(): List<String> {
        delay(1000)
        return listOf("A", "B", "C")
    }
}


Test

@OptIn(ExperimentalCoroutinesApi::class)
class MyViewModelTest {

    private val dispatcher = UnconfinedTestDispatcher()
    private lateinit var vm: MyViewModel

    @Before
    fun setup() {
        Dispatchers.setMain(dispatcher)
        vm = MyViewModel(FakeRepo())
    }

    @After
    fun tearDown() {
        Dispatchers.resetMain()
    }

    @Test
    fun testViewModelStateFlow() = runTest {
        val states = mutableListOf<UiState>()

        val job = launch {
            vm.state.toList(states)
        }

        vm.load()

        assertEquals(true, states[1].loading)  // first update

        // simulate delay() inside repo
        advanceTimeBy(1000)

        assertEquals(listOf("A", "B", "C"), states.last().data)

        job.cancel()
    }
}


Explanation (important)

1. Replacing Dispatchers.Main

ViewModel uses viewModelScope → uses Dispatchers.Main by default.
We replace it with a test dispatcher.

2. Collecting StateFlow

We use:

vm.state.toList(states)

This collects all emissions into a MutableList.

3. Triggering load

vm.load() updates state twice:

  • first: loading = true
  • second: after fetch: data = [...]

4. Virtual time

advanceTimeBy(1000) completes the repository delay instantly.

5. Assertions

We assert states in correct order and content.


Conclusion

Testing coroutines and flows is essential for writing reliable asynchronous code in Android and Kotlin. The coroutine test library makes it possible to run tests:

  • deterministically
  • instantly
  • without real delays
  • without real threads
  • without Android environment

You learned how to:

  • use standard coroutine test utilities
  • control execution with TestCoroutineScheduler
  • test suspend functions cleanly
  • test Flow emissions (manually or with Turbine)
  • manipulate virtual time via advanceTimeBy
  • unit test a ViewModel using StateFlow

Key takeaways

  • Always use runTest for coroutine testing
  • Replace Dispatchers.Main in ViewModel tests
  • Flow testing is easier with Turbine
  • Virtual time allows instant delay testing
  • StateFlow makes view models predictable and testable

Leave a Reply

Your email address will not be published. Required fields are marked *