Android/RxJava, RxKotlin

RxJava - Observable을 제외한 다른 생산자들

최데브 2021. 6. 12. 21:46

전 포스팅에서는 Observable에 대해서 알아봤다.

기본적인 생산자였는데 

Observable

  • 0개에서 n개의 데이터를 전달하는 생산자다.
  • 기본적인 생산자로 단건(0 or 1)이 아니면 대부분 Observable을 쓴다.
  • observer 방식으로 consumer 등록시 Observer를 구현해 전달한다.
  • consumer 방식을 사용할 시 onNext, onComplete, onError와 onSubscribe가 있다.
val observer = Observable.just(11, 12, 13)
        .map {
            if (it == 12) throw IllegalStateException() // 12에 에러
            else it
        }
    observer.subscribe(
        { println("onNext $it") },
        { println("onError") },
        { println("onComplete") },
        { println("onSubscribe") })

 

 

이 생산자 말고도 상황에 맞게 사용할 수 있도록 다양한 생산자들이 Rx에는 준비되어있다. (과도한 친절이다... 알아야할게 많아..)

 

일단 언제나 그랬던거 처럼 한번 나열해보자.

 

1. Single

2. Completable

3. Maybe

4. Flowable

 

5. Subjects

 5-1. PublishSubject

 5-2. BehaviorSubject

 5-3. ReplaySubject

 5-4 AsyncSubject

 

 

 

Single

  • 오직 1개의 데이터를 전달하는 생산자다.
  • Http GET Request와 같이 결과가 1개의 데이터 or 실패인 경우 사용한다.
  • observer 방식으로 consumer 등록시 SingleObserver를 구현해 전달한다.
  • consumer 방식을 사용할 시 onSuccess와 onError만 있다.
Single.just(1)
        .subscribe(
            { println("onSuccess $it") }, 
            { println("onError") }
        )

Completable

  • 0개의 데이터를 전달하는 생산자다.
  • db에 insert, update와 같이 데이터가 필요 없이 성공 or 실패인 경우 사용한다.
  • observer 방식으로 consumer 등록시 CompletableObserver를 구현해 전달한다.
  • consumer 방식을 사용할 시 onComplete와 onError만 있다.
Completable.complete()
        .subscribe(
            { println("onComplete") },
            { println("onError") }
        )

Maybe

  • 0개 또는 1개의 데이터를 전달하는 생산자다.
  • 예 / 아니오 선택과 같이 (둘 중 하나 + 예외 경우)에 쓸 수 있다.
  • observer 방식으로 consumer 등록시 MaybeObserver를 구현해 전달한다.
  • consumer 방식을 사용할 시 onSuccess, onComplete와 onError가 있다.
Maybe.empty<Unit>()
        .subscribe(
            { println("onSuccess $it") },
            { println("onComplete") },
            { println("onError") }
        )

Flowable

  • 데이터의 발행 속도가 구독자의 처리속도보다 크게 빠를 때 사용 (BackPressure Issue)
  • observer 방식으로 consumer 등록시 FlowableSubscriber를 구현해 전달한다.
  • BackPressure Issue를 처리하는 방법을 설정할 수 있다.
  • LiveDataReactiveStreams을 사용해 AAC LiveData와 연계할 수 있다.
  • 언제 Observable을 쓰고 언제 Flowable을 써야할까?
Flowable.just(1, 2, 3, 4)
        .subscribe(
            { println("onNext $it") }, // onNext: Consumer,
            { println("onError") }, // onError: Consumer,
            { println("onComplete") }, // onComplete: Consumer,
            { println("onSubscribe") } // onSubscribe: Consumer
        )

 

Subjects

공통

  • Observable과 Observer의 성격을 둘 다 가지고 있다.
  • 즉 subscribe를 달 수 있으며 동시에 onNext, onComplete 등을 달 수 있다.
  • 다음 코드를 Subject 종류만 다르게 해서 한 결과를 보면서 이해하자.
    val xSubject = 종류별Subject.create<Int>()
    xSubject.subscribe { println("첫번째 $it") }
    xSubject.onNext(1)
    Thread.sleep(1000L)
    xSubject.subscribe { println("----두번째 $it") }
    xSubject.onNext(2)
    xSubject.onNext(3)
    Thread.sleep(1000L)
    xSubject.subscribe { println("********세번째 $it") }
    xSubject.onNext(4)
    xSubject.onComplete()

PublishSubject

  • 구독한 시점부터 새로운 데이터를 가져오는 subject

BehaviorSubject

  • 구독한 시점 직전 데이터부터 가져오는 subject
  • PublishSubject와 달리 직전 데이터도 가져온다. (두번째 2, 세번째 3)

ReplaySubject

  • 지금까지 발행된 데이터 모두들 가져오는 subject

AsyncSubject

  • complete되었을 때 가장 마지막 데이터를 받는 subject

 

 

 

출처 : https://velog.io/@cmplxn/RxJava-%EC%9E%85%EB%AC%B8%ED%95%98%EA%B8%B0-Kotlin

반응형