봄날은 갔다. 이제 그 정신으로 공부하자

2. Reactive - Observable 본문

Reactive

2. Reactive - Observable

길재의 그 정신으로 공부하자 2021. 6. 3. 11:11

이번 글에서는 리액티브 주요 구성 요소인 Observable에 대해 심도있게 설명해보도록 하겠습니다.

옵저버블이 무엇인지와 옵저버를 객체를 생성하는 다양한 방법 그리고 구독과 해지 방법에 대해 설명하도록하겠습니다.

 

Observable

Observable은 데이터의 흐름을 관장하는 클래스로 데이터의 흐름에 맞게 알림을 보내 옵저버가 데이터 처리를 할 수 있도록 만드는 역할을 수행합니다. 따라서 옵저버블은 일련의 연산자를 거친 아이템을 최종 옵저버로 내보내는 방출 기반의 조합 가능한 Iterator라고 할 수 있습니다. 이 부분을 조금 더 자세히 설명하면 아래와 같습니다.

  - 옵저버는 옵저버블을 구독합니다.

  - 옵저버블이 그 내부의 아이템들을 방출하기 시작합니다.

  - 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응합니다.

 

옵저버블이 onNext, onComplete, onError 같은 이벤트 함수를 통해 방출을 제어합니다.

이 함수들이 호출되는 시점은 옵저버가 구독을 시작한 시점입니다.

  - onNext: 옵저버블은 모든 아이템을 하나씩 이 함수에 전달합니다.

  - onComplete: 모든 아이템이 onNext 함수를 통과하면 옵저버블은 onComplete 함수를 호출합니다.

  - onError: 옵저버블에서 에러가 발생하면 onError 함수가 호출돼 정의된 대로 에러를 처리합니다.

 

이 내용을 다이어그램으로 표현하면 아래와 같습니다.

 

위 내용을 예제로 살펴보면 아래와 같습니다.

// 리스트 이터레이터를 Observable 객체로 만들어줍니다.
var list: List<Any> = listOf("One", “Two”, "Three", "Four", "Five")
var observable: Observable<String> = list.toObservable()

// Observable에서 방출하는 데이터를 수신할 Observer 객체를 생성합니다.
val observer: Observer<String> = object: Observer<String>{
    override fun onComplete(){
        println("All Complete")
    }

    override fun onNext(item: Any){
        println("Next $item")
    }

    override fun onError(e: Throwable){
        println("Error Occured $e")
    }

    override fun onSubscribe(e: Disposable){
        println("New Subscription”)
    }
}

// 마지막으로 옵저버블에 옵저버를 연결하면서 동시성을 관리하는 스케줄러를 정의합니다.
observable
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer)

 

 

위 코드의 실행하면 옵저버가 옵저버블 구독을 시작하면 구독이 시작되었음을 옵저버에 알려주는 onSubscribe() 함수가 호출되고

이후 방출이 시작되면 onNext() 함수가 호출되고 모든 아이템 방출이 완료되면 onComplete() 함수가 호출됩니다.

실행 결과는 아래와 같습니다.

New Subscription
Next One
Next Two
Next Three
Next Four
Next Five
All Complete

 

Creating Observable

Observable 객체를 생성하는 방법은 create, from, just 등 다양한 방법이 있습니다.

이번 절에서는 옵저버블 객체를 생성하는 다양한 방법에 대해 설명하도록 하겠습니다.

 

create()

Create()는 직접적인 코드 구현을 통해 옵저버 메서드를 호출하여 옵저버블을 생성합니다. 

create() 함수는 사용자가 지정한 데이터구조를 사용하거나 내보내는 값을 제어하려고 할 때 유용합니다.

 

Create() 함수를 사용해 옵저버블 객체를 생성하는 예제 코드는 아래와 같습니다.

fun funExam(){
    val observer: Observer<String> = object: Observer<String>{
        override fun onComplete(){
            println("All Complete")
        }

        override fun onNext(item: String){
            println("Next $item")
        }

        override fun onError(e: Throwable){
            println("Error Occured $e")
        }

        override fun onSubscribe(e: Disposable){
            println("New Subscription")
        }
    }
    
    val observable: Observable<String> = Observable.create<String>{
        it.onNext(“One”)
        it.onNext(“Two”)
        it.onNext(“Three”)
        it.onNext(“Four”)
        it.onNext(“Five”)
        it.onComplete()
    }
    observable.subscribe(observer)
}

 

