일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- node.js
- Reactive
- mvvm
- PagingLib
- MotionLayout
- MediaPlayer
- Koin
- Kotlin
- paging
- MediaSession
- SWIFTUI
- 테스트 자동화
- databinding
- 동영상
- GCP
- list
- rx
- 인앱결제
- Observable
- Android
- node
- Animation
- Android 13
- junit
- android13
- mysql
- RxKotlin
- google play
- SwiftUI Tutorial
- liveData
- Today
- Total
봄날은 갔다. 이제 그 정신으로 공부하자
8. Reactive - 결합 연산자 본문
이번 글에서는 겹합 연산자에 대해 설명하도록 하겠습니다.
결합연산자는 다수의 Observable을 하나로 합하는 방법을 제공하는 연산자 입니다. 결합과 관련된 연산자로는 변환 연산자에서 설명한 flatMap() or groupBy() 연산자들이 있지만 이 연산자들은 1개의 Observable을 확장해주는 연산자인 반면 결합 연산자는 여러 개의 Observable을 개발자가 원하는 Observable로 결합해 준다는 점에서 서로 다르다고 할 수 있습니다.
Reactive에서 지원하는 결합 연산자의 종류는 다음과 같습니다.
- startWith
- merge
- mergeArray
- mergeWith
- mergeDelayError
- concat
- zip
- zipWith
- combineLatest
- withLatestFrom
startWith
startWith() 연산자는 Observable의 기존 아이템 맨 위에 다른 아이템을 추가하는 연산자 입니다.
간단한 예제 코드는 아래와 같습니다.
fun funExam(){
listOf("C", "C++", "Python", "Kotlin", "Java")
.toObservable()
.startWith("Programming Languages")
.subscribe{
println("Next: $it")
}
}
Observable이 구독자에게 값을 방출하기 전에 startWith의 값이 먼저 방출되고 난 이후 원래 Observable의 값이 방출되게 됩니다.
실행 결과는 아래와 같습니다.
Next: Programming Languages
Next: C
Next: C++
Next: Python
Next: Kotlin
Next: Java
merge
merge() 연산자는 연산자의 파라미터로 전달받은 모든 Observable의 방출 아이템들을 합쳐서 방출하는 연산자 입니다.
사용 방법은 아래와 같습니다.
fun funExam(){
val observable1 = listOf("Kotlin", "UML", "Basic").toObservable()
val observable2 = listOf("Python", "Java", "C++", "C").toObservable()
Observable.merge(observable1, observable2)
.subscribe{
println("Next: $it")
}
}
위 코드는 merge 연산자를 사용해 두 개의 Observable(observable1 & observable2)을 병합한 뒤 하나로서 아이템 방출을 시작하게 되므로 아래와 같이 출력됩니다.
Next: Kotlin
Next: UML
Next: Basic
Next: Python
Next: Java
Next: C++
Next: C
위 예제 코드는 두 Observable 모두 즉시 방출 방식을 사용하므로 merge 연산자에 입력된 순서대로 아이템을 방출하지만 merge 연산자의 병합 작업은 입력 순서를 유지하지는 않습니다. 병합된 아이템의 방출은 옵저버블의 아이템 방출 순서를 따릅니다. 이를 확인하기 위한 예제 코드는 아래와 같습니다.
fun funExam(){
val observable1 = Observable
.interval(500, TimeUnit.MILLISECONDS)
.map { "Observable 1 $it" }
val observable2 = Observable
.interval(100, TimeUnit.MILLISECONDS)
.map { "Observable 2 $it"}
Observable.merge(observable1, observable2)
.subscribe{
println("Next: $it")
}
}
아래 출력 결과에서 알 수 있듯이 observable1이 먼저 병합 연산자에 입력이 됐음에도 불구하고 observable2가 먼저 방출된 것을 알 수 있습니다.
Next: Observable 2 0
Next: Observable 2 1
Next: Observable 2 2
Next: Observable 2 3
Next: Observable 1 0
Next: Observable 2 4
Next: Observable 2 5
Next: Observable 2 6
Next: Observable 2 7
…
mergeArray
mergeArray() 연산자는 merge 연산자는 최대 네 개의 매개 변수를 지원하므로 다섯 개 이상이 필요한 경우를 대비한 연산자입니다.
mergeArray 연산자 사용 방법은 아래와 같습니다.
fun funExam(){
val observable1 = listOf("A", "B", "C").toObservable()
val observable2 = listOf("D", "E", "F", "G").toObservable()
val observable3 = listOf("H", "I", "J", "K").toObservable()
val observable4 = listOf("L", "M", "N", "O").toObservable()
val observable5 = listOf("P", "Q", "R").toObservable()
val observable6 = listOf("S", "T", "U", "V").toObservable()
val observable7= listOf("W", "X", "Y", "Z").toObservable()
Observable.mergeArray(observable1,
observable2,
observable3,
observable4,
observable5,
observable6,
observable7)
.subscribe{
println("Next: $it")
}
}
출력 결과는 아래와 같습니다.
Next: A
Next: B
Next: C
…
Next: Z
mergeWith
mergeWith() 연산자는 Observable의 인스턴스 함수 버전 연산자로 호출당한 Observable과 호출한 Observable을 병합합니다.
위 언급한 것과 마찬가지로 mergeWith 연산자 또한 merge계열 연산자이므로 병합된 아이템의 방출은 Observable의 아이템 방출 순서를 따릅니다. 사용 방법은 아래와 같습니다.
fun funExam(){
val observable1 = listOf("Kotlin", "UML", "Basic").toObservable()
val observable2 = listOf("Python", "Java", "C++", "C").toObservable()
observable1.mergeWith(observable2)
.subscribe{
println("Next: $it")
}
}
출력 결과는 다음과 같습니다.
Next: Kotlin
Next: UML
Next: Basic
Next: Python
Next: Java
Next: C++
Next: C
mergeDelayError
병합한 Observable 중 하나에서 error 발생 시, merge 연산자도 onError() 이벤트를 발생시키고 아이템 방출을 중단 시킵니다.
onError() 이벤트로 인한 아이템 방출 중단을 미루고자 할 때 사용하는 연산자가 mergeDelayError 연산자 입니다.
즉, mergeDelayError() 연산자를 사용하면 두 개의 Observable 중 한 개에서 에러가 발생해도 나머지 Observable은 아이템 방출을 계속 수행합니다. 사용 방법은 아래와 같습니다.
fun funExam(){
val observable1 = Observable.just(1,2,3)
val observable2 = Observable.just(3).map { it/(3-it) }
val observable3 = Observable.just(4,5,6)
Observable
.mergeDelayError(observable1, observable2, observable3)
.subscribeBy(onError={
println("Error: $it")
}, onNext={
println("Next: $it")
}, onComplete={
println("Completed")
})
}
위 예제 코드는 의도적으로 두 번째 Observable이 에러가 발생하게하였습니다.
mergeDelayError 연산자가 아닌 merge 연산자를 사용하였다면 3까지만 출력이되고 onError 이벤트가 발생하고 방출이 종료되지만
mergeDelayError 연산자를 사용해 병합을 하면 아래 출력과 같이 두 번째 Observable이 에러를 발생 시켜도 나머지 Observable의 방출은 계속 진행됩니다.
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Error: java.lang.ArithmeticException: divide by zero
concat
concat() 연산자는 merge(병합) 연산자와 거의 동일하지만 병합 연산자와는 달리 지정된 순서를 유지합니다.
즉 제공된 모든 Observable을 한번에 방출하는 대신 입력된 Observable의 순서대로 아이템을 방출하고 onComplete 이벤트는 한번만 호출됩니다.
위 merge 연산자의 예제 코드 중 interval을 사용한 예제를 concat 연산자로 수정하면 다음과 같습니다.
fun funExam(){
val observable1 = Observable
.interval(500, TimeUnit.MILLISECONDS)
.take(2)
.map { "Observable 1 $it" }
val observable2 = Observable
.interval(100, TimeUnit.MILLISECONDS)
.map { "Observable 2 $it"}
Observable.concat(observable1, observable2)
.subscribe{
println("Next: $it")
}
}
concat 연산자는 입력된 Observable의 순서대로 아이템을 방출하므로 observable1이 500 ms의 시간 간격으로 2개의 아이템을 방출한 후, observable2의 아이템을 방출합니다.
출력 결과는 아래와 같습니다.
Next: Observable 1 0
Next: Observable 1 1
Next: Observable 2 0
Next: Observable 2 1
Next: Observable 2 2
…
zip
zip() 연산자는 여러 개의 Observable이나 Flowable을 사용하고 있는 상황에서 각 프로듀서에서 발생하는 방출에 특정 연산을 적용해야 하는 상황에 사용되는 연산자 입니다.
즉, zip() 연산자는 여러 Observable의 방출을 통합해서 지정된 함수를 거치게 해 새로운 방출 아이템을 생성할 수 있습니다.
이를 그림으로 설명하면 아래와 같습니다.
위 그림에서 알 수 있듯이 zip 연산자는 여러 프로듀서의 방출 아이템들을 하나의 아이템으로 누적시켜 방출 시킵니다.
fun funExam(){
val observable1 = Observable.range(1, 10)
val observable2 = Observable.range(11, 10)
Observable
.zip(observable1,observable2,
io.reactivex.functions.BiFunction<Int, Int, Int> {emission1, emission2 ->
emission1 + emission2 }
).subscribe{
println("Next: $it")
}
}
zip 연산자는 Observable 클래스의 companion object에 정의되어 있으므로 인스턴스 생성 없이도 Observable.zip을 입력해 직접 접근 할 수 있습니다. 출력 결과는 다음과 같습니다.
Next: 12
Next: 14
Next: 16
…
Next: 30
zip 연산자를 더 잘 이해하고 사용하려면 다음 사항에 유의해야 합니다.
- zip 연산자는 제공된 프로듀서가 아이템을 방출할 때마다 작동합니다.
예를 들어 zip 연산자에 세 개의 프로듀서 x,y,z를 전달했다면 x의 n번째 push와 y와 z의 n번째 push로 누적시킵니다.
- zip 운영자는 함수를 적용하기 전에 각 프로듀서가 push할 때까지 대기합니다.
예를 들어 zip 연산자의 프로듀서로 Observable.interval을 사용하면 각 push를 기다렸다가 지정된 간격으로 누적값을 push합니다.
- 어떤 프로듀서가 기다리는 아이템을 push하지 않고 onComplete 또는 onError를 알리면 다른 프로듀서의 push를 포함해 이후
모든 push를 폐기합니다.
zipWith
zipWith() 연산자는 zip 연산자의 인스턴스 버전으로 zip 연산자와 달리 Observable 인스턴스에서 호출해야 합니다.
zipWith 연산자는 두 개의 옵저버블만 zipping이 가능하므로 세 개 이상의 Observable 인스턴스의 zipping이 필요하면 zipWith 대신 zip 연산자를 사용해야 합니다.
fun funExam(){
val observable1 = Observable.range(1, 10)
val observable2 = listOf("String 1", "String 2", "String 3",
"String 4", "String 5", "String 6",
"String 7", "String 8", "String 9",
"String 10").toObservable()
observable1.zipWith(observable2, {e1: Int, e2: String ->
"$e2 $e1"
}).subscribe{
println("Next: $it")
}
}
위 코드 실행 결과는 다음과 같습니다.
Next: String 1 1
Next: String 2 2
Next: String 3 3
…
Next: String 10 10
combineLatest
combineLatest() 연산자는 제공된 Observable의 방출을 누적하는데 zip 연산자와 비슷한 방식으로 동작합니다.
두 연산자 간의 차이점은 zip 연산자는 새로운 방출을 처리하기 전에 원천 생성자 각각이 아이템을 방출하기를 기다리는데 반해,
combineLatest 연산자는 원천 프로듀서에서 배출을 받자마자 처리를 시작한다는 것 입니다.
이 연산자를 더 잘 이해하기 위해 zip, combineLatest 연산자 모두 사용하는 예제를 살펴보도록 하겠습니다.
fun funExam(){
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
Observable.zip(observable1, observable2,
BiFunction { t1, t2 ->
"t1: $t1, t2 $t2"
}).subscribe{
println("Next: $it")
}
}
위 예제 코드에서 observable1은 500 ms 간격으로 아이템을 방출하고 observable2은 1,000ms 간격으로 아이템을 방출합니다.
zip 연산자는 zipping에 필요한 Observable 들이 모두 방출을 완료된 시점인 1 sec 아이템을 방출하므로 아이템이 방출되는 속도는 제일 느린 두 번째 Observable의 방출 속도를 따릅니다.
Next: t1: 0, t2: 0
Next: t1: 1, t2: 1
Next: t1: 2, t2: 2
…
combineLatest를 사용해 위 코드를 동일하게 만들면 아래와 같습니다.
fun funExam(){
val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
Observable.combineLatest(observable1, observable2,
BiFunction { t1, t2 ->
"t1: $t1, t2 $t2"
}).subscribe{
println("Next: $it")
}
}
combineLatest는 다른 모든 원천 프로듀서에 대해 마지막으로 생성된 값을 사용해 방출된 값을 즉시 처리하고 출력하는 방식을 사용하므로 500 ms 간격으로 아래와 같은 결과를 출력합니다.
Next t1: 1, t2: 0
Next t1: 1, t2: 1
Next t1: 2, t2: 1
Next t1: 2, t2: 2
Next t1: 3, t2: 2
Next t1: 3, t2: 3
Next t1: 4, t2: 3
Next t1: 4, t2: 4
…
withLatestFrom
withLatestFrom() 연산자는 주체가 되는 Observable의 방출이 있을 때에만 zipping해서 방출하는 연산자 입니다.
위 combineLatest() 연산자에서 사용한 예제 코드를 withLatestFrom 연산자로 바꾸면 아래와 같습니다.
단, withLatestFrom 연산자는 주체가 되는 Observable이 있어야 하므로 observable1을 주체로 했습니다.
fun funExam(){
val observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
val observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
observable1.withLatestFrom(observable2,
BiFunction<Long, Long, String> {t1, t2 ->
"t1: $t1, t2 $t2"
}).subscribe{ it ->
println("Next: $it")
}
}
observable1은 1,000ms 간격으로 아이템을 방출하고 observable2은 500 ms 간격으로 아이템을 방출하지만 주체가 되는 Observable이 1,000ms 간격으로 아이템을 방출하는 observable1이므로 구독자에게 아이템 방출은 1,000ms 간격으로 이루어지며 observable2의 방출은 중간 중간 무시됩니다.
출력 결과는 아래와 같습니다.
Next: t1: 0, t2 1
Next: t1: 1, t2 3
Next: t1: 2, t2 5
Next: t1: 3, t2 7
Next: t1: 4, t2 9
Next: t1: 5, t2 11
…
정리
이번 글에서는 Reactive에서 제공하는 다양한 결합 연산자에 대해 살펴보았습니다.
결합 연산자에는 입력 순서에 관계 없이 방출을 하는 merge 계열 연산자와 입력 순서에 맞게 방출하는 concat 연산자 그리고 Observable의 방출 값을 연산하여 새롭게 정의된 값으로 방출하는 zip 계열 연산자와 combineLatest, withLatestFrom 연산자들에 대해 알아보았습니다.
다음 글에서는 조건 연산자에 대해 설명하도록 하겠습니다.
'Reactive' 카테고리의 다른 글
10. Reactive - 오류 처리 연산자 (0) | 2021.06.25 |
---|---|
9. Reactive - 조건 연산자 (0) | 2021.06.22 |
7. Reactive - 필터 연산자 (0) | 2021.06.16 |
6. Reactive - 변환 연산자 (0) | 2021.06.14 |
5. Reactive - 연산자 구분 (0) | 2021.06.12 |