Scheduler
스케줄러의 역할은 코드가 어느 스레드에서 실행될 것인지를 지정하는 역할로 스케줄러를 통해 스레드를 분리해주어야 비동기 처리가 가능하다.
Observable
스레드를 지정할 수 있는 역할로 subscrbieOn()
, observeOn()
로 지정이 가능하며 기본적으로 스케줄러를 별도로 지정하지 않으면 호출한 스레드에서 동작한다.
subscribeOn()
Observable
이 데이터를 발행하고 연산하는 스레드를 지정할 수 있다.
fun main() {
Observable.just(1, 2, 3)
.map {
println("map: $it - ${Thread.currentThread().name}")
it
}
.subscribeOn(Schedulers.io())
.subscribe {
println("subscribe: $it - ${Thread.currentThread().name}")
}
delay(1000)
}
// map: 1 - RxCashedThreadScheduler-1
// subscribe: 1 - RxCashedThreadScheduler-1
// map: 2 - RxCashedThreadScheduler-1
// subscribe: 2 - RxCashedThreadScheduler-1
// map: 3 - RxCashedThreadScheduler-1
// subscribe: 3 - RxCashedThreadScheduler-1
- 데이터를 생산하는 스레드와 데이터를 소비하는 스레드가 동일하다.
observeOn()
Observable
이 소비자에게 알림을 보내는 스레드를 지정할 수 있다.
fun main() {
Observable.just(1)
.observeOn(Schedulers.io())
.map {
println("map[1] - ${Thread.currentThread().name}")
it
}
.observeOn(Schedulers.computation())
.map {
println("map[2] - ${Thread.currentThread().name}")
it
}
.observeOn(Schedulers.single())
.subscribe {
println("subscribe - ${Thread.currentThread().name}")
}
delay(1000)
}
// map[1] - RxCashedThreadScheduler-1
// map[2] - RxComputationThreadPool-1
// subscribe - RxSingleScheduler-1
Schedulers.newThread()
요청을 받을 때마다 새로운 스레드를 생성하여 작업하는 스케줄러이다.
fun main() {
val stream = Observable.just(1, 2, 3)
stream.subscribeOn(Schedulers.newThread())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[1] - ${Thread.currentThread().name}")
}
stream.subscribeOn(Schedulers.newThread())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[2] - ${Thread.currentThread().name}")
}
stream.subscribeOn(Schedulers.newThread())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[3] - ${Thread.currentThread().name}")
}
sleep(1000)
}
// 1: Observable[1] - RxNewThreadSchedulers-1
// 1: Observable[3] - RxNewThreadSchedulers-3
// 1: Observable[2] - RxNewThreadSchedulers-2
// 1: Observable[2] - RxNewThreadSchedulers-2
// 1: Observable[3] - RxNewThreadSchedulers-3
// 1: Observable[1] - RxNewThreadSchedulers-1
// 1: Observable[3] - RxNewThreadSchedulers-3
// 1: Observable[2] - RxNewThreadSchedulers-2
// 1: Observable[1] - RxNewThreadSchedulers-1
newThread()
로 생성된Observable
은 서로 다른 스레드이기에 비동기로 출력된다.
Scheduler.io()
파일 입출력 등의 IO
작업을 하거나 네트워크 요청 처리 시에 사용하는 스케줄러로 내부적으로 CashedPool
사용하여 스레드가 필요할 때 마다 계속 생성되며 유후 스레드가 존재하면 재활용한다.
fun main() {
val stream = Observable.just(1, 2, 3)
println("start1")
stream.subscribeOn(Schedulers.io())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[1] - ${Thread.currentThread().name}")
}
println("start2")
stream.subscribeOn(Schedulers.io())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[2] - ${Thread.currentThread().name}")
}
println("end")
delay(1000)
}
// start1
// start2
// end
// 1: Observable[2] - RxCachedThreadSchedulers-1
// 1: Observable[1] - RxCachedThreadSchedulers-1
// 2: Observable[1] - RxCachedThreadSchedulers-1
// 2: Observable[2] - RxCachedThreadSchedulers-1
// 3: Observable[1] - RxCachedThreadSchedulers-1
// 3: Observable[2] - RxCachedThreadSchedulers-1
Scheduler.computation()
별도의 IO
없고 일반적인 연산 작업을 할 때 사용하는 스케줄러로 내부적으로 스레드 풀을 생성하고 생성된 스레드를 사용하며 기본적으로 스레드의 개수는 프로세서의 개수와 같다.
interval()
,timer()
는 기본적으로 계산 스케줄러를 사용한다.
fun main() {
val stream = Observable.just(1, 2, 3)
stream.subscribeOn(Schedulers.computation())
.subscribe {
runBlocking { delay(100) }
println("$it: ${Thread.currentThread().name}")
}
delay(1000)
}
// 1: RxComputationThreadPool-1
// 2: RxComputationThreadPool-1
// 3: RxComputationThreadPool-1
Scheduler.single()
단일 스레드를 계속 재사용하는 스케줄러로 스레드를 만들어 해당 스레드 큐에 작업을 넘기는 방식으로 한 번 생성된 스레드로 여러 작업을 처리한다.
fun main() {
val stream = Observable.just(1, 2, 3)
println("start1")
stream.subscribeOn(Schedulers.single())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[1] - ${Thread.currentThread().name}")
}
println("start2")
stream.subscribeOn(Schedulers.single())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[2] - ${Thread.currentThread().name}")
}
println("end")
delay(1000)
}
// start1
// start2
// end
// 1: Observable[1] - RxSingleScheduler-1
// 2: Observable[1] - RxSingleScheduler-1
// 3: Observable[1] - RxSingleScheduler-1
// 1: Observable[2] - RxSingleScheduler-1
// 2: Observable[2] - RxSingleScheduler-1
// 3: Observable[2] - RxSingleScheduler-1
Scheduler.trampoline()
새로운 스레드를 생성하지 않고 호출한 스레드 큐에 작업을 넘겨주는 방식의 스케줄러로 Queuing
된 모든 작업이 종료되어야 다음 라인의 코드가 실행된다.
fun main() {
val stream = Observable.just(1, 2, 3)
println("start1")
stream.subscribeOn(Schedulers.trampoline())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[1] - ${Thread.currentThread().name}")
}
println("start2")
stream.subscribeOn(Schedulers.trampoline())
.subscribe {
runBlocking { delay(100) }
println("$it: Observable[2] - ${Thread.currentThread().name}")
}
println("end")
delay(1000)
}
// start1
// 1: Observable[1] - main
// 2: Observable[1] - main
// 3: Observable[1] - main
// start2
// 1: Observable[2] - main
// 2: Observable[2] - main
// 3: Observable[2] - main
// end