Introducció

In the simplest terms, a coroutine is a way to write asynchronous code in a sequential manner. It’s a lightweight thread that doesn’t require the overhead of context switching.

In the world of Kotlin, a coroutine is a piece of code that can be suspended and resumed without blocking the executing thread.

Coroutine Basics: Launch, Async and RunBlocking

To work with coroutines, Kotlin provides three basic building blocks: launch, async, and runBlocking.

Launch

launch is used to fire and forget coroutine. It's perfect for cases where you don't need to compute any result.

Here's a simple example:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { 
        delay(1000L) 
        println("Hello from Coroutine!")
    }
    println("Hello from Main Thread!")
    Thread.sleep(2000L)
}

In this code, we’re launching a new coroutine using GlobalScope.launch.

Inside this coroutine, we're delaying for one second (1000 milliseconds), and then printing a message.

Async

async is used when you need a result computed in a coroutine.

It starts a new coroutine and returns a Deferred<T>, which is a non-blocking future that represents a promise to provide a result later.

Here's an example:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch {
        val result = async { 
            computeResult() 
        }
        println("Computed result: ${result.await()}")
    }
    Thread.sleep(2000L)
}

suspend fun computeResult(): Int {
    delay(1000L)
    return 42
}

In this code, we’re launching a new coroutine and starting a computation inside it using async.

This computation is a suspend function computeResult, which delays for one second and then returns the number 42.

After the computation, we print the result using await.

RunBlocking

runBlocking is a bridge between non-coroutine world and coroutine world.

It's a way to start top-level main coroutine.

Here's how to use it:

import kotlinx.coroutines.*

fun main() = runBlocking { 
    launch { 
        delay(1000L) 
        println("Hello from Coroutine!")
    }
    println("Hello from Main Thread!")
}

In this code, we’re starting a main coroutine using runBlocking, and inside this coroutine, we're launching a new coroutine.

Coroutine Context and Dispatchers

Every coroutine in Kotlin has a context associated with it, which is a set of various elements.

The key elements in this set are Job of the coroutine and its dispatcher.

TODO moure dispatcher i excepcions més endavant.

Dispatchers

In simple words, coroutine dispatchers determine what thread or threads the corresponding coroutine uses for its execution.

Kotlin provides three main dispatchers:

  1. Dispatchers.Main — for UI-related tasks.
  2. Dispatchers.IO — for input/output tasks, like reading or writing from/to a database, making network calls, or reading/writing files.
  3. Dispatchers.Default — for CPU-intensive tasks, like sorting large lists or doing complex computations.

Here’s an example of using dispatchers:

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch(Dispatchers.IO) { 
        println("IO: ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { 
        println("Default: ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Main) { 
        println("Main: ${Thread.currentThread().name}")
    }
}

Exception Handling in Coroutines

Exceptions in coroutines are propagated in a way similar to regular exceptions, with some differences related to the asynchronous nature of coroutines.

Here’s a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = GlobalScope.launch {
        println("Throwing exception from coroutine")
        throw IllegalArgumentException()
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async {
        println("Throwing exception from async")
        throw ArithmeticException() 
        42 
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}

This code will output:

Throwing exception from coroutine
Joined failed job
Throwing exception from async
Caught ArithmeticException

Coroutine Exception Handling Best Practices

When dealing with exceptions in Kotlin Coroutines, there are some best practices you should follow:

  • Prefer using structured concurrency: When a coroutine is launched in a CoroutineScope, it’s automatically cancelled when the scope is cancelled. This makes cleanup easier and helps avoid leaking resources.

  • Use CoroutineExceptionHandler sparingly: Exceptions in coroutines are propagated to their parent coroutine or the thread that started them, so you usually don't need to use a CoroutineExceptionHandler. It's mostly useful for top-level coroutines that you launch and don't have a parent coroutine.

  • Use catch operator with Flows: When working with Flows, exceptions can occur in the operators applied to the Flow. You can catch these exceptions using the catch operator.

  • Don’t catch CancellationException: If a coroutine is cancelled, it throws a CancellationException. This is a normal operation and should not be treated as an error, so you usually shouldn't catch it.

Structured Concurrency

A diferència dels threads que operen de manera completament independent uns dels altres, una coroutine funciona dins d'un scope.

Per exemple, si cancelem un scope totes les coroutines vinculades a aquest scope que estan en execució és cancelen.

La funció runBlocking crea una coroutine i un scope, i totes les coroutines que creas a partir d'aquesta coroutine - per exemple, amb la funció launch - queden vinculades a aquest scope.

Però si vols crear una coroutine dins un nou scope, pots utilitzar la funció coroutineScope enlloc de launch, i totes les noves coroutines creades a partir d'aquesta nova coroutine estaran vinculades al nou scope.

Però tingues en compte que al crear una nova coroutine amb coroutineScope es bloqueja la coroutine que s'està executant fins que totes les coroutines que s'executen dins del nou scope hagin finalitzat.

A continuació tens un exemple:

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch { 
        delay(1000L) 
        println("Task from runBlocking")
    }
    
    coroutineScope { 
        launch {
            delay(2000L) 
            println("Task from nested launch")
        }
    
        delay(500L)
        println("Task from coroutine scope") 
    }
    
    println("Coroutine scope is over")
}

Quina és l'ordre en que s'imprimiran les linies per pantalla?

  1. runBlocking crea un nou scope de coroutines.

  2. El primer launch s'executa dins de l'scope creat per runBlocking.

TODO

Task from corou
...
Coroutine scope is over

coroutineScope bloqueja la coroutine que s'està executant i ha cridat aquesta funció, però no pas la primera coroutine que s'ha creat amb launch perquè s'executa de manera independent.

Suspending Functions

Suspending functions are the functions that can be paused and resumed at a later time.

To define a suspending function, you use the suspend modifier.

Here's a simple example:

import kotlinx.coroutines.*

suspend fun doSomething() {
    delay(1000L)
    println("Doing something")
}

fun main() = runBlocking {
    launch {
        doSomething()
    }
}

In this code, doSomething is a suspending function.

Inside doSomething, we're delaying for one second and then printing a message.

We're calling doSomething from a coroutine, because suspending functions can only be called from another suspending function or a coroutine.

Non-blocking Delays

Si treballes amb threads pots suspendre l'execució d'un thread amb Thread.sleep().

Però quan treballes amb coroutines tens complemtament prohibit suspendre l'execució d'un Thread perquè les coroutines comparteixen el mateix Thread !!.

Per suspendre l'execució d'una coroutine tens a funció delay.

Here's a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch {
        delay(1000L)
        println("Hello from Coroutine!")
    }
    println("Hello from Main Thread!")
}

