봄날은 갔다. 이제 그 정신으로 공부하자

11. Reactive - 유틸리티 연산자 본문

Reactive

11. Reactive - 유틸리티 연산자

길재의 그 정신으로 공부하자 2021. 6. 29. 11:11

지금까지 여러 타입의 연산자에 대해 알아보았습니다.

이번 글은 연산자 타입의 마지막으로 유틸리티 작업을 수행하는데 도움을 주는 유틸리티 연산자에 대해 설명하도록 하겠습니다.

유틸리티 작업을 수행하는데 도움을 주는 연산자들 입니다. 이번 글에서 설명할 유틸리티 연산자의 종류는 다음과 같습니다.

   - reduce()

   - count()

   - any()

   - all()

   - contains()

   - doOnXXX()

   - serialize

   - cache()

 

reduce

reduce() 연산자는 누적 연산자로 프로듀서의 모든 방출들을 누적해서 onComplete 이벤트 호출 시 누적한 값을 내보냅니다.

reduce 연산자는 앞서 설명한 scan 연산자(변환 연산자)와 유사한데 scan 연산자가 아이템 방출시마다 누적했다가 방출하는 반면 reduce 연산자는 onComplete 이벤트 호출 시  누적한 값을 방출한다는 차이가 있습니다.

자세한 예시는 아래와 같습니다.

