봄날은 갔다. 이제 그 정신으로 공부하자

3. Reactive - Hot & Cold Observable, Subject 본문

Reactive

3. Reactive - Hot & Cold Observable, Subject

길재의 그 정신으로 공부하자 2021. 6. 7. 11:11

이번 글에서는 동작 방식에 따른 옵저버블의 구분과 Subject에 대해 설명하도록 하겠습니다.

 

Hot & Cold Observable

Observable은 그 행동에 따라 두 가지 범주(Hot, Cold)로 구분할 수 있습니다.

 

Cold Observable

Cold Observable은 구독 시에 실행을 시작하고 subscribe가 호출되면 아이템을 푸시하기 시작하는데 각 구독에서 동일한 순서로 아이템을 방출 합니다. 즉, 콜드 옵저버블은 수동적이며 subscribe가 호출될 때까지 아무것도 내보내지 않습니다.

이전에 사용한 예제 코드들은 모두 Cold Observable 방식이라고 생각하시면 됩니다.

 

Hot Observable

Hot Observable은 Cold Observable과는 반대로 방출을 시작하기 위해 구독을 할 필요가 없습니다.

Hot Observable은 구독 여부에 관계 없이 아이템을 계속 방출합니다.

이후 구독이 이루어지게되면 구독자들은 중간부터 아이템을 받게 됩니다.

 

ConnectableObservable

Hot Observable을 구현하기 위한 방법에는 여러 가지가 있는데 그 중 하나가 ConnectableObservable 이라는 특수한 유형의 Observable 입니다. ConnectableObservable은 Cold Observable을 Hot Observable로 바꿀 수 있으며 앞서 설명한 옵저버블들과는 달리 subscribe() 함수의 호출로 아이템 방출이 시작되는 대신 connect() 함수를 호출해 방출되는 아이템을 구독합니다.

 

아래 코드는 ConnectableObservable을 사용하는 예시 입니다.

val connectableObservable ConnectableObservable = Observable.interval(1, TimeUnit.SECONDS).publish()
connectableObservable.connect()

connectableObservable.subscribe{it -> println("First: " + it)}

Thread.sleep(3000)

