brunch
매거진 ReactiveX

RxSwift, Observable Utillity

doOn, ObserveOn, timeout, using 등

by Tilltue

* 이 포스트는 RxSwift 4.3.1, swift 4.2 버전을 기준으로 작성되었습니다.

RxSwift 의 Observable 의 여러 이벤트 메서드 관련 utility 함수들에 대해 알아보자.



1. doOn ( doOnNext , doOnError, doOnCompleted )

Observable 의 이벤트( next 등)가 발생할때 이벤트 핸들러를 등록 할 수 있다.

subscribe 시점이 아닌 이벤트 발생시점에 처리할 작업이 있을때 사용한다.


예제

let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

observable.do(onNext: { event in

print("doOn next: \(event)")

}).subscribe{ event in

print("subscribe: \(event)")

}.disposed(by: disposeBag)

결과

doOn next: 0

subscribe: next(0)

doOn next: 1

subscribe: next(1)

doOn next: 2

subscribe: next(2)


RxSwift 내 Do.swift 의 함수 구현부이다.

func on(_ event: Event<Element>) {

do {

try _parent._eventHandler(event) // doOn next

forwardOn(event) // subscribe next

if event.isStopEvent {

dispose()

}

}

catch let error {

forwardOn(.error(error))

dispose()

}

}



2. ObserveOn

ObserveOn 이 호출된 다음 메서드가 수행될 스케쥴러를 지정할 수 있다.



예제

doOn next 는 메인 스레드, subscribe next 는 background 스레드로 지정하는 예제이다.

let observeOnTest = ["rabbit","fox","fish","fox","cat"]

Observable.from(observeOnTest).observeOn(MainScheduler.instance)

.do(onNext: { _ in

print("doOnNext \(Thread.isMainThread)")

}).observeOn(ConcurrentDispatchQueueScheduler(qos: .background))

.subscribe(onNext: { _ in

print("subscribeNext \(Thread.isMainThread)")

}).addDisposableTo(disposeBag)



결과

doOnNext true

doOnNext true

doOnNext true

doOnNext true

doOnNext true

subscribeNext false

subscribeNext false

subscribeNext false

subscribeNext false

subscribeNext false


do(OnNext 는 메인스레드, subscribe(onNext 는 백그라운드 스레드에서 불려진 것을 볼수 있다.


3. SubscribeOn

Observable 이 수행될 스케쥴러를 지정한다.

schedulers.png

reactivex 에서 설명한 위와 같은 이벤트 체인을 예제로 구성했다.

예제

let test = Observable<String>.create { observer in

for count in 1...3 {

print("observable \(Thread.isMainThread)")

observer.on(.next("\(count)"))

}

observer.on(.completed)

return Disposables.create {

print("dispose")

}

}

test.observeOn(MainScheduler.instance)

.map{ (intValue) -> String in

print("map \(Thread.isMainThread)")

return "\(intValue)"

}

.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))

.do(onNext: { event in

print("doOn next \(Thread.isMainThread)")

})

.observeOn(ConcurrentDispatchQueueScheduler(qos: .background))

.subscribe{ event in

print("subscribe next \(Thread.isMainThread)")

}.addDisposableTo(disposeBag)



결과

- obsableble 은 subscribeOn에 의해 background thread 에서 동작한다.

observable false

observable false

observable false

dispose

- map 은 observeOn 에 의해 main thread 에서 이벤트를 가공한다. -> doOn 에 체인

map true

doOn next true

map true

doOn next true

map true

doOn next true

- subscribe 는 observeOn에 의해 background thread 에서 이벤트를 수신한다.

subscribe false next(1)

subscribe false next(2)

subscribe false next(3)

subscribe false completed


복잡해 보이지만 SubscribeOn은 Observable 이 어디서 동작할지 결정한다는 것이 중요 포인트이다.


4. timeout

지정된 시간동안 이벤트가 발생하지 않으면 error 를 방출한다.


예제

let observable = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance).filter { $0 < 3 }

observable.timeout(1, scheduler: MainScheduler.instance)

.do(onNext: { item in

print(item)

}, onError: { _ in

print("error")

}, onCompleted:nil).subscribe( onNext: { event in

print(event)

}).addDisposableTo(disposeBag)



결과

0

0

1

1

2

2

error


5. materialize, dematerialize


이벤트가 전달될때 어떤 이벤트인지도 같이 전달된다.

materialize.c.png



maerialize 된 이벤트를 다시 일반 이벤트 발생형태로 변경한다.

Dematerialize.swift 파일에 정의된 것과 같이 EventConvertible 일때 사용가능하다.


extension ObservableType where E: EventConvertible {

public func dematerialize() -> Observable<E.ElementType> {

return Dematerialize(source: self.asObservable())

}

}


dematerialize.c.png


예제

let timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

timer.materialize().subscribe{ event in

print(event)

}.disposed(by: disposeBag)


결과

next(next(0))

next(next(1))

next(next(2))

next(next(3))


6. using

Observable 과 생명주기를 함께하는 일회용 disposable (resource)을 지정할수 있다.


매개 변수

resourceFactory : 리소스 객체를 얻기위한 팩토리 함수

observableFactory: resouce에 연결된 Observable 을 반환하는 팩토리 함수


함수 원형

func using<Resource: Disposable>

(_ resourceFactory: @escaping () throws -> Resource,

observableFactory: @escaping (Resource) throws -> Observable<E>)

-> Observable<E>


예제

class ResouceDisposable: Disposable {

func dispose() {

print("dispose")

}

}

Observable.using({ () -> ResouceDisposable in

return ResouceDisposable()

}) { (disposable) in

return Observable<Int>.interval(1, scheduler: MainScheduler.instance)

}.debug().take(3).subscribe().disposed(by: disposeBag)


(usingTest()) -> subscribed

(usingTest()) -> Event next(0)

(usingTest()) -> Event next(1)

(usingTest()) -> Event next(2)

(usingTest()) -> isDisposed

dispose


Observable 이 dispose될때 resource 로 지정한 disposable 이 dispose 된 것을 확인할수 있다.



개인적인 의견 : ReactiveX 의 장점인 "이벤트를 발생하는 Observable 과 이것을 관찰하는 Observer 를 통해 비동기 이벤트들을 쉽고 안전하게 처리할 수 있는 라이브러리" 의 설명처럼 위의 내용중 스케쥴러 지정은 중요한 내용이고, 빈번하게 사용하게 될 것이다.

keyword
매거진의 이전글RxSwift, 둘 중 하나만 Subscribe 받기