봄날은 갔다. 이제 그 정신으로 공부하자

6. Reactive - 변환 연산자 본문

Reactive

6. Reactive - 변환 연산자

길재의 그 정신으로 공부하자 2021. 6. 14. 11:11

이번 글에서는 변환 연산자에 대해 설명하도록 하겠습니다.

변환 연산자는 데이터의 흐름을 개발자가 원하는대로 변형할 수 있도록 지원해주는 기능을 수행하는 연산자로 아래와 같이 많은 연산자들이 있습니다.

   - buffer()

   - window()

   - throttle()

   - map()

   - flatMap()

   - concatMap()

   - switchMap()

   - cast()

   - defaultIfEmpty()

   - switchIfEmpty()

   - startWith()

   - sort()

   - scan()

   - groupBy()

 

많네요... ㅠ_ㅠ  그래도 알아야하니까 하나 하나 설명하도록 하겠습니다. ^___^

 

buffer

buffer() 연산자는 방출되는 아이템을 모아서 리스트나 다른 컬렉션 유형으로 전달합니다.

이전 글에서 설명한 onBackPressureBuffer() 연산자와는 차이점은 구독자가 방출받은 아이템을 소비할 때까지 방출을 무시하는 하는 것이 아닌 모아서 한꺼번에 전달한다는 점입니다.

사용 방법은 아래와 같습니다.

fun funExam(){
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
    flowable.buffer(1, TimeUnit.SECONDS)
            .subscribe{println(it)}
}

 

flowable이 100 ms 단위로 아이템을 방출하지만 buffer가 1sec로 설정되어 있으므로 1 sec 동안 방출되는 것을 모아서 한번에 방출하므로 아래와 같은 출력 결과를 보여주게 됩니다.

