결합 연산자

zip / zipWith / combineLatest / merge / concat

결합 연산자

zip()


2개 이상의 Observable 을 결합할 때 사용하는 연산자로, 만약 한쪽의 Observable 에서 처리가 안된다면 모두 처리가 될 때까지 발행을 대기한다.

fun main() {
    val items = arrayOf("A Item", "B Item", "C Item")
    val numbers = arrayOf("One", "Two", "Three")
		
    Observable.zip(
        Observable.fromIterable(items).map {
            when (it) {
                "A Item" -> {
                    return@map "A = "
                },
                "B Item" -> {
                    return@map "B = "
                },
                "C Item" -> {
                    return@map "C = "
                },
                else -> {
                    return@map "Unknow = "
                }
            }
        },
		Observable.fromIterable(items).map {
            when (it) {
                "One" -> {
                    return@map "1"
                },
                "Two" -> {
                    return@map "2"
                },
                "Three" -> {
                    return@map "3"
                },
                else -> {
                    return@map "0"
                }
            }
        },
        // zip 으로 새로 발행할 값을 생성하는 함수
        BiFunction<String, String, String> { item, number ->  
            return@BiFunction item + number
        }
    ).subscribe(System.out::println)
}


// A = 1
// B = 2
// C = 3



zipWith()


zip() 이 발행하는 Observable 의 값을 추가적으로 다시 조합하여 발행하는 연산자이다.

fun main() {		
    Observable.zip(
        Observable.just(1, 2, 3),
        Observable.just(10, 20, 30),
        BiFunction<Int, Int, Int> { a, b -> a + b	}
    ).zipWith(
        Observable.just(100, 200, 300),
        BiFunction<Int, Int, Int> { ab, c -> ab + c }
    ).subscribe(System.out::println)
}


// 111
// 222
// 333



combineLatest()


2개 이상의 Observable 을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 연산자이다.

fun main() {
    val items = arrayOf("A Item", "B Item", "C Item", "D Item")
    val numbers = arrayOf("One", "Two", "Three")
		
    Observable.combineLatest(
        Observable.fromIterable(items).zipWith(
            // 0.15초 시작, 0.2초 간격
            Observable.interval(150L, 200L, TimeUni.MILLISECONDS),
            // 2번째 파라미터 Long 은 interval 값
            BiFunction<String, Long, String> { item, _ ->
                when (item) {
                    "A Item" -> {
                        return@map "A="
                    },
                    "B Item" -> {
                        return@map "B="
                    },
                    "C Item" -> {
                        return@map "C="
                    },
                    else -> {
                        return@map "Unknow="
                    }
                }
            }
        ),
        Observable.fromIterable(numbers).zipWith(
            Observable.interval(200L, TimeUnit.MILLISECONDS),
            BiFunction<String, Long, String> { number, _ -> 
                when (it) {
                    "One" -> {
                        return@map "1"
                    },
                    "Two" -> {
                        return@map "2"
                    },
                    "Three" -> {
                        return@map "3"
                    },
                    else -> {
                        return@map "0"
                    }
                }
            }		
        ),
        BiFunction<String, String, String> { item, number ->  
            return@BiFunction item + number
        }
	).subscribe(System.out::println)

    sleep(1000)
}


// A=1
// B=1
// B=2
// C=2
// C=3
// Unknown=3
  • 첫 번째 Observable 은 0.15초 후에 시작하고 이후 0.2초 마다 발행하며, 두 번째 Observable 은 0.2초마다 발행하기 때문에 두 Observable 의 발행 간격이 서로 다르다.
  • 첫 번째 Observable 발행값이 A=, 두 번째 Observable 발행값이 1인 상태에서 첫 번째 Observable 이 새로운 값 B= 를 발행하면 두 번째 Observable 의 값 1을 새로 갱신하여 출력한다.


merge()


Observable 에서 먼저 입력되는 데이터를 그대로 발행해주는 가장 단순한 결합 연산자로, Observable 의 발행값을 발행 즉시 전달하며 Observable 들의 발행 개수는 서로 달라도 상관이 없다.

fun main() {
    val data1 = arrayOf("A", "B", "C")
    val data2 = arrayOf("1", "2", "3", "4")
		
    val stream1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
        .map { it.toInit() }
        .map { data1[it] }
        .take(data1.size)

    val stream2 = Observable.interval(50, TimeUnit.MILLISECONDS)
        .map { it.toInit() }
        .map { data2[it] }
        .take(data2.size)

    Observable.merge(stream1, stream2)
        .subscribe(System.out::println)
		
    sleep(1000)
}


// A
// 1
// B
// 2
// C
// 3
// 4



concat()


2개 이상의 Observable 을 차례대로 결합해주는 연산자로 onComplete() 가 호출되야 그 다음 Observable 을 구독하며, 만약 앞의 ObservableonComplete() 를 호출하지 않는다면 뒤의 Observable 은 결합되지 않는다.

fun main() {
    val data1 = arrayOf("A", "B", "C")
    val data2 = arrayOf("1", "2", "3")
		
    val stream1 = Observable.from(data1)
        .doOnComplete { // onComplete() 호출 시 Callback 되는 함수
            println("stream1 onComplete()")
        }

    val stream2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
        .map { it.toInit() }
        .map { data2[it] }
        .take(data2.size)
        .doOnComplete {
            println("stream2 onComplete()")
        }

    Observable.concat(stream1, stream2)
        .doOnComplete {
            println("concat onComplete()")
        }
        .subscribe(System.out::println)
		
    sleep(1000)
}


// A
// B
// C
// stream1 onComplete()
// 1
// 2
// 3
// stream2 onComplete()
// concat onComplete()
essential