봄날은 갔다. 이제 그 정신으로 공부하자

7. Reactive - 필터 연산자 본문

Reactive

7. Reactive - 필터 연산자

길재의 그 정신으로 공부하자 2021. 6. 16. 11:11

이번글에서는 업스트림이 방출한 아이템을 “채택” or “무시” 에 초점을 맞춘 필터 연산자에 대해 설명하도록 하겠습니다.

필터 연산자의 종류는 아래와 같습니다.

  - debounce

  - distinct

  - distinctUntilChanged

  - elementAt

  - Filter

  - first

  - last

  - ignoreElements

  - skipXXX

  - taskXXX

 

debounce

debounce() 연산자는 아이템 방출량이 급격히 증가하는 상황에서 충분히 시간이 지난 뒤에 마지막 항목을 가져오기를 원할 때 사용하는 연산자로 사용자가 실제로 원하는 키워드와 일치하는 쿼리를 얻을 수 있을 때까지 입력을 중단하기를 잠시 기다렸다가 다운스트림 연산자에게 전달하는 역할을 수행합니다.

fun funExam(){
    createObservable()
            .debounce(200, TimeUnit.MILLISECONDS)
            .subscribe{
                println(it)
            }
}

inline fun createObservable(): Observable<String> =
        Observable.create<String>{
            it.onNext("R")
            runBlocking { delay(100) }
            it.onNext("Re")
            it.onNext("Reac")
            runBlocking { delay(130) }
            it.onNext("Reactiv")
            runBlocking { delay(140) }
            it.onNext("Reactive")
            runBlocking { delay(250) }
            it.onNext("Reactive Prog")
            runBlocking { delay(130) }
            it.onNext("Reactive Program")
            runBlocking { delay(130) }
            it.onNext("Reactive Programming")
            runBlocking { delay(220) }
            it.onNext("Reactive Programming i")
            runBlocking { delay(130) }
            it.onNext("Reactive Programming in")
            runBlocking { delay(130) }
            it.onNext("Reactive Programming in k")
            runBlocking { delay(130) }
            it.onNext("Reactive Programming in kot")
            runBlocking { delay(130) }
            it.onNext("Reactive Programming in kotli")
            runBlocking { delay(130) }
            it.onNext("Reactive Programming in kotlin")
            runBlocking { delay(270) }
            it.onComplete()
        }

 

위 예제 코드는 debounce() 연산자를 사용하였습니다. debounce() 연산자에서 debounce(200, TimeUnit.MILLISECONDS)로 설정하였는데 의미는 대기하고 있다가 200 ms 동안 아무런 방출이 없을 때에만 아이템을 방출하는 다운스트림을 생성합니다.

위 예제 코드에서는 200 ms 가 넘는 delay가 3군데이므로 아래와 같은 결과를 출격하게 됩니다.

Reactive
Reactive Programming
Reactive Programming kotlin

 

distinct

distinct 연산자는 아주 간단합니다. 업스트림에서 중복 아이템 방출을 필터링할 수 있도록 도와주는 연산자입니다.

사용 방법은 아래와 같습니다.

fun funExam(){
    listOf(1,2,2,3,4,4,5,5,5,6,7,7,7,8,8,9,9,3,10)
            .toObservable()
            .distinct()
            .subscribe{
                println("Next: $it")
            }
}

 

distinct() 연산자에 의해 중복이 삭제되어 다운스트림에 push되므로 출력 결과는 아래와 같습니다.

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Next: 10

 

distinctUntilChanged

distinctUntilChanged() 연산자는 distinct 연산자와는 다르게 모든 중복된 방출을 폐기하는 대신 연속적인 중복 방출만 폐기하고 나머지는 그대로 유지합니다.

fun funExam(){
    listOf(1,2,2,3,4,4,5,5,5,6,7,7,7,8,8,9,9,3,10)
            .toObservable()
            .distinctUntilChanged()
            .subscribe{
                println("Next: $it")
            }
}

 

위 예제 코드의 출력 결과를 보면 distinct 연산자와는 다르게 3이 두번 출력된 것을 확인할 수 있습니다.

distinctUntilChanged 연산자는 연속적 중복에 대해서만 폐기 처분하므로 3이 두번 출력됩니다.

Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
Next: 9
Next: 3
Next: 10

 

elementAt

일반적으로 모든 배열이나 목록의 n번째 요소에 액세스할 수 있는 기능이 있는데 이는 일반적인 요구사항 입니다. 이 경우에 사용하는 연산자가 elementAt 연산자입니다.

fun funExam(){
    val observable = listOf(1,2,3,4,5,6,7,8,9,10)
            .toObservable()

    observable.elementAt(5)
            .subscribe{
                println("Next 1: $it")
            }

    observable.elementAt(50)
            .subscribe{
                println("Next 2: $it")
            }
}

 