출력 결과는 이전과 동일하지만 이전 코드와의 차이점을 살펴보면 이전 코드에서는 toObservable() 함수를 사용해 list의 이터레이터를 옵저버블 객체로 변환해주었다면 위 코드에서는 Observable class에서 지원하는 create() 함수를 사용해 직접 옵저버블 객체를 생성하고 있습니다. create() 함수의 람다식 내부에서 onNext 함수를 사용해 옵저버에 방출할 아이템을 추가해주고 마지막에 onComplete() 함수를 호출해 방출을 완료합니다.

 

즉, create() 함수의 람다식 내부에서 ObservableEmitter의 onNext() 함수에 방출할 아이템을 추가하면 옵저버의 onNext() 함수에 아이템이 방출됩니다. 또한 onComplete() 함수 호출 후에는 어떠한 방출도 옵저버에 전달되지 않습니다.

 

위 예제 코드의 onComplete() 함수 다음에 onNext() 함수를 사용해 방출을 추가해주어도 옵저버에는 방출되지 않습니다.

val observable: Observable<String> = Observable.create<String>{
        it.onNext(“One”)
        it.onNext(“Two”)
        it.onNext(“Three”)
        it.onNext(“Four”)
        it.onNext(“Five”)
        it.onComplete()
        it.onNext(“Six”)
    }
observable.subscribe(observer)

 

from()

from 함수는 create()함수에 비해 상대적으로 간단히 옵저버블 객체를 만들수 있는 방식으로 배열에 포함된 요소를 하나씩 순서대로 방출하는 업저버블 객체를 생성합니다.

from() 계열 함수는 다음과 같습니다.

   - fromArray()

   - fromIterable()

   - fromCallable()

   - fromFuture()

   - fromPublisher()

 

from()의 동작 매커니즘을 그림으로 살펴보면 아래와 같습니다

 

 

fromArray()

Array를 Observable객체를 생성하는 것으로 fromArray() 함수를 통해 배열을 옵저버블 객체로 변환합니다.

 

fromIterable()

Iterable 인터페이스를 구현한 클래스에서 Observable객체를 생성하는 것으로 Iterable 인터페이스는 반복자(iterator)를 반환합니다.

 

fromCallable()

Java 5에 추가된 동시성 API인 Callable인터페이스로, 비동기 실행 후 결과를 반환하는 call()메소드를 정의합니다.

Runnable 인터페이스처럼 메소드가 하나이고 인자가 없다는 점에서 비슷하지만, 실행 결과를 리턴한다는 점에서 차이가 있습니다. 

또한 Executor 인터페이스의 인자로 활용되기 때문에 잠재적으로 다른 스레드에서 실행되는 것을 의미하기도 합니다.

 

fromFuture()

Future 인터페이스 역시 자바 5에서 추가된 동시성 API로 비동기 계산의 결과를 구할 때 사용합니다. 

보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future객체를 반환합니다. 

get()메소드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올 때 까지 블로킹 됩니다.

 

fromPublisher()

Publisher는 자바 9 의 표준인 Flow API의 일부로 Observable.create()와 마찬가지로 onNext()와 onComplete()함수를 호출할 수 있습니다.

 

from() 계열 함수의 예제 코드는 아래와 같습니다.

fun funExam(){
    val observer: Observer<String> = object: Observer<String>{
        override fun onComplete(){
            println("All Complete")
        }

        override fun onNext(item: String){
            println("Next $item")
        }

        override fun onError(e: Throwable){
            println("Error Occured $e")
        }

        override fun onSubscribe(e: Disposable){
            println("New Subscription")
        }
    }

    // 1
    val strArray = arrayOf<String>("One", "Two", "Three")
    val observableFromArray: Observable<Array<String>> = Observable.fromArray(strArray)
    observableFromArray.subscribe(observer)

    // 2
    val list = listOf("String 1", "String 2", "String 3")
    val observableFromIterable: Observable<String> = Observable.fromIterable(list)
    observableFromIterable.subscribe(observer)

    // 3
    val callable = object: Callable<String> {
        override fun call(): String {
            return "From Callable"
        }
    }
    val observableFromCallable: Observable<String> = Observable.fromCallable(callable)
    observableFromCallable.subscribe(observer)

    // 4
    val future: Future<String> = object: Future<String>{
        override fun isDone(): Boolean = true
        override fun get(): String = "Hello From Future"
        override fun get(timeout: Long, unit: TimeUnit?): String = "Hello From Future"
        override fun cancel(mayInterruptIfRunning: Boolean): Boolean = false
        override fun isCancelled(): Boolean = false
    }
    val observableFromFuture: Observable<String> = Observable.fromFuture(future)
    observableFromFuture.subscribe(observer)

    // 5
    val publisher: Publisher<String> = object: Publisher<String>{
        override fun subscribe(s: Subscriber<in String>?) {
            s?.let {
                it.onNext("Publisher.onNext")
                it.onComplete()
            }
        }
    }
    val observableFromPublisher: Observable<String> = Observable.fromPublisher(publisher)
    observableFromPublisher.subscribe(observer)
}

 

