filter()
Observable
의 발행 데이터에서 원하는 데이터만 걸러내는 연산자이다.
fun main() {
val items = arrayOf("Item 1", "Item 2", "Value 3", "Item 4", "Value 5")
Observable.fromArray(*items)
.filter { it.startsWith("Value") }
.subscribe { println("$it") }
}
// Value 3
// Value 5
map()
입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자이다.
fun main() {
val numbers = arrayOf(1, 2, 3, 4, 5)
Observable.fromArray(*numbers)
.map { it * 10 }
.subscribe { println("$it")}
}
// 10
// 20
// 30
// 40
// 50
concatMap()
map()
과 동일하게 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자로, 들어오는 데이터를 순서대로 처리한다.
fun main() {
val numbers = arrayOf("1", "2", "3")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { numbers[it] }
.take(numbers.size.toLong())
.concatMap {
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { "<${it}>" }
.take(2)
}
.subscribe(System.out::println)
sleep(2000)
}
// <1>
// <1>
// <2>
// <2>
// <3>
// <3>
flatMap()
map()
과 동일하게 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자로, 들어오는 데이터의 순서를 보장하지 않고 처리한다.
fun main() {
val numbers = arrayOf("1", "2", "3")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { numbers[it] }
.take(numbers.size.toLong())
.flatMap {
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { "<${it}>" }
.take(2)
}
.subscribe(System.out::println)
sleep(2000)
}
// <1>
// <2>
// <3>
// <1>
// <2>
// <3>
switchMap()
map()
과 동일하게 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자로, concatMap()
과 동일하게 데이터의 순서를 보장하지만 중간에 새로운 데이터가 발행되면 기존에 진행중이던 작업을 중단하고 중간에 발행된 데이터를 처리한다.
fun main() {
val numbers = arrayOf("1", "2", "3")
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInt() }
.map { numbers[it] }
.take(numbers.size.toLong())
.switchMap {
Observable.interval(200L, TimeUnit.MILLISECONDS)
.map { "<${it}>" }
.take(2)
}
.subscribe(System.out::println)
sleep(2000)
}
// <3>
// <3>
Main
Observable
이 0.1초 간격으로 3개의Sub
Observable
을 발행하며, 각Sub
Observable
은 0.2 초 간격으로 값을 발행한다.- 값 1에 해당하는 첫 번째
Sub
Observable
이 값을 발행할 때까지 0.2초 동안 대기 중에 값 2에 해당하는 두 번째Sub
Observable
이 생성되었기 때문에 값 1에 해당하는 첫 번째Sub
Observable
이 중단된다. - 값 2에 해당하는 두 번째
Sub
Observable
이 값 발행할 때까지 0.2초 동안 대기 중에 값 3에 해당하는 세 번째Sub
Observable
이 생성되었기 때문에 값 2에 해당하는 두 번째Sub
Observable
은 중단된다. - 값 3에 해당하는 세 번째
Sub
Observable
이 값 발행할 때까지 0.2초 동안 대기 중에 다른 값의 발행이 없어서 값 3에 해당하는 세 번째Sub
Observable
은 값을 발행할 수 있게 된다.
reduce()
발행한 데이터를 모두 사용하여 최종 결과 데이터를 합칠 때 사용하는 연산자로, 모든 데이터가 입력된 후 모두 종합하여 마지막 1개의 데이터를 발행한다.
fun main() {
val numbers = arrayOf("1", "2", "3", "4", "5")
Observable.fromArray(*numbers)
.reduce { prev, next ->
"$next($prev)"
}
.subscribe(System.out::println)
}
// 5(4(3(2(1))))
scan()
실행할 때 마다 입력값에 맞는 중간 결과 및 최종 결과의 데이터를 발행하는 연산자이다.
fun main() {
val numbers = arrayOf("1", "2", "3", "4", "5")
Observable.fromArray(*numbers)
.reduce { prev, next ->
"$next($prev)"
}
.subscribe(System.out::println)
}
// 1
// 2(1)
// 3(2(1))
// 4(3(2(1)))
// 5(4(3(2(1))))
filter() 와 비슷하게 사용할 수 있는 함수
first(default)
Observable
의 첫 번째 값만 발행하고, 값이 없으면 default
값 발행한다.
last(default)
Observable
의 마지막 값만 발행하고, 값이 없으면 default
값 발행한다.
take(N)
Observable
의 최초 N
개의 값을 발행한다.
takeLast(default)
Observable
의 마지막 N
개의 값을 발행한다.
skip(N)
Observable
의 최초 N
개 값을 제외하고 발행한다.
skipLast(N)
Observable
의 마지막 N
개 값을 제외하고 발행한다.