조건 연산자

amb / takeUntil / skipUntil / all

조건 연산자

amb()


여러 개의 Observable 중에서 가장 먼저 데이터를 발행하는 Observable 을 선택하는 연산자이다.

fun main() {
    val data1 = arrayOf("A", "B", "C")
    val data2 = arrayOf("1", "2", "3")
		
    val stream = lisfOf(
        Observable.fromIterable(data1)
            .delay(200L, TimeUnit.MILLSECONDS)
            .doOnComplete {
                println("First Observable onComplete()")
            },
        Observable.fromItemable(data2)
            .delay(100L, TimeUnit.MILLSECONDS)
            .doOnComplete {
                println("Second Observable onComplete()")
            }
	)

    Observable.amb<String>(stream)
        .doOnComplete {
            println("Result onComplete()")
        }
        .subscribe(System.out:println)
}


// 1
// 2
// 3
// Second Observable onComplete()
// Result onComlete()



takeUntil()


파라미터로 받은 Observable 에서 어떤 값을 발행하면 현재 Observable 의 데이터 발행을 중단하고 즉시 완료하는 연산자로, take() 와 같이 일정 개수만 값을 발행하지만 완료 기준을 다른 Observable 이 값을 발행하는지로 결정한다.

fun main() {
    val data = listOf("1", "2", "3", "4", "5")
		
    Observable.fromIterable(data)
        .zipWith(
            Observable.interval(100L, TimeUnit.MILLISECONDS),
            BiFunction<String, Long, String> { data, _ -> data }
        )
        .takeUntil(
            // 발행 중지의 기준이 되는 Observable
            Observable.timer(500L, TimeUnit.MILLISECONDS)
        )
        .subscribe(System.out::println)
		
    sleep(1000)
}


// 1
// 2
// 3
// 4



skipUntil()


takeUntil() 과 반대로 현재 Observable 의 발행을 무시하다가 설정한 Observable 이 발행되면 그 시점부터 발행을 시작하는 연산자이다.

fun main() {
    val data = listOf("1", "2", "3", "4", "5")
		
    Observable.fromIterable(data)
        .zipWith(
            Observable.interval(100L, TimeUnit.MILLISECONDS),
            BiFunction<String, Long, String> { data, _ -> data }
        )
        .skipUntil(
            // 발행 시작의 기준이 되는 Observable
            Observable.timer(500L, TimeUnit.MILLISECONDS)
        )
        .subscribe(System.out::println)
		
    sleep(1000)
}


// 5



all()


주어진 조건이 모두 만족할 때 데이터를 발행하는 연산자이다.

fun main() {
    val data = listOf("A", "B", "C", "D", "E")
		
    Observable.fromIterable(data)
        .map(
            when(it) {
                "1" -> {
                    return@map "One"
                }
                "2" -> {
                    return@map "two"
                }
                "3" -> {
                    return@map "Three"
                }
                "4" -> {
                    return@map "Four"
                }
                "5" -> {
                    return@map "Five"
                }
                else -> {
                    return@map "Unknown"
                }
            }				
        )
        .all(
            it == "Unknown"
        )
        .subscribe(println(it)
		
    sleep(1000)
}


// true
essential