Esta pagina se ve mejor con JavaScript habilitado

Kotlin Mutex: Concurrencia Segura para Coroutines

 ·  ☕ 12 minutos lectura  ·  ✍️ Ignacio Carrión

Kotlin Mutex: Concurrencia Segura para Coroutines

Al construir aplicaciones concurrentes con coroutines de Kotlin, proteger el estado mutable compartido es esencial. Aunque las herramientas tradicionales de sincronización de Java como bloques synchronized y ReentrantLock funcionan, bloquean hilos y no se integran bien con el modelo de suspensión de coroutines. Aquí es donde entra Mutex — una primitiva de sincronización compatible con coroutines que proporciona exclusión mutua sin bloquear hilos.

Esta guía explora cuándo usar Mutex, mejores prácticas y cómo se compara con otros mecanismos de control de concurrencia.


TL;DR: Recomendaciones Rápidas

  • Usa Mutex cuando necesites proteger estado mutable compartido al que acceden múltiples coroutines.
  • Prefiere Mutex sobre synchronized en código con coroutines para evitar bloquear hilos.
  • Usa mutex.withLock { } para adquisición y liberación automática del bloqueo.
  • Considera Actor o StateFlow para escenarios de gestión de estado más complejos.
  • Para contadores simples, usa AtomicInteger o AtomicReference en su lugar.
  • Usa Semaphore cuando necesites limitar el acceso concurrente a múltiples permisos.
  • Siempre libera los bloqueos en bloques finally si no usas withLock.

¿Qué es Mutex?

Mutex (exclusión mutua) es una primitiva de sincronización de kotlinx.coroutines que asegura que solo una coroutine pueda ejecutar una sección crítica a la vez. A diferencia de los bloqueos tradicionales que bloquean hilos, Mutex suspende coroutines, manteniendo los hilos libres para hacer otro trabajo.

Estructura básica:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()

suspend fun protectedOperation() {
    mutex.withLock {
        // Sección crítica - solo una coroutine a la vez
        // Modifica el estado compartido de forma segura aquí
    }
}

Características clave:

  • No bloqueante: Suspende coroutines en lugar de bloquear hilos
  • Justo: Otorga acceso en orden FIFO por defecto
  • No reentrante: Una coroutine que mantiene el bloqueo no puede adquirirlo nuevamente (previene deadlock)
  • Ligero: Más eficiente que los bloqueos que bloquean hilos

Casos de Uso Principales para Mutex

1. Proteger Estado Mutable Compartido

El caso de uso más común — asegurar acceso seguro a variables compartidas:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class CounterService {
    private var counter = 0
    private val mutex = Mutex()
    
    suspend fun increment() {
        mutex.withLock {
            counter++
        }
    }
    
    suspend fun getCount(): Int {
        return mutex.withLock {
            counter
        }
    }
}
2. Coordinar Acceso a Recursos

Cuando múltiples coroutines necesitan acceso exclusivo a un recurso:

1
2
3
4
5
6
7
8
9
class FileWriter(private val file: File) {
    private val mutex = Mutex()
    
    suspend fun appendLine(line: String) {
        mutex.withLock {
            file.appendText("$line\n")
        }
    }
}
3. Asegurar Ejecución Secuencial

Cuando las operaciones deben ocurrir en orden, incluso si se activan concurrentemente:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class OrderProcessor {
    private val mutex = Mutex()
    private val orders = mutableListOf<Order>()
    
    suspend fun processOrder(order: Order) {
        mutex.withLock {
            // Asegurar que los pedidos se procesan secuencialmente
            orders.add(order)
            validateOrder(order)
            persistOrder(order)
        }
    }
}
4. Inicialización Perezosa con Seguridad de Hilos

Inicialización perezosa segura para hilos en contextos suspendidos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class DatabaseConnection {
    private var connection: Connection? = null
    private val mutex = Mutex()
    
    suspend fun getConnection(): Connection {
        if (connection != null) return connection!!
        
        return mutex.withLock {
            // Verificación doble dentro del bloqueo
            connection ?: createConnection().also { connection = it }
        }
    }
    
    private suspend fun createConnection(): Connection {
        delay(1000) // Simular configuración de conexión
        return Connection()
    }
}

Mejores Prácticas

1. Siempre Usa withLock

