collectLatest 의 한계
collectLatest
는 새로운 데이터가 발행되었을 때 이전 데이터의 처리를 강제로 종료시키고 새로운 데이터를 처리하여 항상 최신의 데아터를 소비한다.
이로 인해 collectLatest
는 데이터를 발행하는 시간보다 데이터를 처리하는 시간이 오래 걸릴 경우에 새로운 들어오는 중간의 데이터는 계속해서 소비되지 못하고 마지막 데이터만 소비하게 된다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
countUp().collectLatest {
delay(1000)
println("Number: $it")
}
}
fun countUp(): Flow<Int> = flow {
for (i in 0..5) {
emit(i)
delay(100)
}
}
// Number: 5
- 데이터를 발행하는데 0.1초, 데이터를 소비하는데 1초가 걸려서 중간의 데이터들은 소비되지 못하고 마지막 데이터만 소비된다.
conflate
conflate
는 한 번 시작된 데이터의 소비를 끝날 때 까지 진행하고 데이터의 소비가 완료되는 시점에 가장 최신의 데이터를 다시 소비하는 것이다.
이로 인해 collectLatest
의 문제를 해결할 수 있다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
countUp()
.onEach {
println("Number-Emit: $it")
}
.conflate()
.collect {
delay(3000)
println("Number-Consume: $it")
}
}
fun countUp(): Flow<Int> = flow {
for (i in 0..5) {
emit(i)
delay(1000)
}
}
// Number-Emit: 0
// Number-Emit: 1
// Number-Emit: 2
// Number-Consume: 0
// Number-Emit: 3
// Number-Emit: 4
// Number-Emit: 5
// Number-Consume: 2
// Number-Consume: 5
- 0이 발행된 후 0에 대한 소모가 3초 후에 완료되며, 추가적으로 1과 2가 발행되고 3초 시점에서 마지막으로 발행된 데이터는 2가 된다.
- 2에 대한 소모가 시작 시점으로부터 6초 후에 완료되며, 추가적으로 3부터 5까지 발행되고 6초 시점에서 마지막으로 발행된 데이터는 5가 된다.
- 5에 대한 소모가 시작 시점으로부터 9초 시점에 완료된다.