brunch

You can make anything
by writing

C.S.Lewis

by 에디의 기술블로그 Feb 23. 2019

Reactor 6. Data Processing

Filtering, Converting

"Reactive Spring" 시리즈의 첫번 째 주제인 "Project Reactor"의 6장입니다. 이번 글에서는 Filter, Map 등 Reactor 의 데이터&스트림 프로세싱에 대해서 공부합니다.


전체 목차


"Reactive Spring"

첫번째 주제는, Project Reactor

1. 리액티브 프로그래밍

2. Async VS Non-Blocking

3. Reactive Streams

4. Project Reactor Flux, Mono Basic

5. Project Reactor Subscriber 

6. Project Reactor Data Processing(현재 글)

7. Project Reactor Create, Generator

8. Project Reactor Testing

9. 미정


두번째 주제는, Spring Webflux

목차 미정


세번째 주제는, Spring Reactive Data

목차 미정


추후에 목차는 변경될 수 있습니다.



6. Flux Data, Stream Processing


6.1 Filtering



6.1.1 filter()


filter() 는, 주어진 조건에 맞는 데이터만 전달할 수 있다. 아래 그림을 보면, "원" 모양의 데이터만 통과 시키는 것을 볼 수 있다.


샘플 코드를 바로 작성해 보자.  

아주 심플한 Flux<String> 를 생성해서 subscribe를 한다. 이때, filter 를 추가해서 "blue"가 아닌 데이터만 전달하도록 구현하였다. Flux<String> 는 "blue","green","orange","purple"  4개의 데이터이지만, subscribe 한 이후에 List<String> 에 추가된 데이터는 3개 일 것이다. 테스트 코드를 통해서 검증해보자. 근데, 여기서 필자의 의문이 생겼다...


[필자의 의문]

필자는 onNext 이벤트가 발생하는 로직에서, blue 를 빼고 전달할거라고 생각했다. Publisher 의 구현체인 Flux에서 필터링을 하는 것으로 생각했지만... log 를 보니, onNext(blue)를 실행하고 request(1) 를 수행하는 것을 확인할 수 있다.


자, 그렇다면 샘플코드를 하나 더 짜보자. 위에서는 not-equals 를 했는데, 이번에는 equals 를 수행해보자.

"orange" 데이터만 전달하는 코드이다. 딱 하나의 데이터만 전달하기 때문에 List<String> colors 의 사이즈도 "1" 이다. 로그를 확인해보면 아래와 같다.

역시, 4개의 String 데이터를 모두 onNext(color) 를 수행하는 것을 확인할 수 있다. filter 조건에 맞지 않는 "blue,green,purple"는 onNext 에 데이터를 전달하지 않을 것이라고 생각했는데... 그럼 저 onNext() 는 뭐지?


필자가 예상했던 log

request(unbounded) --> onNext(orange) --> onComplete()


실제로 찍힌 log

request(unbounded) --> onNext(blue) --> request(1) --> onNext(green) --> request(1) --> onNext(orange) --> onNext(purple) --> request(1) --> onComplete()


그럼, Subscriber 에서 onNext 가 발생하는지 확인해보자. "reactor.core" 의 LambdaSubscriber 클래스에서 디버깅을 해보면 아래와 같이 onNext 는 "orange"데이터만 전달 되는것을 확인할 수 있다.

로그에는 onNext 가 찍혔지만, 실제로 Subscriber 에서는 필터링 된 데이터를 전달받게 된다.


그럼, filter 조건에 의해서 전달되지 않는 데이터는, 굳이 onNext 로그를 남길 필요가 있을까???

필자의 의문에 대해서 아는 개발자가 있다면 알려주길 바란다.



6.1.2 take()


시퀀스 순서대로 특정 개수의 데이터만 전달하고 싶다면, take() 를 사용하면 된다.

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#take

샘플 코드를 먼저 보자.

Flux<String> 데이터 중 2개만 전달하고 cancel 이벤트가 발생한다.


[필자의 의문]

근데, 로그를 보면, onComplete 가 없다. 그렇다면 혹시 onComplete 메서드는 수행되지 않는것인가?? 즉, Subscriber 의 onComplete 는 실행이 안되나?? 테스트 코드를 작성해서 다시 확인해보자.

