create()
Observable
을 구독하는 시점에 create
블록이 실행되는 연산자로 onNext()
, onError()
, onComplete()
를 직접 호출해야 한다.
fun main() {
Observable.create<Int> {
it.onNext(1)
it.onNext(2)
it.onComplete()
}.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()") }
)
Observable.create<Int> {
it.onNext(10)
it.onNext(20)
it.onError(Exception("Error!!!"))
}.subscribeBy(
onNext = { println("onNext() $it") },
onError = { println("onError() $it") },
onComplete = { println("onComplete()") }
)
}
// onNext() 1
// onNext() 2
// onComplete()
// onNext() 10
// onNext() 20
// onError() java.lang.Exception: Error!!!
just()
전달 받은 파라미터를 순서대로 발행하는 Observable
을 생성하는 연산자로, 전달 받은 데이터를 그대로 전달하며 최대 갯수는 10인데 10개 이상을 선언하면 예외가 발생한다.
fun main() {
val observer: Observer<Any> = object: Observer<Any> {
override fun onSubscribe(d: Disposable) = Unit
override fun onNext(item: Any) = println("onNext() $item")
override fun onError(e: Throwable) = println("onError() ${e.message}")
override fun onComplete() = println("onComplete()")
}
val list = listOf(1, 2, 3, 4, 5)
val num = 100
val str = "String"
val map = mapOf(1 to "One", 2 to "Two", 3 to "Three")
Observable
.just(list, num, str, map)
.subscribe(observer)
}
// onNext() [1, 2, 3, 4, 5]
// onNext() 100
// onNext() String
// onNext() {1=One, 2=Two, 3=Three}
// onComplete()
range()
주어진 값(n
)부터 m
개의 Integer
객체를 발행하는 연산자이다.
fun main() {
Observable
.range(5, 5)
.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
}
// onNext() 5
// onNext() 6
// onNext() 7
// onNext() 8
// onNext() 9
// onComplete()
empty()
아무 값도 전달하지 않고 onComplete()
를 마지막에 호출하는 연산자이다.
fun main() {
Observable
.empty<Unit>()
.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
}
// onComplete()
interval()
주어진 시간 간격으로 0부터 1씩 증가하는 Long
객체를 생성하는 연산자이다.
fun main() {
Observable
.interval(100, TimeUnit.MILLISECONDS)
.take(5) // 5개까지 발행
.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
// Observable 은 별도의 스레드에서 작동하기 때문에 동작이 완료할 때 까지 메인 스레드 대기
sleep(1000)
}
// onNext() 0
// onNext() 1
// onNext() 2
// onNext() 3
// onNext() 4
// onComplete()
timer()
주어진 시간이 지나면 1개의 데이터를 발행하고 onComplete()
를 호출하는 연산자이다.
fun main() {
println(SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date()))
Observable
.timer(5000L, TimeUnit.MILLISECONDS)
.map {
SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date())
}
.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
// Observable 은 별도의 스레드에서 작동하기 때문에 동작이 완료할 때 까지 메인 스레드 대기
sleep(5000)
}
// 2022/01/01 10:00:00
// onNext() 2022/01/01 10:00:05
// onComplete()
fromArray()
Array
데이터를 1개씩 발행하는 Observable
을 반환하는 연산자이다.
fun main() {
val arr = arrayOf("one", "two", "three")
Observable
.fromArray(*arr)
.subscribe {
println(it)
}
}
// one
// two
// three
fromIterable()
Iterable
객체를 Observable
로 변환하는 연산자이다.
fun main() {
val list = listOf("one", "two", "three")
Observable
.fromIterable(list)
.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
}
// onNext() one
// onNext() two
// onNext() three
// onComplete()
fromCallable()
Callable
객체를 Observable
로 변환하는 연산자이다.
fun main() {
val callable = {
sleep(1000)
"Sleep"
}
Observable
.fromCallable(callable)
.subscribeBy(
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
}
// onNext() Sleep
// onComplete()
fromFuture()
Future
객체를 Observable
로 변환하는 연산자로, Future
의 get()
함수의 반환값이 전달된다.
fun main() {
val future = object: Future<String> {
override fun get() = "Future"
override fun get(timeout: Long, unit: TimeUnit) = "Future TimeOut"
override fun isDone() = true
override fun cancel(mayInterruptIfRunning: Boolean) = false
override fun isCancelled() = false
}
Observable
.fromFuture(future)
.subscribeBy (
onNext = { println("onNext() $it") },
onComplete = { println("onComplete()")}
)
}
// onNext() Future
// onComplete()