ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ReactiveX가 뭐야? ( 기본편 - RxJava )
    Android/RxJava 2019. 6. 17. 09:53

    보라색 뱀장어...!

    Reactive가 뭘까?

    네이버 사전적 의미로 reactive는 반응을 하다라는 뜻을 가지고 있다.

     

    Reactive를 알기위해서는 동기 이벤트 기반의 프로그래밍을 먼저 알아야 한다.


    비/동기 기반 프로그래밍은 sync ( 동기 ) / async ( 비동기 ) 가 있는데,

    동기적 일처리 방식은 순차적으로 일을 스스로 끝내 나가는 방식이고

    비동기적 일처리 방식은 해야할 일을 위임하고 기다리는 방식이다.

     

    아래 코드를 살펴보자 (  ※ 안드로이드 스튜디오에서 코틀린으로 작성하였습니다. )

     

    [ Tip ] 아래와 같은 로그관련 확장함수를 만들었습니다.

    fun Any.log(tag: String) {
        Log.d(tag, this.toString())
    }

     

    동기(sync)

     

    [ 코드 ]

    fun sync() {
        1.log("sync")
        2.log("sync")
        3.log("sync")
        4.log("sync")
    }

     

    [ 결과 ]

    D/ReactiveCoding-sync: 1

    D/ReactiveCoding-sync: 2

    D/ReactiveCoding-sync: 3             

    D/ReactiveCoding-sync: 4

     

    비동기(async)

    fun async() {
        1.log("async")
        GlobalScope.launch {
            2.log("async")
            3.log("async")
        }
        4.log("async")
    }
    D/ReactiveCoding-sync: 1
    D/ReactiveCoding-sync: 4
    D/ReactiveCoding-sync: 2
    D/ReactiveCoding-sync: 3

    보다시피 동기적은 1 2 3 4가 순서대로 출력되는 반면

    비동기는 중간 2 3 만 다른 쓰레드로 실행 시켰더니 출력 순서가 1 4 2 3 으로 변경되었다.


    이벤트 기반 프로그래밍

     

    이벤트 구동형 프로그래밍(EDP : Event-driven programming) 이라고도 하는데 아래의 사진

    보다시피 프로그램이 대기하고 있다가 각각의 이벤트가 발생되면 그에 맞는 함수를 실행시킨다.

     

    예를 들어 안드로이드에서 버튼을 클릭시 그에 맞는 함수가 실행되듯이 말이다.

     


    Reactive가 뭔지는 알겠다, 무엇인가 변경또는 동작이 되면 알아서 반응하는것!

    그러면 ReactiveX는 뭔데? 앞서 말한것들을 가능하게끔 도와주는 일종의 라이브러리다.

    비동기 및 이벤트 기반 프로그래밍을 위한 라이브러리 라고도 할 수 있겠다.

     

    옵저버 패턴을 기반으로 데이터를 관할하고 있다가 상황에 따라서 변경되면 그것을 알려주어

    그에 맞는 행동을 취할 수 있게 해준다.

     

    그러면 Observable ( 관찰대상 ) 과 Observer ( 관찰자 )가 있는데 Observable이 있어야 Observer을 할 수 있으니

    Observable을 먼저 만들어 보도록 하자


    Gradle Dependency


    implementation 'io.reactivex.rxjava2:rxjava:2.2.6'


    Observable Type : Observable, Single, Completable

     

    Observable 타입은 Observable, Single, Completable 이렇게 3가지가 있다.

    이거 말고도 더 있지만 이 3가지만 사용해도 충분하다.

     

    Observable

    데이터 흐름에 맞게 알림을 보내 구독자가 데이터 처리를 할 수 있도록 만든다.

     

    - onNext : 데이터의 발행을 알림

    - onComplete : 모든 데이터의 발행 완료를 알림

    - onError : 데이터의 흐름 중에 에러가 발생했음을 알림

     

    Observable 생성 방법은 아래와 같이 총 6개!

     

    1. just

    인자로 넣은 데이터를 차례로 발행하는 Observable을 생성한다.

    타입이 같은 데이터를 최소 1개 최대 10개까지 넣을 수 있다.

    Observable.just(1,2,3)
        .subscribe { Log.d("reactiveX", it.toString()) }
    D/reactiveX: 1 
    D/reactiveX: 2 
    D/reactiveX: 3 

     

    2. fromArray

    인자로 배열을 발행하는 Observable을 생성한다.

    just와 다르게 1개의 Array를 인자로 받는다.

     

    [ 코드 ]

    Observable.fromArray(arrayOf(1, 2, 3))
        .subscribe { Log.d("reactiveX", it.toString()) }
    D/reactiveX: [Ljava.lang.Integer;@6ad85 

     

    3. create

    직접 onNext, onComplete, onError을 설정할 수 있다.

    Observable
        .create<Int> {
            for (i in 1..3) {
                it.onNext(i)
            }
            it.onComplete()
        }
        .subscribe { Log.d("reactiveX", it.toString()) }
    D/reactiveX: 1 
    D/reactiveX: 2 
    D/reactiveX: 3

     

    4. fromIterable

    인자로 넣은 데이터를 차례로 발행하는 Observable을 생성한다.

    Observable.fromIterable(1..3)
        .subscribe { Log.d("reactiveX", it.toString()) }
    D/reactiveX: 1 
    D/reactiveX: 2 
    D/reactiveX: 3

     

    5. range

    해당 범위의 수를 차례로 발행

    Observable.range(1, 3)
        .subscribe { Log.d("reactiveX", it.toString()) }
    D/reactiveX: 1 
    D/reactiveX: 2 
    D/reactiveX: 3

     

    6. interval ( 기본적으로 computation Scheduler 에서 실행된다. ) 

    지정된 시간마다 숫자를 계속 발행

    Observable.interval(0, 1, TimeUnit.SECONDS)
        .subscribe { Log.d("reactiveX", it.toString()) }
    D/reactiveX: 0
    D/reactiveX: 1
    D/reactiveX: 2
    
    ... 계속 쭉 출력

    just와 fromArray는 생성과 동시에 파라미터를 onNext하고 다 전달되면 onComplete하는 반면에

    create는 직접 onNext와 onComplete, onError을 조절 할 수 있다.


    Single

    임이의 연속된 값들을 배출하는 것과는 달리, 항상 한 가지 값 또는 오류 알림 둘 중 하나만 배출

    onSuccess또는 onError만 구현

    Single.just("b")
    
    Single.create<String> {
        if (name == "홍길동") {
            it.onSuccess(name)
        }
        it.onError(Exception("올바르지 않은 이름!"))
    }

     

    Completable

    임이의 연속된 값들을 배출하는 것과는 달리, 항상 완료 알림 및 오류 알림 둘 중 하나만 배출

    onComplete또는 onError만 구현

    Complete.just("b")
    
    Complete.create<String> {
        if (name == "홍길동") {
            it.onComplete(name)
        }
        it.onError(Exception("올바르지 않은 이름!"))
    }

     

     

    Subscribe

    Observable에서 데이터를 push 하면 그것을 받을 즉, 관찰자가 필요하다

    관찰자는 데이터를 보내주는 사람을 구독! 한다고 해서 Subscribe라고 한다.

    ( ! 데이터의 흐름을 구독 ! )

     

    subscribe에서는 onNext, onComplete, onError의 알림에 대한 리스너를 설정하고

    데이터의 흐름에서 일어나는 3개의 이벤트에 대한 로직을 실행하도록 만든다.

    Single.create<String> {
        if (name == "홍길동") {
            it.onSuccess(name)
        }
        it.onError(Exception("올바르지 않은 이름!"))
    }.subscribe(
        { name ->
            name.log("subscribe next")
        },
        {
            it.message.log("subscribe error")
    })

    Single의 경우 onSuccess와 onError만 처리를 해주면 됨으로 코드를 알맞게 작성을 해주면 된다.

    보통 람다를 onSuccess -> onComplete -> onError 순으로 처리 한다.

     

    Disposable

    subscribe는 모두 Disposable 인터페이스 객체를 리턴하며 Disposable을 통해 데이터의 흐름을 중단할 수 있다.

    비동기는 백그라운드에서 실행된다고 생각하면 되는데 생명주기가 계속 진행되는 것이 있다.

     

    예를 들어 안드로이드에서 작업을 하는데 해당 액티비티가 종료되었는데도 계속 비동기 작업을 하게되면

    메모리 릭이 일어나고 결국은 펑!

     

    그래서 subscribe는 모두 disposable을 return함으로 그것들을 종료 해주어야 한다. 

    종료 하는 방법은 간단하다. 사용을 완료한 후 

    val single = Single.just("a").subscribe { word -> System.out.print(word) }
    
    single.dispose()

     

    정말 간단하게 끝난다.

    이부분은 RxAndroid에서 자세히 다루어 보도록 하겠다.


    연산자

     

    대상을 구독하기전에 보내온 데이터를 자기 입맛대로 변경이 가능한데 이부분은 나중에 추가적으로 설명을 하고 아래의 사이트를 통해서 잠깐 봐보도록 하자

    http://reactivex.io/documentation/ko/operators.html


    스트림 관리 : 스트림을 실행하는 스케줄러

     

    오우... 어려운 말이 나왔다! 스케줄러..? 배압?

    먼저 ReactiveX는 비동기라고 했다... 그러면 동작이 될 Thread를 정해주어야 한다!

     

    그냥 백그라운드에서 실행하면 안되냐는 생각이 들지만....

    subscribe에서 Android의 View를 변경한다면 Android MainThread에서 작동을 해야 하기 때문에

    데이터는 백그라운드에서 적용은 MainThread에서 작동해야 한다.

     

    SubscribeOn 연산자는 Observable이 작업할 스레드 지정한다.

    연산자 체인 중 아무 곳에서 호출해도 문제되지 않는다.

    하지만 되도록이면 처음에 선언하여 혼선이 없도록 하자

     

    ObserveOn 연산자는 연쇄되는 연산을 실행할 스레드를 지정한다.

     

    쉽게 말해 

    getData()
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation())
        .map{ data -> data + "A" }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { textview.text = it.toString() }

    위에 코드에서는 getData()는 Schedulers.io에서 실행이 되고

    map은 Schedulers.computation에서 실행

    subscribe는 Android mainThread에서 실행이 된다는 뜻이다.

     

    (1)

    Observable.just(1, 2, 3, 4, 5)
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.computation())
        .map { it * 2 }
        .map { it * 3 }
        .subscribe { textview.text = it.toString() }

     

    (2)

    Observable.just(1, 2, 3, 4, 5)
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.computation())
        .map { it * 2 }
        .map { it * 3 }
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { textview.text = it.toString() }

    1번의 경우에는 구독을 안드로이드 메인쓰레드에서 시작하였지만 마지막으로는 연산 쓰레드로 변경되어 안드로이드 익셉션 오류가 날것이다.

    2번의 경우에는 1번과 동일하게 안드로이드 메인쓰레드에서 하고 연산쓰레드에서 연산을 한 후 마지막은 메인쓰레드로 이동하기에 별도의 오류가 없이 잘 작동된ㄷ.ㅏ


    Hot, Cold Observable 개념 : PublishSubject, BehaviorSubject Subject 소개

     

    Hot Observable

    생성과 동시에 이벤트를 방출하기 시작, 또 이후  subscribe 되는 시점과 상관없이 옵저버들에게 이벤트를 중간부터 전송

     

    Cold Observable ( 기본적인 Observable )

    옵저버가 subscribe 되는 시점부터 이벤트를 생성하여 방출하기 시작

     

    쉽게 말해 열정적으로 계속 데이터를 보내는 타입이 Hot! 구독을 당할때 까지 계속 기다려주는 인자한 Cold!

     

    Subject Observer Observable 두 역할을 수행하는 브릿지 또는 프록시 종류

    Observer 역할로, 하나 이상의 Observable을 구독하며, Observable 역할로 아이템을 내보낼 수 있습니다.

     

    PublishSubject

    PublishSubject는 구독한 뒤에 Observable이 보낸 이벤트를 전달받는다.

    val subject= PublishSubject.create<String>()
    
    subject.subscribe { "Subscription First : $it".log() }
    subject.onNext("a")
    
    subject.subscribe { "Subscription Second : $it".log() }
    subject.onNext("b")
    Subscription First : a
    Subscription First : b
    Subscription Second : b

     

    ReplaySubject

    ReplaySubject는 구독 전에 발생한 이벤트를 버퍼에 넣고, 버퍼에 있던 이벤트를 구독 후에 전달한다.

    버퍼 크기를 설정한 만큼 구독 후 이벤트를 전달한다.

    만약 버퍼 크기가 0이라면, PublishSubject와 같은 역할을 하게 된다.

    val subject = ReplaySubject.create<String>()
    
    subject.subscribe { "Subscription First : $it".log() }
    subject.onNext("a")
    
    subject.subscribe { "Subscription Second : $it".log() }
    subject.onNext("b")
    Subscription First : a
    Subscription Second : a
    Subscription First : b
    Subscription Second : b

     

    BehaviorSubject

    - BehaviorSubject는 구독 후에 가장 최근 아이템을 전달합니다.

    val subject = BehaviorSubject.create<String>()
    subject.onNext("c")
    
    subject.subscribe { "Subscription First : $it".log() }
    subject.onNext("a")
    
    subject.subscribe { "Subscription Second : $it".log() }
    subject.onNext("b")
    Subscription First : c
    Subscription First : a
    Subscription Second : a
    Subscription First : b
    Subscription Second : b

    Flowable? Observable?

    RxJava 2.0부터 등장한 Flowable! 자세한거는 좀 더 공부 후 포스팅을 하도록 하고!

    사용방법은 동일하나 선택의 기준은 아래와 같을때 적절히 섞어서 사용하도록 하자!

     

    Observable을 선택하는 기준

    - 최대 1000개 미만의 데이터 흐름, Out of Memory Exception 이 발생할 확률이 적은 경우

    - 마우스, 터치 이벤트를 다루는 GUI 프로그래밍, 초당 1000회 이하의 이벤트를 다룸

     

    Flowable을 선택하는 기준

    - 10000개 이상의 데이터를 처리하는 경우, 메서드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 함

    - 디스크에서 파일을 읽어 들일 경우

    - JDBC를 활용해 데이터베이스의 쿼리 결과를 가져오는 경우

    - 네트워크 I/O를 실행하는 경우 ( 서버에서 가져오길 원하는 만큼의 데이터양을 요청할 수 있을 때 )

    댓글

Designed by Tistory.