[0, 1, 2, 3, 4, 5, 6, 7, 8]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
[19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
[29, 30, 31, 32, 33, 34, 35, 36, 37, 38]
[39, 40, 41, 42, 43, 44, 45, 46, 47, 48]

 

buffer() 연산자의 특징은 다른 생산자를 강제로 취할 수 있다는 것으로 아래와 같이 사용하는 것도 가능합니다.

fun funExam(){
    val boundaryFlowable = Flowable.interval(350, TimeUnit.MILLISECONDS)
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
    flowable.buffer(boundaryFlowable)
            .subscribe{println(it)}
}

 

window

window() 연산자는 아이템을 컬렉션 형태로 버퍼링하는 대신 다른 프로듀서 형태로 버퍼링한다는 점만 빼면 buffer() 연산자와 유사합니다. 사용 방법은 아래와 같습니다.

fun funExam(){
    val flowable = Flowable.range(1, 111)
    flowable.window(10)
            .subscribe{
                it.subscribe {
                    print("$it, ")
                }
                println()
            }
}

 

출력 결과는 다음과 같습니다.

1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 
…

 

throttle

buffer() & window() 연산자는 방출되는 아이템들을 수집하지만 throttle() 연산자는 방출을 생략합니다.

사용 방법은 아래와 같습니다.

fun funExam(){
    val flowable = Flowable.interval(100, TimeUnit.MILLISECONDS)
    flowable.throttleFirst(200, TimeUnit.MILLISECONDS)
            .subscribe{ println(it)}

    runBlocking { delay(1000) }
}

 

throttle() 연산자를 사용해 200ms마다 발생하는 첫번째 방출을 무시하도록 해놓았으므로 아래와 같은 결과가 출력됩니다.

0
3
5
7

 

map

map 연산자는 방출된 각 아이템에 주어진 작업을 수행하고 이를 구독자에게 내보내는 역할을 수행합니다. 

주어진 Observable<T> 또는 Flowable<T>에 대해 map 연산자는 제공된 Function<T, R>람다를 적용해 타입 T로 push된 아이템을 타입 R로 변환해 방출합니다.

아래 코드는 map 연산자의 간단한 예시 입니다.

fun funExam(){
    val observable = Observable.range(1, 10)
    observable
            .map {
                "Int to String $it"
            }.subscribe{
                println(“Next: $it")
            }
}

 

위 코드는 Observable<Int>를 map 연산자를 사용해 Observable<String>으로 변환해 방출하는 예제코드 입니다.

출력 결과는 아래와 같습니다.

Next: Int to String 1
Next: Int to String 2
Next: Int to String 3
…
Next: Int to String 9
Next: Int to String 10

 

flatMap

map 연산자는 방출되는 각 아이템들을 가져와서 변환하지만 flatMap 연산자는 새로운 프로듀서를 만들고 원천 프로듀서에 전달한 연산자를 방출되는 아이템에 적용합니다.

fun funExam(){
    val observable = Observable.range(1, 10)
    observable
            .flatMap {
                Observable.just("Int to String $it")
            }.subscribe{
                println(“Next $it")
            }
}

 

위 코드의 실행 결과는 이전 map의 출력 결과와 동일하지만 적용된 로직에는 차이가 있습니다. 

단순히 문자열이 아니라 문자열 형태의 Observable을 반환하기 때문 입니다.

이부분을 조금 더 자세히 보기 위해 단일 아이템 방출에서 여러 아이템을 가져와야 하는 경우를 예를 들어 설명하면 아래 코드와 같습니다.

fun funExam(){
    val observable = Observable.range(1, 10)
    observable
            .flatMap {
                Observable.create<String>{emiter ->
                     emiter.onNext("The Number $it")
                    emiter.onNext("number/2 ${it/2}")
                    emiter.onNext("number%2 ${it%2}")
                    emiter.onComplete()
                }
            }.subscribeBy(
                    onNext = {
                        println(“Next: $it")
                    },
                    onComplete = {
                        println("Completed")
                    }
            )
}

 

위 예제 코드는 flatMap 연산자 안에서 Observable이라는 새로운 인스턴스를 생성했습니다.

이 인스턴스는 세 개의 문자열을 방출 합니다.

flatMap 연산자 안의 Observable 내에서 만이라면 구독자에 3번의 onNext 호출 후 onComplete가 호출되어야 하지만 flatMap 연산자는 내부적으로 merge 연산자를 사용해 여러 Observable을 결합하므로 모든 Observable에 대해 방출한 후 onComplete가 호출되게 됩니다.

출력 결과는 아래와 같습니다.

Next: The Number 1
Next: number/2 0
Next: number%2 1
Next: The Number 2
Next: number/2 1
Next: number%2 0
Next: The Number 3
Next: number/2 1
Next: number%2 1
…
Next: The Number 10
Next: number/2 5
Next: number%2 0
Completed

 

concatMap

concatMap 연산자는 flatMap 연산자와 동일한 유사한 기능을 수행하는 연산자입니다. 

차이점은 flatMap 연산자는 내부적으로 merge 연산자를, concatMap은 concat 연산자를 사용한다는 점입니다.

이 두 개의 매핑 연산자가 서로 다른 점은 무엇일까요?

동일한 코드를 가진 예제를 flatMap과 concatMap을 사용해 차이점을 비교해보도록 하겠습니다.

fun funExam(){
    Observable.range(1, 10)
            .flatMap { 
                val randDelay = Random().nextInt(10)
                return@flatMap Observable
                        .just(it)
                        .delay(randDelay.toLong(), TimeUnit.MILLISECONDS)
            }.blockingSubscribe { 
                println(“Next: $it")
            }
}

 

위 예제를 살펴보면 Observable 인스턴스를 생성한 후 delay 연산자와 함께 flatMap 연산자를 사용해 임의의 지연 시간을 추가한 것을 알 수 있습니다.

출력 결과는 아래와 같습니다.

Next: 6
Next: 1
Next: 7
Next: 8
Next: 9
Next: 4
Next: 3
Next: 5
Next: 2
Next: 10

 

결과에서 다운스트림이 규정된 순서대로 방출되는 아이템을 얻지 못했다는 것을 알 수 있는데 이유는 바로 merge 연산자가 한 번에 모두 비동기적으로 방출을 구독하고 다시 방출함으로 순서가 유지되지 않기 때문 입니다.

 

이제 concatMap 연산자를 사용한 코드를 살펴보도록 하겠습니다.

fun funExam(){
    Observable.range(1, 10)
            .concatMap {
                val randDelay = Random().nextInt(10)
                return@concatMap Observable
                        .just(it)
                        .delay(randDelay.toLong(), TimeUnit.MILLISECONDS)
            }.blockingSubscribe {
                println("Next: $it")
            }
}

 

아래 출력 결과에서 알 수 있듯이 concatMap 연산자는 내부적으로 concat을 사용하기 때문에 규정된 아이템 방출 순서를 유지하는 것을 알 수 있습니다.

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Next: 10

 

flatMap 연산자가 적합한 경우

  - 페이지나 액티비티 또는 플레그먼트에서 데이터의 목록을 다룰 때 해당 목록의 아이템별로 데이터베이스나 서버에 전송하고 싶을

     경우 적합합니다.

  - 목록의 아이템에 대한 작업을 비동기적으로 비교적 짧은 기간에 수행하려는 경우에 적합합니다.

 

concatMap 연산자가 적합한 경우

  - 사용자에게 표시할 데이터 목록을 다운로드하는데, 이때는 순서가 매우 중요한 경우에 적합합니다.

  - 정렬된 목록의 순서를 그대로 유지하고 싶은 경우에 적합합니다.

 

switchMap

switchMap 연산자는 원천 프로듀서(Observable/Flowable)의 모든 방출을 비동기로 대기하지만 정해진 시간 이내의 최신 아이템만 방출합니다.  더 자세히 설명하자면 원천 Observable이 switchMap에서 이들 중 하나를 내보내기 이전에 하나 이상의 아이템을 연속적으로 방출하면 switchMap은 마지막 항목을 가져와서 그 사이에 들어온 모든 방출 아이템을 무시합니다.

fun funExam(){
    println("Without delay")
    Observable
            .range(1, 10)
            .switchMap {
                return@switchMap Observable.just(it)
            }.blockingSubscribe { 
                println("Next: $it")
            }
    
    println("With delay")
    Observable
            .range(1, 10)
            .switchMap {
                val randDelay = Random().nextInt(10)
                return@switchMap Observable
                        .just(it)
                        .delay(randDelay.toLong(), TimeUnit.MILLISECONDS)
            }.blockingSubscribe {
                println("Next: $it")
            }
}

 

결과는 다음과 같습니다.

Without delay
Next: 1
Next: 2
Next: 3
…
Next: 10
With delay
Next: 10

 

위 예제는 두 가지 접근방법을 사용했는데 한쪽에서는 delay 연산자를 사용하고 다른 한쪽에서는 사용하지 않았습니다.

결과를 보면 두 번째의 경우 switchMap이 방출 이전에 연속적으로 값을 전달 받았지만 마지막 아이템만 방출했음을 알 수 있습니다.

 

switchMap 연산자는 지연 없이 방출된 숫자들만 다시 방출합니다. 이유는 switchMap 연산자가 다음 아이템을 받기 이전에 그것들을 다시 방출하는 것이 가능하기 때문 입니다.

 

cast

cast() 연산자는 Observable에서 데이터를 캐스팅하는 경우에 사용되는 연산자 입니다.

사용 방법은 아래와 같습니다.

fun funExam(){
    val list = listOf<MyItemInherit>(MyItemInherit(1), MyItemInherit(2), MyItemInherit(3), MyItemInherit(4), MyItemInherit(5))

    // 1
    list.toObservable()
            .map {
                it as MyItem1
            }.subscribe{
                println("Next 1: $it")
            }

    // 2
    list.toObservable()
            .cast(MyItem::class.java)
            .subscribe{
                println("Next 2: $it")
            }
}

open class MyItem(val id: Int){
    override fun toString(): String {
        return "[MyItem: $id]"
    }
}

class MyItemInherit(id: Int): MyItem1(id){
    override fun toString(): String {
        return "[MyItemInherit: $id]"
    }
}

 

위 코드에서 첫번째 주석의 Observable은 map 연산자를 사용해 MyItemInherit 인스턴스를 MyItem 인스턴스로 변환해 구독자에게 방출했지만, 두번째 주석은 cast 연산자를 사용해 MyItemInherit 인스턴스를 MyItem 인스턴스로 변환해 구독자에게 방출했습니다.

 

defaultIfEmpty

연산자를 필터링하거나 복잡한 요구사항을 다루는 동안 빈 프로듀서가 나타날 수 있는데 defaultIfEmpty 연산자는 이러한 상황을 처리하는데 사용되는 연산자 입니다.

아래 코드를 먼저 살펴 보면 아래 코드는 0~10까지의 수를 push하는데 filter 연산자에 의해 15보다 큰 수만 push 되도록 필터링되므로 결과적으로 구독자에 아무런 값도 전달되지 않게 됩니다.

fun funExam(){
    Observable
            .range(0, 10)
            .filter{it>15}
            .subscribe { 
                println(“Next: $it")
            }
}

 

이러한 경우 아래와 같이 defaultIfEmpty 연산자를 사용하면 아무것도 push 되지 않는 경우에 defaultIfEmpty 연산자에 정해진 값 15를 받을 수 있게 됩니다.

만약 위 코드에서 filter{ it>5}라고 한다면 구독자에 전달되는 값이 있으므로  defaultIfEmpty 연산자에 정해진 값은 호출되지 않습니다.

fun funExam(){
    Observable
            .range(0, 10)
            .filter{it>15}
            .defaultIfEmpty(15)
            .subscribe { 
                println(“Next: $it")
            }
}

 

switchIfEmpty

switchIfEmpty 연산자는 defaultIfEmpty 연산자와 유사합니다.

차이점은 defaultIfEmpty 연산자의 경우 빈 프로듀서에 방출을 추가하지만 switchIfEmpty 연산자는 원천 프로듀서가 비어 있을 경우 대체 프로듀서를 방출하는 점이 차이점 입니다.

아이템을 전달해야 하는 defaultIfEmpty 연산자와 달리 switchIfEmpty 연산자는 대체 프로듀서를 전달해야 합니다. 원천 프로듀서가 비어 있으면 대체 프로듀서부터 방출을 시작합니다.

아래는 switchIfEmpty 연산자에 대한 간단한 예제 코드 입니다.

fun funExam(){
    Observable
            .range(0, 10)
            .filter{it>15}
            .switchIfEmpty(Observable.range(11, 10))
            .subscribe {
                println("Received $it")
            }
}

 

위 코드를 살펴보면 첫번째 Observable은 0~9까지의 수를 구독자에게 방출하려고 하지만 filter 조건으로 인해 아무런 값도 방출이 이루어지지 않습니다.

값이 empty이므로 switchIfEmpty 연산자에 정해진 Observable이 구독자에게 아이템을 push 하기 시작하고 이때 이전 Observable에 걸려있는 filter는 동작하지 않으므로 아래와 같은 값을 출력하게 됩니다.

Received 11
Received 12
Received 13
…
Received 20

 

startWith

startWith 연산자는 프로듀서의 기존 아이템의 맨 위에 다른 아이템을 추가하는 연산자 입니다.

간단한 예제 코드는 아래와 같습니다.

fun funExam(){
    listOf("Korea", "USA", "France", "England", "Japan")
            .toObservable()
            .startWith("Country name")
            .subscribe{
                println("Next: $it")
            }
}

 

위 코드를 실행 시키면 구독자에게 Observable의 값을 방출하기 전에 startWith의 값이 먼저 방출되고 이후 Observable의 값이 방출되게 됩니다. 출력 결과는 아래와 같습니다.

Next: Country name
Next: Korea
Next: USA
Next: France
Next: England
Next: Japan

 

sort

방출되는 아이템을 정렬하고 싶은 경우 사용하는 연산자가 sort 연산자 입니다. 이 연산자를 사용하면 원천 프로듀서부터 모든 방출을 내부적으로 수집한 후에 정렬해서 다시 방출하게 됩니다.

자세한 예제 코드는 아래와 같습니다.

fun funExam(){
    listOf(2,5,6,4,7,1,9,8,3)
            .toObservable()
            .sorted()
            .subscribe{
                println("Next 1: $it")
            }

    listOf(2,5,6,4,7,1,9,8,3)
            .toObservable()
            .sorted { o1, o2 ->  if(o1<o2)-1 else 1}
            .subscribe{
                println("Next 2: $it")
            }
}

 

위 예제코드의 두 개의 Observable 모두 sort 연산자로 인해 1~9까지의 값을 출력하게 됩니다.

 

scan

scan 연산자는 실행할 때마다 입력 값에 맞는 중간 결과 및 최종 결과를 구독자에게 방출합니다.

자세한 예제 코드는 아래와 같습니다.

fun funExam(){
    Observable
            .range(1, 10)
            .scan { t1: Int?, t2: Int? ->
                t1!! + t2!!
            }.subscribe {
                println(“Next: $it")
            }
}

 

1~10까지의 값을 방출하는 Observable이지만 scan 연산자에 의해 이전 값을 계속 누적 시키므로 아래와 같이 출력됩니다.

Next: 1
Next: 3
Next: 6
Next: 10
Next: 15
Next: 21
..
Next: 55

 

groupBy

groupBy() 연산자는 Reactive가 지원하는 강력한 연산입니다. groupBy 연산자를 사용하면 특정 속성을 기준으로 방출되는 아이템을 분류할 수 있습니다.

fun funExam(){
    val observable = Observable.range(1, 30)

    observable.groupBy {
        it%5
    }.blockingSubscribe {
        println("Key ${it.key}")
        it.subscribe{
            println(“Next: $it")
        }
    }
}

 

위 예제 코드에서는 5로 나눈 나머지를 기준으로 push 아이템을 그룹화 했으므로 기본적으로 5개의 그룹(0 ~ 4)이 존재해야 합니다.

groupBy 연산자를 사용해 논리 표현식의 결과를 통해 push를 그룹화 했으며, blockingSubscribe 연산자를 사용해 새롤 생성된 Observable<GroupedObservable<K,T>> 인스턴스에 구독했습니다.

groupBy 연산자는 그룹을 포함하는 GroupObservable을 배출하는 Observable을 반환합니다.

그래서 blockingSubscribe 안에서는 방출된 GroupObservable 인스턴스를 구독해야 합니다.

결과는 다음과 같습니다.

Key 1
Received 1
Received 6
Received 11
Received 21
Received 26
Key 2
Received 2
Received 7
Received 12
Received 22
Received 27
Key 3
Received 3
Received 8
Received 13
Received 23
Received 28
…

 

정리 

이번 글에서는 Reactive에서 제공하는 다양한 변환 연산자에 대해 살펴보았습니다.

다음 글에서는 필터 연산자의 종류와 기능에 대해 자세히 설명하도록 하겠습니다.

'Reactive' 카테고리의 다른 글

8. Reactive - 결합 연산자  (0) 2021.06.18
7. Reactive - 필터 연산자  (0) 2021.06.16
5. Reactive - 연산자 구분  (0) 2021.06.12
4. Reactive - BackPressure & Flowable  (0) 2021.06.10
3. Reactive - Hot & Cold Observable, Subject  (0) 2021.06.07
Comments