위 예제 코드를 살펴보면 첫번째 구독에서 elementAt 연산자를 사용해서 6번째(0부터 시작하므로) 아이템을 지정하고 두번째 구독에서 51번째 아이템을 지정한 것을 알 수 있습니다. 하지만 51번째 아이템은 없으므로 onNext()함수가 호출되지 않고 6번째 아이템에 대해서만 단독으로 아래와 같이 호출되는 출력을 가지게 됩니다.

Next 1: 6

 

filter

filter() 연산자는 확실히 가장 많이 사용되는 필터링 연산자입니다. 이를 통해 사용자 정의 로직으로 방출을 필터링할 수 있습니다.

아래 코드는 필터 연산자의 간단한 예시 입니다.

fun funExam(){
    Observable
            .range(1,20)
            .filter{
                it%2 == 0
            }.subscribe {
                println("Next: $it")
            }
}

 

위 예시 코드에서 filter 연산자를 사용해 짝수인 경우에만 다운스트림으로 방출하도록 하였기 때문에 아래와 같은 출력을 가지게 됩니다.

Next: 2
Next: 4
Next: 6 
Next: 8
Next: 10
Next: 12
Next: 14
Next: 16
Next: 18
Next: 20

 

first

first() 연산자는 단어 의미 그대로 첫번째 방출만 유지하고 나머지는 폐기합니다.

아래 코드는 first 연산자의 간단한 예시 입니다.

fun funExam() {
    val observable = Observable.range(1, 20)
    observable
            .first(2)
            .subscribeBy {
                println("Next: $it")
            }
}

 

위 예제 코드를 살펴보면 first() 연산자 호출 시 2라는 값을 매개변수로 넣었는데 의미는 해당 위치에 액세스할 수 없는 경우에 매개 변수로 넣은 값을 방출하라는 의미 입니다. first 위치에 액세스 가능하므로 아래와 같은 출력 결과를 가집니다.

Next: 1

 

last

last() 연산자는 first 연산자와 반대로 마지막 방출만 유지하고 나머지는 폐기하는 연산자 입니다.

아래 코드는 last 연산자의 간단한 예시 입니다.

fun funExam() {
    val observable = Observable.range(1, 20)
    observable
            .last(2)
            .subscribeBy {
                println("Next: $it")
            }
}

 

위 예제 코드를 살펴보면 last() 연산자 호출 시 2라는 값을 매개변수로 넣었는데 의미는 해당 위치에 액세스할 수 없는 경우에 매개 변수로 넣은 값을 방출하라는 의미 입니다. last 위치에 액세스 가능하므로 아래와 같은 출력 결과를 가집니다.

Next: 20

 

ignoreElements

때로는 프로듀서의 onComplete에만 관심이 있을 수 있는데 이럴때 사용하는 연산자가 ignoreElements 연산자 입니다.

해당 연산자를 사용하면 아이템을 다운스트림으로 방출하지 않고 onComplete만 방출합니다.

fun funExam(){
    val observable = Observable.range(1, 20)
    observable
            .ignoreElements()
            .subscribe {
                println("Completed")
            }
}

 

skip & take

개발하다보면 원하는 방출 몇개만 취하고 나머지는 무시하고 싶을 경우가 있는데 skip과 take 연산자는 이런 경우 사용할 수 있는 필터링 연산자입니다.

 

skip 계열 연산자

특정 조건이 충족된 경우 또는 무조건 선두의 방출 아이템 일부를 무시하고자 하는 경우가 있을 수 있습니다.

또한 아이템 방출을 취하기 전에 다른 프로듀서들을 기다리면서 나머지는 모두 건너뛰어야 할 수도 있습니다.

아래 연산자들은 이와 같은 시나리오를 염두에 두고 설계 되었습니다.

  - skip

  - skipLast

  - skipWhile

  - skipUntil

 

take 계열 연산자

take 연산자는 skip 연산자와 정확히 반대로 동작하는 연산자라고 생각하면 됩니다.

take 계열 연산자의 종류는 아래와 같습니다.

   - take

   - takeLast

   - takeWhile

   - takeUntil

 

skip

먼저 skip() 연산자를 알아보도록 하겠습니다.

fun funExam(){
    val observable1 = Observable.range(1, 20)
    observable1
            .skip(5)
            .subscribe {
                println("Next 1: $it")
            }

    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
    observable2
            .skip(400, TimeUnit.MILLISECONDS)
            .subscribe {
                println("Next 2: $it")
            }
}

 

첫 번째 Observable의 skip 연산자는 skip(count: Long)를 사용해 처음 5개의 방출을 건너뛰도록 되어 있고

두 번째 Observable의 skip 연산자는 skip(time: Long, unit: TimeUnit)를 사용해 처음 400ms 동안 발생한 모든 방출을 건너 뛰도록 되어 있습니다. 출력 결과는 다음과 같습니다.

Next 1: 6
Next 1: 7
Next 1: 8
…
Next 1: 20
Next 2: 3
Next 2: 4
Next 2: 5
…

 

skipLast

skipLast() 연산자는 skip 연산자와 유사하게 동작하지만 차이점은 방출을 앞이 아닌 뒤에서부터 건너뛰도록 되어 있습니다.