withLock maneja automáticamente la adquisición y liberación del bloqueo, incluso si ocurren excepciones:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// ✅ Bueno: Limpieza automática
mutex.withLock {
    dangerousOperation()
}

// ❌ Malo: Gestión manual, propenso a errores
mutex.lock()
try {
    dangerousOperation()
} finally {
    mutex.unlock()
}
2. Mantén las Secciones Críticas Pequeñas

Minimiza el tiempo manteniendo el bloqueo para reducir la contención:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// ✅ Bueno: Bloqueo solo para la sección crítica
suspend fun updateUser(userId: String, name: String) {
    val validated = validateName(name) // Fuera del bloqueo
    
    mutex.withLock {
        userCache[userId] = validated // Solo esto necesita protección
    }
    
    notifyObservers(userId) // Fuera del bloqueo
}

// ❌ Malo: Mantener bloqueo durante operaciones lentas
suspend fun updateUserSlow(userId: String, name: String) {
    mutex.withLock {
        val validated = validateName(name) // Operación lenta dentro del bloqueo
        userCache[userId] = validated
        notifyObservers(userId) // I/O dentro del bloqueo
    }
}
3. Evita Bloqueos Anidados

Mutex no es reentrante. Evita adquirir el mismo bloqueo dos veces:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ❌ Malo: ¡Deadlock!
suspend fun problematic() {
    mutex.withLock {
        helperFunction() // Intenta adquirir mutex nuevamente
    }
}

suspend fun helperFunction() {
    mutex.withLock {
        // Se suspenderá para siempre
    }
}

// ✅ Bueno: Reestructurar para evitar anidación
suspend fun better() {
    mutex.withLock {
        helperFunctionUnsafe() // Sin adquisición de bloqueo
    }
}

fun helperFunctionUnsafe() {
    // Asume que el llamador mantiene el bloqueo
}
4. Considera Alternativas Sin Bloqueo Primero

Para operaciones simples, los tipos atómicos son más rápidos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// ✅ Mejor para contadores simples
class AtomicCounter {
    private val counter = AtomicInteger(0)
    
    fun increment() = counter.incrementAndGet()
    fun get() = counter.get()
}

// ❌ Excesivo para un contador simple
class MutexCounter {
    private var counter = 0
    private val mutex = Mutex()
    
    suspend fun increment() {
        mutex.withLock { counter++ }
    }
}
5. Documenta las Invariantes del Bloqueo

Deja claro qué protege el bloqueo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class UserCache {
    private val mutex = Mutex() // Protege userMap y lastUpdate
    private val userMap = mutableMapOf<String, User>()
    private var lastUpdate = 0L
    
    suspend fun updateUser(id: String, user: User) {
        mutex.withLock {
            userMap[id] = user
            lastUpdate = System.currentTimeMillis()
        }
    }
}

Mutex vs. Otros Métodos de Sincronización

Mutex vs. synchronized
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// synchronized tradicional (bloquea hilo)
class SynchronizedCounter {
    private var count = 0
    
    @Synchronized
    fun increment() {
        count++ // Hilo bloqueado mientras espera
    }
}

// Mutex (suspende coroutine)
class MutexCounter {
    private var count = 0
    private val mutex = Mutex()
    
    suspend fun increment() {
        mutex.withLock {
            count++ // Coroutine suspendida, hilo libre
        }
    }
}

Cuándo usar cuál:

  • Usa synchronized para código no suspendido e interoperabilidad con Java legado
  • Usa Mutex para funciones suspendidas y código basado en coroutines
  • Mutex es más eficiente en contextos de coroutines porque los hilos no se bloquean
Mutex vs. Semaphore
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Mutex: Solo una coroutine a la vez
val mutex = Mutex()

// Semaphore: N coroutines a la vez
val semaphore = Semaphore(permits = 3)

// Ejemplo: Limitación de velocidad de llamadas API
class ApiClient {
    private val semaphore = Semaphore(5) // Máximo 5 peticiones concurrentes
    
    suspend fun makeRequest(endpoint: String): Response {
        semaphore.withPermit {
            return httpClient.get(endpoint)
        }
    }
}

Cuándo usar cuál:

  • Usa Mutex cuando necesites acceso exclusivo (un solo permiso)
  • Usa Semaphore cuando necesites limitar la concurrencia a N operaciones
