Flow 변환

flatMapConcat / flatMapLatest / flatMapMerge

Flow 변환

flatMapConcat


flatMapConcat 은 여러 Flow 를 연결하는 연산자로, Flow 를 연결해서 새로운 Flow 를 만든다.

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()


public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}
  • transform 람다식을 받아 map 을 수행해 Flow 에서 발행된 데이터를 변환하여 새로 생성한다.
  • 생성된 Flow 들을 합쳐서 하나의 Flow 로 만든다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val flow = flow<Int> {
    emit(10)
    emit(100)
}

fun main() = runBlocking {
    flow.flatMapConcat {
        flow { // flow 로부터 emit 된 데이터를 다시 flow 에서 받아서 emit
            emit(it + 1)
            emit(it + 2)
            emit(it + 3)
        }
    }.collect {
        println("$it")
    }
}


// 11
// 12
// 13
// 101
// 102
// 103
  • 10에 대한 변환 수행이 된 후 100에 대한 변환이 순차적으로 처리된다.
  • 순차적으로 처리되기 때문에 만약 데이터 변환에 시간이 오래 지연되는 경우에 변환된 데이터의 발행 시점이 계속해서 밀리는 문제가 생길 수 있다.


flatMapLatest


flatMapLatest 는 최신의 데이터만을 이용해 새로운 Flow 로 변환해주는 연산자로 collectLatest 와 유사한 동작을 하는데, 발행된 데이터를 변환하는 도중에 새로운 데이타가 발행되는 경우에 이전 데이터의 변환을 취소하고 새로운 데이터를 받아 변환을 한다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val flow = flow<Int> {
    emit(10)
    emit(100)
}

fun main() = runBlocking {
    flow.flatMapLatest {
        flow {
            emit(it + 1)
            delay(1000)
            emit(it + 2)
            emit(it + 3)
        }
    }.collect {
        println("$it")
    }
}


// 11
// 101
// 102
// 103
  • 이전 데이터가 변환되기 전에 새로운 데이터가 들어오면 이전 데이터의 변환이 취소되기 때문에 첫 번째 데이터 10의 +1은 처리되고 +2와 +3은 취소된다.


flatMapMerge


flatMapMerge 는 발행된 데이터를 순차적으로 처리하는 flatMapConcat, flatMapLatest 와는 다르게 데이터의 변환을 병렬로 수행하는 연산자이다.

대부분의 연산은 순차 처리를 하지만 순서가 중요하지 않은 경우에는 발행된 데이터들을 동시에 수집하여 수집한 데이터를 빠르게 방출되도록 병렬로 처리하는 것이 훨씬 효율적이다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val flow = flow<Int> {
    emit(10)
    emit(100)
}

fun main() = runBlocking {
    flow.flatMapMerge {
        flow {
            emit(it + 1)
            delay(1000)
            emit(it + 2)
            emit(it + 3)
        }
    }.collect {
        println("$it")
    }
}


// 11
// 101
// 12
// 13
// 102
// 103
  • 첫 번째 데이터 10의 변환이 완료되는 것을 기다리지 않고 두 번째 데이터 100도 변환을 시작한다.
  • 첫 번째 데이터 10의 +1이 처리된 후 첫 번째 데이터 10에 대한 변환을 1초간 지연하고, 두 번째 데이터 100의 +1이 처리된 후 두 번째 데이터 1000에 대한 변환을 1초간 지연한다.
  • 1초 후에 첫 번째 데이터 10에 대한 변환을 수행하여 12와 13을 발행하고, 두 번째 데이터 100에 대한 변환을 수행하여 102와 103을 발행한다.
essential