brunch

You can make anything
by writing

- C.S.Lewis -

by Eddy Kim Feb 06. 2019

Project Reactor 3. 리액티브 스트림

3. Reactive Streams Interface

"Project Reactor" 시리즈의 세 번째 글입니다. 하지만, 이 글에서도 "Reactor" 에 대해서는 거의 다루지 않을 예정입니다. 이 글에서는, "Reactive Streams"에 대해서 공부합니다. 급하게 작성했기 때문에 맞춤법이 틀렸거나 , 문맥상 어색한 문장이 많습니다. 글 쓰는데 너무 시간이 오래 걸려서 힘드네요. 나중에, 시간이 되면 그때 다시 수정 할 예정입니다. 


전체 목차


Reactive Spring

첫 번째 주제는, Project Reactor

1. 리액티브 프로그래밍 

2. Async VS Non-Blocking 

3. Reactive Streams (현재 글)

4. Project Reactor Flux, Mono Basic

5. Project Reactor Subscriber

6. Project Reactor Data Processing

7. Project Reactor Create, Generator

8. 미정


두 번째 주제는, Spring Webflux

목차 미정


세 번째 주제는, Spring Reactive Data

목차 미정


추후에 목차는 변경될 수 있습니다.


3. Reactive Streams


"Reactive Streams"는 2013년 Netflix, Pivotal, Typesafe 의 개발자들이 발의해서 처음 만들어졌는데,"리액티브 프로그래밍"을 위한 명세(specification) 이다. 비즈니스 시스템 개발자는 "Reactive Streams" 에 정의 된 인터페이스를 구현하면 "리액티브 프로그래밍"을 구현할 수 있다. 


필자는 이 글에서, Reactive Streams 를 직접 구현하여, 구현체를 만들 것이다. 


3.1 Reactive Streams Interface


 "Reactive Streams" 인터페이스는 아래와 같다. 

Processor

Publisher

Subscriber

Subscription


"Reactive Streams" 는 Maven 에서 다운로드 할 수 있다. 현재 최신 버전은 1.0.2 이다. 

https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams

사실, Reactive Streams 에 정의된 코드는 몇줄 안된다. 


public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}


public interface Publisher<T> {

    public void subscribe(Subscriber<? super T> s);

}


public interface Subscriber<T> {

    public void onSubscribe(Subscription s);

    public void onNext(T t);

    public void onError(Throwable t);

    public void onComplete();

}


public interface Subscription {

    public void request(long n);

    public void cancel();

}


클래스 다이어그램으로 표현하면 아래와 같다. 


"Reactive Streams"는 명세를 정의할 뿐, 실제로 구현체를 만들어서 사용해야 한다. 기본적인 흐름을 이해하기 위해서 아래 그림을 참고해보자.

Subscriber 가 Publisher 에게 subscribe 하면 Publisher 가 데이터 또는 시퀀스를 전달하게 된다. 전달하기 전에 Publisher 는 Subscribe 에 정의된 onSubscribe()를 호출하고, Subscriber 는 request(n)를 호출하여 몇개의 데이터를 보내달라고 요청하게 된다. 이때 Subscription 을 사용하는데, Request(n)를 호출하여 데이터 전송 요청을 하게 되면 Publisher 에서는 0에서 N개의 데이터 또는 시퀀스를 Subscriber에 전달하게 된다. 이 과정에서 에러가 발생하면 onError()를 호출하고, 데이터(시퀀스) 전달이 완료가 되면 onComplete()를 호출한다. Subscriber 가 Publisher 에 Request 하는 과정을 보통 Back-Pressure 라고 표현하는데, Push 하는 데이터(시퀀스) 의 흐름을 제어할 수 있다. Request(1) 을 호출하면 1개만 보내도록 요청할 수 있고, Request(MAX) 를 호출하면 최대값에 해당하는 데이터를 요청하게 된다. BackPressure 를 번역하면, "역압" 이라는 단어로 번역할 수 있는데, 그냥 이 글에서는 BackPressure 라고 표현하겠다. 

이 글에서는 Prossor 에 대해서는 자세하게 다루지 않을 예정이다.


3.2 Reactive Streams 구현체


위에서 설명했듯이, Reactive Streams 는 리액티브 프로그래밍을 위해 정의한 스펙(명세,정의)이다. 실제로 리액티브 프로그래밍을 위해서는, "Reactive Streams"를 구현하는 구현체를 만들어야 한다. 필자가 알고 있는 상식으로 가장 많이 사용되는 구현체 라이브러리는 RxJava 이다. 그 외에 필자가 알고 이는 "Reactive Streams"의 구현체는 아래와 같다. 

RxJava 1.x or RxJava 2.x

Project Reactor

Vert.x

Akka Streams

Slick


참고로, "Reactive Streams" 스펙은 서로 호환된다고 한다.(필자가 검증을 하지는 않았다.) 

https://blog.redelastic.com/a-journey-into-reactive-streams-5ee2a9cd7e29


기회가 된다면 RxJava 를 공부해보고 싶지만, 이 글에서는 Reactor 에서 자세하게 다룰 예정이다. 


3.3 Reactive Streams 샘플 구현


Reactive Streams 를 구현하여, 간단한 리액티브 프로그래밍을 만들어보자. 


디펜던시


필자는 스프링 부트 2.0.7 , Java, Gradle 환경에서 개발하겠다. 

org.reactivestreams:reactive-streams:1.0.2

