amb()
여러 개의 Observable
중에서 가장 먼저 데이터를 발행하는 Observable
을 선택하는 연산자이다.
fun main() {
val data1 = arrayOf("A", "B", "C")
val data2 = arrayOf("1", "2", "3")
val stream = lisfOf(
Observable.fromIterable(data1)
.delay(200L, TimeUnit.MILLSECONDS)
.doOnComplete {
println("First Observable onComplete()")
},
Observable.fromItemable(data2)
.delay(100L, TimeUnit.MILLSECONDS)
.doOnComplete {
println("Second Observable onComplete()")
}
)
Observable.amb<String>(stream)
.doOnComplete {
println("Result onComplete()")
}
.subscribe(System.out:println)
}
// 1
// 2
// 3
// Second Observable onComplete()
// Result onComlete()
takeUntil()
파라미터로 받은 Observable
에서 어떤 값을 발행하면 현재 Observable
의 데이터 발행을 중단하고 즉시 완료하는 연산자로, take()
와 같이 일정 개수만 값을 발행하지만 완료 기준을 다른 Observable
이 값을 발행하는지로 결정한다.
fun main() {
val data = listOf("1", "2", "3", "4", "5")
Observable.fromIterable(data)
.zipWith(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction<String, Long, String> { data, _ -> data }
)
.takeUntil(
// 발행 중지의 기준이 되는 Observable
Observable.timer(500L, TimeUnit.MILLISECONDS)
)
.subscribe(System.out::println)
sleep(1000)
}
// 1
// 2
// 3
// 4
skipUntil()
takeUntil()
과 반대로 현재 Observable
의 발행을 무시하다가 설정한 Observable
이 발행되면 그 시점부터 발행을 시작하는 연산자이다.
fun main() {
val data = listOf("1", "2", "3", "4", "5")
Observable.fromIterable(data)
.zipWith(
Observable.interval(100L, TimeUnit.MILLISECONDS),
BiFunction<String, Long, String> { data, _ -> data }
)
.skipUntil(
// 발행 시작의 기준이 되는 Observable
Observable.timer(500L, TimeUnit.MILLISECONDS)
)
.subscribe(System.out::println)
sleep(1000)
}
// 5
all()
주어진 조건이 모두 만족할 때 데이터를 발행하는 연산자이다.
fun main() {
val data = listOf("A", "B", "C", "D", "E")
Observable.fromIterable(data)
.map(
when(it) {
"1" -> {
return@map "One"
}
"2" -> {
return@map "two"
}
"3" -> {
return@map "Three"
}
"4" -> {
return@map "Four"
}
"5" -> {
return@map "Five"
}
else -> {
return@map "Unknown"
}
}
)
.all(
it == "Unknown"
)
.subscribe(println(it)
sleep(1000)
}
// true