Corutina - Basic

Una corutina és un bloc de codi que es pot executar de manera independent en qualsevol moment.

Introducció

Una corutina és un bloc de codi que es pot executar de manera independent en qualsevol moment.

Una corutina s’executa dins d’un Kotlin - Thread, es pot executar per qualsevol thread, i inclús executar-se en threads diferents en diferents instants d’execució.

Entorn de treball

Crea un projecte “JVM console application” amb Amper

Encara que la paraula clau suspend forma part del llenguatge necessites instal·lar biblioteques per poder utilitzar les corutines.

Modifica el fitxer module.yaml afegint una dependència amb kotlinx-coroutines-core:

module.yaml
dependencies:
- org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2

Corutina

A continuació tens un codi que crea dos corutines amb launch i runBlocking:

Cada corutina s’executa al moment de crear-se, però pot suspendre la seva execució si executa una funció suspend que decideix esperar algun esdeveniment i no continuar la seva execució.

Si una corutina suspen la seva execució, es posa en una cua d’execució del thread a l’espera de continuar sent executada, i el thread continua executant altres corutines.

Si executes el codi, pots veure que World! apareix abans que Hello, perquè delay és una funció suspend que atura la seva execució fins que no ha passat el temps que li passes com argument.

Terminal window
World!
Hello,
Nota

Amb les funcions suspend pots escriure codi de manera lineal, però el codi no s’executa de manera lineal sinó concurrent.

Les coroutines són lleugeres

Les corutines necessiten molts menys recursos que els threads.

Per exemple, el següent codi llança 50.000 coroutines diferents consumint molt poca memòria, on cada corutina espera 5 segons i després imprimeix un període (’.’):

import kotlinx.coroutines.*
fun main() = runBlocking {
repeat(50_000) { // launch a lot of coroutines
launch {
delay(5_000)
print(".")
}
}
}

Si escrius el mateix programa utilitzant threads, consumirà molta memòria:

import kotlin.concurrent.thread
fun main() {
repeat(50_000) { // launch a lot of coroutines
thread {
Thread.sleep(5_000)
print(".")
}
}
}

Depenent del teu sistema operatiu, versió JDK, i la seva configuració, llançarà un error per haver exhaurit la memòria o arrencarà threads a poc a poc de manera que mai hi ha massa threads funcionament concurrentment.

CoroutineScope

La diferència entre launch i runBlocking, és que runBlocking, a més de crear una corutina, crea una CoroutineScope on s’executarà aquesta corutina.

Totes les altres corutines que es creen a partir d’aquesta corutina pertanyen a aquest “scope”, i es poden gestionar com un conjunt.

Si elimines runBlocking del codi, pots veure que el compilador dona error perquè launch no es pot cridar sense un àmbit de corutina corresponent.

src/main.kt
import kotlinx.coroutines.*
fun main() {
launch {
delay(1_000)
println("Hello")
}
println("World")
}
'launch' can not be called without the corresponding coroutine scope. Consider wrapping 'launch' in 'coroutineScope { }', using 'runBlocking { }', or using some other 'CoroutineScope'.

runBlocking no permet que el thread executi el codi que ve a continuació fins que no hagi finalitzi l’execució de totes les corutines creades dins de l‘“scope” creat per runBlocking (“bloqueja” el thread).

Per tant, si executes aquest codi:

fun main() {
runBlocking {
launch {
delay(1_000)
println("Hello")
}
println("World")
}
println("Bye")
}

Pots veure que println("Bye") només s’executa un cop han finalitzat totes les corutines creades dins de l‘“scope” creat per runBlocking.

World
Hello
Bye

suspend

Les funcions suspend es poden utilitzar dins les corutines igual que les funcions normals.