just()

just() 함수는 넘겨진 인자 만을 배출하는 옵저버블을 생성합니다.

Iterable 인스턴스를 Observable.just에 단일인자로 넘기면 전체 목록을 하나의 아이템으로 배출하는데 이는 Iterable 내부의 각각의 아이템을 Observable로 생성하는 from() 함수와는 다른 점 입니다. just() 함수를 호출하면 아래와 같은 일이 일어납니다.

  - 인자와 함께 just() 함수 호출

  - just() 함수는 옵저버블을 생성

  - onNext 알림을 통해 각각의 아이템을 내보냄

  - 모든 인자의 방출이 완료되면 onComplete 메서드 호출

 

예제 코드는 다음과 같습니다.

fun funExam() {
    val observer: Observer<String> = object : Observer<String> {
        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext(item: String) {
            println("Next $item")
        }

        override fun onError(e: Throwable) {
            println("Error Occured $e")
        }

        override fun onSubscribe(e: Disposable) {
            println("New Subscription")
        }
    }

    // 1
    Observable.just(“One”).subscribe(observer)
    Observable.just(“Two”).subscribe(observer)
    
    // 2
    Observable.just("One", "Two", "Three").subscribe(observer)
}

 

 

위 코드를 살펴보면 주석 1에서는 just()함수를 사용해 각각의 옵저버블을 생성했으므로  주석 1의 실행 결과로 아래와 같이 출력되고 

New Subscription
Next One
All Completed
New Subscription
Next Two
All Completed

 

주석 2에서는 just() 함수 내부에서 매개변수로 받은 emitter들을 옵저버블로 전환하므로 리스트 아이템의 각각의 인자가 별개의 아이템으로 받아들여저 아래와 같이 출력됩니다.

New Subscription
Next One
Next Two
Next Three
All Completed

 

Observable 객체를 생성하는 다른 함수들

Observable 객체를 생성하는 다른 함수로는 아래와 같은 함수들이 있습니다.

   - range()

   - empty()

   - interval()

   - timer()

 

range()

range() 함수는 주어진 값(n)부터 m개의 integer 아이템을 방출합니다.

아래 코드는 1부터 10까지 열 개의 integer 아이템을 방출합니다.

Observable.range(1, 10).subscribe(observer)

 

empty()

empty() 함수는 옵저버의 onNext 호출 없이 바로 onComplete가 호출됩니다.

사용 방법은 아래와 같습니다.

Observable.empty<String>().subscribe(observer)

 

interval()

interval() 함수는 일정 시간 간격으로 데이터 흐름을 생성합니다.

