Observable

소비할 데이터를 발행하는 역할을 하는 구독 대상자

Observable

Observable


소비할 데이터를 발행하는 역할을 하는 구독 대상자로 Observable 이 데이터를 발행하고 이벤트를 보내면 소비자는 소비자는 Observable 에 구독(Subscribe)해 데이터를 소비(Consume)한다.


Observable Callback


Observable 이 데이터를 발행한 후 보내는 이벤트는 4가지 종류가 있으며, Callback 함수를 구현하여 Observable 에 등록하면 Observable 이 전달하는 이벤트를 받을 수 있다.

val observer: Observer<T> = object: Observer<T> {
    override fun onSubscribe(d :Disposable) = Unit
    override fun onNext(item :T) = Unit
    override fun onError(e :Throwable) = Unit
    override fun onComplete() = Unit
}
  • onSubscribe 는 구독을 하면 호출이 되며, 파라미터의 Disposable 객체는 구독을 해제할 때 사용되며 dispose() 를 호출하면 된다.
  • onNext 는 데이터를 발행할 때 호출되며 데이터를 전달한다.
  • onError 는 에러가 발생하였을 경우에 호출되며 onError, onComplete 가 발생하지 않는다.
  • onComplete 는 데이터의 발행이 완료되었을 때 호출되며 딱 한 번만 발생한다.


Observable 구독


구독은 발행한 데이터를 수신하여 무엇을 할 것인지 행동을 정의하는 것으로 subscribe() 로 구독을 신청할 수 있다.

public final Disposable subscribe() {
    /* ... */
}

public final Disposable subscribe(Consumer<? super T> onNext) {
    /* ... */
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
    /* ... */
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
    /* ... */
}

public final void subscribe(Observer<? super T> observer) {
    /* ... */
}
  • 여러 메소드가 오버로딩 되어있는데 파라미터가 없는 subscribe() 는 주로 테스트 혹은 디버깅 용도로 사용되며 onError 가 호출되면 onErrorNotImplementedExceptionthrow 하고 끝난다.
  • Disposable 객체는 구독을 해제할 때 사용되는데 onComplete 가 호출되면 dispose() 를 호출해 Observable 이 더 이상 데이터를 발행하지 않도록 구독을 해지해야 한다.

Observable 의 구독 방법은 사용자 필요 이벤트만 정의해서 등록하는 방법과 Observable 객체를 등록하는 방법 2가지가 있다.

// 사용자 필요한 이벤트 정의
fun main() {
    // 4개 정의 (onNext / onError / onComplete / onSubscribe) 
    Observable.just(1)
        .subscribe(
            { println("onNext() - $it") },     // onNext()
            { println("onError() - $it") },	   // onError()
            { println("onComplete()") },	   // onComplete()
            { println("onSubscribe() - $it") } // onSubscribe()
        )

    // 2개 정의 (onNext / onError)
    Observable.just(1)
        .subscribe(
            { println("onNext() - $it") }, // onNext()
            { println("onError() - $it") } // onError()
        )
}
// Observer 객체 등록
val observer: Observer<Any> = object: Observer<Any> {
    override fun onSubscribe(d: Disposable) {
        println("onSubscribe() - $d")
    }
        
    override fun onNext(item: Any) {
        println("onNext() - $item")
    }
        
    override fun onError(e: Throwable) {
        println("onError() - ${e.message}")
    }
        
    override fun onComplete() {
        println("onComplete()\n")
    }
}



Observable 구독 해지


subscribe() 로 구독을 하였다면 onSubscribe(d: Disposable) 호출로 Disposable 객체를 전달하여 구독을 해지할 수 있다.

public interface Disposable {
    // 구독 해지 요청
    void dispose();
    
    // 구독 해지 상태 확인
    boolean isDisposed();   
}


Observer 를 통한 구독 해지

fun main() {
    val observer: Observer<Long> = object: Observer<Long> {
        lateinit var disposable: Disposable

        override fun onSubscribe(d: Disposable) {
            println("onSubscribe() $d")
            disposable = d
        }

        override fun onNext(item: Long) {
            println("onNext() $item")
            if (item >= 10 && disposable.isDisposed == false) {
                disposable.dispose()
                println("Subscribe Dispose")
            }
        }

        override fun onError(e: Throwable) { 
            println("onError() ${e.message}") 
        }

        override fun onComplete() { 
            println("onComplete()") 
        }
    }

    Observable
        .interval(100, TimeUnit.MILLISECONDS)
        .subscribe(observer)

    Thread() {
        sleep(1000)
    }.apply {
        start()
        join()
    }
}


// onSubscribe() = null
// onNext() 0
// onNext() 1
// onNext() 2
// onNext() 3
// onNext() 4
// onNext() 5
// onNext() 6
// onNext() 7
// onNext() 8
// onNext() 9
// onNext() 10
// Subscribe Dispose


Disposable 를 통한 구독 해지

fun main() {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    
    val disposable: Disposable = observable.subscribe(
        { println("onNext() $it")},			
        { println("onError() ${it.message}")},
        { println("onComplete()")}			
    )

    Thread() {
        Thread.sleep(1000)
        disposable.dispose()
    }.apply {
        start()
        join()
    }
}


// onNext() 0
// onNext() 1
// onNext() 2
// onNext() 3
// onNext() 4
// onNext() 5
// onNext() 6
// onNext() 7
// onNext() 8
// onNext() 9
// onNext() 10
// onNext() 11
essential