비록 log 는 찍히진 않았지만 Subscriber 의 onComplete 는 정상적으로 실행을 하였다.


나는 지금, 어느 포인트에서 이해를 잘못하고 있는건가?  


추측으로는 Flux 에 체인으로 거는 log 는 Flux 입장에서의 로그인 듯 싶다. 자세한 내용은 더 공부하고 코멘트를 남기겠다.



6.1.3 skip()


설명은 생략한다.


skip() 을 실행하면, 역시 로그가 필자가 이해하기 어렵게 찍혀서 나온다...


6.1.4 repeat()


설명은 생략한다.



6.1.5 정리


일단, 구현은 심플하고 어렵지 않지만 log 에 대해서 필자가 이해를 지금 못하고 있다. 조금 더 공부를 해보고 추가 의견을 작성할 예정이다.




6.2 Converting



6.2.1 Map


map() 은 각각의 데이터를 변환해주는데, 아래 그림을 보면 이해가 빠를 것이다.


https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

Flux<String> 에서 푸쉬하는 문자열을 소문자로 변경해보자.

코드를 보면 이해가 쉽게 될 것이다. 1 대 1 방식으로 데이터를 변환해준다.


6.2.2 flatMap()


위에서 설명한대로 map()은 one-to-one 으로 변환한다. 만약 one-to-n 으로 변환하고 싶다면 flatMap() 을 사용해야 한다.

사실 flatMap() 은 필자가 100% 완벽하게 이해하지 못했다. 위에 설명한 대로 1-to-1 일때는 Map을 사용하고, 1-to-n 일때는 flatMap 을 사용하면 된다는 알겠지만, 두 Operator의 결정적인 차이는 바로 동기,비동기 여부이다.

map is for synchronous, non-blocking, 1-to-1 transformations

flatMap is for asynchronous (non-blocking) 1-to-N transformations

동기,비동기 여부에 대해서 좀 더 공부를 한 이후에 내용을 추가하겠다.


6.2.3 zip()


설명 생략한다.


6.2.4 정리


참고로, 더 많은 메서드가 존재한다... 필자는 미리 공부는 못할 듯하고, 나중에 프로젝트에서 사용하게 되면 그때그때 해보면서 공부할 예정이다.



6.3 글을 마무리하면서


이번 글에서는, Flux 의 데이터 스트림의 필터링, 변환 등에 대해서 알아보았다. 사실, 포스팅 하나로 작성하기에는 무리가 있는 방대한 내용이다. 이번 주말에도 이렇게 하나의 글을 작성해서 브런치에 발행을 하였는데, 첫번째 주제인 Reactor 를 얼릉 마무리하고 WebFlux 로 넘어가야할 듯 싶다.


https://github.com/sieunkr/reactive-spring/tree/master/06/src/test/java/com/example/demo




[7장에 대한 예습]

지금까지는 just()등과 같은 기능을 사용해서 Flux 를 생성하였지만 사실 실무에서 just()를 사용하는 일은 거의 없을 듯하다.  다음 글인 7장에서는 프로그래밍으로 시퀀스를 생성하는 방법에 대해서 알아볼 것이다.

Synchronous:  generate

Asynchronous & multi-threaded: create

Asynchronous but single-threaded: push

https://projectreactor.io/docs/core/release/reference/#producing

         

[8장에 대한 예습]

사실, 이제와서 고백하자면 필자의 테스트 코드 작성은 올바르지 않을수 있다. Reactor 는 비동기, Non-Blocking 프로그래밍인데, Assert 테스트 구문이 subscribe 이후 바로 테스트 검증을 하는 것이 맞는지 모르겠다. 아무래도 Reactor 에서의 테스트 코드 작성은 Assert 로 하는 방법보다는, Reactor 에서 제공하는 테스트 방법을 사용하는게 좋을 듯하다. Reactor는 테스트 코드 작성을 위한 라이브러리를 따로 제공하며, 대표적으로 StepVerifier 와 같은 도구를 제공한다. StepVerifier 를 포함한 테스트 코드 작성에 관련해서는 8장에서 다룰 예정이다.

https://projectreactor.io/docs/core/release/reference/#testing


브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari