Un flow és un flux de dades que pot ser processat de forma asíncrona.
Introducció
Una corutina centrada en simples funcions de suspensió funciona molt bé quan:
-
Necessites fer treball asíncron, però no necessites rebre cap mena de “resultat” d’aquest treball
-
Necessites fer treball asíncron, i esperes un únic objecte que serveix com a resultat
No obstant això, hi ha moltes ocasions en programació on necessites un flux de resultats, no només un únic resultat. Una funció de suspensió ordinària no ofereix això. En canvi, el sistema de corutines de Kotlin ofereix canals i flows per a fluxos de resultats. Dels dos, els flows són l’API principal de streaming per a corutines, encara que els canals tenen alguns usos especialitzats.
Bastant treball asíncron es pot modelar com a operacions sense resultat o d’un sol resultat:
- Transaccions de base de dades
- Crides a serveis web
- Descàrrega o lectura d’un fitxer d’imatge
- I així successivament
Bàsicament, qualsevol cosa que sigui de naturalesa transaccional — on cada resultat és desencadenat per una petició diferent — es pot modelar com una operació sense resultat o d’un sol resultat. Aquests funcionen molt bé amb funcions de suspensió ordinàries.
No obstant això, també és comú tenir una única rutina que necessita retornar una sèrie de resultats al llarg del temps:
- Connexions de xarxa duradores, com WebSockets o XMPP, on el servidor pot enviar contingut sense una nova petició del client
- Lectures de GPS
- Lectures de sensors d’acceleròmetres, termòmetres, etc.
- Dades rebudes de dispositius externs via USB, Bluetooth, etc.
- I així successivament
Alguns entorns o frameworks de programació poden tenir els seus propis fluxos. Per exemple, podem obtenir diversos resultats al llarg del temps de Room (a mesura que canviem el contingut d’una base de dades).
Conceptes bàsics
Crea un projecte “JVM console application” amb Amper
Modifica el fitxer module.yaml
:
dependencies:
- org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2
test-dependencies:
- io.kotest:kotest-assertions-core:5.9.0
- org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2
Els flows simples estan representats en forma d’un objecte Flow
.
Una manera típica de crear un Flow
és utilitzant la funció de nivell superior flow()
.
flow()
és bastant simple: proporciones una expressió lambda, i aquesta expressió crida a emit()
per cada element que vols publicar en el flux.
Una manera típica de consumir un Flow
és cridar collect()
sobre ell. Això pren una altra expressió lambda, i se li passa cada element que el Flow
emet al seu flux.
collect()
és una funció de suspensió, i per tant necessitem cridar-la des de dins d’una altra funció de suspensió o des d’un constructor de corutines com launch()
.
Modifica el fitxer main.kt
per tal d’imprimir alguns nombres aleatoris:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random
fun main() = runBlocking {
launch {
randomPercentages(10, 200).collect { println(it) }
println("That's all folks!")
}
println("...and we're off!")
}
fun randomPercentages(count: Int, delayMs: Long) = flow {
for (i in 0 until count) {
delay(delayMs)
emit(Random.nextInt(1, 100))
}
}
Aquí, randomPercentages()
crea un Flow
utilitzant flow()
. Fa un bucle un nombre específic de vegades, espera un nombre específic de mil·lisegons en cada passada, i per cada passada emet un nombre aleatori.
Cal destacar:
-
emit()
és una funció de suspensió.flow()
configura una corutina perquè la puguis utilitzar, així que no t’has de preocupar de fer-ho tu mateix. Però això significa queemit()
pot provocar un canvi a una altra corutina, i queemit()
podria bloquejar-se durant una estona. -
Quan surts de l’expressió lambda, el flux es considera tancat.
Després, dins d’una corutina llançada, cridem collect()
sobre aquest Flow
, imprimint cada número. Això ens donarà quelcom com:
...and we're off!
30
84
62
79
19
83
83
6
81
59
That's all folks!
Així doncs, el nostre Flow
emet objectes, i la nostra funció collect()
— que configura un FlowCollector
— els rep.
Obtenint un Flow
Les tres maneres principals d’obtenir un Flow
són:
- Cridar una funció de nivell superior del sistema de corutines de Kotlin,
- Convertir un Channel a Flow, o
- Obtenir un Flow d’una biblioteca de tercers
flowOf()
flowOf()
accepta un nombre variable de paràmetres i els emet un per un. Només has de proporcionar els objectes:
@Test
fun flow_of() = runTest {
flowOf(1, 2, 3).toList() shouldBe listOf(1, 2, 3)
}
Aquí, generem tres nombres, creem un Flow
d’aquests mitjançant flowOf()
.
Pot semblar que flowOf()
no té sentit. Al cap i a la fi, si ja tenim els objectes, per què ens hem de molestar amb un Flow? No obstant això, hi ha alguns casos on això és exactament el que volem, com ara:
-
Proves, on necessitem proporcionar un
Flow
i ja tenim les nostres dades de prova per proporcionar -
Condicions d’error, on ja tenim el nostre objecte d’error, però l’API que estem implementant requereix un
Flow
(potser d’una classe segellada que representi estats de càrrega/contingut/error)
emptyFlow()
emptyFlow()
és bàsicament flowOf()
sense paràmetres. Retorna un flux que ja està tancat, així que una crida a collect()
retornarà immediatament.
Això és una mica més explícit que flowOf()
, per casos on necessites un Flow per propòsits d’API, però mai emetràs cap objecte. Però, com flowOf()
, principalment és per escenaris com proves.
Flows proporcionats per biblioteques
En molts casos, no crearàs un Flow tu mateix. En lloc d’això, l’obtindràs d’alguna biblioteca que ofereix una API basada en Flow.
Per exemple, Room suporta Flow
com a tipus de retorn per a una funció anotada amb @Query
, etc.
Consumint un Flow
Hem vist que una manera de consumir el Flow és cridar collect()
. Tot i que això serà bastant comú, no és la teva única opció.
Funcions com collect()
que consumeixen un flow s’anomenen “operadors terminals” — veurem operadors no terminals més endavant. Així que, mirem alguns altres operadors terminals.
single()
i singleOrNull()
De vegades, pots acabar amb un Flow
on només esperes un objecte a ser emès… i potser ni això. Aquest seria el cas en què potser una biblioteca sempre retorna un Flow
, però saps que la situació mai resultarà en un flux de múltiples objectes.
single()
retorna el primer objecte emès pel Flow.
class FlowTest {
@Test
fun single() = runTest {
randomPercentages(1).single() should beBetween(0, 100)
}
suspend fun randomPercentages(count: Int, delayMs: Long = 200) = flow {
for (i in 0 until count) {
delay(200)
val value = Random.nextInt(1, 100)
emit(value)
}
}
}
Si el Flow
es tanca abans d’emetre un objecte, o si el Flow
no es tanca després d’emetre l’únic objecte esperat, single()
llança una excepció.
@Test
fun single_exception() = runTest {
// Flow has more than one element
assertFailsWith<IllegalArgumentException> { randomPercentages(10).single() }
}
Una versió una mica més segura de single()
és singleOrNull()
.
- Retorna null per a un
Flow
buit (un que es tanca sense emetre res) - Retorna l’objecte emès per a un
Flow
d’un sol objecte - Llança una excepció si el
Flow
no es tanca després d’emetre el seu primer objecte
No obstant això, singleOrNull()
retornarà un tipus nullable. Si tens un Flow<String>
, singleOrNull()
retorna String?
, no String
.
@Test
fun single_or_null() = runTest {
randomPercentages(0).singleOrNull() should beNull()
randomPercentages(1).singleOrNull()!! should beBetween(0, 100)
}
first()
first()
és similar a single()
i singleOrNull()
, ja que retorna un objecte del Flow
. No obstant això, després deixa d’observar el Flow
, així que és segur utilitzar-lo amb un Flow
que podria retornar més d’un valor.
@Test
fun first() = runTest {
randomPercentages(10).first() should beBetween(0, 100)
}
Aquí, demanem 10 nombres aleatoris… però només en prenem un mitjançant first()
, després abandonem el Flow
.
Quan deixem d’observar el Flow
, el Flow
es considera tancat, així que cancel·la el job que s’executa en la nostra expressió lambda flow()
, i sortim de manera neta.
emit()
només s’acaba cridant una vegada.
toList()
and toSet()
Cridar toList()
o toSet()
recull tots els objectes emesos pel Flow
i els retorna en una List
o Set
.
Aquí, recollim els nostres deu nombres aleatoris en una List
:
@Test
fun to_list() = runTest {
val list = randomPercentages(10).toList()
assertEquals(10, list.size)
list.forEach { it should beBetween(0, 100) }
}
Aquestes funcions gestionen tots els fluxos acotats: zero elements, un element o diversos elements. No obstant això, només funcionen per a fluxos que es tancaran per si mateixos basant-se en algun criteri intern. Aquestes funcions no funcionaran bé per a fluxos que puguin emetre objectes indefinidament.
Com amb single()
i els altres operadors terminals, toList()
i toSet()
són funcions de suspensió.
launchIn()
launchIn
és un operador terminal que llança la col·lecció del flux en el CoroutineScope
donat. S’utilitza per iniciar el flux i mantenir la recol·lecció de les seves emissions dins de l’àmbit especificat fins que l’àmbit es cancel·li o el flux es completi.
Aquests són els propòsits i l’ús de launchIn
:
-
Control del Cicle de Vida del Flux: launchIn permet controlar el cicle de vida de la col·lecció del flux associant-lo amb un CoroutineScope. Això és particularment útil en escenaris d’UI on vols que el flux estigui actiu mentre un component particular (per exemple, un ViewModel o Activity) estigui actiu.
-
Evitar el Llançament Manual: Proporciona una manera convenient de recollir fluxos sense haver de llançar manualment una corutina. Això simplifica el codi i el fa més llegible.
-
Cancel·lació Automàtica: La col·lecció del flux es cancel·larà automàticament quan l’àmbit es cancel·li, ajudant a evitar fuites de memòria i altres problemes relacionats amb corutines no gestionades.
Aquí tens l’exemple de com s’utilitza launchIn
:
@Test
fun launch_in() = runTest {
var sum = 0
val scope = CoroutineScope(Dispatchers.Default)
randomPercentages(10, 500)
.onEach { value -> sum += value }
.launchIn(this)
delay(200)
scope.cancel()
sum shouldBe 0
}
En aquest exemple:
- L’operador
onEach
s’utilitza per imprimir els valors rebuts. launchIn(scope)
comença a recollir el flux en l’àmbit especificat.- L’àmbit es cancel·la després d’un retard, aturant la recol·lecció del flux.