Mutex vs. Actor
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Mutex: Sincronización manual
class MutexBasedCache {
    private val cache = mutableMapOf<String, Data>()
    private val mutex = Mutex()
    
    suspend fun get(key: String) = mutex.withLock { cache[key] }
    suspend fun put(key: String, value: Data) = mutex.withLock { cache[key] = value }
}

// Actor: Sincronización basada en mensajes
sealed class CacheMessage
data class Get(val key: String, val response: CompletableDeferred<Data?>) : CacheMessage()
data class Put(val key: String, val value: Data) : CacheMessage()

fun CoroutineScope.cacheActor() = actor<CacheMessage> {
    val cache = mutableMapOf<String, Data>()
    
    for (msg in channel) {
        when (msg) {
            is Get -> msg.response.complete(cache[msg.key])
            is Put -> cache[msg.key] = msg.value
        }
    }
}

Cuándo usar cuál:

  • Usa Mutex para sincronización simple con llamadas directas a métodos
  • Usa Actor para máquinas de estado complejas o cuando necesites encolamiento de mensajes
  • Los actores proporcionan mejor encapsulación y pueden manejar contrapresión
Mutex vs. StateFlow
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// Mutex: Gestión de estado imperativa
class MutexState {
    private var state = 0
    private val mutex = Mutex()
    
    suspend fun updateState(transform: (Int) -> Int) {
        mutex.withLock {
            state = transform(state)
        }
    }
}

// StateFlow: Gestión de estado reactiva
class FlowState {
    private val _state = MutableStateFlow(0)
    val state: StateFlow<Int> = _state.asStateFlow()
    
    fun updateState(transform: (Int) -> Int) {
        _state.update(transform) // Seguridad de hilos integrada
    }
}

Cuándo usar cuál:

  • Usa Mutex cuando necesites lógica de sincronización personalizada
  • Usa StateFlow para estado observable con seguridad de hilos integrada
  • StateFlow es mejor para estado de UI y arquitecturas reactivas
Mutex vs. Tipos Atómicos
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// AtomicInteger: Sin bloqueo para operaciones simples
class AtomicCounter {
    private val counter = AtomicInteger(0)
    
    fun increment() = counter.incrementAndGet()
    fun addAndGet(delta: Int) = counter.addAndGet(delta)
}

// Mutex: Para operaciones complejas
class ComplexCounter {
    private var counter = 0
    private var history = mutableListOf<Int>()
    private val mutex = Mutex()
    
    suspend fun increment() {
        mutex.withLock {
            counter++
            history.add(counter) // Múltiples operaciones
        }
    }
}

Cuándo usar cuál:

  • Usa tipos atómicos para operaciones de una sola variable (contadores, flags)
  • Usa Mutex cuando necesites coordinar múltiples variables
  • Los atómicos son más rápidos pero limitados a operaciones específicas

Errores Comunes

1. Olvidar Usar suspend

Las operaciones de Mutex requieren suspensión:

1
2
3
4
5
6
7
8
9
// ❌ No compilará
fun broken() {
    mutex.withLock { } // Error: función suspend llamada en contexto no suspend
}

// ✅ Correcto
suspend fun correct() {
    mutex.withLock { }
}
2. Mantener el Bloqueo Durante Operaciones Largas
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// ❌ Malo: Mantener bloqueo durante I/O
suspend fun bad(url: String) {
    mutex.withLock {
        val data = httpClient.get(url) // Llamada de red dentro del bloqueo
        cache[url] = data
    }
}

// ✅ Bueno: Obtener fuera del bloqueo
suspend fun good(url: String) {
    val data = httpClient.get(url)
    mutex.withLock {
        cache[url] = data
    }
}
3. Asumir Reentrancia
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// ❌ Deadlock: Mutex no es reentrante
suspend fun outer() {
    mutex.withLock {
        inner() // ¡Deadlock!
    }
}

suspend fun inner() {
    mutex.withLock {
        // Nunca se alcanza
    }
}
4. No Manejar la Cancelación

Siempre considera la cancelación al mantener bloqueos:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ✅ Bueno: withLock maneja la cancelación
suspend fun proper() {
    mutex.withLock {
        doWork()
    } // Bloqueo liberado incluso en cancelación
}

// ❌ Riesgoso: Gestión manual de bloqueos
suspend fun risky() {
    mutex.lock()
    try {
        doWork() // Si se cancela aquí, el bloqueo permanece adquirido
    } finally {
        mutex.unlock()
    }
}