connectableObservable.subscribe(it -> println(“Second: " + it)}
Thread.sleep(3000)

 

첫 번째 구독 시에 3초동안 0~2 아이템을 발행하고, 3초 뒤에 추가된 구독자는 이를 수신하지 못하며 3부터 수신하는 것을 볼 수 있다.

First: 0
First: 1
First: 2
First: 3
Second: 3
First: 4
Second: 4
First: 5
Second: 5

 

autoConnect() 함수는 connect()를 호출하지 않더라도 구독 즉시 아이템을 발행할 수 있도록 도와주는 함수로 autoConnect의 매개변수는 아이템을 발행하는 구독자의 수 입니다. 만약 autoConnect(2)라고 한다면 구독자가 2개 이상 붙어야 아이템을 발행하기 시작한다.

예제 코드는 아래와 같습니다.

val observable Observable<Long> = Observable.interval(1, TimeUnit.SECONDS).publish().autoConnect(2)
observable.subscribe{it -> println("First: " + it)}
observable.subscribe(it -> println("Second: " + it)};
Thread.sleep(3000)

 

subject

subject는 Observer나 Observable처럼 행동하는 리액티브의 일부 구현체에서 사용 가능한 일종의 Proxy라고 볼 수 있는데, 그 이유는 주제는 옵저버이기 때문에 하나 이상의 Observable을 구독할 수 있으며 동시에 Observable이기도 하기 때문에 항목들을 하나 하나 거치면서 재배출하고 관찰하며 새로운 항목들을 배출할 수도 있다.

 

Hot Observable을 구현하는 또다른 방법은 Subject로 Subject는 옵저버블과 옵저버의 조합인데 두가지 모두의 공통된 동작을 갖고 있기 때문 입니다. 

 

Subject는 다음과 같은 특징을 가지고 있습니다.

   - 옵저버블이 가져야하는 모든 연산자를 가지고 있습니다.

   - 옵저버와 마찬가지로 push된 모든 값에 접근할 수 있습니다.

   - Subject가 완료 / 오류 / 구독 해지 된 후에는 재사용할 수 없습니다.

   - onNext를 사용해 값을 Sujbect(Observer)측에 전달하면 Observable에서 접근 가능합니다.

 

subject는 옵저버블과 옵저버의 성격을 동시에 가지고 있기 때문에 조금 이해하기 어려운 부분이 있는데요.

아래 subject 사용 예시를 보시면 이해하는데 조금 도움이 될꺼라 생각됩니다.

// 1
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)

// 2
val subject = PublishSubject.create<Long>()
observable.subscribe(subject)

// 3
subject.subscribe({println(“Next 1: $it")})

runBlocking { delay(1100) }

// 4
subject.subscribe({println(“Next 2: $it")})
runBlocking { delay(500) }

 

위 코드를 볼때 유심히 보아야하는 부분은 subject의 옵저버블과 업저버 동시 역할 입니다.

주석 2에서는 subject가 옵저버의 역할을 수행해서 업저버블을 구독했지만

주석 3과 4에서는 subject가 옵저버블의 역할을 수행하고 있음을 확인할 수 있습니다.

 

주석 1에서 100ms 간격으로 아이템을 방출하는 옵저버블 객체를 만들었고 

주석 2에서 PublishSubject 객체를 만들어서 옵저버블에 구독을 시작했습니다.

주석 3에서 subject에 구독을 시작했고 옵저버블이 방출하는 값을 구독할 수 있게 되었습니다.

11 sec의 지연 시간이 끝난 후에, 

주석 4에서 다시 한번 subject에서 구독을 시작하고 5sec의 지연시간을 추가하였습니다.

 

첫번째 구독자가 11 sec동안 옵저버블이 방출하는 아이템을 수신하고 이후부터는 첫번째 & 두번째 구독자 모두 방출되는 아이템을 구독하게 되므로 아래와 같이 출력되게 됩니다.

Next 1: 0
Next 1: 1
Next 1: 2
..
Next 1: 11
Next 2: 11
Next 1: 12
Next 2: 12
…

 

간단한 예제 코드로 subject에 대해 설명했는데 subject는 4종류가 있습니다. 각각의 subject는 특정 상황에 맞도록 설계되었으므로 subject의 특징과 동작 방식을 이해해야만 적절할 상황에 사용할 수 있습니다.

 

AsyncSubject

AsyncSubject는 수신 대기 중인 소스 Observable로 부터 방출된 마지막 값만 방출하고 소스 Observable의 동작이 완료된 후에 동작합니다. 이 내용을 그림으로 설명하면 아래와 같습니다.

 

아래 코드와 같이 4개의 값을 호출하는 옵저버블의 경우 마지막 방출이 완료된 후 마지막 아이템을 한번만 방출하게 됩니다.

val observable = Observable.just(1,2,3,4,5)
val subject = AsyncSubject.create<Int>()
observable.subscribe(subject)
subject.subscribe({
    println(“Next $it")
  },{
  },{
    println("Completed")
  })
subject.onComplete()

 

즉, 위 코드 실행 결과는 아래와 같습니다.

Next 5
Completed

 

PublishSubject

위 에서 한번 언급한 subject로 PublishSubject는 OnNext() 함수 또는 다른 구독을 통해 값을 받았는지 여부에 관계 없이 구독 시점에 이어지는 모든 값을 방출합니다. 가장 많이 사용되는 subject 중 하나 입니다.

이 내용을 그림으로 설명하면 아래와 같습니다.

 

BehaviorSubject

BehaviorSubject는 멀티태스킹으로 동작하는데 구독 전에 마지막 아이템과 구독 후 모든 아이템을 방출합니다.

즉 내부 옵저버 목록을 유지하는데 중복 전달 없이 모든 옵저버에게 동일한 아이템을 방출합니다.

이 내용을 그림으로 설명하면 아래와 같습니다.

val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.subscribe({
    println(“[S1] Next $it")
  },{
    
  },{
    println("[S1] Completed")
  })
subject.onNext(4)
subject.subscribe({
    println("[S2] Next $it")
  },{

  },{
    println("[S2] Completed")
  })
subject.onComplete()

 

BehaviorSubject는 앞서 언급했던 AsyncSubject와 PublishSubject의 특성을 모두 가지고 있으므로 위 코드에 대해 다음과 같이 동작합니다. 첫번째 구독 S1에 대해 “[S1] Next 3”을 출력하고 두번째 구독 [S2]에서 첫번째와 두번째 onNext가 호출되어 결과를 출력하고 onComplete 메서드에 의해 두개 모두 완료 처리됩니다.

호출 결과를 아래와 같습니다.

[S1] Next 3
[S1] Next 4
[S2] Next 4
[S1] Completed
[S2] Completed

 

ReplaySubject

ReplaySubject는 갖고 있는 모든 아이템을 옵저버의 구독 시점과 상관 없이 다시 전달하는데 이는 콜드 옵저버블과 유사합니다.

다이어그램으로 동작을 표현하면 아래 그림과 같습니다.

 

위 BehaviorSubject의 예제 코드를 ReplaySubject로 변경하는 경우

val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.subscribe({
    println("[S1] Next $it")
  },{

  },{
    println("[S1] Completed")
  })
subject.onNext(4)
subject.subscribe({
    println("[S2] Next $it")
  },{

  },{
    println("[S2] Completed")
  })
subject.onComplete()

 

아래와 같이 출력되게 됩니다.

[S1] Next 1
[S1] Next 2
[S1] Next 3
[S1] Next 4
[S2] Next 1
[S2] Next 2
[S2] Next 3
[S2] Next 4
[S1] Completed
[S2] Completed

 

정리

이번 글에서는 동작 방식에 따른 옵저버블의 구분과 Subject에 대해 설명했습니다.

 

Observable은 그 행동에 따라 두 가지 범주(Hot, Cold)로 구분할 수 있습니다.

   - Cold Observable

   - Hot Observable

 

Cold Observable은 수동적이며 subscribe가 호출될 때까지 아무것도 내보내지 않습니다.

Hot Observable은 Cold Observable과는 반대로 방출을 시작하기 위해 구독을 할 필요가 없습니다.

Hot Observable은 구독 여부에 관계 없이 아이템을 계속 방출합니다.

 

Cold Observable을 Hot Observable로 변환해 줄 수 있는 ConnectableObservable대해 살펴보았으며,

 

Hot Observable을 구현하는 subject 4가지에 대해서도 알아보았습니다.

1. AsyncSubject

AsyncSubject는 수신 대기 중인 소스 Observable로 부터 방출된 마지막 값만 방출하고 소스 Observable의 동작이 완료된 후에 동작합니다.

2. PublishSubject

PublishSubject는 OnNext() 함수 또는 다른 구독을 통해 값을 받았는지 여부에 관계 없이 구독 시점에 이어지는 모든 값을 방출합니다.

3. BehaviorSubject

BehaviorSubject는 멀티태스킹으로 동작하는데 구독 전에 마지막 아이템과 구독 후 모든 아이템을 방출합니다.

4. ReplaySubject

ReplaySubject는 갖고 있는 모든 아이템을 옵저버의 구독 시점과 상관 없이 다시 전달하는데 이는 콜드 옵저버블과 유사합니다.

 

이번 글까지는 옵저버블의 종류와 사용 방법에 설명했다면 다음 글에서는 비동기 처리 상황에서 발생할 있는 문제, 비동기적으로 동작할 Observable 처리 있는 처리량보다 많은 아이템을 방출하는 경우에 대한 해결 방법에 대해 설명하도록 하겠습니다.

'Reactive' 카테고리의 다른 글

6. Reactive - 변환 연산자  (0) 2021.06.14
5. Reactive - 연산자 구분  (0) 2021.06.12
4. Reactive - BackPressure & Flowable  (0) 2021.06.10
2. Reactive - Observable  (0) 2021.06.03
1. Reactive 프로그래밍이란?  (0) 2021.06.01
Comments