봄날은 갔다. 이제 그 정신으로 공부하자

4. Reactive - BackPressure & Flowable 본문

Reactive

4. Reactive - BackPressure & Flowable

길재의 그 정신으로 공부하자 2021. 6. 10. 11:11

지난 글 까지는 Observable의 종류와 사용 방법에 설명했습니다. 이번 글에서는 비동기 처리 상황에서 발생할 수 있는 문제, 

비동기로 동작할 때 Observable이 처리 할 수 있는 처리량보다 더 많은 아이템을 방출하는 경우에 대한 대응, 흐름제어 방법 대해 설명하도록 하겠습니다.

 

이번 글에서는 리액티브에서 흐름을 제어하는 역할을 하는 BackPressure와 Flowable에 대해 자세히 설명하도록 하겠습니다.

 

backpressure란?

Observable은 기본적으로 아이템을 동기적으로 Observer에서 하나씩 방출하도록 동작합니다.

그러나 Observer가 시간을 필요하는 작업을 처리해야 하고 그 처리 시간이 Observable의 방출 시간보다 간격이 긴 경우 문제가 발생할 수 있습니다.

 

이러한 문제의 해결방법은 Observer와 Observable간에 피드백을 주고 받을 수 있는 채널 일 것 입니다.

채널을 통해 Observer가 Observable에게 처리가 완료 될때까지 기다려야 한다고 전달할 수 있으면 이러한 문제를 해결할 수 있습니다.

이러한 피드백 채널을 BackPressure라고 합니다. Observable과 Observer는 BackPressure를 지원하지 않으므로 그 대신 Flowable과 Subscriber를 사용해야 합니다.

 

아래 코드는 Observer가 Observable의 처리 속도를 따라가지 못하는 상황에 대한 예시입니다.

val observable = Observable.just(1,2,3,4,5)
val subject = BehaviorSubject.create<Int>()
subject.observeOn(Schedulers.computation())
        .subscribe({
            println("[S1] Next: $it")
            runBlocking { delay(100) }
        })
