brunch

You can make anything
by writing

C.S.Lewis

[RxJava] #8 뜨거운 Observable 만들기

들어가는 글: 7화에서는 차가운 Observable과 뜨거운 Observable의 개념에 대해서 알아보았습니다. 그러면 뜨거운 Observable은 어떻게 만드는지 알아보도록 하겠습니다. 


1. ConnectableObservable 클래스 


뜨거운 Observable을 만드는 방법은 "차가운 Observable을 변환"하는 것입니다. 

가장 손쉽게 할 수 있는 방법은 ConnectableObservable 클래스를 활용하는 것입니다. 


ConnectableObservable의 javadoc 주소는 아래와 같습니다. 

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/observables/ConnectableObservable.html 


ConnectableObservable의 마블 다이어그램

이 클래스와 연관된 주요 메서드는 아래와 같습니다.

Observable 클래스

1) publish() : Observable을 ConnectableObservable로 변환해줍니다.   

2) subscribe() : 원래 알던 메서드지만.. subscribe()를 해도 데이터가 나오지 않습니다.  


ConnectableObservable 클래스 

1) connect() : 이 메서드를 호출하면 그제야 데이터가 나옵니다. 

2) refCount() : 몇 명의 구독자가 있는지 알려줍니다. 


여기서 주목해야 할 메서드는 subscribe()와 connect() 입니다. publish()는 ConnectableObservable을 생성하는 메서드이니 상식적으로 이해를 할 수 있습니다. 


예를 들어 Observable에 {1,2,3} 이라는 데이터가 있다고 하면 

그것을 subscribe()하면 항상 1,2,3 이 차례로 발행됩니다. 심지어는 10초를 쉬고 새로운 구독자가 subscribe()를 해도 다시 1,2,3이 동일하게 나옵니다. 이것은 앞서 말씀드렸던 차가운 Observable의 성격입니다. 


하지만 ConnectableObservable로 바꿔놓으면 

1) 구독자를 동시에 대기시킬 수 있습니다. 

> 구독자 #1, #2, #3을 subscribe() 해놓고 마지막에 connect()를 하면 동시에 같은 데이터를 받을 수 있습니다. 이것이 뜨거운 Observable의 성격입니다. 


2) connect() 한 이후에 subscribe()를 하게 되면 앞서 발행되었던 데이터를 받을 수 없습니다. 

> 이것도 뜨거운 Observable의 성격입니다. 최신의 데이터만 필요하거나 앞서 발행되었던 데이터는 불필요한 경우에 활용할 수 있습니다. 


ConnectableObservable에 대한 간단한 예제는 아래의 wiki를 참고하세요 

https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators 


두번째로 뜨거운 Observable을 만드는 방법은 Subject를 이용하는 것입니다. 

여기에서는 가장 단순한 PublishSubject에 대해서 알아보도록 하겠습니다. 


2. PublishSubject 클래스 


Subject 클래스는 Observable과 Observer(구독자)를 모두 구현하고 있는 클래스입니다. 한 객체로 Observable도 되고 Observer도 된다고 생각하시면 됩니다. 


Subject 타입의 s 변수가 있다면 그것으로 

> onNext() 

> onError() 

> onComplete() 도 호출할 수 있고 한편 

> subscribe()도 호출할 수 있습니다. 


앞서 Observable 객체에 {1, 2, 3}의 데이터가 있다면 지금 subscribe()를 하던 아니면 10초 후에 subscribe()를 하던 구독자는 동일한 데이터를 받을 수 있다고 했습니다. 하지만 Subject 객체에 대해서는 그럴수가 없습니다. 이미 onNext()를 해버린 데이터는 그 이후에 subscribe()를 하면 데이터를 받을 수 없습니다. 따라서 "뜨거운" Observable의 성격을 가집니다. 


PublishSubject 클래스의 마블 다이어그램은 아래와 같습니다. 

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html 


PublishSubject 클래스의 마블 다이어그램 

실시간으로 런타임에서 데이터를 발행하고 그때 구독한 구독자에게만 데이터가 전달됩니다. 우리가 전통적으로 알고 있는 Observer 패턴에 가장 유사한 형태입니다. 


PublishSubject의 간단한 예제는 위의 javadoc에서 발견할 수 있습니다. 

PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onComplete events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onComplete
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();

  

Subject에는 그외에 AsyncSubject, BehaviorSubject, ReplaySubject 등의 클래스에 있습니다. 이에 대한 내용은 ReactiveX 홈페이지를 참고하세요 

http://reactivex.io/documentation/subject.html 


오늘은 "뜨거운" Observable을 만드는 방법에 대해 알아보았습니다. 

9화에서는 RxJava의 예외 처리에 대해서 알아보도록 하겠습니다. 지금까지 Java 프로그래밍할 때 접하던 예외처리(try-catch)와는 개념이 다릅니다. 


즐거운 주말 되세요 

2017.7.2 

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari