ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ReactiveX - Schdulers 도대체 뭘 사용해야 할까?
    Android/RxJava 2019. 6. 18. 10:17

    보라색 뱀장어!

    Rx에서 Observer와 Subscribe의 스케쥴 즉, 동작하는 Thread를 정해줄 수 있다.

    Android의 View을 건들이는 부분이면 AndroidSchedulers.mainThread()를 사용하면 된다지만 Observer는?


    아무것도 지정해주지 않았을때는 뭐가 작동되는데?
    이러한 궁금증이 생겨 포스팅을 시작한다.

    먼저 RxJava2의 Schedulers을 디컴해보면 아래와 같이 나온다.

        static final Scheduler SINGLE; 
    
        static final Scheduler COMPUTATION; 
    
        static final Scheduler IO; 
    
        static final Scheduler TRAMPOLINE; 
    
        static final Scheduler NEW_THREAD; 


    총 5가지의 Schduler가 있고 어떻게 사용해야 하는지 하나씩 살펴보도록 하자


    1. computation
    - 계산 작업을 위한 기본 인스턴스 반환
    - 이벤트 루프, 콜백 처리 및 기타 계산 작업에 사용할 수 있다.
    - 단, 이 스케쥴러에서는 차단, 입출력 바인딩 작업을 수행하지 않는 것이 좋다.

     

    val observable = Observable.just(1, 3, 5) 
    
    observable 
        .map { number -> "++ $number ++" } 
        .subscribeOn(Schedulers.computation()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    
    observable 
        .map { number -> "-- $number --" } 
        .subscribeOn(Schedulers.computation()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    D/reactiveX: RxComputationThreadPool-1 | ++ 1 ++ 
    D/reactiveX: RxComputationThreadPool-2 | -- 1 -- 
    D/reactiveX: RxComputationThreadPool-1 | ++ 3 ++ 
    D/reactiveX: RxComputationThreadPool-2 | -- 3 -- 
    D/reactiveX: RxComputationThreadPool-1 | ++ 5 ++ 
    D/reactiveX: RxComputationThreadPool-2 | -- 5 -- 

    2. io
    - IO-bound 작업을 위한 기본 공유 인스턴스 반환
    - Blocking IO를 비동기적으로 수행할 때 사용할 수 있다.
    - 이 스케줄러에서는 연산 작업을 수행하지 않는 것이 좋다.

     

    val observable = Observable.just(1, 3, 5) 
    
    observable 
        .map { number -> "++ $number ++" } 
        .subscribeOn(Schedulers.io()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    
    observable 
        .map { number -> "-- $number --" } 
        .subscribeOn(Schedulers.io()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    D/reactiveX: RxCachedThreadScheduler-1 | ++ 1 ++ 
    D/reactiveX: RxCachedThreadScheduler-2 | -- 1 -- 
    D/reactiveX: RxCachedThreadScheduler-1 | ++ 3 ++ 
    D/reactiveX: RxCachedThreadScheduler-2 | -- 3 -- 
    D/reactiveX: RxCachedThreadScheduler-1 | ++ 5 ++ 
    D/reactiveX: RxCachedThreadScheduler-2 | -- 5 -- 

    설명상 계산을 수행하였지만 위에 설명에 있듯이 파일 입출력으로 사용하는것이 좋다.


    3. trampoline
    - 인스턴스(instance)가 작동하여 참여 스레드 중 하나에서 FIFO 방식으로 실행
    - 기본 구현의 방법 현재 스레드에서 작업 실행

    val observable = Observable.just(1, 3, 5) 
    
    observable 
        .map { number -> "++ $number ++" } 
        .subscribeOn(Schedulers.trampoline()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    
    observable 
        .map { number -> "-- $number --" } 
        .subscribeOn(Schedulers.trampoline()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    D/reactiveX: main | ++ 1 ++ 
    D/reactiveX: main | ++ 3 ++ 
    D/reactiveX: main | ++ 5 ++ 
    D/reactiveX: main | -- 1 -- 
    D/reactiveX: main | -- 3 -- 
    D/reactiveX: main | -- 5 -- 

    4. newThread
    각 작업 단위에 대해 새스레드 을(를) 생성하는 기본 공유 인스턴스 반환
    이 스케줄러의 기본 구현으로 인해 새로운 단일 스레드 생성
    - 무제한의 작업자 스레드가 생성 될수 있음으로 시스템 속도가 느려지거나
    OutOfMemoryError가 발생할 수 있다.

    val observable = Observable.just(1, 3, 5) 
    
    observable 
        .map { number -> "++ $number ++" } 
        .subscribeOn(Schedulers.newThread()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    
    observable 
        .map { number -> "-- $number --" } 
        .subscribeOn(Schedulers.newThread()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    D/reactiveX: RxNewThreadScheduler-1 | ++ 1 ++ 
    D/reactiveX: RxNewThreadScheduler-2 | -- 1 -- 
    D/reactiveX: RxNewThreadScheduler-1 | ++ 3 ++ 
    D/reactiveX: RxNewThreadScheduler-2 | -- 3 -- 
    D/reactiveX: RxNewThreadScheduler-1 | ++ 5 ++ 
    D/reactiveX: RxNewThreadScheduler-2 | -- 5 -- 

    5. single
    작업에 대한 기본 공유 단일 스레드 지원
    동일한 백드라운드 스레드에 대해 강력한 순차적 실행 필요

    val observable = Observable.just(1, 3, 5) 
    
    observable 
        .map { number -> "++ $number ++" } 
        .subscribeOn(Schedulers.single()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    
    observable 
        .map { number -> "-- $number --" } 
        .subscribeOn(Schedulers.single()) 
        .subscribe { num -> Log.d("reactiveX", "${Thread.currentThread().name} | $num") } 
    D/reactiveX: RxSingleScheduler-1 | ++ 1 ++ 
    D/reactiveX: RxSingleScheduler-1 | ++ 3 ++ 
    D/reactiveX: RxSingleScheduler-1 | ++ 5 ++ 
    D/reactiveX: RxSingleScheduler-1 | -- 1 -- 
    D/reactiveX: RxSingleScheduler-1 | -- 3 -- 
    D/reactiveX: RxSingleScheduler-1 | -- 5 -- 

    각각의 특성에 맞추어서 잘 사용하면 될 것 같다.

    'Android > RxJava' 카테고리의 다른 글

    RxKotlin과 RxAndroid 알아보기  (0) 2019.06.18
    ReactiveX와 Retrofit 같이 쓰기!  (0) 2019.06.18
    ReactiveX가 뭐야? ( 기본편 - RxJava )  (3) 2019.06.17

    댓글

Designed by Tistory.