Flow

  • Un flow és un flux de dades que pot ser processat de forma asíncrona.

    Introducció

    Una corutina centrada en simples funcions de suspensió funciona molt bé quan:

    • Necessites fer treball asíncron, però no necessites rebre cap mena de “resultat” d’aquest treball

    • Necessites fer treball asíncron, i esperes un únic objecte que serveix com a resultat

    No obstant això, hi ha moltes ocasions en programació on necessites un flux de resultats, no només un únic resultat. Una funció de suspensió ordinària no ofereix això. En canvi, el sistema de corutines de Kotlin ofereix canals i flows per a fluxos de resultats. Dels dos, els flows són l’API principal de streaming per a corutines, encara que els canals tenen alguns usos especialitzats.

    Bastant treball asíncron es pot modelar com a operacions sense resultat o d’un sol resultat:

    • Transaccions de base de dades
    • Crides a serveis web
    • Descàrrega o lectura d’un fitxer d’imatge
    • I així successivament

    Bàsicament, qualsevol cosa que sigui de naturalesa transaccional — on cada resultat és desencadenat per una petició diferent — es pot modelar com una operació sense resultat o d’un sol resultat. Aquests funcionen molt bé amb funcions de suspensió ordinàries.

    No obstant això, també és comú tenir una única rutina que necessita retornar una sèrie de resultats al llarg del temps:

    • Connexions de xarxa duradores, com WebSockets o XMPP, on el servidor pot enviar contingut sense una nova petició del client
    • Lectures de GPS
    • Lectures de sensors d’acceleròmetres, termòmetres, etc.
    • Dades rebudes de dispositius externs via USB, Bluetooth, etc.
    • I així successivament

    Alguns entorns o frameworks de programació poden tenir els seus propis fluxos. Per exemple, podem obtenir diversos resultats al llarg del temps de Room (a mesura que canviem el contingut d’una base de dades).

    Conceptes bàsics

    Crea un projecte “JVM console application” amb Amper

    Modifica el fitxer module.yaml:

    dependencies:
      - org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2
    test-dependencies:
      - io.kotest:kotest-assertions-core:5.9.0
      - org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2

    Els flows simples estan representats en forma d’un objecte Flow.

    Una manera típica de crear un Flow és utilitzant la funció de nivell superior flow().

    flow() és bastant simple: proporciones una expressió lambda, i aquesta expressió crida a emit() per cada element que vols publicar en el flux.

    Una manera típica de consumir un Flow és cridar collect() sobre ell. Això pren una altra expressió lambda, i se li passa cada element que el Flow emet al seu flux.

    collect() és una funció de suspensió, i per tant necessitem cridar-la des de dins d’una altra funció de suspensió o des d’un constructor de corutines com launch().

    Modifica el fitxer main.kt per tal d’imprimir alguns nombres aleatoris:

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    import kotlin.random.Random
    
    fun main() = runBlocking {
        launch {
            randomPercentages(10, 200).collect { println(it) }
            println("That's all folks!")
        }
    
        println("...and we're off!")
    }
    
    fun randomPercentages(count: Int, delayMs: Long) = flow {
        for (i in 0 until count) {
            delay(delayMs)
            emit(Random.nextInt(1, 100))
        }
    }

    Aquí, randomPercentages() crea un Flow utilitzant flow(). Fa un bucle un nombre específic de vegades, espera un nombre específic de mil·lisegons en cada passada, i per cada passada emet un nombre aleatori.

    Cal destacar:

    • emit() és una funció de suspensió. flow() configura una corutina perquè la puguis utilitzar, així que no t’has de preocupar de fer-ho tu mateix. Però això significa que emit() pot provocar un canvi a una altra corutina, i que emit() podria bloquejar-se durant una estona.

    • Quan surts de l’expressió lambda, el flux es considera tancat.

    Després, dins d’una corutina llançada, cridem collect() sobre aquest Flow, imprimint cada número. Això ens donarà quelcom com:

    ...and we're off!
    30
    84
    62
    79
    19
    83
    83
    6
    81
    59
    That's all folks!

    Així doncs, el nostre Flow emet objectes, i la nostra funció collect() — que configura un FlowCollector — els rep.

    Obtenint un Flow

    Les tres maneres principals d’obtenir un Flow són:

    • Cridar una funció de nivell superior del sistema de corutines de Kotlin,
    • Convertir un Channel a Flow, o
    • Obtenir un Flow d’una biblioteca de tercers

    flowOf()

    flowOf() accepta un nombre variable de paràmetres i els emet un per un. Només has de proporcionar els objectes:

        @Test
        fun flow_of() = runTest {
            flowOf(1, 2, 3).toList() shouldBe listOf(1, 2, 3)
        }

    Aquí, generem tres nombres, creem un Flow d’aquests mitjançant flowOf().

    Pot semblar que flowOf() no té sentit. Al cap i a la fi, si ja tenim els objectes, per què ens hem de molestar amb un Flow? No obstant això, hi ha alguns casos on això és exactament el que volem, com ara:

    • Proves, on necessitem proporcionar un Flow i ja tenim les nostres dades de prova per proporcionar

    • Condicions d’error, on ja tenim el nostre objecte d’error, però l’API que estem implementant requereix un Flow (potser d’una classe segellada que representi estats de càrrega/contingut/error)

    emptyFlow()

    emptyFlow() és bàsicament flowOf() sense paràmetres. Retorna un flux que ja està tancat, així que una crida a collect() retornarà immediatament.

    Això és una mica més explícit que flowOf(), per casos on necessites un Flow per propòsits d’API, però mai emetràs cap objecte. Però, com flowOf(), principalment és per escenaris com proves.

    Flows proporcionats per biblioteques

    En molts casos, no crearàs un Flow tu mateix. En lloc d’això, l’obtindràs d’alguna biblioteca que ofereix una API basada en Flow.

    Per exemple, Room suporta Flow com a tipus de retorn per a una funció anotada amb @Query, etc.

    Consumint un Flow

    Hem vist que una manera de consumir el Flow és cridar collect(). Tot i que això serà bastant comú, no és la teva única opció.

    Funcions com collect() que consumeixen un flow s’anomenen “operadors terminals” — veurem operadors no terminals més endavant. Així que, mirem alguns altres operadors terminals.

    single() i singleOrNull()

    De vegades, pots acabar amb un Flow on només esperes un objecte a ser emès… i potser ni això. Aquest seria el cas en què potser una biblioteca sempre retorna un Flow, però saps que la situació mai resultarà en un flux de múltiples objectes.

    single() retorna el primer objecte emès pel Flow.

    class FlowTest {
    
        @Test
        fun single() = runTest {
            randomPercentages(1).single() should beBetween(0, 100)
        }
    
        suspend fun randomPercentages(count: Int, delayMs: Long = 200) = flow {
            for (i in 0 until count) {
                delay(200)
                val value = Random.nextInt(1, 100)
                emit(value)
            }
        }
    }

    Si el Flow es tanca abans d’emetre un objecte, o si el Flow no es tanca després d’emetre l’únic objecte esperat, single() llança una excepció.

        @Test
        fun single_exception() = runTest {
            // Flow has more than one element
            assertFailsWith<IllegalArgumentException> { randomPercentages(10).single() }
        }

    Una versió una mica més segura de single() és singleOrNull().

    • Retorna null per a un Flow buit (un que es tanca sense emetre res)
    • Retorna l’objecte emès per a un Flow d’un sol objecte
    • Llança una excepció si el Flow no es tanca després d’emetre el seu primer objecte

    No obstant això, singleOrNull() retornarà un tipus nullable. Si tens un Flow<String>, singleOrNull() retorna String?, no String.

        @Test
        fun single_or_null() = runTest {
    
            randomPercentages(0).singleOrNull() should beNull()
            randomPercentages(1).singleOrNull()!! should beBetween(0, 100)
        }

    first()

    first() és similar a single() i singleOrNull(), ja que retorna un objecte del Flow. No obstant això, després deixa d’observar el Flow, així que és segur utilitzar-lo amb un Flow que podria retornar més d’un valor.

        @Test
        fun first() = runTest {
            randomPercentages(10).first() should beBetween(0, 100)
        }

    Aquí, demanem 10 nombres aleatoris… però només en prenem un mitjançant first(), després abandonem el Flow.

    Quan deixem d’observar el Flow, el Flow es considera tancat, així que cancel·la el job que s’executa en la nostra expressió lambda flow(), i sortim de manera neta.

    emit() només s’acaba cridant una vegada.

    toList() and toSet()

    Cridar toList() o toSet() recull tots els objectes emesos pel Flow i els retorna en una List o Set.

    Aquí, recollim els nostres deu nombres aleatoris en una List:

        @Test
        fun to_list() = runTest {
            val list = randomPercentages(10).toList()
            assertEquals(10, list.size)
            list.forEach { it should beBetween(0, 100) }
        }

    Aquestes funcions gestionen tots els fluxos acotats: zero elements, un element o diversos elements. No obstant això, només funcionen per a fluxos que es tancaran per si mateixos basant-se en algun criteri intern. Aquestes funcions no funcionaran bé per a fluxos que puguin emetre objectes indefinidament.

    Com amb single() i els altres operadors terminals, toList() i toSet() són funcions de suspensió.

    launchIn()

    launchIn és un operador terminal que llança la col·lecció del flux en el CoroutineScope donat. S’utilitza per iniciar el flux i mantenir la recol·lecció de les seves emissions dins de l’àmbit especificat fins que l’àmbit es cancel·li o el flux es completi.

    Aquests són els propòsits i l’ús de launchIn:

    • Control del Cicle de Vida del Flux: launchIn permet controlar el cicle de vida de la col·lecció del flux associant-lo amb un CoroutineScope. Això és particularment útil en escenaris d’UI on vols que el flux estigui actiu mentre un component particular (per exemple, un ViewModel o Activity) estigui actiu.

    • Evitar el Llançament Manual: Proporciona una manera convenient de recollir fluxos sense haver de llançar manualment una corutina. Això simplifica el codi i el fa més llegible.

    • Cancel·lació Automàtica: La col·lecció del flux es cancel·larà automàticament quan l’àmbit es cancel·li, ajudant a evitar fuites de memòria i altres problemes relacionats amb corutines no gestionades.

    Aquí tens l’exemple de com s’utilitza launchIn:

        @Test
        fun launch_in() = runTest {
    
            var sum = 0
    
            val scope = CoroutineScope(Dispatchers.Default)
            randomPercentages(10, 500)
                .onEach { value -> sum += value }
                .launchIn(this)
    
            delay(200)
            scope.cancel()
    
            sum shouldBe 0
        }

    En aquest exemple:

    • L’operador onEach s’utilitza per imprimir els valors rebuts.
    • launchIn(scope) comença a recollir el flux en l’àmbit especificat.
    • L’àmbit es cancel·la després d’un retard, aturant la recol·lecció del flux.

    TODO