배압
배압이란 데이터의 생산과 소비가 불균형적일 때 발생하는 현상을 말한다.
예를 들어 10,000개의 데이터를 0.1초마다 발행하고 소비는 10초마다 한다면 소비와 관계 없는 데이터는 스트림에 계속 쌓이게 될텐데 Observable
이 데이터를 발행하는 속도를 소비 속도가 따라가지 못해서 발생하는 것이다.
결국 이 같은 현상이 발생하면 메모리가 overflow
되어 OutOfMemoryError
가 발생하여 에러가 발생할 것이다.
// 10,000개의 데이터를 발행하면서, 소비는 0.1초 delay
fun main() {
Observable.range(1, 10000)
.doOnNext {
println("Emit Data : $it")
}
.observeOn(Schedulers.io())
.subscribe {
println("Consume Data : $it")
Thread.sleep(100)
}
Thread.sleep(100 * 10000)
}
// Emit Data : 1
// Emit Data : 2
// Emit Data : 3
// ...
// Emit Data : 10
// Consume Data : 1
// ...
// Emit Data : 9998
// Emit Data : 9999
// Emit Data : 10000
// Consume Data : 2
// Consume Data : 3
// Consume Data : 4
// ...
Observable
을 사용한 경우에는 데이터의 발행과 소비가 불균형적으로 발생해서 데이터는 소비와 상관 없이 스트림에 계속 쌓이게 된다.
// 10,000개의 데이터를 발행하면서, 소비는 0.1초 delay
fun main() {
Flowable.range(1, 10000)
.doOnNext {
println("Emit Data : $it")
}
.observeOn(Schedulers.io())
.subscribe {
println("Consume Data : $it")
Thread.sleep(100)
}
Thread.sleep(100 * 10000)
}
// Emit Data : 1
// Emit Data : 2
// Emit Data : 3
// ...
// Emit Data : 128
// Consume Data : 1
// Consume Data : 2
// Consume Data : 3
// ...
Flowable
을 사용한 경우에는 데이터가 일정량 이상 누적이 되면 더 이상의 데이터를 발행하지 않게 된다.
배압 전략
Flowable
을 사용하여도 배압 현상을 제어하지 못해 MissingBackpressureException
이 발생할 수 있는 예외가 존재하는데 Flowable
에 배압 전략을 명시하여 이를 제어할 수 있다.
BackpressureStrategy.MISSING
배압 전략을 구현하지 않을 경우에 해당한다.
BackpressureStrategy.ERROR
소비 속도가 발행 속도를 따라가지 못하는 경우에 해당하며 MissingBackpressureException
발생한다.
BackpressureStrategy.BUFFER
데이터를 소비할 때까지 데이터를 버퍼에 넣어두는 전략으로 무한한 크기의 큐이지만 OutOfMemoryError
이 발생할 수 있다.
BackpressureStrategy.DROP
데이터의 발행 속도를 데이터의 속도가 따라가지 못하는 경우 발행되는 데이터를 모두 버리는 전략이다.
BackpressureStrategy.LATEST
소비자가 데이터를 받을 준비가 될 때까지 최신의 데이터만 유지하고 나머지 데이터는 모두 전략이다.
배압 제어 연산자
Flowable
은 onBackPressureBuffer()
, onBackPressureDrop()
, onBackPressureLatest()
3가지 연산자를 사용하여 배압 전략을 적용할 수 있다.
fun main() {
Flowable.range(1, 10000)
.onBackpressureLatest()
.doOnNext {
println("Emit Data : $it")
}
.observeOn(Schedulers.io())
.subscribe {
println("Consume Data : $it")
Thread.sleep(100)
}
Thread.sleep(100 * 10000)
}
// Emit Data : 1
// Emit Data : 2
// Emit Data : 3
// Consume Data : 1
// Emit Data : 4
// Emit Data : 5
// ...
// Emit Data : 128
// Consume Data : 2
// Consume Data : 3
// ...
// Consume Data : 96
// Emit Data : 1000
// Consume Data : 97
// Consume Data : 98
// ...
// Consume Data : 128
// Consume Data : 1000
onBackPressureBuffer()
BackpressureStrategy.BUFFER
전략을 적용하는 연산자로 파라미터로 버퍼의 용량, 버퍼 overflow
발생 시의 동작 등을 전달할 수 있다.
onBackPressureDrop()
BackpressureStrategy.DROP
전략을 적용하는 연산자로 파라미터로 데이터를 버릴 때의 동작을 전달할 수 있다.
onBackPressureLatest()
BackpressureStrategy.LATEST
전략을 적용하는 연산자이다.