Flow

데이터 스트림

Flow

Flow


데이터의 소비자가 구독을 요청하면, 데이터의 발행자는 새로운 데이터가 들어오면 데이터의 소비자에게 지속적으로 데이터를 전달하는 역할을 하는데, 이것을 데이터 스트림이라고 한다.

이러한 데이터 스트림을 코루틴에서 지원하기 위한 요소가 Flow 이다.

데이터 스트림의 구성 요소는 생산자(Producer), 중간 연산자(Intermediary), 소비자(Consumer) 순서로 구성된다. (ProducerIntermediaryConsumer)

생산자 (Producer)

생산자는 데이터를 생산하는 역할로 Flow 에서 생산자는 flow { } 블록 내부에서 emit() 을 통해 데이터를 생산한다.

fun countUp(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        emit(it)
    }
}


중간 연산자 (Intermediary)

중간 연산자는 생성자가 생성한 데이터를 변환하는 역할을 하며 Flow 에서 map(), filter() 등의 중간 연산자를 사용할 수 있다.

Collection, Sequence 에서 사용하는 중간 연산자와는 다르게 Flow 의 중간 연산자 블록 내부에서는 중단 함수(suspend 함수)를 사용할 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    (1..3).asFlow()
        .map {
            request(it) // flow 를 매핑하여 새로운 flow 반환
        }
        .collect {
            println(it) // 매핑된 새로운 flow 를 수집
        }
}

suspend fun request(request: Int): String {
    delay(1000)
    return "Response $request"
}

// Response 1
// Response 2
// Response 3


소비자 (Consumer)

소비자는 중간 연산자가 변환한 데이터를 소비하는 역할로 Flow 에서 소비자는 collect 를 이용해 전달된 데이터를 소비할 수 있다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    countUp() // Not Working
    countUp().collect { // Working
        println(it)
    }
}

fun countUp(): Flow<Int> = flow {
    println("Count Up")
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}


// Count Up
// 1
// 2
// 3
essential