brunch

You can make anything
by writing

C.S.Lewis

by 에디의 기술블로그 Feb 16. 2019

Project Reactor 4. Flux, Mono

4. Flux, Mono 를 생성하는 방법

"Project Reactor" 시리즈의 네 번째 글입니다. 드디어, "Reactor" 에 대해서 공부하기 시작합니다. 이 글에서는 Flux, Mono 의 기본 개념에 대해서 정리합니다. 필자가 시간이 없어서 대충 글을 작성했고 잘못된 내용이 포함되어있을 수 있습니다. Reactor 개발 경험이 있으신 훌륭한 개발자분들께서는 이 글에 피드백을 남겨주시길 바랍니다. 필자의 글이 읽기 싫으시다면 공식 레퍼런스를 참고하시길 바랍니다.

https://projectreactor.io/docs/core/release/reference/#getting-started-introducing-reactor


전체 목차


Reactive Spring

첫번째 주제는, Project Reactor

1. 리액티브 프로그래밍

2. Async VS Non-Blocking

3. Reactive Streams

4. Project Reactor Flux, Mono Basic (현재 글)

5. Project Reactor Subscriber

6. Project Reactor Data Processing

7. Project Reactor Create, Generator

8. 미정


두번째 주제는, Spring Webflux

목차 미정


세번째 주제는, Spring Reactive Data

목차 미정


추후에 목차는 변경될 수 있습니다.

 

4. Project Reactor Flux, Mono Basic


Reactor 는 "Reactive Streams"의 구현체로서, JVM을 위한 논블록킹 리액티브 프로그래밍 라이브러리이다. 스프링 5 에서 제공하는 리액티브 프로그래밍은 모두 Reactor 를 기반으로 구현되어있다.


4.1 reactor-core


Reactor 3 의 핵심 모듈인 "reactor-core" 는 2019년1월 현재 3.2.5.RELEASE 버전이 최신이다.

https://mvnrepository.com/artifact/io.projectreactor/reactor-core

https://github.com/reactor/reactor-core

reactor 3 버전은 최소 JDK 1.8 ... 즉, Java 8 이상에서 실행 된다. 만약, Java 7 이하 버전에서 사용하고 싶다면 reactor 1.X, 2.X 버전을 사용하면 되는데, 왠만하면 JDK 8 이상에서 reactor 3 버전을 사용하길 바란다.


dependencies


필자는 Gradle 환경에서 샘플 코드를 작성할 것이다. 아래와 같이 디펜던시를 추가하자. 최신 버전인 3.2.5.RELEASE 버전을 추가하였다. 참고로, 필자는 스프링 부트 디펜던시를 추가하였지만, 굳이 스프링 부트 디펜던시는 추가하지 않아도 된다.

Maven 환경에서는 아래와 같이 디펜던시 추가를 하면 된다.


만약, 스프링 웹플럭스(Webflux) 환경으로 개발을 한다면, reactor-core 를 추가할 필요가 없다. 웹플럭스 디펜던시에, reactor-core 가 포함되어 있다.  이 글의 주제와는 좀 멀지만, 궁금하니깐 웹플럭스 Maven 도 확인해보자.

https://mvnrepository.com/artifact/org.springframework/spring-webflux/5.1.4.RELEASE

웹플럭스의 컴파일 디펜던시를 확인해보면, reactor-core 가 포함되어 있는 것을 확인할 수 있다.

필자가, 스프링 5 의 모든 프로젝트를 확인은 못했지만, 아마도 거의 모든 프로젝트에서 Reactor 를 디펜던시로 사용할 것이다.


4.2 Publisher 구현체, Flux 와 Mono


Reactor 의 핵심인 Flux 와 Mono 에 대해서 알아보자. 일단, 이 글을 읽는 개발자 중에서 Reactive Streams 에 대해서 잘 모른다면, 반드시 "Reactive Streams" 를 이해하고 오길 바란다.

https://brunch.co.kr/@springboot/153


Flux 와 Mono 는 "Reactive Streams" 인터페이스 중에서 데이터(시퀀스)를 제공하는 발행자 역할을 하는 Publisher 의 구현체이다. reactor.core.publisher 패키지에서  Flux 클래스를 까서 보자.

