Coroutine Testing Patterns: Effective Strategies for Testing Asynchronous Kotlin Code
Testing asynchronous code has always been challenging, and Kotlin’s coroutines and flows are no exception. However, the Kotlin team has provided powerful testing utilities that make this process more manageable and reliable. In this blog post, we’ll explore effective patterns for testing coroutines and flows, from basic unit tests to complex integration scenarios.
The Foundation: kotlinx-coroutines-test
Before diving into specific patterns, let’s establish the foundation. The kotlinx-coroutines-test
library provides essential tools for testing coroutines:
1
2
3
| dependencies {
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.7.3")
}
|
This library offers several key components:
TestCoroutineScheduler
: Controls virtual time for coroutinesStandardTestDispatcher
: A dispatcher that uses the test schedulerUnconfinedTestDispatcher
: A dispatcher that executes coroutines eagerlyTestScope
: A coroutine scope with test-specific functionality
Let’s see how to set up a basic test environment:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import kotlinx.coroutines.test.*
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
class BasicCoroutineTest {
@Test
fun `basic coroutine test`() = runTest {
// runTest creates a TestScope with a StandardTestDispatcher
val result = fetchData() // suspend function called within a coroutine scope
assertEquals("Data", result)
}
private suspend fun fetchData(): String {
// Simulate network delay
delay(1000)
return "Data"
}
}
|
The runTest
function creates a coroutine test environment that:
- Runs your test in a
TestScope
- Uses a
StandardTestDispatcher
by default - Automatically advances virtual time to complete suspended coroutines
- Fails the test if any coroutine throws an exception
Testing Custom Flow Operators
Custom Flow operators are a powerful way to encapsulate reusable stream processing logic. Testing them thoroughly is essential to ensure they behave as expected under various conditions.
Let’s consider a custom operator that filters and transforms items:
1
2
3
4
5
6
7
8
9
10
| fun <T, R> Flow<T>.filterAndMap(
predicate: suspend (T) -> Boolean,
transform: suspend (T) -> R
): Flow<R> = flow<R> {
collect { value ->
if (predicate(value)) {
emit(transform(value))
}
}
}
|
Here’s how to test this operator:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| @Test
fun `filterAndMap should filter and transform items`() = runTest {
// Given
val sourceFlow = flowOf(1, 2, 3, 4, 5)
val isEven: suspend (Int) -> Boolean = { it % 2 == 0 }
val double: suspend (Int) -> Int = { it * 2 }
// When
val resultFlow = sourceFlow.filterAndMap(isEven, double)
// Then
val result = resultFlow.toList()
assertEquals(listOf(4, 8), result)
}
|
For more complex operators, test different edge cases:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| @Test
fun `filterAndMap should handle empty flows`() = runTest {
// Given
val emptyFlow = emptyFlow<Int>()
// When
val resultFlow = emptyFlow.filterAndMap(
predicate = { true },
transform = { it * 2 }
)
// Then
val result = resultFlow.toList()
assertEquals(emptyList(), result)
}
@Test
fun `filterAndMap should propagate exceptions from predicate`() = runTest {
// Given
val sourceFlow = flowOf(1, 2, 3)
val throwingPredicate: suspend (Int) -> Boolean = {
if (it == 2) throw RuntimeException("Test exception")
true
}
// When/Then
assertThrows<RuntimeException> {
runBlocking {
sourceFlow.filterAndMap(
predicate = throwingPredicate,
transform = { it }
).toList()
}
}
}
|
When testing operators that involve timing, use the test scheduler to control virtual time:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| @Test
fun `debounce operator should emit only after specified delay`() = runTest {
// Given
val testScope = this
val flow = flow<String> {
emit("A")
testScope.advanceTimeBy(90)
emit("B")
testScope.advanceTimeBy(90)
emit("C")
testScope.advanceTimeBy(200)
emit("D")
}
// When
val results = mutableListOf<String>()
val job = launch {
flow.debounce(100).collect { results.add(it) }
}
// Advance time to complete all operations
advanceUntilIdle()
job.cancel()
// Then
assertEquals(listOf("C", "D"), results)
}
|
Testing Timeout and Cancellation
Proper handling of timeouts and cancellation is crucial for robust coroutine code. Here’s how to test these scenarios:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class TimeoutService {
suspend fun fetchWithTimeout(id: String, api: Api): Result<Data> {
return try {
// Use withTimeout to limit execution time
val data = withTimeout(1000) {
api.fetchData(id)
}
Result.success(data)
} catch (e: TimeoutCancellationException) {
Result.failure(e)
}
}
fun processWithCancellationCheck(input: Flow<Int>): Flow<Int> = input
.map<Int, Int> { value ->
ensureActive() // Check for cancellation
value * 2
}
.onCompletion<Int> { cause ->
if (cause is CancellationException) {
// Log or handle cancellation
}
}
}
|
Testing timeout behavior:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| @Test
fun `fetchWithTimeout should return success when API responds in time`() = runTest {
// Given
val mockApi = mock<Api> {
onBlocking { fetchData("123") } doAnswer {
delay(500) // Respond within timeout
Data("test")
}
}
val service = TimeoutService()
// When
val result = service.fetchWithTimeout("123", mockApi)
// Then
assertTrue(result.isSuccess)
assertEquals(Data("test"), result.getOrNull())
}
@Test
fun `fetchWithTimeout should return failure when API times out`() = runTest {
// Given
val mockApi = mock<Api> {
onBlocking { fetchData("123") } doAnswer {
delay(2000) // Exceed timeout
Data("test")
}
}
val service = TimeoutService()
// When
val result = service.fetchWithTimeout("123", mockApi)
// Then
assertTrue(result.isFailure)
assertTrue(result.exceptionOrNull() is TimeoutCancellationException)
}
|
Testing cancellation handling:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| @Test
fun `processWithCancellationCheck should handle cancellation properly`() = runTest {
// Given
val service = TimeoutService()
val flow = flow {
repeat(10) {
emit(it)
delay(100)
}
}
// When
val results = mutableListOf<Int>()
val job = launch {
service.processWithCancellationCheck(flow).collect {
results.add(it)
if (results.size >= 5) {
cancel() // Cancel after collecting 5 items
}
}
}
// Then
advanceUntilIdle()
assertEquals(5, results.size)
assertEquals(listOf(0, 2, 4, 6, 8), results)
}
|
Conclusion
Testing coroutines and flows effectively requires understanding both the testing utilities provided by the Kotlin team and the patterns that work best for different scenarios. By using the techniques outlined in this post, you can create reliable tests for even the most complex asynchronous code:
- Use
kotlinx-coroutines-test
as your foundation for testing coroutines - Test custom Flow operators thoroughly with different inputs and edge cases
- Simulate various dispatch scenarios to ensure your code works across different threading models
- Verify proper handling of timeouts and cancellation
- Create custom test dispatchers when you need more control
- Build comprehensive integration tests that verify the entire flow of data
Remember that good tests not only verify that your code works correctly but also serve as documentation for how it should be used. By investing time in writing thorough tests for your coroutine code, you’ll create more robust applications and make future maintenance easier.
As coroutines and flows continue to evolve, stay updated with the latest testing utilities and best practices. The Kotlin team regularly improves the testing libraries to make our lives as developers easier and our code more reliable.