fun funExam() {
    val observable = Observable.range(1, 20)
    observable
            .skipLast(5)
            .subscribe {
                println("Next 1: $it")
            }
}

 

skipLast를 사용했으므로 위 예제 코드의 출력은 맨 뒤의 방출 5개가 제거된 아래와 같은 출력 결과를 가집니다.

Next 1: 1
Next 1: 2
Next 1: 3
…
Next 1: 15

 

skipWhile

skipWhile() 연산자는 논리 표현식을 기반으로 건너 뛰는 연산자 입니다.

필터 연산자와 마찬가지로 논리 표현식을 skipWhile 연산자에 전달합니다.

논리 표현식이 참이라고 평가되면 방출을 건너뛰고 거짓이라고 반환되는 순간부터 모든 방출을 다운스트림에 전달하기 시작합니다.

fun funExam() {
    val observable = Observable.range(1, 20)
    observable
            .skipWhile { item -> 
                item<10
            }.subscribe {
                println("Next 1: $it")
            }
}

 

skipWhile 연산자를 사용한 위 예제는 10보다 작은 경우에 방출을 건너뛰도록 되어 있으므로 아래와 같이 출력합니다.

Next 1: 10
Next 1: 11
Next 1: 12
…
Next 1: 20

 

skipUntil

두 개의 Observable이 동시에 작업하는 상황에서 두 번째 Observable이 방출을 시작하자마자 첫 번째 Observable이 방출을 처리하고자할 때 사용하는 연산자가 skipUntil 연산자 입니다.

fun funExam(){
    val observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)

    observable1
            .skipUntil(observable2)
            .subscribe {
                println("Next: $it")
            }
}

 

skipUntil 연산자에 observable2를 사용했는데 observable2는 500ms 뒤에 아이템 방출을 시작하도록 되어 있으므로 observable2가 방출을 시작하기 전까지 observable1의 모든 아이템 방출은 무시되므로 아래와 같이 출력됩니다.

Next: 4
Next: 5
Next: 6
Next: 7
Next: 8
…

 

take

take() 연산자는 count로 지정한 만큼의 아이템만 방출합니다. 지정한 수의 방출이 완료되면 곧바로 completed가 발생합니다. 

다만 원본 Observable이 error나 complete가 나온다면 그 즉시 종료됩니다.

사용 방법은 아래 코드와 같습니다.

fun funExam(){
    val observable1 = Observable.range(1, 20)
    observable1
            .take(5)
            .subscribe {
                println("Next 1: $it")
            }

    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
    observable2
            .take(400, TimeUnit.MILLISECONDS)
            .subscribe {
                println("Next 2: $it")
            }
}

 

위 예제 코드는 skip 연산자의 예제 코드와 동일합니다. 다른 부분은 skip 연산자를 take 연산자로 변경한 것 뿐 입니다.

take 연산자는 skip 연산자와 반대이므로 아래와 같이 출력됩니다.

Next 1: 1
Next 1: 2
Next 1: 3
Next 1: 4
Next 1: 5
Next 2: 0
Next 2: 1
Next 2: 2

 

takeLast

takeLast() 연산자는 count로 지정한 만큼의 마지막 아이템들만 방출합니다.

사용 방법은 다음과 같습니다.

fun funExam() {
    val observable = Observable.range(1, 20)
    observable
            .takeLast(5)
            .subscribe {
                println("Next: $it")
            }
}

 

즉, 위 예제 코드에서 takeLast(5)를 사용했으므로 마지막 출력 다섯개만 방출됩니다.

Next: 16
Next: 17
Next: 18
Next: 19
Next: 20

 

takeWhile

takeWhile() 연산자는 skipWhile 연산자와 반대로 동작하는 연산자로  논리 표현식이 참인 경우에만 아이템이 방출되고 나머지는 건너뜁니다.

fun funExam() {
    val observable = Observable.range(1, 20)
    observable
            .takeWhile { item ->
                item<10
            }.subscribe {
                println("Next: $it")
            }
}

논리 표현식이 참인 경우에만 방출되므로 출력 결과는 아래와 같습니다.

Next: 1
Next: 2
Next: 3
…
Next: 9

 

정리

이번 글에서는 Observable이 방출한 아이템을 “채택” or “무시” 에 초점을 맞춘 필터 연산자에 대해 살펴보았습니다.

다수 개의 필터 연산자가 존재하므로 상황에 맞는 적절한 필터 연산자를 사용하는게 중요합니다.

이번 글에서 설명한 필터 연산자를 다시한번 간략히 정리하면 아래 표와 같습니다.

다음 글에서는 Observable의 결합을 지원하는 결합 연산자에 대해 설명하도록 하겠습니다.

'Reactive' 카테고리의 다른 글

9. Reactive - 조건 연산자  (0) 2021.06.22
8. Reactive - 결합 연산자  (0) 2021.06.18
6. Reactive - 변환 연산자  (0) 2021.06.14
5. Reactive - 연산자 구분  (0) 2021.06.12
4. Reactive - BackPressure & Flowable  (0) 2021.06.10
Comments