Publisher 를 구현하고 있고, Flux 는 추상클래스로 정의되어있다. Flux  추상클래스를 상속받는 클래스는 20개가 넘는다. reactor-core 내부 로직을 당장 전부 이해하기는 힘들 것 같다.  "Flux 와 Mono는 Publisher 의 구현체이다" 라고 이해하고 넘어가자. 그럼 Flux 와 Mono 는 어떤 차이가 있는가?  데이터를 전송하는 갯수의 차이가 있다.

Flux : 0-N 개의 데이터를 전달

Mono : 0-1 개의 데이터를 전달


0-N 개의 데이터를 전달하는 Flux


Flux 는 Reactive Streams 에서 정의한 Publisher 의 구현체로서, 0-N 개의 데이터를 발행(전달,방출)할 수 있다. 하나의 데이터를 전달할 때마다 onNext 이벤트를 발생한다. Flux 내의 모든 데이터의 전달 처리가 완료 되면 onComplete 이벤트가 발생하며, 데이터를 전달하는 과정에서 오류가 발생하면 onError 이벤트가 발생한다.




0-1 개의 데이터를 전달하는 Mono


Mono 역시, Reactive Streams 의 Publisher 인터페이스를 구현하는 구현체인데, Flux 와의 차이점은, Flux 는 0-N 개의 데이터를 처리하지만, Mono 는 0-1 개의 데이터를 처리한다.




4.3 Flux 를 생성하는 방법


Flux 를 생성하는 가장 간단한 방법은, Flux 클래스에서 제공하는 팩토리 메서드 를 사용하는 것이다. 대표적으로 아래와 같은 팩토리 메서드를 제공한다.


just

range

fromArray, fromIterable, fromStream

empty


just


just 메서드를 사용해서 String 데이터를 전달하는 Flux 를 생성해보자. Flux.just() 메서드를 사용해서, "에디킴","아이린" 이라는 String 을 포함하는 Publisher 의 구현체인 Flux 를 생성한다.

자!! 여기서 중요한 점은, Flux 는 subscribe 가 실행하기 전까지는 어떤 일도 발생하지 않는다는 점이다. Publisher (발행자)는 구독이 되었을 경우에만 데이터를 Subscriber(구독자)에게 전달한다. Subscriber(구독자)가 Publisher 에 구독을 하는 과정은, Publisher(발행자)에 정의된 subscribe() 메서드를 사용한다. 이때, 매개변수로 Consumer 함수를 전달할 수 있는데, Consumer 함수는 데이터 전달을 해서 Subscriber의 onNext 이벤트가 발생을 했을때 실행되는 함수이다. 필자는, Consumer함수에 데이터를 하나씩 받을 때마다 System.out.print 를 실행해서 데이터 수신을 확인하고, 전달 받은 데이터를 List<String> 에 add하는 로직을 추가하였다. 참고로, 람다식을 개선한다면, Consumer 함수는 아래와 같이 한줄로 줄일 수 있다.

log() 메서드를 추가하면 아래와 같이 로그를 확인할 수 있다.

이벤트는 아래의 순서로 실행 될 것이다.

onSubscribe --> request --> onNext --> onNext --> onComplete

필자는 subscribe() 를 실행하면서 매개변수가 Consumer 함수 하나인 메서드를 실행하였다. 이런 경우에는, request(unbounded) 가 실행 된다. unbounded 로 전달되면, 내부적으로 requext(MAX)로 적용이 된다. 즉, 모든 데이터를 전달하라고 요청하는 것이다. subscribe() 에 request 를 명시적으로 지정하는 방법은 5장에서 다시 검토하기로 하자.

5장에서 공부할 내용 예습)

request(N)를 명시적으로 지정하고 싶다면, subscribe 메서드 중에서 4개의 매개변수를 받는 아래 메서드를 실행하면 된다.

subscribe(
   Consumer<? super T> consumer,
   Consumer<? super Throwable> errorConsumer,
   Runnable completeConsumer,
   Consumer<? super Subscription> subscriptionConsumer);