Consideraciones de Rendimiento

  • Mutex vs. synchronized: En código con muchas coroutines, Mutex es más eficiente porque los hilos no se bloquean
  • Contención: Alta contención degrada el rendimiento; considera fragmentación (múltiples bloqueos para diferentes claves)
  • Granularidad del bloqueo: Bloqueos de grano más fino (más bloqueos, cada uno protegiendo menos datos) reducen la contención
  • Alternativas sin bloqueo: Para operaciones simples, los tipos atómicos y StateFlow son más rápidos

Ejemplo: Fragmentación para reducir contención:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class ShardedCache(private val shardCount: Int = 16) {
    private val mutexes = Array(shardCount) { Mutex() }
    private val caches = Array(shardCount) { mutableMapOf<String, Data>() }
    
    private fun shardIndex(key: String) = key.hashCode() and (shardCount - 1)
    
    suspend fun put(key: String, value: Data) {
        val index = shardIndex(key)
        mutexes[index].withLock {
            caches[index][key] = value
        }
    }
    
    suspend fun get(key: String): Data? {
        val index = shardIndex(key)
        return mutexes[index].withLock {
            caches[index][key]
        }
    }
}

Ejemplo del Mundo Real: Repositorio Seguro para Hilos

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class UserRepository(
    private val api: UserApi,
    private val database: UserDatabase
) {
    private val cache = mutableMapOf<String, User>()
    private val mutex = Mutex()
    
    suspend fun getUser(userId: String): User? {
        // Verificar caché primero (bloqueo de lectura)
        mutex.withLock {
            cache[userId]?.let { return it }
        }
        
        // Intentar base de datos (fuera del bloqueo)
        database.getUser(userId)?.let { user ->
            mutex.withLock {
                cache[userId] = user
            }
            return user
        }
        
        // Obtener de API (fuera del bloqueo)
        return try {
            val user = api.fetchUser(userId)
            mutex.withLock {
                cache[userId] = user
                database.insertUser(user)
            }
            user
        } catch (e: Exception) {
            null
        }
    }
    
    suspend fun updateUser(user: User) {
        mutex.withLock {
            cache[user.id] = user
            database.updateUser(user)
        }
    }
    
    suspend fun clearCache() {
        mutex.withLock {
            cache.clear()
        }
    }
}

Testeando Código Protegido con Mutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Test
fun `concurrent increments should be thread-safe`() = runTest {
    val counter = CounterService()
    
    // Lanzar 1000 incrementos concurrentes
    val jobs = List(1000) {
        launch {
            counter.increment()
        }
    }
    
    jobs.joinAll()
    
    // Debería ser exactamente 1000
    assertEquals(1000, counter.getCount())
}

@Test
fun `mutex prevents race conditions`() = runTest {
    val cache = mutableMapOf<String, Int>()
    val mutex = Mutex()
    
    // Simular condición de carrera
    coroutineScope {
        repeat(100) {
            launch {
                mutex.withLock {
                    val current = cache["key"] ?: 0
                    delay(1) // Simular trabajo
                    cache["key"] = current + 1
                }
            }
        }
    }
    
    assertEquals(100, cache["key"])
}

Reflexiones Finales

Mutex es una herramienta poderosa para proteger estado mutable compartido en aplicaciones basadas en coroutines. Proporciona sincronización segura para hilos sin bloquear hilos, haciéndola ideal para código concurrente con coroutines.

Puntos clave:

  • Usa withLock para gestión automática de bloqueos
  • Mantén las secciones críticas pequeñas y rápidas
  • Considera alternativas más simples (atómicos, StateFlow) cuando sea apropiado
  • Entiende cuándo usar Mutex vs. otras primitivas de sincronización
  • Siempre maneja la cancelación adecuadamente

Recuerda: la mejor sincronización es ninguna sincronización. Cuando sea posible, diseña tu sistema para evitar estado mutable compartido por completo usando estructuras de datos inmutables, paso de mensajes (Actores/Canales), o flujos reactivos (Flow/StateFlow). Pero cuando sí necesites exclusión mutua en código con coroutines, Mutex es tu mejor amigo.

compartir en

Ignacio Carrión
Escrito por
Ignacio Carrión
Android Developer