brunch

You can make anything
by writing

C.S.Lewis

by anonymDev Feb 06. 2023

Reactor Publisher 만들기 실습 코드

코드로 파헤치는 Project Reactor (1편 실습)

reactor-core 구조와 라이프사이클 파헤치기에서 Reactor는 기본으로 하는 Publisher-Subscriber과 라이프사이클에 대해서 알아봤다. 가물가물하다면 글을 다시 보거나 아래 요약 내용을 참고하기 바란다.


1. Publisher-Subscriber 라이프사이클 요약

Publisher-Subscriber 라이프사이클


1. Publisher.subcribe로 Subscriber 인스턴스를 넘기며 구독을 요청한다
2. Publisher는 Subscriber.onSubscriber(subscription)을 호출한다.
3. Subscriber가 Subscription.request(long)를 호출하기 전까지 Publisher는 Subscriber에게 어떠한 이벤트도 전달하지 않는다.
4. Subscription.request(long)이 호출된 후 Publisher는 request로 (최대) 전달받은 개수만큼 Subscriber.onNext(data: T)를 통해 데이터를 공급한다.
5. Subscriber.onError/onComplete를 호출해 '오류로 인한 종료' 또는 '공급 완료' 이벤트를 Subcriber에 전달한다.


이번 글에서는 위 라이프사이클에 맞춰서 Publisher, Subscriber, Subscription을 실습을 통해 직접 구현해자. Flux.from() 메서드를 사용하면 직접 구현한 Publisher에 Flux API를 입힐 수 있다.

public static <T> Flux<T> from(Publisher<? extends T> source)


요구 사항은 다음과 같다.


1. 공급자(Publisher)는 0부터 n개의 숫자를 구독자에게 하나씩 발행한다.

2. 구독자는 공급자에게 n개의 숫자를 구독하여 공급받은 숫자를 로그에 출력한다.

3.  구독자가 n개의 숫자를 모두 제공받았다면 구독을 종료한다.


2. 구독자 구현하기


우선 Subscriber부터 구현해 보자. 10개의 숫자를 구독한다는 의미로 TenNumbersSubscriber로 지었다. 코드와 주석을 따라가 보자.


class TenNumbersSubscriber implements Subscriber<Long> {

/**

* 라이프사이클 2번, 3번에 해당

* 구독이 시작되면 Publisher에게 전달받은 Subscription을 통해 원하는 개수만큼 숫자를 요청한다  

**/
    @Override
    public void onSubscribe(Subscription s) {

         System.out.println("구독 시작");

        s.request(10);

    }

    @Override
    public void onNext(Long number) {

        // Publisher로부터 제공받은 숫자를 출력한다.
        System.out.println("제공받은 숫자: #" + number);
    }

    @Override
    public void onError(Throwable t) {

       // 에러 이벤트 발생 시 로그 출력한다.
        System.out.println("에러 발생");
    }

    @Override
    public void onComplete() {

       // 구독 완료 시 로그 출력한다
        System.out.println("구독을 완료합니다.");
    }
}


Subscriber 구현은 끝났다. 이제 라이프사이클에 맞춰 Subscriber의 메서드를 호출하고 데이터를 발행해 줄 Publisher가 필요하다.


3. 공급자와 구독 구현하기


구독 요청에 응답하고  N개의 숫자를 발행하는 Publisher, Subscription을 구현해 보자. 공급자는 라이프사이클에 맞춰 Subscriber의 인터페이스를 호출해야 한다.


다음의 방식으로 구현해 보자.

1. Publisher는 새로운 구독 요청에 새로운(고유한) Subscription을 전달한다.

2. 숫자 발행 및 완료/실패 이벤트 알림은 Subscription에 구현한다.


1번 구현을 위해 구독이 시작되면 Publisher는 Subscriber에게 Subscription을 생성해 전달한다. 라이프사이클 1번 2번에 해당한다.

class NumberPublisher implements Publisher<Long> {
    @Override
    public void subscribe(Subscriber<? super Long> subscriber) {

         NumberSubscription subscription = new NumberSubscription(subscriber);
        subscriber.onSubscribe(subscription);
    }

}


Subscription은 코드와 주석을 따라 내려가보며 파악해 보자. Subscription이 Subcriber와 1;1로 매칭돼 구독자의 요청과 데이터 발행, 완료, 실패 이벤트 발행을 담당한다.


class NumberSubscription implements Subscription {
    private Subscriber<? super Long> subscriber;
    private boolean done;
// 2번 구현을 위해 Subscription 초기화 시 데이터를 제공받을 Subcriber를 넘긴다.
    public NumberSubscription(Subscriber<? super Long> subscriber) {
        this.subscriber = subscriber;
    }


/**

* Subscriber에게 n개의 숫자 발행을 요청받으면 n번만큼 Subscriber.onNext를 호출해 1씩 증가하는 * * 숫자를 발행한다. 라이프사이클 4번에 해당한다

* 위에서 구현한 NumberSubscriber가 onSubscribe 메서드에서 request(10)을 호출했었다.

**/
    @Override
    public void request(long n) {

        System.out.println(n + "개의 숫자를 요청받았습니다.");
        if (done) return; // 완료된 경우 숫자를 발행하지 않고 종료한다.
        for (long i = 0; i < n; i++) {

            subscriber.onNext(i);

        }

       done = true;

// 요청받은 개수만큼 숫자를 발행한 후 Subscriber에게 완료를 알리고 종료(라이프사이클 5번)

       subscriber.onComplete();
    }

    @Override
    public void cancel() { // 취소 요청을 받은 경우 강제로 완료 처리한다.
        done = true;
    }
}


4. 실행해보기

직접 구현한 Publisher와 Subscriber로 Flux를 생성하고 구독해 보자.


Flux<Long> publisher = Flux.from(new NumberPublisher());
publisher.subscribe(new NumberSubscriber());


출력 로그

구독 시작
10개의 숫자를 요청받았습니다.
제공받은 숫자: #0
제공받은 숫자: #1
...
제공받은 숫자: #9
구독을 완료합니다.


마무리


Project Reactor의 Publisher-Subscriber 구조와 라이프사이클을 실습해 봤습니다. 백문이 불여일견이라고 reactor-core의 구독 라이프사이클을 직접 구현해 보는 거만큼 reactor를 이해하는데 도움이 되는 것은 없다. 특히 Publisher-Subscriber 라이프사이클이 reactor의 주요 개념인 만큼 reactor를 기반으로 한  Reactive Libraray들과도 친숙해질 수 있을 것이다.


reactor-core 구조와 라이프사이클 파헤치기 글을 아직 보지 않았거나 구체적인 설명을 원한다면 읽어보는 것을 추천한다.



브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari