Observable 생성 및 subscribe 의 이해
* 이 포스트는 RxSwift 4.3.1, swift 4.2 버전을 기준으로 작성되었습니다.
* RxSwift 라이브러리의 Observable+Creation.swift 에 관련된 내용
* interval 은 이전 글에서 다루었기에 생략한다.
가장 기본적인 Observable 생성메서드이다.
함수 원형
func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E>
let createTest = Observable<String>.create { observer -> Disposable in
observer.on(.next("event emit"))
observer.on(.completed)
return Disposables.create()
}.debug()
어떤 이벤트를 발생할지 Generic Type을 받는다.
observer.on(.next("event emit")) 는 이벤트를 발생시키는 구문이다.
observer.on(.completed) 는 observable 을 completed 시킨다.
observer.on(.error(error)) 는 error 를 발생시킨다.
Disposables.create() 를 통해 disposable 을 리턴한다.
Cold Observable로 Observable은 subscribe 되면 이벤트를 발생하기 시작한다.
on(.error(error)) 와 on(completed) 는 상호 베타적인 관계이므로 같이 사용하지 않는다.
* debug() 를 통해 Observable 의 동작과정을 로그로 볼수 있다.
Disposable 에도 여러 종류가 있으며 아래의 글에 정리했다.
RxSwift 의 ObservableType.swift ObservableType+Extensions.swift 파일에 정의 되어있다.
1. func subscribe(onNext: ((Self.E) -> Void)? = default, onError: ((Error) -> Void)? = default, onCompleted: (() -> Void)? = default, onDisposed: (() -> Void)? = default) -> Disposable
2. func subscribe(_ on: @escaping (RxSwift.Event<Self.E>) -> Void) -> Disposable
3. func subscribe<O>(_ observer: O) -> Disposable where O : ObserverType, Self.E == O.E
첫번째
createTest.subscribe(onNext: ((String) -> Void)?,
onError: ((Error) -> Void)?,
onCompleted: (() -> Void)?,
onDisposed: (() -> Void)?)
처리하고자 하는 이벤트를 골라 closure 를 설정할수 있다.
* closure 에서 self 에 접근할 경우에는 순환참조가 걸리지 않게 특히 유의 하자.
[weak self] 등을 통해 순환참조를 막자.
첫번째 함수 원형을 사용한 예)
createTest.subscribe(onNext: { [weak self] event in
print(event)
}).disposed(by: disposeBag)
결과
(createTest) -> subscribed
(createTest) -> Event next(event emit)
event emit
(createTest) -> Event completed
(createTest) -> isDisposed
Observable의 debug() 로그가 빨간색이고, subscribe 에서 print 한 내용이 녹색이다.
subscribe 가 되자 이벤트를 발생하기 시작했고, subscribe(onNext: 가 호출되는 것을 확인할수 있다.
이후 complete 되고 Observable 은 dispose 되었다.
두번째
createTest.subscribe(_ on: ((Event<String>) -> Void))
이벤트를 전달받는다.
createTest.subscribe { (event) in
print(event)
switch event {
case .next(let string):
break
case .completed:
break
case .error(let error):
break
}
}.disposed(by: disposeBag)
세번째
class Observer: ObserverType {
typealias E = String
func on(_ event: Event<String>) {
print(event)
}
}
let observer = Observer()
createTest.subscribe(observer)
observer 를 만들어서 구독할수도 있다. 이경우 ObserverType 프로토콜을 따라야 한다.
종료를 결정하는 조건식 과 반복문을 가진 Observable 생성 함수이다.
함수 원형
func generate(initialState: Self.E,
condition: @escaping (Self.E) throws -> Bool,
scheduler: ImmediateSchedulerType = default,
iterate: @escaping (Self.E) throws -> Self.E) -> RxSwift.Observable<Self.E>
let generateTest = Observable.generate(initialState: 1, condition: { $0 < 30 }, iterate: { $0 + 10 })
1. 초기값을 가지고 첫 이벤트를 발생시킨다.
2. 조건식을 통해 Observable 의 종료(completed)를 결정한다.
3. 이전 이벤트 값으로 다음 이벤트 값을 가공하는 함수를 가진다.
세번째 parameter 로 scheduler 를 전달할 수 있는데 기본적으로는 현재 thread 를 사용한다.
( scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance )
별도의 thread 에서 동작하도록 하고 싶다면 scheduler를 전달하자.
결과
(generateTest) -> subscribed
(generateTest) -> Event next(1)
(generateTest) -> Event next(11)
(generateTest) -> Event next(21)
(generateTest) -> Event completed
(generateTest) -> isDisposed
31 이벤트는 조건식에서 false 가 되므로 completed 된것을 볼 수 있다.
단일 이벤트를 발생하는 Observable 을 생성한다.
함수 원형
public static func just(_ element: E) -> Observable<E>
public static func just(_ element: E, scheduler: ImmediateSchedulerType) -> Observable<E>
스케쥴러를 설정할수 있다.
let justTest = Observable<String>.just("just one").debug()
justTest.subscribe { event in
print(event)
}.disposed(by: disposeBag)
결과
(justTest) -> subscribed
(justTest) -> Event next(just one)
next(just one)
(justTest) -> Event completed
(justTest) -> isDisposed
하나의 이벤트만 발생하고 종료된것을 확인할수 있다.
각각 이벤트가 없이 종료되거나, 종료되지 않거나, error 를 리턴하며 종료되는 Observable 이다.
예를 들자면, 조건을 가지고 Observable 을 생성하는 경우에, 생성 조건이 맞지 않아 Observable 을 nil 등으로 리턴하여 에러처리 하는 것 보다 Observable 생성이 되고 빈 이벤트 전달 후에 complete 되거나, 종료되지 않거나, 에러를 리턴하고 싶을때 사용 된다. ( 구조상 이벤트 처리의 의미가 에러 처리보다 더 분명하게 사용될때 )
TBD: "조건에 따라 Observable 생성"을 설명할때 다시 설명
순차적으로 이벤트를 발생한다.
함수 원형
func of(_ elements: E ...,
scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance)
-> Observable<E>
func from(_ array: [E],
scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance)
-> Observable<E>
let ofTest = Observable<String>.of("My","name","is","tom").debug()
ofTest.subscribe { event in
print(event)
}.disposed(by: disposeBag)
결과
(ofTest) -> subscribed
(ofTest) -> Event next(My)
next(My)
(ofTest) -> Event next(name)
next(name)
(ofTest) -> Event next(is)
next(is)
(ofTest) -> Event next(tom)
next(tom)
(ofTest) -> Event completed
completed
(ofTest) -> isDisposed
이 예제 만으로는 딱히 of 가 필요한지 잘 이해되지 않을 수 있다.
이해를 돕자면 Subject 두개를 merge() 할때 쓰이곤 한다.
let array = ["My","name","is","tom"]
let fromTest = Observable<String>.from(array)
fromTest.subscribe { event in
print(event)
}.disposed(by: disposeBag)
결과
of 와 동일하다.
lazy initialize Observable 생성자 이다.
subscribe 가 발생할때 Observable이 생성되므로, 메모리관리에 효율적이다.
Observable 생성이 메모리를 많이 차지하는 등, lazy 한 생성이 효율적일때 사용한다.
함수 원형
func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
-> Observable<E>
let deferTest = Observable<String>.deferred({Observable.just("defer")}).debug()
deferTest.subscribe { event in
print(event)
}.disposed(by: disposeBag)
설정한 element 로 반복적으로 이벤트를 발생.
설정한 Range 내의 이벤트를 발생. ( Int 타입만 가능 )
함수 원형
func repeatElement(_ element: E,
scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance)
-> Observable<E>
func range(start: E,
count: E,
scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance)
-> Observable<E>
let repeatTest = Observable<String>.repeatElement("repeat")
repeatTest.subscribe { event in
print(event)
}.disposed(by: disposeBag)
결과
next(repeat)
next(repeat)
next(repeat)
...
let rangeTest = Observable<Int>.range(start: 0, count: 3)
rangeTest.subscribe { event in
print(event)
}.disposed(by: disposeBag)
결과
Next(3)
Next(4)
Next(5)
Completed
다음 매거진 : "Subject 알아보기"