Introducció

Kotlin utitliza Thread per implementar les coroutines mitjançant el mòdul kotlinx.coroutines i la seva capacitat com a llenguatge per crear DSL.

El primer que has de fer és crear un nou projecte kotlin i configurar el fitxer build.gradle.kts.

Has d'afegir la dependència corresponent:

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
}

I assegurar-te de fer servir la darrera versió de Kotlin:

plugins {
    kotlin("jvm") version "2.0.0"    
}

I tenir mavenCentral() a la llista de repositoris:

repositories {
    mavenCentral()
}

Coroutine

Una corrutina és una instància que té un bloc de codi que es pot executar de manera independent en qualsevol moment, de la mateixa manera que es fa amb un Thread.

La diferència és que una corrutina s'executa dins d'un thread, i que es pot executar per qualsevol thread, inclús executar-se en threads diferents.

Un Thread és un recurs car de crear i gestionar,´i mitjançant les corrutines tenim un grup de threads que es dediquen a processar blocs de codi independents.

Modifica el fitxer Main.kt:

import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
    launch { // launch a new coroutine and continue
        delay(1_000) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello") // main coroutine continues while a previous one is delayed
}

Si executes el codi pots veure que Hello apareix abans que World encara que la sentència println("World!") s'ha declarat abans.

Hello
World!

Això vol dir que el bloc de codi no s'executa linealment, sinò que una part del codi s'executa a part (en una altre corrutina).

