Introducció
Kotlin utitliza Thread per implementar les coroutines mitjançant el mòdul kotlinx.coroutines
i la seva capacitat com a llenguatge per crear DSL.
El primer que has de fer és crear un nou projecte kotlin i configurar el fitxer build.gradle.kts
.
Has d'afegir la dependència corresponent:
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1")
}
I assegurar-te de fer servir la darrera versió de Kotlin:
plugins {
kotlin("jvm") version "2.0.0"
}
I tenir mavenCentral()
a la llista de repositoris:
repositories {
mavenCentral()
}
Coroutine
Una corrutina és una instància que té un bloc de codi que es pot executar de manera independent en qualsevol moment, de la mateixa manera que es fa amb un Thread
.
La diferència és que una corrutina s'executa dins d'un thread, i que es pot executar per qualsevol thread, inclús executar-se en threads diferents.
Un Thread
és un recurs car de crear i gestionar,´i mitjançant les corrutines tenim un grup de threads que es dediquen a processar blocs de codi independents.
Modifica el fitxer Main.kt
:
import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1_000) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}
Si executes el codi pots veure que Hello
apareix abans que World
encara que la sentència println("World!")
s'ha declarat abans.
Hello
World!
Això vol dir que el bloc de codi no s'executa linealment, sinò que una part del codi s'executa a part (en una altre corrutina).
Anem a analitzar què fa aquest codi.
-
runBlocking
crea un context d'execució de corrutines i una corrutina que executarà el codi declarat dins el blocrunBlocking { ... }
. -
launch
crea i llança una nova corrutina (es posa en una cua d'execució) que s'executarà de manera simultànea amb la resta de les corrutines, i la corrutina que ha creat la nova coroutina segueix funcionant de manera independent. -
delay
és una funció que suspen l'execució de la corrutina durant un temps determinat.Suspendre l'execució de la corrutina no vol dir bloquejar el thread que està executant la corrutina, sinò que la corrutina es posa en una cua d'espera mentres el thread continua executant altres corrutines.
Si elimines runBlocking
del codi, pots veure que el compilador indica un error en l'ús de la funció launch
perquè la funció és declara a CoroutineScope
, i no s'ha creat.
Modifica el fitxer Main.kt
i elimina runBlocking
:
import kotlinx.coroutines.*
fun main() {
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}
La IDE t'indicarà dos errors de compilació:
Unresolved reference: launch
Suspend function 'suspend fun delay(timeMillis: Long): Unit' should be called only from a coroutine or another suspend function.
El nom de runBlocking
significa que el thread que l'executa (en aquest cas, el thread principal) es bloqueja a l'executar la funció, fins que totes les corrutines dins runBlocking { ... }
finalitzin la seva execució.
import kotlinx.coroutines.*
fun main() {
println("${Thread.currentThread().name}: before runBlocking")
runBlocking {
println("${Thread.currentThread().name}: before launch")
launch {
delay(300)
println("${Thread.currentThread().name}: launch")
}
println("${Thread.currentThread().name}: after launch")
}
println("${Thread.currentThread().name}: after runBlocking")
}
Pots veure que tot el codi l'executa el thread main
i que el thread espera que s'hagin completat totes les corrutines del bloc runBlocking
abans d'executar l'últim println
:
main: before runBlocking
main: before launch
main: after launch
main: launch
main: after runBlocking
Normalment runBlocking
es declara tal com hem vist al principi i no es fa servir en altre part del codi perquè precisament fem servir corrutines per evitar tenir threads bloquejats.
Concurrència estructurada
Les corrutines segueixen un principi de concurrència estructurada que significa que les noves corrutines només es poden llançar en un CoroutineScope
específic que delimita la vida útil de les corrutines.
Tal com hem explicat abans runBlocking
crea una CorutineScope
que gestiona totes les corrutines que es generan en aquest àmbit.
D'aquesta manera, es pot saber en quin moment es pot proseguir amb l'execució del codi que ha executat la funció runBlocking
perquè la CorutineScope
que s'ha creat indica que totes les corrutines que s'han llançant dins del seu àmbit s'han completat.
Refactorització de la funció d'extracció
Extreurem el bloc de codi que hi ha dins launch { ... }
en una funció separada.
Selecciona el bloc de codi, botó dret ratolí i "Refactor" > "Function..."
Al realitzar la refactorització obtens una funció nova amb el modificador suspend
.
Les funcions de suspensió es poden utilitzar dins les corrutines igual que les funcions normals, però la seva característica addicional és que poden, al seu torn, utilitzar altres funcions de suspensió (com ara delay en aquest exemple) per suspendre l'execució d'una corrutina.
import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
doWorld()
}
println("Hello") // main coroutine continues while a previous one is delayed
}
private suspend fun doWorld() {
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
Àmbit del constructor
Els constructor com runBlocking
creen un àmbit d'execució, però dins d'un àmbit d'execució pots crear un nou àmbit d'execució amb la funció coroutineScope
.
(TODO: revisar) Els constructors runBlocking i coroutineScope poden semblar semblants perquè tots dos esperen que el seu cos i tots els seus fills s'acabin. La diferència principal és que el runBlocking mètode bloqueja el fil actual per esperar, mentre que coroutineScope només se suspèn, alliberant el fil subjacent per a altres usos. A causa d'aquesta diferència, runBlocking
és una funció normal i coroutineScope
és una funció de suspensió.
Pots fer servir coroutineScope
en qualsevol funció de suspensió.
Per exemple, podeu moure la impressió simultània de Hello
i World
a la funció suspend fun doWorld()
:
import kotlinx.coroutines.*
fun main() = runBlocking {
doWorld()
}
private suspend fun doWorld() = coroutineScope{ // this: CoroutineScope
launch {
delay(1000L)
println("World!")
}
println("Hello")
}
El resulta d'aquest codi és el mateix:
Hello
World!
La funció constructora coroutineScope
es pot utilitzar dins de qualsevol funció de suspensió per realitzar múltiples operacions simultàniament.
A continuació llançarem dues corrutines concurrents dins de la funció de suspensió doGet
:
// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
doGet()
println("Done")
}
// Concurrently executes both sections
suspend fun doGet() = coroutineScope { // this: CoroutineScope
launch {
delay(2000L)
println("Get resource 1")
}
launch {
delay(1000L)
println("Get resource 2")
}
println("Waiting response")
}
L'execució del codi serà:
Waiting response
Get resource 2
Get resource 1
Done
Pots veure que:
-
El codi dins dels blocs
launch { ... }
s'executa de manera simultània, i com que el segonlaunch
ha d'esperar menys temps s'imprimirà primer "Get resouce 2". -
La
coroutineScope
dedoGet
només finalitza després de que totes les corrutines dins del seu àmbit estiguin finalitzades. -
Un cop finalitzada, la funcioó
doGet
torna i permet la impressió de l'string "Done".
Job
El constructor de corrutines launch
retorna un objecte Job
que és un identificador de la corrutina llançada i es pot utilitzar per esperar explícitament que es completi.
Per exemple, pots esperar que s'hagi completat la corrutina secundària i després imprimir l'string "Done":
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello")
job.join() // wait until child coroutine completes
println("Done")
}
El resultat d'executar aquest codi és:
Hello
World!
Done
Les coroutines són lleugeres
Les corrutines necessiten menys recursos que els threads JVM.
Per exemple, el següent codi llança 50.000 coroutines diferents consumint molt poca memòria, on cada un espera 5 segons i després imprimeix un període ('.'):
import kotlinx.coroutines.*
fun main() = runBlocking {
repeat(50_000) { // launch a lot of coroutines
launch {
delay(5_000)
print(".")
}
}
}
Si escrius el mateix programa utilitzant threads, consumirà molta memòria.
import kotlin.concurrent.thread
fun main() {
repeat(50_000) { // launch a lot of coroutines
thread {
Thread.sleep(5_000)
print(".")
}
}
}
Depenent del seu sistema operatiu, versió JDK, i la seva configuració, llançarà un error per haver exhaurit la memòria o arrencarà threads a poc a poc de manera que mai hi ha massa threads funcionament concurrentment.
Cancel·lació i time-outs
En moltes aplicacions és necessari poder controlar les corrutines que s'estan executant.
Per exemple,si un usuari tanca una pàgina que ha llançat una corrutina per obtenir uns resultats que s'han de mostrar a la pàgina, ja no té cap sentit que la corrutina continui executant-se.
La funció launch
retorna un Job
que es pot utilitzar per cancel·lar la coroutina que s'està executant:
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1_300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
Aquí tens el resultat de l'execució:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
Pots veure que tant aviat com s'executa job.cancel()
ja no veiem cap més sortida de la corrutina creada per launch
ja que ha estat cancelada.
La cancel.lació no és immediata, i per aquest motiu després de cancel
has de fer un join
.
Com que fer un cancel
i després un join
és habitual, tens la funció cancelAndJoin
que combina cancel
i join
.
La cancel·lació és cooperativa
Per poder cancelar una corrutina aquesta ha de cooperar.
Totes les funcions que es poden suspendre del mòdul kotlinx.coroutines
es poden cancel.lar de manera cooperativa perquè comproven si s'han de cancelar mentres s'estan executant i llencen una CancellationException
quan es cancel·len.
Però si escrius una funció que es pot supendre i no comproves que si s'ha de cancel.lar mentres s'està executant, llavors no es pot cancel.lar.
A continuació tens un exemple que ho demostra:
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1_300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
Si executes el codi pots verificar que segueix imprimint "I'm sleeping" fins i tot després que s'ha cancelat el Job
, finst que s'han completat les cinc iteracions.
El mateix passa si captures una una CancellationException
i no rellançes:
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
repeat(5) { i ->
try {
// print a message twice a second
println("job: I'm sleeping $i ...")
delay(500)
} catch (e: Exception) {
// log the exception
println(e)
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
Pots veure que mai has d'escriture un codi com aquest que atrapa excepcions genèriques i no les rellança.
Però aquest problema pot aparèixer de maneres més subtils, com quan s'utilitza la funció runCatching
, que no rellança CancellationException
.
Fer que el codi de càlcul sigui cancel·lable
Quan una funció està realitzant un càlcul que necessita molt de temps ha de verificar de manera periodica si ha de cancel.lar la seva execució.
La funció pot anar invocant una funció de suspensió que verifica si s'ha de cancel.lar l'execució; per fer això tens la funció yield
.
Una altra manera és comprovar de manera explícita si la corrutina està activa tal com es mostra a continuació.
Substitueix while (i < 5)
en l’exemple anterior amb while (isActive)
:
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
Torna a executar el codi i verifica que la coroutina es cancel.la.
isActive
és una propietat d'extensió disponible a l'interior de la coroutina a través de l'objecte CoroutineScope
.
Tancar els recursos amb finally
Les funcions cancel·lables de suspensió llancen CancelationException
al ser cancelades cancel·lació, que es pot manejar de la manera habitual. Per exemple, l'expressió try {...} finally {...}
i la de funció use
de Kotlin executa les seves accions de finalització amb normalitat quan es cancel·la una corutina.
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
Tant join
com cancel
esperen que totes les accions de finalització es completin, de manera que l'exemple anterior produeix la següent sortida:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.
Executr un bloc no cancel·lable
Qualsevol intent d'utilitzar una funció de suspensió en el bloc finally
de l'exemple anterior provoca una CancelationException
, perquè la coroutina que executa aquest codi es cancel·la. Normalment, això no és un problema, ja que totes les operacions de tancament de bon comportament (tancant un arxiu, cancel·lant un treball, o tancant qualsevol tipus de canal de comunicació) solen ser no bloquejos i no impliquen cap funció de suspensió. No obstant això, en el cas rar en què cal suspendre en un corutí cancel·lat es pot embolicar el codi corresponent en withContext(NonCancellable) {...}
utilitzant la funció withContext
i el context NonCancellable
com a exemple següent mostra:
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
Timeout
La raó pràctica més òbvia per cancel·lar l'execució d'una coroutina és perquè el seu temps d'execució ha superat algun temps. Si bé es pot fer un seguiment manual de la referència al Job
corresponent i llançar una coroutina separada per cancel·lar la rastrejada després del retard, hi ha una funció preparada per utilitzar withTimeout
que ho fa. Mireu el següent exemple:
import kotlinx.coroutines.*
fun main() = runBlocking {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
Produeix la següent producció:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
El TimeoutCancellationException
que és llançat per withTimeout
és una subclasse de CancellationException
. No hem vist el seu rastre de pila imprès a la consola abans. Això és perquè dins d'una coroutine cancel·lada CancellationException
es considera una raó normal per a la finalització de la coroutina. En aquest exemple hem utilitzat withTimeout
a l'interior de la funció main
.
Com que la cancel·lació és només una excepció, tots els recursos estan tancats de la manera habitual. Podeu embolicar el codi amb Timeout en un bloc try {...} catch (e: TimeoutCancellationException) {...}
si necessiteu fer alguna acció addicional específicament sobre qualsevol tipus de temps d'oportatge o utilitzeu la funció withTimeoutOrNull
que és similar a withTimeout
però torna null
si Timeout en lloc de llançar una excepció:
import kotlinx.coroutines.*
fun main() = runBlocking {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
Ja no hi ha una excepció a l'hora d'executar aquest codi:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
Timeout asíncron i recursos
L'esdeveniment de timeout en ``withTimeout` és asíncron pel que fa al codi que s'executa en el seu bloc i pot ocórrer en qualsevol moment, fins i tot just abans de la tornada des de l'interior del bloc de temps. Tingues-ho en compte si obris o adquireixes algun recurs dins del bloc que necessiti tancar o alliberar-te fora del bloc.
Per exemple, aquí imitem un recurs més apropable amb la classe Resource
que simplement fa un seguiment de quantes vegades va ser creada mitjançant l'augment del contador acquired
i decrement el comptador en la seva funció close
. Ara fem una gran quantitat de coroutines, cadascuna de les quals crea un Resource
al final del bloc withTimeout
i alliberar el recurs fora del bloc. Afegim un petit retard perquè sigui més probable que el timeout es produeixi just quan el withTimeout
El bloc ja està acabat, la qual cosa provocarà una fuita de recursos.
import kotlinx.coroutines.*
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
val resource = withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
Resource() // Acquire a resource and return it from withTimeout block
}
resource.close() // Release the resource
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}
Si executes el codi anterior, veuràs que no sempre s'imprimeix zero, encara que pot dependre dels temps de la teva màquina. És possible que hagis d'ajustar el timeout en aquest exemple per veure realment els valors no nuls.
Fixeu-vos que incrementant i decrementant acquired
compta aquí des de 10K coroutines és completament de filferro segur, ja que sempre passa des del mateix fil, el que s'utilitza per runBlocking. (TODO: enllaç) Més sobre això s'explicarà en el capítol sobre el context coroutítí.
Per treballar al voltant d'aquest problema es pot emmagatzemar una referència al recurs en una variable en lloc de tornar-lo des del bloc de withTimeout
.
import kotlinx.coroutines.*
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
var resource: Resource? = null // Not acquired yet
try {
withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
resource = Resource() // Store a resource to the variable if acquired
}
// We can do something else with the resource here
} finally {
resource?.close() // Release the resource if it was acquired
}
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}
Aquest exemple sempre s'imprimeix zero. Els recursos no es fugen.