Combine observable, Thread, Subject
* 이 포스트는 RxSwift 5.0.1, swift 5.1 버전을 기준으로 작성되었습니다.
Scheduler 관련 RxSwift 코드를 살펴보는 중에 다음과 같은 의문이 생겼다.
서로 다른 scheduler 에서 수행되는 Observable의 이벤트는 합성될때 어떤 Scheduler를 기준으로 이벤트가 발생될까?
몇가지 실험을 해보았고 해당 내용으로 글을 작성해봤다.
(왜 이런걸 궁금하게 되었는지 시발점은 최하단에 별첨)
주) 이벤트를 구독하는 쪽에서 특정 스레드에서 동작해야 한다면 observeOn 를 사용하면 된다.
두개의 옵저버블에서 발생한 마지막 이벤트를 합성해주는 combineLatest 는 위와 같이 동작한다.
합성될 마지막 이벤트가 발생한 스레드에서 이벤트가 발생된다.
예제
let scheduler = SerialDispatchQueueScheduler(qos: .userInitiated)
let ob1 = Observable<Int>.create { observer -> Disposable in
usleep(100)
observer.on(.next(1))
usleep(100)
observer.on(.next(2))
return Disposables.create()
}.subscribeOn(self.scheduler)
.do(onNext: { print("ob1 \($0) \(Thread.current)") })
let ob2 = Observable<String>.create { observer -> Disposable in
usleep(150)
observer.on(.next("a"))
usleep(150)
observer.on(.next("b"))
return Disposables.create()
}.subscribeOn(MainScheduler.instance)
.do(onNext: { print("ob2 \($0) \(Thread.current)") })
Observable.combineLatest(ob1, ob2)
.subscribe(onNext: { (e1, e2) in
print("\(e1) \(e2) \ncombineLatest subscribe \(Thread.current)")
}).disposed(by: self.disposeBag)
결과
ob1 1 <NSThread: 0x600001832440>{number = 5, name = (null)}
ob2 a <NSThread: 0x60000184ac00>{number = 1, name = main}
1 a
combineLatest subscribe <NSThread: 0x60000184ac00>{number = 1, name = main}
ob1 2 <NSThread: 0x600001832440>{number = 5, name = (null)}
2 a
combineLatest subscribe <NSThread: 0x600001832440>{number = 5, name = (null)}
ob2 b <NSThread: 0x60000184ac00>{number = 1, name = main}
2 b
combineLatest subscribe <NSThread: 0x60000184ac00>{number = 1, name = main}
로그에서 보이는 것처럼 마지막 합성 이벤트를 발생시킨 observable 의 thread 와 동일한 것을 알수 있다.
이건 아마 다들 예상 했을 것으로 보인다.
메인 옵저버블의 이벤트가 발생한 스레드에서 이벤트가 발생한다.
예제는 생략
합성될 마지막 이벤트가 발생한 스레드에서 이벤트가 발생한다.
예제
let ob1 = Observable<Int>.create { observer -> Disposable in
usleep(100)
observer.on(.next(1))
usleep(200)
observer.on(.next(2))
return Disposables.create()
}.subscribeOn(self.scheduler)
.do(onNext: { print("ob1 \($0) \(Thread.current)") })
let ob2 = Observable<String>.create { observer -> Disposable in
usleep(300)
observer.on(.next("a"))
usleep(600)
observer.on(.next("b"))
return Disposables.create()
}.subscribeOn(MainScheduler.instance)
.do(onNext: { print("ob2 \($0) \(Thread.current)") })
Observable.zip(ob1, ob2)
.subscribe(onNext: { (e1, e2) in
print("\(e1) \(e2) \nzip-1 ob1,ob2 subscribe \(Thread.current)")
}).disposed(by: self.disposeBag)
결과
ob2 a <NSThread: 0x600001c0ea80>{number = 1, name = main}
ob1 1 <NSThread: 0x600001c12180>{number = 6, name = (null)}
1 a
zip-1 ob1,ob2 subscribe <NSThread: 0x600001c12180>{number = 6, name = (null)}
ob1 2 <NSThread: 0x600001c12180>{number = 6, name = (null)}
ob2 b <NSThread: 0x600001c0ea80>{number = 1, name = main}
2 b
zip-1 ob1,ob2 subscribe <NSThread: 0x600001c0ea80>{number = 1, name = main}
어느정도 규칙을 눈치챘으리라, 합성될 기준이 되는 이벤트가 발생한 스레드에서 이벤트가 발생한다는 것
더이상의 실험은 무의미 할것으로 생각하고, 사실 이건 중요한 내용은 아니다.
구독받는 쪽에서 스레드를 특정해야 한다면 observeOn을 사용하는 것이 일반적이니까..
Synchronization anomaly was detected.
어느날 이 warnings 메시지를 보게되었다.
이 메시지는
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Rx.swift
링크에 있는 Rx.swift 파일에서 감지되어 보여지는 warnings 이다.
이 메시지를 접한 경로는 다음과 같았다.
Hot Observable 인 Subject 에 onNext 를 통해 이벤트를 발생시키려고 할때,
이전 onNext 가 끝나기 전에 다른 onNext 이벤트를 발생시키려고 했을때.
즉 서로 다른 스레드에서 Subject 에 onNext 를 동시에 수행하려고 했을때 발생한 문제였다.
위 링크에 있는 warnings 메시지에 세부 내용을 보면, 위와 같은 상황에 대한 내용이 적혀있고,
의도한 상황이라면 시퀀스 동기화를 수행해줘야 한다는 내용이 적혀있다.
결론적으로 Subject 에 이벤트를 발생시키려고 할때 서로 다른 스레드에서 발생시키는 상황이라면 동기화가 필요하다는것. (동기화 하는 방법은 보편적인 방법들이라서 예는 생략)
다른 언어쪽에 이런 것들이 필요한지 찾아봤지만 ( RxJava ) 이런 내용 ( 스레드 카운팅 ) 은 찾아보진 못했다.
짧게 찾아본것이라 못찾은 것일수도...
마침.
주) 이 글은 추후 업데이트 될수 있음. ( 생략한 실험들을 더 한다거나 등등)