In this code, we’re delaying the execution of the coroutine for one second using delay.

Despite the delay, the main thread continues its execution, demonstrating the non-blocking nature of delay.

Job Hierarchy

Un scope ens permet controlar de manera conjunta un grup de coroutines.

Però quan es crea una coroutine amb launcho async ens torna un objecte `Job,, which can be used to control the lifecycle of the coroutine.

If a parent job is cancelled, all its children are cancelled too.

This forms a hierarchy of jobs.

Here's a simple example:

import kotlinx.coroutines.*

fun main() = runBlocking {
    val parentJob = launch {
        val childJob = launch {
            while (true) {
                println("Child is running")
                delay(500L)
            }
        }
        delay(2000L)
        println("Cancelling child job")
        childJob.cancel()
    }
    parentJob.join()
}

In this code,

  • We’re launching a parent job, and inside the parent job, we’re launching a child job.

  • Since the child job is running an infinite loop, it will continue to run until cancelled.

  • After some delay, we cancel the child job.

Stream

Channels

Channels provide a way to transfer a stream of values between coroutines.

Here’s a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() 
    }
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

In this code,

  • We’re creating a channel of Integers.

  • Inside a coroutine, we’re sending squares of numbers from 1 to 5 to the channel, and then closing it.

  • Outside the coroutine, we’re receiving the values from the channel and printing them.

Flow

Flow is a type in Kotlin that can emit multiple values sequentially, as opposed to suspend functions which return only a single value.

It’s a cold stream, meaning the code inside a flow builder does not run until the flow is collected.

Here’s a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow { 
    for (i in 1..5) {
        delay(1000L) 
        emit(i) 
    }
}

fun main() = runBlocking {
   val numbers = numbers() 
   launch {
        for (k in 1..5) {
            println("I'm not blocked $k")
            delay(1000L)
        }
    }
    delay(2000L)
    numbers.collect { value -> println(value) } 
}

In this code,

  • Numbers is a Flow that emits numbers from 1 to 5, with a delay of one second between each number.

  • Cridem la funció numbers() que torna un Flow<Int> que guardem en la variable numbers.

  • Inside main, we're launching a new coroutine that prints messages without being blocked by the Flow, demonstrating the non-blocking nature of Flows.

  • After that, we're collecting the values emitted by the Flow and printing them.

Advanced Flow Operators

Kotlin’s Flow API provides a variety of operators that you can use to transform and manipulate streams of data.

These operators include common ones like map and filter, as well as more advanced ones like debounce, buffer, combineLatest, and more.

Here's a simple example of using debounce:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000L) 
    return "response $request"
}

fun main() = runBlocking {
    val flow = (1..5).asFlow().onEach { delay(300L) }
    flow.debounce(500L)
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

In this code, we’re creating a Flow of numbers from 1 to 5, with a delay of 300 milliseconds between each number. Then, we’re applying debounce operator to the Flow to only emit a value if 500 milliseconds have passed without it emitting another value. After that, we're mapping the numbers to their responses using performRequest, and collecting the result.

Combining Multiple Coroutines

In real-world applications, you often need to run multiple coroutines simultaneously and combine their results.

Kotlin Coroutines provide several ways to achieve this, like zip, combine, select, and more.

Here's a simple example of using zip:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000L)
    return "response $request"
}

fun main() = runBlocking {
    val nums = (1..5).asFlow()
    val strs = nums.map { performRequest(it) }
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

In this code, performRequest is a suspending function that delays for one second and then returns a string. Inside main, we're creating a Flow of numbers from 1 to 5, and then mapping these numbers to their responses using performRequest. After that, we're using zip to combine the original numbers and their responses, and then collecting the result.

Callbacks and Coroutines

Sometimes, you may need to interact with APIs or libraries that use callbacks. Converting these callbacks to Coroutines can make your code cleaner and easier to understand.

Kotlin Coroutines provides suspendCancellableCoroutine for this purpose.

Here's a simple example:

suspend fun fetchUser(id: String): User = suspendCancellableCoroutine { continuation ->
    api.getUser(id, object : Callback<User> {
        override fun onResponse(call: Call<User>, response: Response<User>) {
            continuation.resume(response.body())
        }
        
        override fun onFailure(call: Call<User>, t: Throwable) {
            continuation.resumeWithException(t)
        }
    })
}

In this code, fetchUser is a suspending function that fetches a user with a specific ID using a callback-based API. Inside fetchUser, we're using suspendCancellableCoroutine to convert the callback to a Coroutine.

Coroutine Timeouts

Occasionally, it’s necessary to set a timeout for a particular operation or task performed by a coroutine. Kotlin Coroutines provides the withTimeout function for this purpose. When the timeout is reached, withTimeout throws a TimeoutCancellationException and the corresponding coroutine is cancelled.

Here's a simple example:

import kotlinx.coroutines.*

suspend fun doSomething() {
    delay(3000L) // simulate a long-running task
}

fun main() = runBlocking {
    try {
        withTimeout(2000L) {
            doSomething()
        }
    } catch (e: TimeoutCancellationException) {
        println("The task exceeded the timeout limit.")
    }
}

In this code, doSomething is a suspending function that simulates a long-running task with a delay. Inside main, we're using withTimeout to set a timeout of 2000 milliseconds for doSomething. If doSomething doesn't complete within the timeout, withTimeout throws a TimeoutCancellationException which we catch and handle.

Coroutine Scopes and Supervision

In structured concurrency, when a parent coroutine is cancelled, all its child coroutines are also cancelled. However, sometimes you want to control the cancellation of child coroutines independently.

For this purpose, Kotlin provides the concept of coroutine supervision. You can create a SupervisorJob or use supervisorScope.

Here's a simple example:

import kotlinx.coroutines.*

suspend fun doSomething() {
    delay(1000L)
    throw Exception("Something went wrong.")
}

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        val child1 = launch {
            doSomething()
        }
        val child2 = launch {
            delay(2000L)
            println("Coroutine 2 completed.")
        }
    }
    delay(3000L)
}

In this code, doSomething is a suspending function that simulates a task and throws an exception. Inside main, we're creating a SupervisorJob and two child coroutines: child1 and child2. child1 calls doSomething and catches any exceptions it throws. child2 simply delays for 2000 milliseconds and then prints a message. Because we're using a supervisor job, child2 isn't cancelled when child1 fails.

Shared Mutable State and Concurrency

In concurrent programming, shared mutable state is a common source of bugs and problems. Kotlin Coroutines provide several tools to handle shared mutable state safely.

One of these tools is Mutex, which is a mutual exclusion lock that can be used to protect shared state.

Here's a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

var counter = 0
val mutex = Mutex()

suspend fun increment() {
    withContext(Dispatchers.Default) {
        repeat(1000) {
            mutex.withLock {
                counter++
            }
        }
    }
}

fun main() = runBlocking {
    val job1 = launch { increment() }
    val job2 = launch { increment() }
    job1.join()
    job2.join()
    println(counter) 
}

In this code, counter is a shared variable that's incremented by two concurrent coroutines. We're using mutex to ensure that only one coroutine can increment the counter at a time, avoiding race conditions.

Another tool provided by Kotlin Coroutines for handling shared mutable state is sharedFlow.

A SharedFlow is a hot Flow that emits updates to its collectors. It's useful when you have state that's updated by one coroutine and read by one or more other coroutines.

Here's a simple example:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val sharedFlow = MutableSharedFlow<Int>()

suspend fun producer() {
    var counter = 0
    while (true) {
        delay(1000L)
        counter++
        sharedFlow.emit(counter)
    }
}

suspend fun consumer(id: Int) {
    sharedFlow.collect { value ->
        println("Consumer $id received $value")
    }
}

fun main() = runBlocking {
    val job1 = launch { producer() }
    val job2 = launch { consumer(1) }
    val job3 = launch { consumer(2) }
    delay(5000L)
    job1.cancel()
    job2.cancel()
    job3.cancel()
}

In this code, producer is a suspending function that emits increasing numbers to sharedFlow every second. consumer is another suspending function that collects these numbers from sharedFlow. Inside main, we're launching one producer coroutine and two consumer coroutines. After 5 seconds, we're cancelling all the coroutines.