La seva característica addicional és que poden, al seu torn, utilitzar altres funcions suspend (com ara `delay) per suspendre l’execució d’una corutina.

Extreu el bloc de codi que hi ha dins launch { ... } en una funció separada.

import kotlinx.coroutines.*
fun main() = runBlocking {
launch {
doHello()
}
println("World")
}
suspend fun doHello() {
delay(1_000)
println("Hello")
}
Nota

Si vols, amb la IDE pots seleccionar el bloc de codi, botó dret ratolí i Refactor > Function…

coroutineScope

runBlocking crea un àmbit d’execució, però dins d’un àmbit d’execució pots crear un nou àmbit d’execució amb la funció coroutineScope.

coroutineScope no bloqueja el thread perquè pugui continuar processant el codi que ve a continuació, sinó que es comporta com qualsevol funció “suspend”.

Pots fer servir coroutineScope en qualsevol funció de suspensió.

Per exemple, pots moure el print de Hello i World a la funció suspend fun doWorld():

src/main.kt
import kotlinx.coroutines.*
fun main() = runBlocking {
doWorld()
}
private suspend fun doHello() = coroutineScope {
launch {
delay(1000L)
println("Hello")
}
println("World")
}

El resulta d’aquest codi és el mateix:

Terminal window
World
Hello

La funció coroutineScope es pot utilitzar dins de qualsevol funció de suspensió per efectuar múltiples operacions de manera simultània.

src/main.kt
fun main() = runBlocking {
doGet()
println("Done")
}
suspend fun doGet() = coroutineScope {
launch {
delay(2000L)
println("Get Elephant Picture")
}
launch {
delay(1000L)
println("Get Quixote Book")
}
println("Waiting response")
}

Si executes el codi:

Terminal window
Waiting response
Get Quixote Book
Get Elephant Picture
Done

Pots veure que:

  • El codi dins dels blocs launch { ... } s’executa de manera simultània, i com que el segon launch ha d’esperar menys temps s’imprimeix primer Get Quixote Book.

  • La coroutineScope de doGet només finalitza després que totes les corutines dins del seu àmbit estan finalitzades.

  • Un cop finalitzada, la funció doGet torna i permet la impressió de Done.

Job

La funció launch retorna un objecte Job que identifica la corutina que s’ha creat.

join

Amb la funció join pots esperar que una corutina estigui finalitzada:

import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
delay(1000L)
println("Hello")
}
println("World")
job.join()
println("Done")
}

El resultat d’executar aquest codi és que el thread espera que s’hagi completat la corutina secundària per imprimir “Done”:

Terminal window
World
Hello
Done
Activitat

Elimina el join i mira que passa.

Cancel·lació

En moltes aplicacions és necessari poder controlar les corutines que s’estan executant.

Per exemple, si un usuari tanca una pàgina que ha llançat una corutina per obtenir uns resultats que s’han de mostrar a la pàgina, ja no té cap sentit que la corutina continui executant-se.

El Job que retorna la funció launch es pot utilitzar per cancel·lar la corutina que s’està executant:

import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1_300L)
println("main: I'm tired of waiting!")
job.cancel()
job.join()
println("main: Now I can quit.")
}

Si executes el codi:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

Pots veure que tan aviat com s’executa job.cancel() ja no veiem cap més sortida de la corutina creada per launch ja que ha estat cancel·lada.

Nota

La cancel.lació no és immediata perquè les corutines han de cooperar.

Fins que una corutina no executa una funció suspend i es posa a la cua, el thread seguirà executant el codi de la corutina.

Per aquest motiu després de cancel has de fer un join.

Com que fer un cancel i després un join és habitual, tens la funció cancelAndJoin que combina cancel i join.

yield

Com hem dit abans, per poder cancel·lar una corutina aquesta ha de cooperar.

Si escrius una funció que es pot suspendre, i no comproves si s’ha de cancel·lar mentre s’està executant, llavors no es pot cancel·lar.

A continuació tens un exemple que ho demostra:

import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping $i ... 😴")
i += 1
nextPrintTime += 1000L
}
}
}
delay(1_300L)
println("I'm tired of waiting! 🥱")
job.cancelAndJoin()
println("Now I can quit. ")
}

Si executes el codi, pots verificar que segueix imprimint “I’m sleeping” fins que no s’han completat les cinc iteracions encara que s’ha cancel·lat el Job,

Quan una funció està realitzant un càlcul que necessita molt temps, ha de verificar de manera periodica si ha de cancel·lar la seva execució.

La funció pot anar invocant la funció de suspensió yield que verifica si s’ha de cancel·lar l’execució.

import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
if (System.currentTimeMillis() >= nextPrintTime) {
yield()
println("I'm sleeping $i ... 😴")
i += 1
nextPrintTime += 1000L
}
}
}
delay(1_300L)
println("I'm tired of waiting! 🥱")
job.cancel()
println("Job is ${job.isActive}")
job.join()
println("Now I can quit. ")
}

Torna a executar el codi i verifica que la corutina es cancel·la.

CancellationException

Una corutina cancel·la la seva execució llançant una CancellationException.

Aquestes excepcions es poden gestionar com qualsevol altre excepció.

Per exemple, l’expressió try {...} finally {...} i la funció use executen les seves accions de finalització amb normalitat quan es cancel·la una corutina.

import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}

Pots verificar que el bloc finally s’executa en el moment que es cancel·la l’execució de la corutina.

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

Si captures una CancellationException i no l’eleves cap amunt, la corutina mai es cancel·la:

import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
repeat(5) { i ->
try {
println("job: I'm sleeping $i ...")
delay(500)
} catch (e: Exception) {
println(e.message)
}
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}

Mai has d’escriure un codi com aquest que atrapa excepcions genèriques i no les rellança.

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
StandaloneCoroutine was cancelled
job: I'm sleeping 3 ...
StandaloneCoroutine was cancelled
job: I'm sleeping 4 ...
StandaloneCoroutine was cancelled
main: Now I can quit.

Però aquest problema pot aparèixer de maneres més subtils, com quan s’utilitza la funció runCatching, que no rellança CancellationException.

Executar un bloc no cancel·lable

Qualsevol intent d’utilitzar una funció de suspensió en el bloc finally provoca una CancelationException, perquè la corutina que executa aquest codi es cancel·la.

Normalment, això no és un problema, ja que totes les operacions de tancament que es porten bé (tancar un arxiu, cancel·lar un treball, o tancar qualsevol mena de canal de comunicació) no acostumen a bloquejar el thread d’execució i no impliquen cap funció de suspensió.

En el cas que calgues utilitzar una funció suspend en un bloc finally has de niuar el codi amb la funció withContext i el context NonCancellable tal com es mostra a continuació:

import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L)
println("main: I'm tired of waiting!")
job.cancelAndJoin()
println("main: Now I can quit.")
}
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.

Timeout

La raó pràctica més òbvia per cancel·lar l’execució d’una corutina és perquè el seu temps d’execució ha superat algun temps prefixat.

Per aquestes situacions tens la funció withTimeout :

import kotlinx.coroutines.*
fun main() = runBlocking {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}

El resultat és:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

Pots veure que withTimeout llança una TimeoutCancellationException que és una subclasse de CancellationException.

Per tant, és habitual capturar aquesta excepció i gestionar-la:

import kotlinx.coroutines.*
fun main() = runBlocking {
try {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
} catch (e: TimeoutCancellationException) {
println("I'm timed out!")
}
}
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
I'm timed out!

En lloc de capturar l’excepció, pots utilitzar la funció withTimeoutOrNull que és similar a withTimeout, però torna null en lloc de llançar una excepció:

import kotlinx.coroutines.*
fun main() = runBlocking {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done"
}
println("Result is $result")
}

Ja no hi ha una excepció a l’hora d’executar aquest codi:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

Timeout asíncron i recursos

PENDENT DE REVISAR

L’esdeveniment de timeout en withTimeout és asíncron pel que fa al codi que s’executa en el seu bloc i pot ocórrer en qualsevol moment, fins i tot just abans de la tornada des de l’interior del bloc de temps.

Tingues-ho en compte si obres o adquireixes algun recurs dins del bloc que necessiti tancar o alliberar-te fora del bloc.

Per exemple, aquí imitem un recurs més apropable amb la classe Resource que simplement fa un seguiment de quantes vegades va ser creada mitjançant l’augment del contador acquired i decrement el comptador en la seva funció close. Ara fem una gran quantitat de coroutines, cadascuna de les quals crea un Resource al final del bloc withTimeout i alliberar el recurs fora del bloc. Afegim un petit retard perquè sigui més probable que el timeout es produeixi just quan el withTimeout El bloc ja està acabat, la qual cosa provocarà una fuita de recursos.

import kotlinx.coroutines.*
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) {
launch {
val resource = withTimeout(60) {
delay(50)
Resource()
}
resource.close()
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}

Si executes el codi anterior, veuràs que no sempre s’imprimeix zero, encara que pot dependre dels temps de la teva màquina. És possible que hagis d’ajustar el timeout en aquest exemple per veure realment els valors no nuls.

Fixeu-vos que incrementant i decrementant acquired compta aquí des de 10K coroutines és completament de filferro segur, ja que sempre passa des del mateix fil, el que s’utilitza per runBlocking. (TODO: enllaç) Més sobre això s’explicarà en el capítol sobre el context coroutítí.

Per treballar al voltant d’aquest problema es pot emmagatzemar una referència al recurs en una variable en lloc de tornar-lo des del bloc de withTimeout.

import kotlinx.coroutines.*
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
var resource: Resource? = null // Not acquired yet
try {
withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
resource = Resource() // Store a resource to the variable if acquired
}
// We can do something else with the resource here
} finally {
resource?.close() // Release the resource if it was acquired
}
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}

Aquest exemple sempre s’imprimeix zero. Els recursos no es fugen.

TODO