org.springframework.boot:spring-boot-starter-web


클래스 다이어그램


Publisher, Subscriber, Subscription 의 구현체를 만들자. 조금 유치하지만, 클래스 네이밍을 아래와 정했다. 

EddyPublisher

EddySubscriber

EddySubscription



Publisher


Publisher 인터페이스는 다음과 같이 단 하나의 메서드를 정의한다. 

subscribe


Publisher 를 구현하는 EddyPublisher 클래스는 subscribe 메서드를 구현해야 한다. subscribe메서드는 Subscriber 객체를 매개변수로 받고, 매개변수로 받은 subscriber 의 onSubscribe() 메서드를 실행한다. 

다음 장에서 다루겠지만, Reactor 의 Flux 와 Mono 는 Publisher 의 구현체이다. 즉, Flux 와 Mono 에는 subscribe() 메서드가 구현이 되어있다. Flux 를 생성하여 데이터 또는 시퀀스를 전송하기 위해서는 Flux 객체의 subscriber() 를 실행해야 한다. 물론, 내부 로직은 필자가 구현한 코드 처럼 간단하지는 않다. 필자의 코드에서는 Subscriber 를 매개변수로 전달하지만, Flux 에서는 Subscriber 를 전달받는 메서드도 있지만, Subscriber 를 따로 전달받지 않고, 내부 로직에서 알아서 자동으로 Subscriber 를 생성하기도 한다. 자세한 내용은 다음 장에서 다룰 예정이다. 지금 내용이 이해가 안된다면, 일단 넘어가도 좋다. Flux 를 자세히 공부하기 전에, Publisher 와 Subscriber 가 어떻게 통신하는지에 대해서 이해를 하는 것이 훨씬 중요하다. 



Subscriber


Subscriber 인터페이스는 다음과 같은 메서드를 정의한다. 

onSubscribe

onNext

onError

onComplete


onSubscribe()메서드는 Subscriber 가 구독을 하게 되면, 발행자인 Publisher 가 제일 처음 호출하는 Subscriber 의 메서드이다. 이때, Publisher 는 onSubscribe를 호출하면서, Subscription 를 매개변수로 전달한다. Subscriber 는 Publisher 로 부터 전달받은 Subscription 를 통해서, 데이터(또는 시퀀스)를 전달해달라고 요청할 수 있다. 이때, request(N) 메서드를 실행하는데, N 은 전달을 요청하는 갯수가 된다. 필자의 샘플 코드에서는 3개의 데이터를 요청하고 있다. Subscriber 에서 request(N)를 수행하면, Publisher 에서는 N 개의 데이터를 전달하게 되는데, 이때 전달하기 위해서 사용하는 메서드가 바로 onNext(data) 이다. 매우 중요한 메서드이다. 물론, 해당 메서드에 대한 정의는 Subscriber 에서 정의하고, 호출은 Publisher 에서 호출한다. 필자는... 데이터(시퀀스) 전달 타입을 Interger 로 정의하였고, 3개의 데이터를 요청할 것이다. 만약, request(3)를 요청한 이후에 3개의 데이터를 받았다고 가정하자. request 를 더이상 호출하지 않으면 어떻게 될까? publisher 의 데이터(시퀀스) 전달은 완료하지 못할 것이다. 즉, onComplete 메서드를 호출하지 않는다. 필자는 아래 샘플코드에서, 3개의 데이터(시퀀스)를 전달받은 이후에, 다시 request(3)호출하도록 로직을 구현하였다. 


Subscription


Subscription 인터페이스는 다음과 같은 메서드를 정의한다. 

request

cancel


위에서 구현한 Subscriber 에서는, 데이터(시퀀스)를 받았을 때에 대한 로직을 구현한 것이다. 어떻게 데이터를 전송하는지에 대해서는, Subscription 에 정의가 되어있는데, Publisher 에서 SUbscriber의  onSubscribe 메서드를 호출하면서 매개변수로 SUbscription 을 전달하였다. 전달받은 Subscriber 는 SUbscription 객체의 request 를 호출할 것이고, request 메서드에서 Subscriber 에 데이터를 전다하기 위해서 onNext 를 실행할 것이다. 아래 샘플 코드를 보자.  1000개의 데이터만 전달하고 Push 가 종료되도록 코드를 구현하였다. 


샘플소스는 github에서 확인하길 바란다. 

https://github.com/sieunkr/reactive-spring/tree/master/03


3.4 마무리


이 글에서는, "Reactive Streams"의 구현체를 직접 만들어서 리액티브 프로그래밍 샘플을 작성하였다. 사실, Java9 에서 Flow 패키지를 사용하면 위와 같은 유사한 프로그래밍을 구현할 수 있다. Reactor 를 공부하기 전에 해당 코드를 짠 이유는, 어쩃든, Reactor 이 Reactive Streams 의 구현체이기 때문이다. "Reactive Streams"에 대한 기본 개념을 이해한 이후에, Reactor 를 공부하는게 더 효율적이라고 생각했다. 이제, 드디어 본격적으로 다음 장부터는 "Project Reactor"에 대해서 공부할 예정이다.  


[1] http://www.reactive-streams.org/

[2] https://blog.redelastic.com/a-journey-into-reactive-streams-5ee2a9cd7e29


매거진의 이전글 Reactor 2.Async & Non-Blocking

매거진 선택

키워드 선택 0 / 3 0
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari
;