doOn, ObserveOn, timeout, using 등
* 이 포스트는 RxSwift 4.3.1, swift 4.2 버전을 기준으로 작성되었습니다.
RxSwift 의 Observable 의 여러 이벤트 메서드 관련 utility 함수들에 대해 알아보자.
Observable 의 이벤트( next 등)가 발생할때 이벤트 핸들러를 등록 할 수 있다.
subscribe 시점이 아닌 이벤트 발생시점에 처리할 작업이 있을때 사용한다.
예제
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.do(onNext: { event in
print("doOn next: \(event)")
}).subscribe{ event in
print("subscribe: \(event)")
}.disposed(by: disposeBag)
결과
doOn next: 0
subscribe: next(0)
doOn next: 1
subscribe: next(1)
doOn next: 2
subscribe: next(2)
RxSwift 내 Do.swift 의 함수 구현부이다.
func on(_ event: Event<Element>) {
do {
try _parent._eventHandler(event) // doOn next
forwardOn(event) // subscribe next
if event.isStopEvent {
dispose()
}
}
catch let error {
forwardOn(.error(error))
dispose()
}
}
ObserveOn 이 호출된 다음 메서드가 수행될 스케쥴러를 지정할 수 있다.
예제
doOn next 는 메인 스레드, subscribe next 는 background 스레드로 지정하는 예제이다.
let observeOnTest = ["rabbit","fox","fish","fox","cat"]
Observable.from(observeOnTest).observeOn(MainScheduler.instance)
.do(onNext: { _ in
print("doOnNext \(Thread.isMainThread)")
}).observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { _ in
print("subscribeNext \(Thread.isMainThread)")
}).addDisposableTo(disposeBag)
결과
doOnNext true
doOnNext true
doOnNext true
doOnNext true
doOnNext true
subscribeNext false
subscribeNext false
subscribeNext false
subscribeNext false
subscribeNext false
do(OnNext 는 메인스레드, subscribe(onNext 는 백그라운드 스레드에서 불려진 것을 볼수 있다.
Observable 이 수행될 스케쥴러를 지정한다.
reactivex 에서 설명한 위와 같은 이벤트 체인을 예제로 구성했다.
예제
let test = Observable<String>.create { observer in
for count in 1...3 {
print("observable \(Thread.isMainThread)")
observer.on(.next("\(count)"))
}
observer.on(.completed)
return Disposables.create {
print("dispose")
}
}
test.observeOn(MainScheduler.instance)
.map{ (intValue) -> String in
print("map \(Thread.isMainThread)")
return "\(intValue)"
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.do(onNext: { event in
print("doOn next \(Thread.isMainThread)")
})
.observeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe{ event in
print("subscribe next \(Thread.isMainThread)")
}.addDisposableTo(disposeBag)
결과
- obsableble 은 subscribeOn에 의해 background thread 에서 동작한다.
observable false
observable false
observable false
dispose
- map 은 observeOn 에 의해 main thread 에서 이벤트를 가공한다. -> doOn 에 체인
map true
doOn next true
map true
doOn next true
map true
doOn next true
- subscribe 는 observeOn에 의해 background thread 에서 이벤트를 수신한다.
subscribe false next(1)
subscribe false next(2)
subscribe false next(3)
subscribe false completed
복잡해 보이지만 SubscribeOn은 Observable 이 어디서 동작할지 결정한다는 것이 중요 포인트이다.
지정된 시간동안 이벤트가 발생하지 않으면 error 를 방출한다.
예제
let observable = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance).filter { $0 < 3 }
observable.timeout(1, scheduler: MainScheduler.instance)
.do(onNext: { item in
print(item)
}, onError: { _ in
print("error")
}, onCompleted:nil).subscribe( onNext: { event in
print(event)
}).addDisposableTo(disposeBag)
결과
0
0
1
1
2
2
error
이벤트가 전달될때 어떤 이벤트인지도 같이 전달된다.
maerialize 된 이벤트를 다시 일반 이벤트 발생형태로 변경한다.
Dematerialize.swift 파일에 정의된 것과 같이 EventConvertible 일때 사용가능하다.
extension ObservableType where E: EventConvertible {
public func dematerialize() -> Observable<E.ElementType> {
return Dematerialize(source: self.asObservable())
}
}
예제
let timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
timer.materialize().subscribe{ event in
print(event)
}.disposed(by: disposeBag)
결과
next(next(0))
next(next(1))
next(next(2))
next(next(3))
Observable 과 생명주기를 함께하는 일회용 disposable (resource)을 지정할수 있다.
매개 변수
resourceFactory : 리소스 객체를 얻기위한 팩토리 함수
observableFactory: resouce에 연결된 Observable 을 반환하는 팩토리 함수
함수 원형
func using<Resource: Disposable>
(_ resourceFactory: @escaping () throws -> Resource,
observableFactory: @escaping (Resource) throws -> Observable<E>)
-> Observable<E>
예제
class ResouceDisposable: Disposable {
func dispose() {
print("dispose")
}
}
Observable.using({ () -> ResouceDisposable in
return ResouceDisposable()
}) { (disposable) in
return Observable<Int>.interval(1, scheduler: MainScheduler.instance)
}.debug().take(3).subscribe().disposed(by: disposeBag)
(usingTest()) -> subscribed
(usingTest()) -> Event next(0)
(usingTest()) -> Event next(1)
(usingTest()) -> Event next(2)
(usingTest()) -> isDisposed
dispose
Observable 이 dispose될때 resource 로 지정한 disposable 이 dispose 된 것을 확인할수 있다.
개인적인 의견 : ReactiveX 의 장점인 "이벤트를 발생하는 Observable 과 이것을 관찰하는 Observer 를 통해 비동기 이벤트들을 쉽고 안전하게 처리할 수 있는 라이브러리" 의 설명처럼 위의 내용중 스케쥴러 지정은 중요한 내용이고, 빈번하게 사용하게 될 것이다.