의문...

필자가 테스트 코드를 짜면서 궁금한 점이 생겼다. 테스트 검증 코드의 위치가 올바른지 잘 모르겠다.



range


range 메서드는 int 범위를 지정하여 순차적인 데이터를 생성해주는 Flux 의 메서드이다.

위 샘플 코드는 1 부터 5 의 데이터를 전달한다. 데이터 하나를 전달할떄마다 subscriber 의 onNext 메서드를 실행하는데, 아래 log를 확인해보자.


fromXXX  (ex. fromArray, fromIterable, fromStream)


fromArray, fromIterable, fromStream 메서드를 사용하면 Array, Iterable, Streams 를 사용해서 Flux 를 생성할 수 있다.


fromArray

fromArray 메서드는 이미 생성되어있는 Array 의 데이터를 사용해서 Flux 를 생성한다.

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html



fromIterable

자세한 설명은 생략한다.


fromStream

자세한 설명은 생략한다.



empty


empty 메서드를 사용하면 아무값도 전달하지 않는, Empty Flux를 만들 수 있다.



4.4 Flux, Laziness


Flux 와 Mono 는 subscribe() 가 실행하기 전까지는 그 어떤 데이터도 전달하지 않는다. 예를 들어서, 구구단을 외워서 데이터를 전달하는 상황을 생각해보자.

1. Flux 를 생성한다. 하지만, flatMap 메서드를 사용해서 구구단을 외우는 작업은 수행하지 않는다.

2. 아직 데이터를 전달하지 않는다.

3. subscribe() 메서드를 실행하면, Subscriber 가 등록되고 데이터를 전송해달라는 요청을 보낸다.

4. 구구단을 외우면서 데이터를 하나씩 전달한다. 이때 Subscriber 에 구현된 onNext 메서드를 실행시킨다.

5. subscriber 에 구현된 onNext 메서드를 실행하는데, 필자는 Consumer 함수를 정의하였다. Consumer 함수에는 데이터를 받아서 출력하는 구문이다.  이렇게 4,5 번이 구구단을 외울때마다 반복해서 한번씩 수행한다.

6. 데이터 전송이 완료되면 onComplete 메서드가 실행된다. 이때는 Runnable 함수로 정의한다.

7. 전체 과정이 완료되었다.


이 과정을 로그로 확인하고 싶다면, Flux를 생성하는 구문에서 .log() 메서드를 추가해보자.


onSubscribe --> request --> onNext --> onNext ... 반복 --> onComplete

subscribe 가 실행을 해야 구구단을 외우면서 데이터를 전달하기 시작한다.




4.5 Mono 를 생성하는 방법


4.5에서는 Mono 를 생성하는 방법에 대해서 설명한다.


just


Flux와 마찬가지로, Mono 역시 just 메서드를 사용하면 간단하게 Mono를 만들 수 있다.


empty


Mono.empty() 메서드를 를 사용하면 빈 Mono 를 만들 수도 있다.


Mono 역시 매우 중요하지만, 자세한 내용은 다음에 다시 작성하겠다.



4.6 Lifecycle hooks


설명 추가 예정


doOnSubscribe

설명 추가 예정

doOnRequest

설명 추가 예정

doOnNext

설명 추가 예정

doOnError

설명 추가 예정

doOnComplete

설명 추가 예정

doOnCancle

설명 추가 예정

doOnEach

설명 추가 예정



4.7 마무리


이번 글에서는 Flux, Mono 에 대한 기본 개념을 정리하였다. 해당 글은 공식 레퍼런스를 보고 작성하였는데 아무래도 필자의 영어 독해 실력이 뛰어나지 못해서 어려움이 많았다. 글을 더이상 자세하게 작성할 시간이 없으니 이정도로 마무리 하고, 다음 글에서는 subscriber 에 대해서 자세하게 다룰 예정이다.



https://github.com/sieunkr/reactive-spring/tree/master/04


레퍼런스

[1] https://projectreactor.io/docs/core/release/reference/#getting-started-introducing-reactor


매거진의 이전글 Project Reactor 3. 리액티브 스트림
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari