brunch

You can make anything
by writing

C.S.Lewis

by 에디의 기술블로그 Feb 16. 2019

Project Reactor 5. Subscriber

5. Subscriber

글 작성할 시간이 없고 귀찮아서 급하게 마무리하고 발행하였습니다.

그냥 공식 레퍼런스를 읽으시는 걸 추천합니다. 저도 여기 링크 참고해서 작성한거랍니다.

https://projectreactor.io/docs/core/release/reference/




"Reactive Spring" 시리즈의 첫번 째 주제인 "Project Reactor"의 5장입니다. 지난 번 글에서는 Flux,Mono 에 대해서 정리하였는데, 이번 글에서는 Subscriber 에 대해서 공부합니다. 참고로 이 글은 Flux,Mono 의 기본 개념이 없으시다면 이해할 수 없습니다. 반드시 사전 기본 개념을 이해해야 합니다. 별로 도움은 안되실 수 있습니다만, 그래도 필자의 이전 글을 미리 읽고 오시길 바랍니다.

https://brunch.co.kr/@springboot/152

https://brunch.co.kr/@springboot/158

https://brunch.co.kr/@springboot/153

https://brunch.co.kr/@springboot/154


전체 목차


"Reactive Spring"

첫번째 주제는, Project Reactor

1. 리액티브 프로그래밍

2. Async VS Non-Blocking

3. Reactive Streams

4. Project Reactor Flux, Mono Basic

5. Project Reactor Subscriber (현재 글)

6. Project Reactor Data Processing

7. Project Reactor Create, Generator

8. 미정


두번째 주제는, Spring Webflux

목차 미정


세번째 주제는, Spring Reactive Data

목차 미정


추후에 목차는 변경될 수 있습니다.



5.1 subscriber





5.1 Flux.subscribe()


지난 글에서 설명한 내용을 잠시 복습을 하겠다. Flux 는 Publisher 의 구현체이다. 일반적인 Reactive Streams 에서의 Publisher 는 subscribe 를 실행할 때 subscriber 를 등록해준다. Flux 에서 제공하는 팩토리 메서드는 subscriber 를 등록해주는 메서드도 있는 반면에, subscriber 를 매개변수로 등록하지 않는 함수도 존재한다. subscriber 가 필요없어서가 아니라, 내부 로직에서 자동으로 subscriber 를 만들어 준다. 개발자가 따로 subscriber 를 등록하지 않아도 된다. Flux 에서 제공하는 subscribe()메서드 를 전부 확인해보자.  



메서드 하나씩 확인해보자.  


subscribe(Consumer<? super T> consumer)


일단 처음으로 볼 함수는, Consumer 함수 하나만 매개변수로 받는 메서드이다. 아래와 같은 Flux 의 팩토리 메서드를 호출할 것이다.

여기서 Consumer 함수에는, Publisher 가 Subscriber 에 데이터를 전송하면... 즉,  Publisher 에서 Subscriber의 onNext 메서드를 실행시킬 때 Subscriber 측에서 데이터를 받을때 실행하는 구문이다.  필자가 작성한 샘플 코드를 보자. Subscriber가 데이터를 전달 받으면 따로 정의한 List 에 데이터를 추가하는 구문이다.  

로그를 보면 아래와 같다.

그동안 수차례 설명했듯이, subscribe(Consumer..) 메서드를 호출할 때, 별도로 개발자가 Subscriber 를 등록하지 않아도 된다. 내부 로직에서 Subscriber 를 생성할 것이다.



subscribe(consumer, errorConsumer)


생략



subscribe(consumer, errorConsumer, completeConsumer)


subscriber 의 onComplete 메서드를 정의할려면, subscribe 메서드에 아래와 같이 Runnable 함수를 매개변수로 정의해서 넘겨줘야 한다.

필자가 작성한 샘플 코드를 보자.

실행을 하면 로그를 보면..아래와 같다.

onComplete() 메서드가 실행되면, Runnable 함수로 정의한 작업이 수행된다.


subscribe(consumer, errorConsumer, completeConsumer, subscriptionConsumer)


위에 작성한 샘플 코드는, request(unbounded) 를 호출한다. unbounded 는 내부 로직에서 MAX 값을 설정하게 될 것이다. 즉, Back-Pressure 기능을 활용하지 않고, 모든 데이터를 전송해 달라고 요청하는 것이다. 만약에, Back-Pressure 를 사용하기 위해서 request(N) 에서 N 개수를 변경하고 싶다면, 아래 메서드를 호출하면서 파라미터에 subscriptionConsumer 를 넘겨줘야 한다.

