Android/RxJava

ReactiveX - Schdulers 도대체 뭘 사용해야 할까?

donghune 2019. 6. 18. 10:17

보라색 뱀장어!

Rx에서 Observer와 Subscribe의 스케쥴 즉, 동작하는 Thread를 정해줄 수 있다.

Android의 View을 건들이는 부분이면 AndroidSchedulers.mainThread()를 사용하면 된다지만 Observer는?


아무것도 지정해주지 않았을때는 뭐가 작동되는데?
이러한 궁금증이 생겨 포스팅을 시작한다.

먼저 RxJava2의 Schedulers을 디컴해보면 아래와 같이 나온다.

    static final Scheduler SINGLE; 

    static final Scheduler COMPUTATION; 

    static final Scheduler IO; 

    static final Scheduler TRAMPOLINE; 

    static final Scheduler NEW_THREAD; 


총 5가지의 Schduler가 있고 어떻게 사용해야 하는지 하나씩 살펴보도록 하자


1. computation
- 계산 작업을 위한 기본 인스턴스 반환
- 이벤트 루프, 콜백 처리 및 기타 계산 작업에 사용할 수 있다.
- 단, 이 스케쥴러에서는 차단, 입출력 바인딩 작업을 수행하지 않는 것이 좋다.

 

val observable = Observable.just(1, 3, 5) 

observable 
    .map { number -> "++ $number ++" } 
    .subscribeOn(Schedulers.computation()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 

observable 
    .map { number -> "-- $number --" } 
    .subscribeOn(Schedulers.computation()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
D/reactiveX: RxComputationThreadPool-1 | ++ 1 ++ 
D/reactiveX: RxComputationThreadPool-2 | -- 1 -- 
D/reactiveX: RxComputationThreadPool-1 | ++ 3 ++ 
D/reactiveX: RxComputationThreadPool-2 | -- 3 -- 
D/reactiveX: RxComputationThreadPool-1 | ++ 5 ++ 
D/reactiveX: RxComputationThreadPool-2 | -- 5 -- 

2. io
- IO-bound 작업을 위한 기본 공유 인스턴스 반환
- Blocking IO를 비동기적으로 수행할 때 사용할 수 있다.
- 이 스케줄러에서는 연산 작업을 수행하지 않는 것이 좋다.

 

val observable = Observable.just(1, 3, 5) 

observable 
    .map { number -> "++ $number ++" } 
    .subscribeOn(Schedulers.io()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 

observable 
    .map { number -> "-- $number --" } 
    .subscribeOn(Schedulers.io()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
D/reactiveX: RxCachedThreadScheduler-1 | ++ 1 ++ 
D/reactiveX: RxCachedThreadScheduler-2 | -- 1 -- 
D/reactiveX: RxCachedThreadScheduler-1 | ++ 3 ++ 
D/reactiveX: RxCachedThreadScheduler-2 | -- 3 -- 
D/reactiveX: RxCachedThreadScheduler-1 | ++ 5 ++ 
D/reactiveX: RxCachedThreadScheduler-2 | -- 5 -- 

설명상 계산을 수행하였지만 위에 설명에 있듯이 파일 입출력으로 사용하는것이 좋다.


3. trampoline
- 인스턴스(instance)가 작동하여 참여 스레드 중 하나에서 FIFO 방식으로 실행
- 기본 구현의 방법 현재 스레드에서 작업 실행

val observable = Observable.just(1, 3, 5) 

observable 
    .map { number -> "++ $number ++" } 
    .subscribeOn(Schedulers.trampoline()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 

observable 
    .map { number -> "-- $number --" } 
    .subscribeOn(Schedulers.trampoline()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
D/reactiveX: main | ++ 1 ++ 
D/reactiveX: main | ++ 3 ++ 
D/reactiveX: main | ++ 5 ++ 
D/reactiveX: main | -- 1 -- 
D/reactiveX: main | -- 3 -- 
D/reactiveX: main | -- 5 -- 

4. newThread
각 작업 단위에 대해 새스레드 을(를) 생성하는 기본 공유 인스턴스 반환
이 스케줄러의 기본 구현으로 인해 새로운 단일 스레드 생성
- 무제한의 작업자 스레드가 생성 될수 있음으로 시스템 속도가 느려지거나
OutOfMemoryError가 발생할 수 있다.

val observable = Observable.just(1, 3, 5) 

observable 
    .map { number -> "++ $number ++" } 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 

observable 
    .map { number -> "-- $number --" } 
    .subscribeOn(Schedulers.newThread()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
D/reactiveX: RxNewThreadScheduler-1 | ++ 1 ++ 
D/reactiveX: RxNewThreadScheduler-2 | -- 1 -- 
D/reactiveX: RxNewThreadScheduler-1 | ++ 3 ++ 
D/reactiveX: RxNewThreadScheduler-2 | -- 3 -- 
D/reactiveX: RxNewThreadScheduler-1 | ++ 5 ++ 
D/reactiveX: RxNewThreadScheduler-2 | -- 5 -- 

5. single
작업에 대한 기본 공유 단일 스레드 지원
동일한 백드라운드 스레드에 대해 강력한 순차적 실행 필요

val observable = Observable.just(1, 3, 5) 

observable 
    .map { number -> "++ $number ++" } 
    .subscribeOn(Schedulers.single()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 

observable 
    .map { number -> "-- $number --" } 
    .subscribeOn(Schedulers.single()) 
    .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
D/reactiveX: RxSingleScheduler-1 | ++ 1 ++ 
D/reactiveX: RxSingleScheduler-1 | ++ 3 ++ 
D/reactiveX: RxSingleScheduler-1 | ++ 5 ++ 
D/reactiveX: RxSingleScheduler-1 | -- 1 -- 
D/reactiveX: RxSingleScheduler-1 | -- 3 -- 
D/reactiveX: RxSingleScheduler-1 | -- 5 -- 

각각의 특성에 맞추어서 잘 사용하면 될 것 같다.