subject.observeOn(Schedulers.computation())
        .subscribe({
            println(“[S2] Next: $it")
        })
observable.subscribe(subject)

 

위 코드는 첫번째 구독에서는 100 ms의 대기시간을 두고 두번째 구독에서는 대기시간 없이 처리하고 있습니다.

해당 코드를 동작시키면 옵저버블의 Push를 전달받은 첫번째 구독에서 “[S1] Next: 1“을 출력하고 100 ms대기 시간을 갖는 동안 두번째 구독에서 1~5까지 결과를 출력합니다. 그 이후에 100 ms 간격으로 첫번째 구독의 결과가 출력 됩니다.

즉, 출력되는 값은 아래와 같습니다.

[S1] Next: 1
[S2] Next: 1
[S2] Next: 2
[S2] Next: 3
[S2] Next: 4
[S2] Next: 5
[S1] Next: 2
[S1] Next: 3
[S1] Next: 4
[S1] Next: 5

 

이렇게 처리된 이유는 첫번째 구독이 처리에서 시간이 오래 걸리면서 Observable의 방출 대기열로 들어가게되었기 때문이다.

위 코드에서는 대기열로 들어간 값이 정수값 몇개라서 큰 문제가 되지는 않지만 대기열로 들어가는 값이 정수값이 아닌 더 큰 값 이거나 대기열의 갯수가 무한이 늘어나는 경우, OutOfMemoryError 예외를 포함해 많은 문제를 일으킬 수 있는 위험한 코드 입니다.

 

Flowable

Flowable은 옵저버블의 BackPressure 버전이라고 부를 수 있습니다.

Flowable은 내부적으로 buffer(최대 128개)를 지원하며 Observer가 시간이 걸리는 작업을 실행하게 되면 방출된 아이템이 buffer에서 대기 할 수 있습니다. 동일한 작업을 수행하는 Observable 코드와 Flowable 코드를 만들어 비교해보도록 하겠습니다.

먼저 Observable 코드 입니다.

fun funExam(){
    val observable = Observable.range(1, 1000)
    observable
            .map{MyItem(it)}
            .observeOn(Schedulers.computation())
            .subscribe({
                println("Next: $it")
                runBlocking { delay(100) }
            })
    runBlocking { delay(100000) }
}

data class MyItem(val id: Int){
    init{
        println("MyItem Created $id")
    }
}

 

위 코드는 map을 통해 1000개의 MyItem 인스턴스가 생성되고 이후 100 ms 단위로 onNext 메서드가 호출되게 됩니다.

1000개의 인스턴스를 생성하고 이후 한개씩 순차적으로 전달하는 비효율적인 방식 입니다.

이를 Flowable 코드로 변경하면 아래와 같습니다.

fun funExam(){
    Flowable.range(1, 1000)
            .map{MyItem(it)}
            .observeOn(Schedulers.computation())
            .subscribe({
                println("Next: $it")
                runBlocking { delay(100) }
            })
    runBlocking { delay(100000) }
}

data class MyItem(val id: Int){
    init{
        println("MyItem Created $id")
    }
}

 

위 코드를 실행해보면 플로어블은 옵저버블 과는 다르게  map에서 한번에 1000개의 MyItem 인스턴스를 모두 생성하는게 아니라

적당한 갯수의 인스턴스를 생성해놓고 Oberver가 처리하기를 기다렸다가 Oberver가 처리할 수 있을 때 방출하고 어느정도 방출하면 다시 MyItem 인스턴스를 생성하는 방식을 반복 하는 것을 알 수 있습니다.

 

Flowable과 Observable 장단점

Flowable과 Observable은 아래 표와 같이 각각의 장단점이 있습니다.

 

Flowable과 Subscriber

Flowable은 Observer 대신 Backpressure 호환이 가능한 Subscriber를 사용합니다. Observer 대신 Subscriber를 사용해야 하는 이유는 Subscriber가 일부 추가 기능과 Backpressure를 동시에 지원하기 때문 입니다.

예를 들어, 얼마나 많은 아이템을 받기를 윈하는지 메시지로 전달할 수 있고 Subscriber를 사용하는 동안 업스트림에서 수신하고자하는 항목의 수를 지정할 수도 있습니다.

 

Subscriber는 람다를 사용하는 Oberver와 유사합니다. 

차이점은 Subscriber는 request() 함수를 이용하여 수신 할 데이터의 개수를 요청해야 한다는 점 입니다.

 

Subscriber를 사용하는 예시 코드는 아래와 같습니다.

fun funExam(){
    Flowable.range(1, 15)
            .map { MyItem(it) }
            .observeOn(Schedulers.io())
            .subscribe(object: Subscriber<MyItem> {
                lateinit var subscription: Subscription

                override fun onSubscribe(subscription: Subscription?) {
                    this.subscription = subscription!!
                    this.subscription.request(5)
                }

                override fun onNext(item: MyItem?) {
                    runBlocking { delay(50) }
                    println("Subscriber received: " + item!!)
                    if(item.id == 5){
                        println("Req Two more")
                        subscription.request(2)
                    }
                }

                override fun onComplete() {
                    println("Done!!")
                }

                override fun onError(throwable: Throwable?) {
                    throwable?.printStackTrace()
                }
            })
    runBlocking { delay(10000) }
}

data class MyItem(val id: Int){
    init{
        println("MyItem Created $id")
    }
}

 

위 코드는 Flowable 기존 설명한 코드와는 약간 다른 부분이 있는데 바로 map() 함수를 사용하는 부분인데요. map()함수에 대해서는 이후 자세히 설명할 예정이므로 넘어가고,  위 코드에서 Flowable은 1부터 15개의 Integer 아이템이 map에 전달이되고 map에서는 넘겨받은 아이템을 사용해 MyItem 객체를 생성한 후, Subscriber 방출합니다.

 

여기서 유념해서 살펴봐야 부분은 onSubscribe() 함수에서 subscription.request(5)를 호출하는 부분인데요. 

subscription.request(5)는 아이템을 5개 방출하라는 의미로 이 코드로 인해 Flowable은 Subscriber 5개의 아이템을 방출 합니다.

이후 onNext()에서 아이템 ID가 5이면 2개를 더 방출하라고 요청하는 부분을 볼 수 있습니다.

 

그 이후로는 request()를 호출하는 부분이 없으므로 buffer에서 대기 중인 나머지 8~15까지의 MyItem 인스턴스는 호출되지 않습니다.

즉, 전체 출력 결과는 다음과 같습니다.

MyItem Created 1
…
MyItem Created 15
Subsctiber received: MyItem(id=1)
Subsctiber received: MyItem(id=2)
Subsctiber received: MyItem(id=3)
Subsctiber received: MyItem(id=4)
Subsctiber received: MyItem(id=5)
Req Two more
Subsctiber received: MyItem(id=6)
Subsctiber received: MyItem(id=7)

 

Backpressure Strategy

에제 코드는 Flowable 객체 생성 별다른 옵션을 추가하지 않았지만 Flowable.create()함수를 통해 Backpressure Strategy를 지정할 수 있습니다. Backpressure Strategy는 아래와 같이 5가지 옵션이 있습니다.

 

BackpressureStrategy를 사용해 Flowable을 생성하는 방법은 아래 코드와 같습니다.

fun funExam(){
    … 

    val flowable: Flowable<Int> = Flowable.create<Int>({
        for(i in 1..10){
            it.onNext(i)
        }
        it.onComplete()
    }, BackpressureStrategy.BUFFER)
    
    flowable.observeOn(Schedulers.io())
            .subscribe(subscriber)
}

 

Observable to Flowable

Observable.toFlowable() 연산자는 백프레셔를 지원하지 않는 원천에서 BackpressureStrategy 구현하는 방법을 제공합니다.

연산자는 Observable Flowable으로 바꿔줍니다. 연산자를 사용하기 위해 먼저 Observable 버퍼링 전략을 사용하도록 변환합니다.

fun funExam(){
    // 1
    val source = Observable.range(1, 1000)

    // 2
    source.toFlowable(BackpressureStrategy.BUFFER)
            .map { MyItem(it) }
            .observeOn(Schedulers.io())
            .subscribe{
                println("Next: $it")
                runBlocking { delay(100) }
            }
}

 

코드는 주석 1에서 Observable 인스턴스를 만들고 주석 2에서 Observable 인스턴스를 BackpressureStrategy.BUFFER인 Flowable 변환했습니다.

 

원천에서 backpressure를 지원하는 flowable 생성

지금까지 다운스트림에서 backpressure를 처리하는 방법을 설명했습니다. 

Flowable.generate() Flowable.create() 유사하지만 약간의 차이점이 있습니다. 

fun funExam(){
    val flowable = Flowable.generate<Int> {
        it.onNext(GenerateFlowableItem.item)
    }

    flowable
            .map { MyItem(it) }
            .observeOn(Schedulers.io())
            .subscribe {
                runBlocking { delay(100) }
                println("Next: $it")
            }
    runBlocking { delay(10000) }

}

object GenerateFlowableItem{
    var item: Int = 0
    get() {
        field += 1
        return field
    }
}

 

예제코드에서는 Flowable.generate() 함수로 flowable을 생성했습니다. flowable이 아이템을 방출하고 구독자는 수신/대기/버퍼링/삭제하는 Flowable.create()와는 달리  Flowable.generate()는 요청 시 아이템을 생성하고 이를 방출합니다. Flowable.generate()는 원천으로 사용할 람다를 허용하는데 이 람다는 Flowable.create()와 유사하게 보일 수 있으며, Flowable.create()와 달리 아이템을 요청할 때마다 이를 방출합니다.

방출 순서로 다시 설명하면 MyItem 인스턴스가 생성되기 전에 generate()메서드에서 onNext의 파라미터로 입력한 GenerateFlowableItem의 get() 메서드가 호출되고 반환값으로 MyItem 인스턴스가 생성되게 됩니다.

 

ConnectableFlowable

Observable처럼 ConnectableFlowable은 flowable과 유사하지만 구독 시점에 아이템 방출을 시작하지 않고 connect() 함수가 호출될 때 시작합니다. 이렇게 하면 flowable이 아이템을 방출하기 전에 의도한 모든 구독자가 Flowable.subscribe()를 기다리도록 할 수 있습니다.

fun funExam(){
    val connectableFlowable = listOf("String 1", "String 2", "String 3", "String 4")
            .toFlowable()
            .publish()

    connectableFlowable.subscribe({
        println("Subscription 1: $it")
        runBlocking { delay(1000) }
        println("Subscription 1 delay")
    })

    connectableFlowable.subscribe({
        println("Subscription 2: $it")
    })

    connectableFlowable.connect()
}

 

위 코드를 보면 리스트 이터러블을 flowable로 변경한 후 publish() 메소드를 사용해  Hot Observable인 ConnectableFlowable 객체를 생성했습니다.

후에 첫번째 구독을 시작하는데 첫번째 구독은 onNext() 메소드 호출 시 1 sec의 지연 시간을 가집니다.

다시 두번째 구독을 시작하는데 두번째 구독은 지연 시간이 없습니다.

 

이 코드를 실행하면 첫번째 구독의 onNext()가 한번 호출 된 후 두번째 구독이 모두 실행될 것 처럼 보이지만 Hot Observable 방식이므로  아래와 같이 모든 구독자가 순차적으로 호출됩니다.

즉, ConnectableFlowable은 모든 구독자가 방출되는 아이템을 받는 것이 완료한 후에 다음 아이템 방출을 진행합니다.

Subscription 1: String 1
Subscription 1 delay
Subscription 2: String 1
Subscription 1: String 2
Subscription 1 delay
Subscription 2: String 2
Subscription 1: String 3
Subscription 1 delay
Subscription 2: String 3
Subscription 1: String 4
Subscription 1 delay
Subscription 2: String 4

 

Processor

Processor는 flowable의 subject에 해당합니다. 모든 subject 유형은 backpressure를 지원하는 타입이 있습니다.

아래는 PublishProcessor의 사용 예시 입니다.

fun funExam(){
    val flowable = listOf("String 1", "String 2", "String 3", "String 4")
            .toFlowable()

    val processor = PublishProcessor.create<String>()

    processor.subscribe({
        println("Subscription 1: $it")
        runBlocking { delay(1000) }
        println("Subscription 1 delay")
    })

    processor.subscribe({
        println("Subscription 2: $it")
    })

    flowable.subscribe(processor)
}

 

위 코드의 실행 결과는 ConnectableFlowable과 동일합니다.

processor도 ConnectableFlowable과 마찬가지로 모든 구독자가 완료될 때까지 다음 아이템 방출을 대기합니다.

 

아이템 방출 속도를 유지하면서 흐름을 제어하는데 도움을 주는 연산자들

다운스트림에서 backpressure를 처리하기 위해 원천에서 아이템 방출을 느리게 만드는 것은 항상 좋은 방법은 아닙니다.

원천의 아이템 방출을 느리게 하지 않고 처리하는데 도움을 주는 3개의 연산자는 다음과 같습니다.

   - buffer

   - window

   - throttle

해당 연산자에 대한 자세한 설명은 다른 글을 통해 자세히 설명하도록 하겠습니다.

 

정리

이번 글에서는 비동기 처리 상황에서 아이템 방출을 제어할 수 있는 흐름 제어 방법에 대해 설명하였습니다.

 

Observable와 Flowable의 장단점에 대해 설명하였으며,  흐름을 제어하는 역할을 하는 BackPressure와 Flowable에 대해 자세히 설명하였고, BackPressure의 Strategy 옵션 5가지에 대해 설명하였습니다.

   - BackpressureStrategy.MISSING

   - BackpressureStrategy.ERROR

   - BackpressureStrategy.BUFFER

   - BackpressureStrategy.DROP

   - BackpressureStrategy.LATEST

 

마지막으로 다운스트림에서 backpressure를 처리하기 위해 원천에서 아이템 방출을 느리게 만드는 것은 항상 좋은 방법은 아니므로 방출 속도를 유지하면서 흐름을 제어하는 연산자도 있다는 것을 가볍게 언급하였습니다.

 

다음 글부터는 Reactive에서 지원하는 연산자를 지원 기능에 따라 그 종류를 구분해보고 각 종류에 따른 설명을 진행해보도록 하겠습니다.

'Reactive' 카테고리의 다른 글

6. Reactive - 변환 연산자  (0) 2021.06.14
5. Reactive - 연산자 구분  (0) 2021.06.12
3. Reactive - Hot & Cold Observable, Subject  (0) 2021.06.07
2. Reactive - Observable  (0) 2021.06.03
1. Reactive 프로그래밍이란?  (0) 2021.06.01
Comments