Anem a analitzar què fa aquest codi.

  • runBlocking crea un context d'execució de corrutines i una corrutina que executarà el codi declarat dins el bloc runBlocking { ... }.

  • launch crea i llança una nova corrutina (es posa en una cua d'execució) que s'executarà de manera simultànea amb la resta de les corrutines, i la corrutina que ha creat la nova coroutina segueix funcionant de manera independent.

  • delay és una funció que suspen l'execució de la corrutina durant un temps determinat.

    Suspendre l'execució de la corrutina no vol dir bloquejar el thread que està executant la corrutina, sinò que la corrutina es posa en una cua d'espera mentres el thread continua executant altres corrutines.

Si elimines runBlocking del codi, pots veure que el compilador indica un error en l'ús de la funció launch perquè la funció és declara a CoroutineScope, i no s'ha creat.

Modifica el fitxer Main.kt i elimina runBlocking:

import kotlinx.coroutines.*

fun main()  {
    launch { // launch a new coroutine and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello") // main coroutine continues while a previous one is delayed
}

La IDE t'indicarà dos errors de compilació:

Unresolved reference: launch
Suspend function 'suspend fun delay(timeMillis: Long): Unit' should be called only from a coroutine or another suspend function.

El nom de runBlocking significa que el thread que l'executa (en aquest cas, el thread principal) es bloqueja a l'executar la funció, fins que totes les corrutines dins runBlocking { ... } finalitzin la seva execució.

import kotlinx.coroutines.*

fun main() {
    println("${Thread.currentThread().name}: before runBlocking")
    runBlocking {
        println("${Thread.currentThread().name}: before launch")
        launch {
            delay(300)
            println("${Thread.currentThread().name}: launch")
        }
        println("${Thread.currentThread().name}: after launch")
    }
    println("${Thread.currentThread().name}: after runBlocking")
}

Pots veure que tot el codi l'executa el thread main i que el thread espera que s'hagin completat totes les corrutines del bloc runBlocking abans d'executar l'últim println:

main: before runBlocking
main: before launch
main: after launch
main: launch
main: after runBlocking

Normalment runBlocking es declara tal com hem vist al principi i no es fa servir en altre part del codi perquè precisament fem servir corrutines per evitar tenir threads bloquejats.

Concurrència estructurada

Les corrutines segueixen un principi de concurrència estructurada que significa que les noves corrutines només es poden llançar en un CoroutineScope específic que delimita la vida útil de les corrutines.

Tal com hem explicat abans runBlocking crea una CorutineScope que gestiona totes les corrutines que es generan en aquest àmbit.

D'aquesta manera, es pot saber en quin moment es pot proseguir amb l'execució del codi que ha executat la funció runBlocking perquè la CorutineScope que s'ha creat indica que totes les corrutines que s'han llançant dins del seu àmbit s'han completat.

Refactorització de la funció d'extracció

Extreurem el bloc de codi que hi ha dins launch { ... } en una funció separada.

Selecciona el bloc de codi, botó dret ratolí i "Refactor" > "Function..."

Al realitzar la refactorització obtens una funció nova amb el modificador suspend.

Les funcions de suspensió es poden utilitzar dins les corrutines igual que les funcions normals, però la seva característica addicional és que poden, al seu torn, utilitzar altres funcions de suspensió (com ara delay en aquest exemple) per suspendre l'execució d'una corrutina.

import kotlinx.coroutines.*

fun main()  = runBlocking { // this: CoroutineScope
    launch { // launch a new coroutine and continue
        doWorld()
    }
    println("Hello") // main coroutine continues while a previous one is delayed
}

private suspend fun doWorld() {
    delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
    println("World!") // print after delay
}

Àmbit del constructor

Els constructor com runBlocking creen un àmbit d'execució, però dins d'un àmbit d'execució pots crear un nou àmbit d'execució amb la funció coroutineScope.

(TODO: revisar) Els constructors runBlocking i coroutineScope poden semblar semblants perquè tots dos esperen que el seu cos i tots els seus fills s'acabin. La diferència principal és que el runBlocking mètode bloqueja el fil actual per esperar, mentre que coroutineScope només se suspèn, alliberant el fil subjacent per a altres usos. A causa d'aquesta diferència, runBlocking és una funció normal i coroutineScope és una funció de suspensió.

Pots fer servir coroutineScope en qualsevol funció de suspensió.

Per exemple, podeu moure la impressió simultània de Hello i World a la funció suspend fun doWorld():

import kotlinx.coroutines.*

fun main()  = runBlocking {
    doWorld()
}

private suspend fun doWorld() = coroutineScope{ // this: CoroutineScope
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}

El resulta d'aquest codi és el mateix:

Hello
World!

La funció constructora coroutineScope es pot utilitzar dins de qualsevol funció de suspensió per realitzar múltiples operacions simultàniament.

A continuació llançarem dues corrutines concurrents dins de la funció de suspensió doGet:

// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
    doGet()
    println("Done")
}

// Concurrently executes both sections
suspend fun doGet() = coroutineScope { // this: CoroutineScope
    launch {
        delay(2000L)
        println("Get resource 1")
    }
    launch {
        delay(1000L)
        println("Get resource 2")
    }
    println("Waiting response")
}

L'execució del codi serà:

Waiting response
Get resource 2
Get resource 1
Done

Pots veure que:

  • El codi dins dels blocs launch { ... } s'executa de manera simultània, i com que el segon launch ha d'esperar menys temps s'imprimirà primer "Get resouce 2".

  • La coroutineScope de doGet només finalitza després de que totes les corrutines dins del seu àmbit estiguin finalitzades.

  • Un cop finalitzada, la funcioó doGet torna i permet la impressió de l'string "Done".

Job

El constructor de corrutines launch retorna un objecte Job que és un identificador de la corrutina llançada i es pot utilitzar per esperar explícitament que es completi.

Per exemple, pots esperar que s'hagi completat la corrutina secundària i després imprimir l'string "Done":

import kotlinx.coroutines.*

fun main() = runBlocking {

    val job = launch { // launch a new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    println("Hello")
    job.join() // wait until child coroutine completes
    println("Done")
}

El resultat d'executar aquest codi és:

Hello
World!
Done

Les coroutines són lleugeres

Les corrutines necessiten menys recursos que els threads JVM.

Per exemple, el següent codi llança 50.000 coroutines diferents consumint molt poca memòria, on cada un espera 5 segons i després imprimeix un període ('.'):

import kotlinx.coroutines.*

fun main() = runBlocking {
    repeat(50_000) { // launch a lot of coroutines
        launch {
            delay(5_000)
            print(".")
        }
    }
}

Si escrius el mateix programa utilitzant threads, consumirà molta memòria.

import kotlin.concurrent.thread

fun main()  {
    repeat(50_000) { // launch a lot of coroutines
        thread {
            Thread.sleep(5_000)
            print(".")
        }
    }
}

Depenent del seu sistema operatiu, versió JDK, i la seva configuració, llançarà un error per haver exhaurit la memòria o arrencarà threads a poc a poc de manera que mai hi ha massa threads funcionament concurrentment.

Cancel·lació i time-outs

En moltes aplicacions és necessari poder controlar les corrutines que s'estan executant.

Per exemple,si un usuari tanca una pàgina que ha llançat una corrutina per obtenir uns resultats que s'han de mostrar a la pàgina, ja no té cap sentit que la corrutina continui executant-se.

La funció launch retorna un Job que es pot utilitzar per cancel·lar la coroutina que s'està executant:

import kotlinx.coroutines.*

fun main() = runBlocking {

    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1_300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion 
    println("main: Now I can quit.")
}

Aquí tens el resultat de l'execució:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

Pots veure que tant aviat com s'executa job.cancel() ja no veiem cap més sortida de la corrutina creada per launch ja que ha estat cancelada.

La cancel.lació no és immediata, i per aquest motiu després de cancel has de fer un join.

Com que fer un cancel i després un join és habitual, tens la funció cancelAndJoin que combina cancel i join.

La cancel·lació és cooperativa

Per poder cancelar una corrutina aquesta ha de cooperar.

Totes les funcions que es poden suspendre del mòdul kotlinx.coroutines es poden cancel.lar de manera cooperativa perquè comproven si s'han de cancelar mentres s'estan executant i llencen una CancellationException quan es cancel·len.

Però si escrius una funció que es pot supendre i no comproves que si s'ha de cancel.lar mentres s'està executant, llavors no es pot cancel.lar.

A continuació tens un exemple que ho demostra:

import kotlinx.coroutines.*

fun main() = runBlocking {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1_300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    
}

Si executes el codi pots verificar que segueix imprimint "I'm sleeping" fins i tot després que s'ha cancelat el Job, finst que s'han completat les cinc iteracions.

El mateix passa si captures una una CancellationException i no rellançes:

import kotlinx.coroutines.*

fun main() = runBlocking {

    val job = launch(Dispatchers.Default) {
        repeat(5) { i ->
            try {
                // print a message twice a second
                println("job: I'm sleeping $i ...")
                delay(500)
            } catch (e: Exception) {
                // log the exception
                println(e)
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.") 
}

Pots veure que mai has d'escriture un codi com aquest que atrapa excepcions genèriques i no les rellança.

Però aquest problema pot aparèixer de maneres més subtils, com quan s'utilitza la funció runCatching, que no rellança CancellationException.

Fer que el codi de càlcul sigui cancel·lable

Quan una funció està realitzant un càlcul que necessita molt de temps ha de verificar de manera periodica si ha de cancel.lar la seva execució.

La funció pot anar invocant una funció de suspensió que verifica si s'ha de cancel.lar l'execució; per fer això tens la funció yield.

Una altra manera és comprovar de manera explícita si la corrutina està activa tal com es mostra a continuació.

Substitueix while (i < 5) en l’exemple anterior amb while (isActive):

import kotlinx.coroutines.*

fun main() = runBlocking {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    
}

Torna a executar el codi i verifica que la coroutina es cancel.la.

isActive és una propietat d'extensió disponible a l'interior de la coroutina a través de l'objecte CoroutineScope.

Tancar els recursos amb finally

Les funcions cancel·lables de suspensió llancen CancelationException al ser cancelades cancel·lació, que es pot manejar de la manera habitual. Per exemple, l'expressió try {...} finally {...} i la de funció use de Kotlin executa les seves accions de finalització amb normalitat quan es cancel·la una corutina.

import kotlinx.coroutines.*

fun main() = runBlocking {

    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("job: I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.") 
}

Tant join com cancel esperen que totes les accions de finalització es completin, de manera que l'exemple anterior produeix la següent sortida:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

Executr un bloc no cancel·lable

Qualsevol intent d'utilitzar una funció de suspensió en el bloc finally de l'exemple anterior provoca una CancelationException, perquè la coroutina que executa aquest codi es cancel·la. Normalment, això no és un problema, ja que totes les operacions de tancament de bon comportament (tancant un arxiu, cancel·lant un treball, o tancant qualsevol tipus de canal de comunicació) solen ser no bloquejos i no impliquen cap funció de suspensió. No obstant això, en el cas rar en què cal suspendre en un corutí cancel·lat es pot embolicar el codi corresponent en withContext(NonCancellable) {...} utilitzant la funció withContext i el context NonCancellable com a exemple següent mostra:

import kotlinx.coroutines.*

fun main() = runBlocking {

    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
    
}

Timeout

La raó pràctica més òbvia per cancel·lar l'execució d'una coroutina és perquè el seu temps d'execució ha superat algun temps. Si bé es pot fer un seguiment manual de la referència al Job corresponent i llançar una coroutina separada per cancel·lar la rastrejada després del retard, hi ha una funció preparada per utilitzar withTimeout que ho fa. Mireu el següent exemple:

import kotlinx.coroutines.*

fun main() = runBlocking {

    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }

}

Produeix la següent producció:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

El TimeoutCancellationException que és llançat per withTimeout és una subclasse de CancellationException. No hem vist el seu rastre de pila imprès a la consola abans. Això és perquè dins d'una coroutine cancel·lada CancellationException es considera una raó normal per a la finalització de la coroutina. En aquest exemple hem utilitzat withTimeout a l'interior de la funció main.

Com que la cancel·lació és només una excepció, tots els recursos estan tancats de la manera habitual. Podeu embolicar el codi amb Timeout en un bloc try {...} catch (e: TimeoutCancellationException) {...} si necessiteu fer alguna acció addicional específicament sobre qualsevol tipus de temps d'oportatge o utilitzeu la funció withTimeoutOrNull que és similar a withTimeout però torna null si Timeout en lloc de llançar una excepció:

import kotlinx.coroutines.*

fun main() = runBlocking {

    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // will get cancelled before it produces this result
    }
    println("Result is $result")

}

Ja no hi ha una excepció a l'hora d'executar aquest codi:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

Timeout asíncron i recursos

L'esdeveniment de timeout en ``withTimeout` és asíncron pel que fa al codi que s'executa en el seu bloc i pot ocórrer en qualsevol moment, fins i tot just abans de la tornada des de l'interior del bloc de temps. Tingues-ho en compte si obris o adquireixes algun recurs dins del bloc que necessiti tancar o alliberar-te fora del bloc.

Per exemple, aquí imitem un recurs més apropable amb la classe Resource que simplement fa un seguiment de quantes vegades va ser creada mitjançant l'augment del contador acquired i decrement el comptador en la seva funció close. Ara fem una gran quantitat de coroutines, cadascuna de les quals crea un Resource al final del bloc withTimeout i alliberar el recurs fora del bloc. Afegim un petit retard perquè sigui més probable que el timeout es produeixi just quan el withTimeout El bloc ja està acabat, la qual cosa provocarà una fuita de recursos.

import kotlinx.coroutines.*


var acquired = 0

class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}

fun main() {
    runBlocking {
        repeat(10_000) { // Launch 10K coroutines
            launch { 
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // Acquire a resource and return it from withTimeout block     
                }
                resource.close() // Release the resource
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
}

Si executes el codi anterior, veuràs que no sempre s'imprimeix zero, encara que pot dependre dels temps de la teva màquina. És possible que hagis d'ajustar el timeout en aquest exemple per veure realment els valors no nuls.

Fixeu-vos que incrementant i decrementant acquired compta aquí des de 10K coroutines és completament de filferro segur, ja que sempre passa des del mateix fil, el que s'utilitza per runBlocking. (TODO: enllaç) Més sobre això s'explicarà en el capítol sobre el context coroutítí.

Per treballar al voltant d'aquest problema es pot emmagatzemar una referència al recurs en una variable en lloc de tornar-lo des del bloc de withTimeout.

import kotlinx.coroutines.*

var acquired = 0

class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}

fun main() {

    runBlocking {
        repeat(10_000) { // Launch 10K coroutines
            launch { 
                var resource: Resource? = null // Not acquired yet
                try {
                    withTimeout(60) { // Timeout of 60 ms
                        delay(50) // Delay for 50 ms
                        resource = Resource() // Store a resource to the variable if acquired      
                    }
                    // We can do something else with the resource here
                } finally {  
                    resource?.close() // Release the resource if it was acquired
                }
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired

}

Aquest exemple sempre s'imprimeix zero. Els recursos no es fugen.