fun funExam(){
    Observable
            .range(1, 10)
            .reduce { t1: Int?, t2: Int? ->
                t1!! + t2!!
            }.subscribe {
                println("Next: $it”)
            }
}

 

reduce 연산자를 사용해 1부터 10까지의 숫자를 누적했으므로 아래와 같이 출력됩니다.

Next: 55

 

count

count() 연산자는 프로듀서의 아이템 방출량을 계산하며 onComplete 이벤트 발생 시 onSuccess 이벤트를 사용하여 계산한 방출량을 구독자에게 방출합니다.

fun funExam(){
    listOf(2,5,6,4,7,1,9,8,3)
            .toObservable()
            .count()
            .subscribeBy { 
                println("count: $it")
            }
}

 

위 예제 코드의 count 연산자는 프로듀서의 방출량을 계산하고 onComplete 이벤트 발생 시 계산한 값을 구독자애게 전달합니다.

출력되는 값은 아래와 같습니다.

Count 9

 

all

all() 연산자는 프로듀서에서 방출하는 아이템을 논리 표현식으로 비교하여 모든 아이템이 참인 경우 onComplete 이벤트 발생 시 onSuccess 이벤트를 사용하여 결과(true or false)를 방출합니다.

fun funExam(){
    Observable
        .range(1, 10)
        .all {
            it < 11
        }.subscribeBy(onSuccess = {
            println("onSuccess: $it")
        }, onError = {
            println("onError: $it")
        } )
}

 

위 예제 코드에서 Observable이 방출하는 아이템은 모두 논리 표현식을 만족(11보다 작으므로)시키므로 onSuccess 이벤트에 true를 전달하므로 출력 결과는 아래와 같습니다.

onSuccess: true

 

any

any() 연산자는 프로듀서에서 방출하는 아이템 중 논리 표현식을 만족하는 아이템이 한개라도 있으면 onComplete 이벤트 발생 시 onSuccess 이벤트를 사용하여 결과(true or false)를 방출합니다.

fun funExam(){
    Observable
        .range(1, 10)
        .any {
            it == 11
        }.subscribeBy(onSuccess = {
            println("onSuccess: $it")
        }, onError = {
            println("onError: $it")
        } )
}

 

위 예제 코드에서 Observable이 방출하는 아이템 중 논리 표현식을 만족하는 아이템이 없어(11은 없음.) onSuccess 이벤트에 false를 전달하므로 출력 결과는 아래와 같습니다.

onSuccess: false

 

contains

contains() 연산자는 프로듀서에서 방출하는 아이템 중 일치하는 아이템이 한개라도 있으면 onComplete 이벤트 발생 시 onSuccess 이벤트를 사용하여 결과(true or false)를 방출합니다.  any 연산자와 유사하지만 any 연산자는 연산자 내부에서 일치하는 아이템을 논리 표현식으로 처리 가능한데 비해 contains 연산자는 아이템의 일치 여부만 확인 가능합니다.

fun funExam(){
    Observable
        .range(1, 10)
        .contains(11)
        .subscribeBy(onSuccess = {
            println("onSuccess: $it")
        }, onError = {
            println("onError: $it")
        } )
}

 

위 예제 코드에서 Observable이 방출하는 아이템 중 일치하는 아이템(11은 없음.)이 없어 onSuccess 이벤트에 false를 전달하므로 출력 결과는 아래와 같습니다.

onSuccess: false

 

doOnNext

doOnNext() 연산자는 Observable의 구독자의 onNext 이벤트 호출 전에 미리 그 정보를 받는 구독자를 등록하는 연산자로 항상 Observable이 아이템을 방출하기 전에 호출 됩니다.

예를 들어 작업 전 프로그래스를 보여주거나 갱신할 때 유용하게 사용되는 연산자로 사용 방법은 다음과 같습니다.

fun funExam() {
    Observable
        .range(1, 10)
        .doOnNext {
            println("doOnNext: $it")
        }.subscribe {
            println("Next: $it")
        }
}

 

doOnNext 연산자에 등록된 구독자가  Observable의 onNext 이벤트 호출 전에 호출되므로 아래와 같이 출력됩니다.

doOnNext: 1
Next: 1
doOnNext: 2
Next: 2
doOnNext: 3
Next: 3
…
doOnNext: 10
Next: 10

 

doOnComplete

doOnComplete() 연산자는 Observable의 구독자의 onComplete 이벤트 호출 전에 미리 그 정보를 받는 구독자를 등록하는 연산자로 Observable이 아이템 방출을 완료하고 onComplete 이벤트를 호출하기 전에 호출 됩니다.

예를 들어 작업 완료 후 진행률을 표시한 프로그래스나 팝업을 종료 시킬 때 유용하게 사용되는 연산자로 사용 방법은 다음과 같습니다.

fun funExam() {
    Observable
        .range(1, 10)
        .doOnComplete {
            println("doOnComplete")
        }.subscribeBy(onNext = {
            println("Next: $it")
        }, onComplete = {
            println("Completed")
        })
}

 

doOnComplete 연산자에 등록된 구독자가  Observable의 onComplete 이벤트 호출 전에 호출되므로 아래와 같이 출력됩니다.

Next: 1
Next: 2
Next: 3
…
Next: 10
doOnComplete
Completed

 

doOnError

doOnError() 연산자는 Observable의 구독자의 onError 이벤트 호출 전에 미리 그 정보를 받는 구독자를 등록하는 연산자로 Observable에서 오류 상황 발생 시 onError 이벤트를 호출하기 전에 호출 됩니다. 사용 방법은 다음과 같습니다.

fun funExam() {
    Observable
        .range(1, 10)
        .map {
            it/(3-it)
        }.doOnError {
            println("doOnError")
        }.subscribeBy(onNext = {
            println("Next: $it")
        }, onError = {
            println("Completed")
        })
}

 

doOnError 연산자에 등록된 구독자가  Observable의 onError 이벤트 호출 전에 호출되므로 아래와 같이 출력됩니다.

Next: 0
Next: 2
doOnError
Completed

 

doOnSubscribe

doOnSubscribe() 연산자는 Observer가 구독을 시작하는 이벤트인  onSubscribe를 받기 전에 그 정보를 받는 구독자를 등록하는 연산자로 Observable에서 구독 이벤트 발생 시 onSubscribe 이벤트를 호출하기 전에 호출 됩니다. 사용 방법은 다음과 같습니다.

fun funExam() {
    Observable
        .range(1, 10)
        .doOnSubscribe {
            println("doOnSubscribe")
        }.subscribe(object: Observer<Int>{
                override fun onComplete() {
                }

                override fun onSubscribe(d: Disposable?) {
                    println("onSubscribe")
                }

                override fun onNext(t: Int?) {
                    println(" Next: $t")
                }

                override fun onError(e: Throwable?) {

                }
            }
        )
}

 

doOnSubscribe 연산자에 등록된 구독자가  onSubscribe이벤트 호출 전에 호출되므로 아래와 같이 출력됩니다.

doOnSubscribe
onSubscribe
Next: 1
Next: 2
…

 

doOnDispose

doOnDispose() 연산자는 Observer가 구독을 시작하는 이벤트인  doOnSubscribe 연산자와는 반대로 구독 취소 이벤트를 받기 전에 그 정보를 받는 구독자를 등록하는 연산자로 구독 취소 이벤트인 disposable.dispose() 함수 호출 시 구독이 취소되기 전에 호출 됩니다. 사용 방법은 다음과 같습니다.

fun funExam() {
    lateinit var disposable: Disposable

    Observable
        .range(1, 10)
        .doOnDispose {
            println("doOnDispose")
        }.subscribe(object: Observer<Int>{
            override fun onComplete() {
                println("completed")
            }

            override fun onSubscribe(d: Disposable?) {
                println("onSubscribe")
                d?.let {
                    disposable = d
                }
            }

            override fun onNext(t: Int?) {
                println(" Next: $t")
                if(t == 3){
                    disposable.dispose()
                }
            }

            override fun onError(e: Throwable?) {
                println(" Error: $e")
            }
        })
}

 

구독 이벤트 발생 시  전달 받은 Disposable 인스턴스를 사용해 3번째 onNext 이벤트 발생 시 구독을 취소하도록 되어 있고 구독 취소 함수인 dispose()를 명시적으로 호출했으므로 onError() or onComplete() 이벤트는 전달되지 않습니다.

출력 결과는 다음과 같습니다.

onSubscribe
Next: 1
Next: 2
Next: 3
doOnDispose

 

doOnLifecycle

doOnLifecycle() 연산자는 doOnSubscribe 연산자와 doOnDispose 연산자를 합한 연산자라고 보시면 됩니다.

연산자의 이름 그대로 구독 시작 시와 구독 종료 시 각각 이벤트를 호출합니다.

사용 방법은 아래와 같습니다.

fun funExam(){
    lateinit var disposable: Disposable

    Observable
        .range(1, 10)
        .doOnLifecycle({
            println("doOnSubscribe")
        },{
            println("doOnDispose")
        }).subscribe(object: Observer<Int>{
            override fun onComplete() {
                println("completed")
            }

            override fun onSubscribe(d: Disposable?) {
                println("onSubscribe")
                d?.let {
                    disposable = d
                }
            }

            override fun onNext(t: Int?) {
                println(" Next: $t")
                if(t == 3){
                    disposable.dispose()
                }
            }

            override fun onError(e: Throwable?) {
                println(" Error: $e")
            }
        })
}

 

doOnLifecycle 연산자는 이름 그대로 구독 시작 시와 구독 종료 시 각각 이벤트를 호출하므로 아래와 같이 출력됩니다.

doOnSubscribe
onSubscribe
Next: 1
Next: 2
Next: 3
doOnDispose

 

doOnTerminate

doOnTerminate() 연산자는 doOnComplete 연산자와 비슷한 동작을하는 연산자 입니다. 차이점이라고 한다면 doOnComplete 연산자는 모든 아이템을 정상적으로 방출을 완료하고 onComplete 이벤트가 호출되기 전에 호출되는 반면 doOnTerminate 연산자는 오류 발생 상황에서도 호출 된다는 점입니다.

그러므로 오류가 발생할 수도 있는 상황에서는 doOnComplete 연산자보다는 doOnTerminate 연산자를 사용하는 것이 더 좋습니다.

사용 방법은 아래와 같습니다.

fun funExam(){
    Observable
        .range(1, 10)
        .map {
            it/(3-it)
        }.doOnTerminate {
            println("doOnTerminate")
        }.doOnComplete {
            println("doOnComplete")
        }.subscribe(object: Observer<Int>{
            override fun onComplete() {
                println("completed")
            }

            override fun onSubscribe(d: Disposable?) {
                println("onSubscribe")
            }

            override fun onNext(t: Int?) {
                println(" Next: $t")
            }

            override fun onError(e: Throwable?) {
                println(" Error: $e")
            }
        })
}

 

위 예제 코드는 3번째 아이템 방출에서 오류가 발생하도록 되어있으므로 doOnComplete와 onComplete 이벤트는 호출되지 않으므로 아래와 같이 출력됩니다.

onSubscribe
Next: 0
Next: 2
doOnTerminate
Error: java.lang.ArithmeticException: divide by zero

 

doOnEach

doOnEach() 연산자는 doOnNext(), doOnComplete(), doOnError() 이벤트를 한번에 처리하는 연산자로 각각의 이벤트 발생 시 doOnEach 이벤트가 호출되며 매개변수로 Notification 방출되는 아이템과 함께 전달되 어떤 이벤트가 호출되는지 구분할 수 있도록 지원됩니다. 사용 방법은 아래와 같습니다.

fun funExam(){
    Observable
        .range(1, 10)
        .doOnEach {
            println("doOnEach: $it")
        }.subscribe(object: Observer<Int>{
            override fun onComplete() {
                println("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                println("onSubscribe")
            }

            override fun onNext(t: Int?) {
                println(" Next: $t")
            }

            override fun onError(e: Throwable?) {

            }
        })
}

 

출력 결과는 아래와 같습니다.

onSubscribe
doOnEach: OnNextNotification[1]
Next: 1
doOnEach: OnNextNotification[2]
Next: 2
…
doOnEach: OnNextNotification[10]
Next: 10
doOnEach: OnCompleteNotification
onComplete

 

serialize

serialize() 연산자는 Observable은 서로 다른 스레드에서 비동기적으로 구독자의 메서드를 호출하는 상황이 발생할 수 있는데 이로 인해 Observable이 아이템을 방출하기 위해 onNext 이벤트를 호출하는 중에 onCompleted 또는 onError 이벤트를 보내려고하거나 두 개의 다른 스레드에서 동시에 onNext 이벤트를 발생 시킬 수 있다는 점에서 Observable 계약을 위반할 수 있습니다. serialize 연산자를 적용하면 Observable이 제대로 작동하고 동기되도록 강제 할 수 있습니다.

 

cache

cache() 연산자는 단어의 뜻 그대로 방출한 아이템을 cache하고 있다가 새로운 구독자가 추가되면 cache하고 있던 모든 값을 방출한 후 이후 아이템을 방출하는 연산자 입니다. 

 

cache 연산자가 없는 경우부터 살펴보면 아래 코드의 경우,

fun funExam(){
        val observable = Observable.interval(1, TimeUnit.SECONDS)

		// 1
        observable.subscribe {
            println("next1: $it")
        }

		// 2
        runBlocking{
            delay(5000)
            observable.subscribe {
                println("next2: $it")
            }
        }
    }

 

첫번째 구독자가 0 ~ 4까지 아이템을 받고 5초 후에,

두번째 구독자가 0부터 시작하는 아이템을 방출 받으므로 아래와 같은 출력 결과를 가지지만,

next1: 0
next1: 1
next1: 2
next1: 3
next1: 4
next1: 5
next2: 0
next1: 6
next2: 1
…

 

Observable에 아래와 같이 cache 연산자를 추가하는 경우,

fun funExam(){
        val observable = Observable.interval(1, TimeUnit.SECONDS).cache()

		// 1
        observable.subscribe {
            println("next1: $it")
        }

		// 2
        runBlocking{
            delay(5000)
            observable.subscribe {
                println("next2: $it")
            }
        }
    }

 

첫번째 구독자가 0~4까지의 이벤트를 받고 5초 후에,

두번째 구독자의 구독이 시작되면 Observable은 cache하고 있던 모든 아이템을 두번째 구독자에게 방출한 후 다음 아이템을 방출하므로 아래와 같은 출력 결과를 가집니다.

next1: 0
next1: 1
next1: 2
next1: 3
next1: 4
next2: 0
next2: 1
next2: 2
next2: 3
next2: 4
next1: 5
next2: 5
next1: 6
next2: 6
next1: 7
next2: 7
...

 

정리

이번 글에서는 다양한 유틸리티 연산자에 대해 알아보았습니다.

방출하는 값을 누적하는 reduce 연산자와 count 연산자

그리고 논리 표현식으로 방출하는 아이템의 값을 비교하여 결과를 전달하는 all, any, contains 연산자와  

구독, 구독 취소, 오류, 아이템 방출 등의 이벤트 호출 전에 미리 이벤트를 받을 수 있는 doOnXXX 계열 연산자,

서로 다른 쓰레드에서 아이템을 방출할 때 발생할 수 있는 동기화문제를 해결할 수 있는 serialize 연산자,

마지막으로 방출하는 값을 기억하고 있다가 방출 할 수 있는 cache 연산자까지

다양한 연산자에 대해 살펴보았습니다.

 

이번 글에서 알아본 연산자를 간단한 설명과 함께 정리하면 아래 표와 같습니다.

'Reactive' 카테고리의 다른 글

10. Reactive - 오류 처리 연산자  (0) 2021.06.25
9. Reactive - 조건 연산자  (0) 2021.06.22
8. Reactive - 결합 연산자  (0) 2021.06.18
7. Reactive - 필터 연산자  (0) 2021.06.16
6. Reactive - 변환 연산자  (0) 2021.06.14
Comments