interval() 함수는 subscribe한 시간부터 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 방출합니다. 주로 사용하는 함수 원형은 아래와 같이 두 가지가 있습니다.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@NonNull
public static Observable<Long> interval(long period, @NonNull TimeUnit unit) {
    return interval(period, period, unit, Schedulers.computation());
}

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(long initialDelay, long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
    Objects.requireNonNull(unit, "unit is null");
    Objects.requireNonNull(scheduler, "scheduler is null");

    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

 

아래와 같이 사용하면 0부터 시작하여 300 ms 단위로 값을 방출하기 시작합니다.

Observable.interval(300, TimeUnit.MILLISECONDS).subscribe(observer)

 

timer()

timer() 함수는 interval() 함수와 유사하지만 한번만 실행하는 함수입니다. 일정 시간 지난 후 한번만 방출하고 onComplete 이벤트를 발생시킵니다. 

timer() 함수 원형은 다음과 같습니다.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
@NonNull
public static Observable<Long> timer(long delay, @NonNull TimeUnit unit) {
    return timer(delay, unit, Schedulers.computation());
}

 

아래 코드는 400 ms 후에 값을 한번만 방출하고 onComplete가 호출되는 예제 입니다.

Observable.timer(400, TimeUnit.MILLISECONDS).subscribe(observer)

 

Observable 구독과 해지

subscribe() : 구독

Observable과 Observer(구독자)를 연결하기 위해서는 매개체가 필요한데 이 매개체 역할을 하는 것이 바로 subscribe() 함수 입니다.

옵저버블 객체의 subscribe()를 호출 해 옵저버(구독자)를 옵저버블과 연결 할 수 있습니다.

subscribe() 함수에 1~3개의 메서드(onNext, onComplete, onError)를 전달할 수도 있고 옵저버 인터페이스의 인스턴스를 연산자에 전달해 연결할 수도 있습니다.

// 1
val observable: Observable<Int> = Observable.range(1, 5)
observable.subscribe({
    // onNext
    println("Next $it")
},{
    // onError
    println("Error ${it.message}")
},{
    // onComplete
    println("Done")
})

// 2
val observable2: Observable<Int> = Observable.range(1, 5)
observable2.subscribe{
    // onNext
    println("Next $it")
}

 

이전까지의 코드와 다른 부분은 이전 코드에서는 별도의 옵저버 인스턴스를 만든 후에 subscribe 메서드를 호출했지만 위 코드에서는 subscribe 메서드에서 직접 람다식으로 옵저버 인스턴스를 생성했습니다.

주석 1은 subscribe() 함수에서 각 함수를 람다식으로 정의하였고 주석 2는 subscribe() 함수에서 onNext만 호출 받는 예시 입니다.

 

구독 해지

구독 해지를 위해서는 onSubscribe()함수의 매개 변수인 Disposable인스턴스를 사용해야 합니다.

onSubscribe()함수는 구독 시 호출되는 함수로 자세한 사용 방법은 아래와 같습니다.

    val observable: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observer: Observer<Long> = object: Observer<Long>{
        lateinit var disposable: Disposable
        override fun onComplete() {
            println("All Completed")
        }

        override fun onNext(item: Long) {
            println("Next $item")
            if(item >= 10 && !disposable.isDisposed){
                disposable.dispose()
                println("Disposed")
            }
        }

        override fun onError(e: Throwable) {
            println("Error Occured $e")
        }

        override fun onSubscribe(d: Disposable) {
            println("New Subscription")
            disposable = d
        }
    }
    
    observable.subscribe(observer)

 

위 코드는 100 ms 단위로 onNext가 호출됩니다.

방출된 item의 값이 10보다 크거나 같은 상태에서 아직 구독 중인 상태이면 disposable.dispose() 함수를 호출해 구독을 해지하도록 되어 있습니다. 구독 해지에 사용된 disposable 객체는 onSubscribe()함수를 통해 전달 받았음을 알 수 있습니다.

위 코드 실행 시 아래와 같이 10까지만 로그를 출력하고 구독을 해지하게 됩니다.

New Subscription
Next 0
Next 1
Next 2
…
Next 10
All Completed

 

정리

이번 글에서는 리액티브 주요 구성 요소인 Observable이 무엇인지와 옵저버블을 객체를 생성하는 다양한 방법 그리고 구독과 해지 방법에 대해 설명하였습니다.  다시한번 정리해보면 아래와 같습니다.

 

Observable이란?

Observable은 데이터의 흐름을 관장하는 클래스로 데이터의 흐름에 맞게 알림을 보내 옵저버가 데이터 처리를 할 수 있도록 만드는 역할을 수행합니다.

   - 옵저버는 옵저버블을 구독합니다.

   - 옵저버블이 그 내부의 아이템들을 방출하기 시작합니다.

   - 옵저버는 옵저버블에서 내보내는 모든 아이템에 반응합니다.

 

옵저버블 객체를 생성하는 다양한 함수

   - create()

   - fromXXX()

   - just()

   - range()

   - empty()

   - interval()

   - timer()

 

옵저버블 구독과 해지

옵저버블 객체의 subscribe()를 호출 해 옵저버(구독자)를 옵저버블과 연결 할 수 있으며 구독 해지를 위해서는 onSubscribe()함수의 매개 변수인 Disposable인스턴스를 사용해야 합니다.

 

다음 글에서는 동작 방식에 따른 옵저버블의 구분에 대해 설명하도록 하겠습니다.

'Reactive' 카테고리의 다른 글

6. Reactive - 변환 연산자  (0) 2021.06.14
5. Reactive - 연산자 구분  (0) 2021.06.12
4. Reactive - BackPressure & Flowable  (0) 2021.06.10
3. Reactive - Hot & Cold Observable, Subject  (0) 2021.06.07
1. Reactive 프로그래밍이란?  (0) 2021.06.01
Comments