brunch

매거진 ReactiveX

You can make anything
by writing

C.S.Lewis

by Tilltue Aug 14. 2016

RxSwift, Observable Utillity

doOn, ObserveOn, timeout, using 등

* 이 포스트는 RxSwift 4.3.1, swift 4.2 버전을 기준으로 작성되었습니다.

RxSwift 의 Observable 의 여러 이벤트 메서드 관련 utility 함수들에 대해 알아보자.



1. doOn ( doOnNext , doOnError, doOnCompleted )

Observable 의 이벤트( next 등)가 발생할때 이벤트 핸들러를 등록 할 수 있다.

subscribe 시점이 아닌 이벤트 발생시점에 처리할 작업이 있을때 사용한다.


예제

            let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

            observable.do(onNext: { event in

                print("doOn next: \(event)")

            }).subscribe{ event in

                print("subscribe: \(event)")

            }.disposed(by: disposeBag)

결과

doOn next: 0

subscribe: next(0)

doOn next: 1

subscribe: next(1)

doOn next: 2

subscribe: next(2)


RxSwift  내 Do.swift 의 함수 구현부이다.

       func on(_ event: Event<Element>) {

        do {

            try _parent._eventHandler(event) // doOn next

            forwardOn(event)                              // subscribe next

            if event.isStopEvent {

                dispose()

            }

        }

        catch let error {

            forwardOn(.error(error))

            dispose()

        }

    }



2. ObserveOn

ObserveOn 이 호출된 다음 메서드가 수행될 스케쥴러를 지정할 수 있다.



예제

doOn next 는 메인 스레드, subscribe next 는 background 스레드로 지정하는 예제이다.

            let observeOnTest = ["rabbit","fox","fish","fox","cat"]

            Observable.from(observeOnTest).observeOn(MainScheduler.instance)

            .do(onNext: { _ in

                print("doOnNext \(Thread.isMainThread)")

            }).observeOn(ConcurrentDispatchQueueScheduler(qos: .background))

            .subscribe(onNext: { _ in

                print("subscribeNext \(Thread.isMainThread)")

            }).addDisposableTo(disposeBag)



결과

doOnNext true

doOnNext true

doOnNext true

doOnNext true

doOnNext true

subscribeNext false

subscribeNext false

subscribeNext false

subscribeNext false

subscribeNext false


do(OnNext 는 메인스레드, subscribe(onNext 는 백그라운드 스레드에서 불려진 것을 볼수 있다.


3. SubscribeOn

Observable 이 수행될 스케쥴러를 지정한다.

reactivex 에서 설명한 위와 같은 이벤트 체인을 예제로 구성했다.

예제

            let test = Observable<String>.create { observer in

                for count in 1...3 {

                    print("observable \(Thread.isMainThread)")

                    observer.on(.next("\(count)"))

                }

                observer.on(.completed)

                return Disposables.create {

                    print("dispose")

                }

            }

            test.observeOn(MainScheduler.instance)

            .map{ (intValue) -> String in

                print("map \(Thread.isMainThread)")

                return "\(intValue)"

            }

            .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))

            .do(onNext: { event in

                print("doOn next \(Thread.isMainThread)")

            })

            .observeOn(ConcurrentDispatchQueueScheduler(qos: .background))

            .subscribe{ event in

                print("subscribe next \(Thread.isMainThread)")

            }.addDisposableTo(disposeBag)



결과

- obsableble 은 subscribeOn에 의해 background thread 에서 동작한다.

observable false

observable false

observable false

dispose

- map 은 observeOn 에 의해 main thread 에서 이벤트를 가공한다. -> doOn 에 체인

map true

doOn next true

map true

doOn next true

map true

doOn next true

- subscribe 는 observeOn에 의해 background thread 에서 이벤트를 수신한다.

subscribe false next(1)

subscribe false next(2)

subscribe false next(3)

subscribe false completed


복잡해 보이지만 SubscribeOn은 Observable 이 어디서 동작할지 결정한다는 것이 중요 포인트이다.


4. timeout

지정된 시간동안 이벤트가 발생하지 않으면 error 를 방출한다.


예제

            let observable = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance).filter { $0 < 3 }

            observable.timeout(1, scheduler: MainScheduler.instance)

            .do(onNext: { item in

                print(item)

            }, onError: { _ in

                print("error")

            }, onCompleted:nil).subscribe( onNext: { event in

                print(event)

            }).addDisposableTo(disposeBag)



결과

0

0

1

1

2

2

error


5. materialize, dematerialize


이벤트가 전달될때 어떤 이벤트인지도 같이 전달된다.



maerialize 된 이벤트를 다시 일반 이벤트 발생형태로 변경한다.

Dematerialize.swift 파일에 정의된 것과 같이 EventConvertible 일때 사용가능하다.


extension ObservableType where E: EventConvertible {

    public func dematerialize() -> Observable<E.ElementType> {

        return Dematerialize(source: self.asObservable())

    }

}



예제

        let timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)

        timer.materialize().subscribe{ event in

            print(event)

        }.disposed(by: disposeBag)


결과

next(next(0))

next(next(1))

next(next(2))

next(next(3))


6. using

Observable 과 생명주기를 함께하는 일회용 disposable (resource)을 지정할수 있다.


매개 변수

resourceFactory : 리소스 객체를 얻기위한 팩토리 함수

observableFactory: resouce에 연결된 Observable 을 반환하는 팩토리 함수


함수 원형

func using<Resource: Disposable>

                    (_ resourceFactory: @escaping () throws -> Resource, 

                     observableFactory: @escaping (Resource) throws -> Observable<E>) 

                    -> Observable<E>


예제

        class ResouceDisposable: Disposable {

                func dispose() {

                    print("dispose")

                }

        }        

        Observable.using({ () -> ResouceDisposable in

            return ResouceDisposable()

        }) { (disposable) in

            return Observable<Int>.interval(1, scheduler: MainScheduler.instance)

        }.debug().take(3).subscribe().disposed(by: disposeBag)


(usingTest()) -> subscribed

(usingTest()) -> Event next(0)

(usingTest()) -> Event next(1)

(usingTest()) -> Event next(2)

(usingTest()) -> isDisposed

dispose


Observable 이 dispose될때 resource 로 지정한 disposable 이 dispose 된 것을 확인할수 있다.



개인적인 의견 : ReactiveX  의 장점인 "이벤트를 발생하는 Observable 과 이것을 관찰하는 Observer 를 통해 비동기 이벤트들을 쉽고 안전하게 처리할 수 있는 라이브러리" 의 설명처럼 위의 내용중 스케쥴러 지정은 중요한 내용이고, 빈번하게 사용하게 될 것이다.

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari