Flowable

배압(Backpressure) 현상을 제어하는 방법

Flowable

배압


배압이란 데이터의 생산과 소비가 불균형적일 때 발생하는 현상을 말한다.

예를 들어 10,000개의 데이터를 0.1초마다 발행하고 소비는 10초마다 한다면 소비와 관계 없는 데이터는 스트림에 계속 쌓이게 될텐데 Observable 이 데이터를 발행하는 속도를 소비 속도가 따라가지 못해서 발생하는 것이다.

결국 이 같은 현상이 발생하면 메모리가 overflow 되어 OutOfMemoryError 가 발생하여 에러가 발생할 것이다.

// 10,000개의 데이터를 발행하면서, 소비는 0.1초 delay
fun main() {
    Observable.range(1, 10000)
        .doOnNext {
            println("Emit Data : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            println("Consume Data : $it")
            Thread.sleep(100)
        }
    
    Thread.sleep(100 * 10000)
}


// Emit Data : 1
// Emit Data : 2
// Emit Data : 3
// ...
// Emit Data : 10
// Consume Data : 1
// ...
// Emit Data : 9998
// Emit Data : 9999
// Emit Data : 10000
// Consume Data : 2
// Consume Data : 3
// Consume Data : 4
// ...
  • Observable 을 사용한 경우에는 데이터의 발행과 소비가 불균형적으로 발생해서 데이터는 소비와 상관 없이 스트림에 계속 쌓이게 된다.

// 10,000개의 데이터를 발행하면서, 소비는 0.1초 delay
fun main() {
    Flowable.range(1, 10000)
        .doOnNext {
            println("Emit Data : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            println("Consume Data : $it")
            Thread.sleep(100)
        }
    
    Thread.sleep(100 * 10000)
}

// Emit Data : 1
// Emit Data : 2
// Emit Data : 3
// ...
// Emit Data : 128
// Consume Data : 1
// Consume Data : 2
// Consume Data : 3
// ...
  • Flowable 을 사용한 경우에는 데이터가 일정량 이상 누적이 되면 더 이상의 데이터를 발행하지 않게 된다.


배압 전략


Flowable 을 사용하여도 배압 현상을 제어하지 못해 MissingBackpressureException 이 발생할 수 있는 예외가 존재하는데 Flowable 에 배압 전략을 명시하여 이를 제어할 수 있다.

BackpressureStrategy.MISSING

배압 전략을 구현하지 않을 경우에 해당한다.

BackpressureStrategy.ERROR

소비 속도가 발행 속도를 따라가지 못하는 경우에 해당하며 MissingBackpressureException 발생한다.

BackpressureStrategy.BUFFER

데이터를 소비할 때까지 데이터를 버퍼에 넣어두는 전략으로 무한한 크기의 큐이지만 OutOfMemoryError 이 발생할 수 있다.

BackpressureStrategy.DROP

데이터의 발행 속도를 데이터의 속도가 따라가지 못하는 경우 발행되는 데이터를 모두 버리는 전략이다.

BackpressureStrategy.LATEST

소비자가 데이터를 받을 준비가 될 때까지 최신의 데이터만 유지하고 나머지 데이터는 모두 전략이다.


배압 제어 연산자


FlowableonBackPressureBuffer(), onBackPressureDrop(), onBackPressureLatest() 3가지 연산자를 사용하여 배압 전략을 적용할 수 있다.

fun main() {
    Flowable.range(1, 10000)
        .onBackpressureLatest()
        .doOnNext {
            println("Emit Data : $it")
        }
        .observeOn(Schedulers.io())
        .subscribe {
            println("Consume Data : $it")
            Thread.sleep(100)
        }
    
    Thread.sleep(100 * 10000)
}

// Emit Data : 1
// Emit Data : 2
// Emit Data : 3
// Consume Data : 1
// Emit Data : 4
// Emit Data : 5
// ...
// Emit Data : 128
// Consume Data : 2
// Consume Data : 3
// ...
// Consume Data : 96
// Emit Data : 1000
// Consume Data : 97
// Consume Data : 98
// ...
// Consume Data : 128
// Consume Data : 1000


onBackPressureBuffer()

BackpressureStrategy.BUFFER 전략을 적용하는 연산자로 파라미터로 버퍼의 용량, 버퍼 overflow 발생 시의 동작 등을 전달할 수 있다.


onBackPressureDrop()

BackpressureStrategy.DROP 전략을 적용하는 연산자로 파라미터로 데이터를 버릴 때의 동작을 전달할 수 있다.


onBackPressureLatest()

BackpressureStrategy.LATEST 전략을 적용하는 연산자이다.

essential