zip()
2개 이상의 Observable
을 결합할 때 사용하는 연산자로, 만약 한쪽의 Observable
에서 처리가 안된다면 모두 처리가 될 때까지 발행을 대기한다.
fun main() {
val items = arrayOf("A Item", "B Item", "C Item")
val numbers = arrayOf("One", "Two", "Three")
Observable.zip(
Observable.fromIterable(items).map {
when (it) {
"A Item" -> {
return@map "A = "
},
"B Item" -> {
return@map "B = "
},
"C Item" -> {
return@map "C = "
},
else -> {
return@map "Unknow = "
}
}
},
Observable.fromIterable(items).map {
when (it) {
"One" -> {
return@map "1"
},
"Two" -> {
return@map "2"
},
"Three" -> {
return@map "3"
},
else -> {
return@map "0"
}
}
},
// zip 으로 새로 발행할 값을 생성하는 함수
BiFunction<String, String, String> { item, number ->
return@BiFunction item + number
}
).subscribe(System.out::println)
}
// A = 1
// B = 2
// C = 3
zipWith()
zip()
이 발행하는 Observable
의 값을 추가적으로 다시 조합하여 발행하는 연산자이다.
fun main() {
Observable.zip(
Observable.just(1, 2, 3),
Observable.just(10, 20, 30),
BiFunction<Int, Int, Int> { a, b -> a + b }
).zipWith(
Observable.just(100, 200, 300),
BiFunction<Int, Int, Int> { ab, c -> ab + c }
).subscribe(System.out::println)
}
// 111
// 222
// 333
combineLatest()
2개 이상의 Observable
을 기반으로 Observable
각각의 값이 변경되었을 때 갱신해주는 연산자이다.
fun main() {
val items = arrayOf("A Item", "B Item", "C Item", "D Item")
val numbers = arrayOf("One", "Two", "Three")
Observable.combineLatest(
Observable.fromIterable(items).zipWith(
// 0.15초 시작, 0.2초 간격
Observable.interval(150L, 200L, TimeUni.MILLISECONDS),
// 2번째 파라미터 Long 은 interval 값
BiFunction<String, Long, String> { item, _ ->
when (item) {
"A Item" -> {
return@map "A="
},
"B Item" -> {
return@map "B="
},
"C Item" -> {
return@map "C="
},
else -> {
return@map "Unknow="
}
}
}
),
Observable.fromIterable(numbers).zipWith(
Observable.interval(200L, TimeUnit.MILLISECONDS),
BiFunction<String, Long, String> { number, _ ->
when (it) {
"One" -> {
return@map "1"
},
"Two" -> {
return@map "2"
},
"Three" -> {
return@map "3"
},
else -> {
return@map "0"
}
}
}
),
BiFunction<String, String, String> { item, number ->
return@BiFunction item + number
}
).subscribe(System.out::println)
sleep(1000)
}
// A=1
// B=1
// B=2
// C=2
// C=3
// Unknown=3
- 첫 번째
Observable
은 0.15초 후에 시작하고 이후 0.2초 마다 발행하며, 두 번째Observable
은 0.2초마다 발행하기 때문에 두Observable
의 발행 간격이 서로 다르다. - 첫 번째
Observable
발행값이A=
, 두 번째Observable
발행값이 1인 상태에서 첫 번째Observable
이 새로운 값B=
를 발행하면 두 번째Observable
의 값 1을 새로 갱신하여 출력한다.
merge()
Observable
에서 먼저 입력되는 데이터를 그대로 발행해주는 가장 단순한 결합 연산자로, Observable
의 발행값을 발행 즉시 전달하며 Observable
들의 발행 개수는 서로 달라도 상관이 없다.
fun main() {
val data1 = arrayOf("A", "B", "C")
val data2 = arrayOf("1", "2", "3", "4")
val stream1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.map { it.toInit() }
.map { data1[it] }
.take(data1.size)
val stream2 = Observable.interval(50, TimeUnit.MILLISECONDS)
.map { it.toInit() }
.map { data2[it] }
.take(data2.size)
Observable.merge(stream1, stream2)
.subscribe(System.out::println)
sleep(1000)
}
// A
// 1
// B
// 2
// C
// 3
// 4
concat()
2개 이상의 Observable
을 차례대로 결합해주는 연산자로 onComplete()
가 호출되야 그 다음 Observable
을 구독하며, 만약 앞의 Observable
이 onComplete()
를 호출하지 않는다면 뒤의 Observable
은 결합되지 않는다.
fun main() {
val data1 = arrayOf("A", "B", "C")
val data2 = arrayOf("1", "2", "3")
val stream1 = Observable.from(data1)
.doOnComplete { // onComplete() 호출 시 Callback 되는 함수
println("stream1 onComplete()")
}
val stream2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { it.toInit() }
.map { data2[it] }
.take(data2.size)
.doOnComplete {
println("stream2 onComplete()")
}
Observable.concat(stream1, stream2)
.doOnComplete {
println("concat onComplete()")
}
.subscribe(System.out::println)
sleep(1000)
}
// A
// B
// C
// stream1 onComplete()
// 1
// 2
// 3
// stream2 onComplete()
// concat onComplete()