생성 연산자

create / just / range / empty / interval / timer / fromArray /fromIterable / fromCallable / fromFuture

생성 연산자

create()


Observable 을 구독하는 시점에 create 블록이 실행되는 연산자로 onNext(), onError(), onComplete() 를 직접 호출해야 한다.

fun main() {
    Observable.create<Int> {
        it.onNext(1)              
        it.onNext(2)
        it.onComplete()          
    }.subscribeBy(
        onNext = { println("onNext() $it") },
        onComplete = { println("onComplete()") }
    )

    Observable.create<Int> {
        it.onNext(10)
        it.onNext(20)
        it.onError(Exception("Error!!!"))
    }.subscribeBy(
        onNext = { println("onNext() $it") },
        onError = { println("onError() $it") },
        onComplete = { println("onComplete()") }
    )
}
    

// onNext() 1
// onNext() 2
// onComplete()
// onNext() 10
// onNext() 20
// onError() java.lang.Exception: Error!!!



just()


전달 받은 파라미터를 순서대로 발행하는 Observable 을 생성하는 연산자로, 전달 받은 데이터를 그대로 전달하며 최대 갯수는 10인데 10개 이상을 선언하면 예외가 발생한다.

fun main() {
    val observer: Observer<Any> = object: Observer<Any> {
        override fun onSubscribe(d: Disposable) = Unit
        override fun onNext(item: Any) = println("onNext() $item")
        override fun onError(e: Throwable) = println("onError() ${e.message}")
        override fun onComplete() = println("onComplete()")
    }

    val list = listOf(1, 2, 3, 4, 5)
    val num = 100
    val str = "String"
    val map = mapOf(1 to "One", 2 to "Two", 3 to "Three")

    Observable
        .just(list, num, str, map)
        .subscribe(observer)			
}


// onNext() [1, 2, 3, 4, 5]
// onNext() 100
// onNext() String
// onNext() {1=One, 2=Two, 3=Three}
// onComplete()



range()


주어진 값(n)부터 m 개의 Integer 객체를 발행하는 연산자이다.

fun main() {
    Observable
        .range(5, 5)
        .subscribeBy(
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )		
}


// onNext() 5
// onNext() 6
// onNext() 7
// onNext() 8
// onNext() 9
// onComplete()



empty()


아무 값도 전달하지 않고 onComplete() 를 마지막에 호출하는 연산자이다.

fun main() {
    Observable
        .empty<Unit>()
        .subscribeBy(
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )		
}


// onComplete()



interval()


주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 생성하는 연산자이다.

fun main() {
    Observable
        .interval(100, TimeUnit.MILLISECONDS)
        .take(5) // 5개까지 발행
        .subscribeBy(
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )		

    // Observable 은 별도의 스레드에서 작동하기 때문에 동작이 완료할 때 까지 메인 스레드 대기
    sleep(1000)
}


// onNext() 0
// onNext() 1
// onNext() 2
// onNext() 3
// onNext() 4
// onComplete()



timer()


주어진 시간이 지나면 1개의 데이터를 발행하고 onComplete() 를 호출하는 연산자이다.

fun main() {
    println(SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date()))

    Observable
        .timer(5000L, TimeUnit.MILLISECONDS)
        .map {
            SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date())
        }
        .subscribeBy(
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )		

    // Observable 은 별도의 스레드에서 작동하기 때문에 동작이 완료할 때 까지 메인 스레드 대기
    sleep(5000)
}


// 2022/01/01 10:00:00
// onNext() 2022/01/01 10:00:05
// onComplete()



fromArray()


Array 데이터를 1개씩 발행하는 Observable 을 반환하는 연산자이다.

fun main() {
    val arr = arrayOf("one", "two", "three")
    
    Observable
        .fromArray(*arr)
        .subscribe {
            println(it)
        }
}


// one
// two
// three



fromIterable()


Iterable 객체를 Observable 로 변환하는 연산자이다.

fun main() {
    val list = listOf("one", "two", "three")
		
    Observable
        .fromIterable(list)
        .subscribeBy(
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )
}


// onNext() one
// onNext() two
// onNext() three
// onComplete()



fromCallable()


Callable 객체를 Observable 로 변환하는 연산자이다.

fun main() {
    val callable = {
        sleep(1000)
        "Sleep"
    }
	
    Observable
        .fromCallable(callable)
        .subscribeBy(
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )
}


// onNext() Sleep
// onComplete()



fromFuture()


Future 객체를 Observable 로 변환하는 연산자로, Futureget() 함수의 반환값이 전달된다.

fun main() {
    val future = object: Future<String> {
        override fun get() = "Future"
    
        override fun get(timeout: Long, unit: TimeUnit) = "Future TimeOut"
	  
        override fun isDone() = true
    
        override fun cancel(mayInterruptIfRunning: Boolean) = false
    
        override fun isCancelled() = false
    }

    Observable
        .fromFuture(future)
        .subscribeBy (
            onNext = { println("onNext() $it") },
            onComplete = { println("onComplete()")}
        )
}


// onNext() Future
// onComplete()
essential