필자의 테스트 샘플 코드를 보자. subscription.request(1) 을 호출하였다. Flux 는 "에디킴","아이린"이라는 String 데이터를 생성하는 Flux 인데, 테스트 코드 검증에서는 assertNotEquals 를 통과하였다. 즉, 데이터가 모두 전송이 되지 않은 것이다.  

request(1) 를 호출하였기 때문에, 한개의 시퀀스(데이터)만 전달하고 대기하게 되는 상황이다. 로그를 확인해보면 아래와 같다.

onNext(에디킴) 이후에 어떤 작업도 않고 멈춰있는다. 만약 이 상황에서 추가 데이터를 받아야 한다면 request를 다시 요청해야 한다. Back-Pressure 관련해서 더 상세한 내용은 추후에 좀 더 상세하게 다룰 예정이다. 어쩃든 매우 중요한 개념이니 꼭 이해하길 바란다.


정리


Flux 의 subscribe() 팩토리 메서드 중, subscriber 를 등록하지 않는 메서드를 검토하였다. subscriber 를 등록하는 방법은, subscribe() 에 대해서 알아본 이후에 이 글의 마지막 부분에 추가하겠다.



5.2 subscribe() 는 어떻게 동작하는가?


subscribe 에 대해서 좀 더 자세히 알아보자.


subscribe() 내부 코드를 보면...


다시, 기본으로 돌아가서, Reactive Streams 를 생각해보자. Reactive Streams 인터페이스는 publisher, subscriber, subscription, proccor(?). 가 정의되어 있다. Reactor 의 Flux 와 Mono 는 publisher 의 구현체이다. 필자가 위에서 작성한 코드들은 subscriber 를 등록하지 않았다. Flux 내부적으로 subscriber 를 등록해줄 것이다. 아래와 같은 코드가 있다고 가정하자.

지금 실행한 메서드는.. subscribe 메서드 중에서 Consumer 를 변수로 받는 메서드 이다.

해당 메서드를 디버깅해본다. reactive.streams 의 Subscriber 를 구현하는 LambdaSubscriber 라는 애를 등록해주는 것을 알 수 있다. LambdaSubscriber 클래스 다이어그램은 간단하게 참고만 하자. 외울 필요는 없다.  


내부 로직에서, LambdaSubscriber 의 onSubscribe 메서드를 실행할 것이고, Subscription 의 구현체를 넘겨준다. 이때, subscriptionConsumer 이 null 이면, request 를 MAX_VALUE 값으로 호출해서 모든 데이터를 조회될 것이다.

reactor.core.publisher.LambdaSubscriber

reactor.core.publisher

여기까지 코드를 보면, 필자가 추측했듯이 Back-Pressure 를 따로 활용하지 않고, Max 데이터를 전송하라는 요청을 보내는 것이다.  더 나아가서 LambdaSubscriber의 onNext 를 보자. onNext 에서는 Consumer Function Interface의 accept 메서드를 실행한다.

reactor.core.publisher.LambdaSubscriber


필자는 조금씩 퍼즐이 맞춰지고 이해가 되기 시작했다. 혹시... 이 글이 이해가 잘 안된다면 필자가 사과하겠다. 설명을 잘 하고 싶지만, 직접 디버깅을 해보면서 확인해보길 바란다.


5.3 subscriber 등록


이제, subscriber 를 직접 등록하는 함수를 알아보자.


subscriber 직접 구현(비추천)


subscriber 를 등록하는 메서드는. Flux 클래스의 아래 메서드를 호출한다.

Subscriber 를 생성해서 매개변수로 호출해야 하는데, org.reactivestreams의 Subscriber 를 구현해서 추가해야 하기 때문에, 인터페이스에 정의된 모든 메서드를 구현해야 한다. 리액티브 스트림의 Subscriber 인터페이스의 메서드는 아래와 같다.

onSubscribe

onNext

onError

onComplete

필자의 샘플 코드를 보자. Subscriber 를 생성하면서 모든 메서드를 구현하였다. 메서드 하나라도 구현하지 않으면 오류가 발생할 것이다.

하지만, 피보탈의 Reactor 팀에서는 이런 방식을 추천하지 않는다. BaseSubscriber 클래스를 사용하는 것을 권장한다.


BaseSubscriber 구현


글 작성 중...



5.4 마무리


급하게 글을 마무리한다.... 그냥 샘플 코드를 참고하길 바란다.


https://github.com/sieunkr/reactive-spring/tree/master/05


다음 글에서는 Flux 와 Mono 의 데이터 프로세싱에 대해서 공부합니다!!

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari