변환 연산자

filter / map / concatMap / flatMap / switchMap / reduce / scan

변환 연산자

filter()


Observable 의 발행 데이터에서 원하는 데이터만 걸러내는 연산자이다.

fun main() {
    val items = arrayOf("Item 1", "Item 2", "Value 3", "Item 4", "Value 5")
		
    Observable.fromArray(*items)
        .filter { it.startsWith("Value") }
        .subscribe { println("$it") }
}


// Value 3
// Value 5



map()


입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자이다.

fun main() {
    val numbers = arrayOf(1, 2, 3, 4, 5)

    Observable.fromArray(*numbers)
        .map { it * 10 }
        .subscribe { println("$it")}
}


// 10
// 20
// 30
// 40
// 50



concatMap()


map() 과 동일하게 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자로, 들어오는 데이터를 순서대로 처리한다.

fun main() {
    val numbers = arrayOf("1", "2", "3")

    Observable.interval(100L, TimeUnit.MILLISECONDS)
        .map { it.toInt() }				
        .map { numbers[it] }	
        .take(numbers.size.toLong())	
        .concatMap {
            Observable.interval(200L, TimeUnit.MILLISECONDS)
                .map { "<${it}>" }
                .take(2)
		}
        .subscribe(System.out::println)

    sleep(2000)
}


// <1>
// <1>
// <2>
// <2>
// <3>
// <3>



flatMap()


map() 과 동일하게 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자로, 들어오는 데이터의 순서를 보장하지 않고 처리한다.

fun main() {
    val numbers = arrayOf("1", "2", "3")

    Observable.interval(100L, TimeUnit.MILLISECONDS)
        .map { it.toInt() }				
        .map { numbers[it] }	
        .take(numbers.size.toLong())	
        .flatMap {
            Observable.interval(200L, TimeUnit.MILLISECONDS)
                .map { "<${it}>" }
                .take(2)
        }
        .subscribe(System.out::println)

    sleep(2000)
}


// <1>
// <2>
// <3>
// <1>
// <2>
// <3>



switchMap()


map() 과 동일하게 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 연산자로, concatMap() 과 동일하게 데이터의 순서를 보장하지만 중간에 새로운 데이터가 발행되면 기존에 진행중이던 작업을 중단하고 중간에 발행된 데이터를 처리한다.

fun main() {
    val numbers = arrayOf("1", "2", "3")

    Observable.interval(100L, TimeUnit.MILLISECONDS)
        .map { it.toInt() }				
        .map { numbers[it] }	
        .take(numbers.size.toLong())	
        .switchMap {
            Observable.interval(200L, TimeUnit.MILLISECONDS)
                .map { "<${it}>" }
                .take(2)
        }
        .subscribe(System.out::println)

    sleep(2000)
}


// <3>
// <3>
  • Main Observable 이 0.1초 간격으로 3개의 Sub Observable 을 발행하며, 각 Sub Observable 은 0.2 초 간격으로 값을 발행한다.
  • 값 1에 해당하는 첫 번째 Sub Observable 이 값을 발행할 때까지 0.2초 동안 대기 중에 값 2에 해당하는 두 번째 Sub Observable 이 생성되었기 때문에 값 1에 해당하는 첫 번째 Sub Observable 이 중단된다.
  • 값 2에 해당하는 두 번째 Sub Observable 이 값 발행할 때까지 0.2초 동안 대기 중에 값 3에 해당하는 세 번째 Sub Observable 이 생성되었기 때문에 값 2에 해당하는 두 번째 Sub Observable 은 중단된다.
  • 값 3에 해당하는 세 번째 Sub Observable 이 값 발행할 때까지 0.2초 동안 대기 중에 다른 값의 발행이 없어서 값 3에 해당하는 세 번째 Sub Observable 은 값을 발행할 수 있게 된다.


reduce()


발행한 데이터를 모두 사용하여 최종 결과 데이터를 합칠 때 사용하는 연산자로, 모든 데이터가 입력된 후 모두 종합하여 마지막 1개의 데이터를 발행한다.

fun main() {
    val numbers = arrayOf("1", "2", "3", "4", "5")

    Observable.fromArray(*numbers)
        .reduce { prev, next ->
            "$next($prev)"
        }				
        .subscribe(System.out::println)
}


// 5(4(3(2(1))))



scan()


실행할 때 마다 입력값에 맞는 중간 결과 및 최종 결과의 데이터를 발행하는 연산자이다.

fun main() {
    val numbers = arrayOf("1", "2", "3", "4", "5")

    Observable.fromArray(*numbers)
        .reduce { prev, next ->
            "$next($prev)"
        }				
        .subscribe(System.out::println)
}


// 1
// 2(1)
// 3(2(1))
// 4(3(2(1)))
// 5(4(3(2(1))))



filter() 와 비슷하게 사용할 수 있는 함수


first(default)

Observable 의 첫 번째 값만 발행하고, 값이 없으면 default 값 발행한다.

last(default)

Observable 의 마지막 값만 발행하고, 값이 없으면 default 값 발행한다.

take(N)

Observable 의 최초 N개의 값을 발행한다.

takeLast(default)

Observable 의 마지막 N개의 값을 발행한다.

skip(N)

Observable 의 최초 N개 값을 제외하고 발행한다.

skipLast(N)

Observable 의 마지막 N개 값을 제외하고 발행한다.

essential