코드로 파헤치는 Project Reactor (1편)
Spring Framework의 리액티브(reactive) 라이브러리의 기반이 되는 모듈은 reactor-core(이하 Reactor)이다. Mono, Flux는 Reactor에서 제공하는 핵심 인터페이스를 제공하는 두 클래스이다.
Reactor 코드를 구체적으로 파보기에 앞서 그 구조를 단순화해보자.
Subscriber(이하 구독자)가 구독(subcribe)하면 Publisher(공급자)는 구독자가 요청한 데이터(이벤트, 신호 등등)를 제공한다. 어떤 데이터를 언제 제공할지는 주로 서비스 공급자의 정한 규칙(구현)에 따른다. 하지만 구독에 관한 내용을 모두 공급자가 정하는 것은 아니다.
구독자가 공급자를 구독하면 바로 구독이 시작되지 않는다. 구독자가 공급자에게 구독을 신청(1) 하면 공급자는 구독자에게 '구독 신청서'를 넘긴다(2). 구독 신청서에는 구독자가 원하는 구독 내용을 정해서 요청할 수도 있다. 구독 신청서를 가지고 구독을 취소할 수도 있다(3). 구독자가 구독 신청서를 가지고 구독을 확정하면 그제야 공급자는 구독 신청 내용에 따라서 구독자에게 공급을 시작한다(4). 구독자는 구독 신청서로 언제든 구독 취소를 할 수 있다.
여기서 공급자는 Mono와 Flux를 의미한다. Mono와 Flux 모두 CorePublisher라는 공급자 인터페이스를 구현하고 있다.
public abstract class Flux<T> implements CorePublisher<T>
public abstract class Mono<T> implements CorePublisher<T>
public interface CorePublisher<T> extends Publisher<T> {
void subscribe(CoreSubscriber<? super T> subscriber);
}
public abstract class Mono<T> implements CorePublisher<T> {
public final void subscribe(Subscriber<? super T> actual) {
...
Mono와 Flux가 구현한 Publisher 인터페이스의 subscribe 메서드에 파라미터로 넘긴 Subscriber 객체가 구독자를 의미한다.
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
org.reactivestreams.Subscriber의 인터페이스 문서를 읽어보면 Reactor의 구독과 공급의 라이프사이클 이해할 수 있다. 순서대로 정리하면 다음과 같다.
1. Publisher.subcribe로 Subscriber 인스턴스를 넘기며 구독을 요청한다
2. Publisher는 Subscriber.onSubscriber(subscription)을 호출한다.
3. Subscriber가 Subscription.request(long)를 호출하기 전까지 Publisher는 Subscriber에게 어떠한 이벤트도 전달하지 않는다.
4. Subscription.request(long)이 호출된 후 Publisher는 request로 (최대) 전달받은 개수만큼 Subscriber.onNext(data: T)를 통해 데이터를 공급한다.
5. Subscriber.onError/onComplete를 호출해 '오류로 인한 종료' 또는 '공급 완료' 이벤트를 Subcriber에 전달한다.
위에서 설명한 구독과 공급의 상세 플로우를 호출되는 메서드로 바뀐 거 말고 달라진 게 없다.
위 라이프 사이클 중에서 2번에 대해서 코드를 보며 얘기해 보자. 아래 코드는 Flux 타입의 Publisher를 구독하는 코드이다. 하지만 Subscriber는 onSubscribe 로그만 출력하고 Publisher에게 데이터를 공급받지 않는다. onNext가 호출되지 않는다는 뜻이다. 무엇이 문제가 무엇일까?
Flux<Long> publisher = (FluxSource<Long>) Flux.from(publisher);
publisher.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) { System.out.println("onSubscribe"); }
@Override
public void onNext(Long item) { System.out.println("onNext:" + item); }
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() { System.out.println("onComplete"); }
});
onNext가 호출되지 않는 이유는 MyPublisher 타입의 Publisher 인스턴스가 위에서 명시된 라이프사이클대로 구현됐기 때문이다. MyPublisher는 subscribe가 호출되면 MySubscription이라는 Subscription 클래스의 인스턴스를 생성해 Subscriber에게 넘기고 종료한다. 더 이상의 동작은 없다.
class MyPublisher implements Publisher<Long> {
@Override
public void subscribe(Subscriber<? super Long> subscriber) {
MySubscription subscription = new MySubscription(this, subscriber);
subscriber.onSubscribe(subscription);
}
...
대신 MySubscription의 request()를 보면 파라미터로 넘어온 1~n까지의 숫자를 subscriber.onNext로 넘겨주고 있다. 데이터를 Subscriber에 넘기는 동작은 MySubscription의 request에 정의하여 위임하였다. 하지만 Subscriber는 Subscription의 request를 호출하지 않았다. 따라서 발행된 데이터를 전달받지 않은 것이다.
@Override
public void request(long n) {
if (done) return;
for (long i = 1; i <= n; i++) {
subscriber.onNext(i);
}
done = true;
subscriber.onComplete();
}
Reactor Publisher 만들기 실습 코드가 궁금하다면 링크를 따라가보자
Subscription의 requet 메서드에 첨부된 주석을 보면 다음과 같다. "해당 메서드가 호출되기 전까지 Publisher는 어떠한 이벤트도 발행하지 않는다."라고 돼있다.
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
...(후략)
*/
public void request(long n);
해당 라이프사이클에 맞춰서 Publisher는 Subscription.request를 통한 명시적인 요청이 있어야만 이벤트 발행을 시작해야 하는 것이다.
하나의 Publisher는 여러 개의 Subscriber로부터 구독을 받을 수 있다. 즉 Publisher.subscribe를 여러 번 호출할 수 있다는 뜻이다.
Mono<Integer> publisher = Mono.just(1);
publisher.subscribe(i -> { System.out.println("Subscriber#1 consume item#" + i); });
publisher.subscribe(i -> { System.out.println("Subscriber#2 consume item#" + i); });
이때 각 Subscriber는 동일한 Publisher에게 구독을 요청하지만 서로 격리된 채 구독 서비스를 제공받는다. Publisher는 모든 구독 요청에 고유한 Subscription를 Subcriber에게 전달하고 Subscriber는 서로 다른 Subscription을 통해 Publisher에게 발행을 요청한다.
각 Subscriber는 고유한 Subscription을 제공받아 독립적으로 구독을 요청하고 취소한다.
public interface Subscription {
public void request(long n); // 구독 요청
public void cancel(); // 구독 취소
}
Subscritpion은 하나의 구독 요청에 최대 하나만 존재한다. 또한 재사용될 수 없다. 동일한 Subscriber가 다시 구독을 요청한 경우에도 새로운 Subscription을 전달해야 한다. 즉 각각의 Subscription이 고유하다는 의미다.
Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher. It can only be used once by a single Subscriber.
Reactor에 정의된 ScalarSubscription<T> 클래스를 살펴보자. ScalarSubspscription은 Subscription 구현체로 단일 상수 값을 공급하도록 구현됐다. Mono.just()로 생성되는 Publisher 클래스인 MonoJust는 ScalarSubspscription을 통해 Subscriber와 상호작용한다. MonoJust는 ScalarSubscription를 재사용하지 않고 매번 신규로 생성해 Subscriber에게 전달하는 것을 확인할 수 있다.
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
만약 Subscription을 재사용한다면 어떻게 될까? MonoJust는 개별 Subscriber에게 철저하게 한 개의 아이템만 발행해야 한다. MonoJust가 넘긴 ScalarSubspscription도 Subscriber에게 몇 개의 아이템 요청이 오더라도 하나의 아이템만 발행하도록 구현돼야 한다.
@Override
public void request(long n) {
if (validate(n)) {
if (ONCE.compareAndSet(this, 0, 1)) { <----- once 값이 0이면 1로 업데이트하고 true를 반환한다.
Subscriber<? super T> a = actual;
a.onNext(value);
if (once != 2) { <---- 2는 Subscription.cancel이 호출돼 구독 취소된 상태이다.
a.onComplete();
}
}
ScalarSubspscription은 once 필드 값을 참조하여 Subscribe의 request에 대응한다. once의 현재 값이 0인 경우에 값을 1로 수정하고 Subscriber에게 데이터를 공급한다(onNext 호출). ScalarSubscription이 다른 Subscriber에게 재호출(재사용) 됐을 때 once의 값이 1이기 때문에 Subscriber의 onNext(value)를 호출하지 않고 종료한다. 따라서 ScalarSubscription.request(n)이 1번 이상 호출될 수 없기 때문에 재사용될 수도 없다.
1. reactor-core의 기본 뼈대는 Publisher-Subscriber 패턴이다
2. Mono, Flux는 Publisher다
3. 세 가지 핵심 컴포넌트는 Publisher, Subscriber, Subscription이다.
4. Publisher와 Subscriber는 1:N의 관계다
5. Publisher, Subscriber는 Subscription를 통해 상호 작용한다.
6. Publisher.subcribe -> Subscriber.onSubsribe -> Subscription.request -> onNext -> onComplete/onError 라이프 사이클을 갖는다.
7. 라이프 사이클:Subscription은 1;1 관계이며 라이프 사이클 당 고유한 Subscription을 갖는다.