리액티브 스트림 기반의 RSocket 를 스프링 환경에서 구현하기
최근 몇일 전에 스프링 프레임워크 5.2 버전이 공식 발표 되었는데, 해당 버전에서는 신규 기능이 많이 추가 되었다. 신규 기능 중 주목해야할 내용은 바로, RSocket 에 대한 지원이다. 비록, 스프링부트 에서는 아직 공식적으로 지원하지 않지만, Pivotal 에서 열심히 개발중인 2.2.X 버전부터 RSocket를 지원할 예정이고, 10월안에는 정식 버전이 나올 것으로 추측된다. (참고로 현재 RC1 버전까지 나온 상황이다.) 어쨋든, 이번 글에서는 RSocket 에 대해서 간단하게 알아보고, 스프링 환경에서 RSocket 을 어떻게 사용할 수 있는지 알아보자.
RSocket 은 리액티브 스트림을 지원하는 통신 프로토콜이다. 넷플릭스에서 처음 개발하였고, 마이크로서비스 환경에서 주로 사용하며, 기존의 HTTP 전송을 대체할 수 있다. 이 글에서 RSocket 에 대한 모든 개념을 설명하지는 않는다. 이 글에서는 간단하게 소개만 하고, 상세한 내용은 각자 알아서 공부하면서 찾아보길 바란다.
웹서비스는 HTTP 라는 프로토콜을 사용하여 서버간 통신을 한다. HTTP 는 클라이언트와 서버의 역할이 명확하게 나누어져있다. 클라이언트 측에서 요청(Request)를 보내고, 서버 측에서 응답(Response)이 되돌아 오는 구조이다. 이 글에서는 HTTP 에 대해서 자세하게 다루지는 않지만, 가볍게 알아보고 싶은 개발자는 필자의 허접한 예전 글을 읽어보자.
https://brunch.co.kr/@springboot/16
https://brunch.co.kr/@springboot/19
상세하게 공부하길 원한다면 "HTTP 완벽 가이드" 라는 책을 꼭 읽어라.
HTTP 는 폴링 방식으로 통신을 하는데, 폴링 방식은 클라이언트가 요청하면 서버가 응답하는 방식이다. HTTP를 사용하면서 상호작용을 위한 기능을 제공하기 위해서는, 다양한 방법(Ajax 등)의 기술을 사용할 수 있는데, 복잡하게 기능을 구현을 해야 하기 때문에 웹서비스 구현 시 가장 어려운 부분이다.
최근 마이크로서비스 아키텍처가 일반화 되면서 다양한 서버 통신 방법이 사용되고 있다. 오래전부터 사용하던 HTTP 도 실무에서 여전히 많이 사용하며, 최근 기술인 GRpc, AMQP, 웹소켓 등 다양한 기술이 혼합해서 사용되고 있다. HTTP 가 비록 MSA에 완벽하게 적합하지는 않지만, Rest API 라는 심플하고 익숙한 기술은 여전히 장점이 많은 기술이다. RSocket 에 대해서 논하기 전에 HTTP 기반의 폴링, 롱폴링 및 웹소켓 등 전반적인 시스템 간의 통신 기술에 대해서 이해를 해야 한다. 나중에 시간이 되면 HTTP 에서부터 RSocket 에 이르는 기술에 대해서 상세하게 글을 작성하겠다.
HTTP, 웹소켓, AMQP 등 통신 프로토콜에 대해서 전반적으로 공부를 하길 바란다. 필자도 틈틈히 공부를 진행해서 다음에 글을 꼭 남기도록 하겠다. 약속!!
RSocket 은 4가지 방식으로 통신을 한다.
Fire-And-Forget (no response)
Request-Response (stream of 1)
Request-Stream (finite stream of many)
Channel (event subscription, infinite stream of many)
생략
생략
생략
생략
자세한 내용은 아래 링크를 읽어보길 바란다.
https://github.com/rsocket/rsocket/blob/master/Protocol.md
자바 라이브러리로 간단하게 RSocket 통신을 구현해보자. RSocket 를 이해하기 위해서는, 반드시 리액티브 스트림에 대해서 사전에 이해를 해야 한다.
https://brunch.co.kr/@springboot/153
또한, RSocket Java 라이브러리는 Project Reactor 를 사용한다. 그래서, Flux 또는 Mono 와 같은 Reactor의 Publisher 구현체가 사용된다. Flux, Mono에 대해서 전혀 모르는 개발자는 반드시 Project Reactor에 대해서 공부하고 다시 돌아오길 바란다.
https://brunch.co.kr/@springboot/154
이 글은 리액티브 스트림, Flux, Mono 등 Reactor 에 대해서 반드시 이해를 하고 읽어야 한다.
RSocket 라이브러리의 RSocket 인터페이스에는 RSocket 통신을 위한 메서드가 정의되어있다. 개발자는 RSOcket 인터페이스를 구현해서 리액티브 시스템을 구축하면 된다.
fireAndForget 메서드는 단방향 푸쉬 알림에 주로 사용하는데, 반환 타입은 Mono<Void> 로 정의되어 있다. Mono<Void> 로 반환한다는 의미는, fireAndForget 메서드로 부터 어떤 데이터도 응답받지 않는다는 의미다.
requestResponse 메서드는 Mono<Payload> 를 반환한다. Mono 는 리액티브 스트림 Publisher의 구현체인데, 단일 데이터 또는 스트림을 전달받는 역할을 수행한다. 우리에게 익숙한 HTTP 통신과 유사한 방식이라고 생각해도 된다.
반면에 requestStream 은 Mono 가 아닌 Flux 로 응답받는다. 즉, 단일 데이터가 아니라 다수의 데이터 또는 스트림을 전달받는다.
마지막으로 requestChannel 은 ... 설명을 생략하겠다. 나중에 다시 공부하자. (가장 중요한 내용이지만...)
RSocket 는 인터페이스이다. 인터페이스만으로 우리는 아무것도 할 수 없다. 가치있는 하기 위해서는 인터페이스를 구현하는 구현체를 만들어야 한다. Java 라이브러리에서는 RSOcket 를 구현하는 AbstractRSocket 라는 추상 클래스를 제공하는데, 역시 해당 클래스만으로도 할 수 있는게 없다.
RSocket를 구현하는 AbstractRSocket 추상 클래스를 상속받아서, 필요한 기능을 구현하자.
아직까지는, 이해하기 어려울 수 있다. 샘플 코드를 차분히 보면서 함께 보도록...
간단한 Gradle 기반 자바 애플리케이션을 생성한다.
샘플코드이기 때문에, 편의상 RSocket 서버, 클라이언트 코드를 동일한 애플리케이션에서 실행한다.
#서버
서버 코드에서는 AbstractRSocket 를 상속하는 findAndForget 메서드를 오버라이딩 해서 필요한 기능을 구현하였다. findAndForget 모델은 서버에서 데이터를 반환하지 않는다. 그래서 Mono<Void> 를 리턴하는데, return Mono.empty 를 선언해주면 된다. 그리고, transport 에 TcpClientTransport 를 사용하였다. RSocket 는 TCP 또는 WebSocket 를 유연하게 선택해서 사용할 수 있다. 만약, WebSocket 기반에서 동작하고 싶다면, WebsocketClientTransport 를 사용하면 된다.
#클라이언트
클라이언트에서는 RSocketFactory 클래스를 사용해서 RSocket 에 연동할 수 있다. 애플리케이션을 실행해보자.
클라이언트에서 "Hello WOrld" 라는 메시지를 findAndForget 모델로 전송하였다. 서버에서는 전달 받은 메시지를 잘 출력하였다. 아주 심플한 예제이다.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/rsocket-basic/fire-forget
Request-Response 모델은 우리에게 친숙한 HTTP 방식과 유사하다.(HTTP와 유사하지만 다른 점도 많다.) 단일 데이터 또는 단일 객체를 응답받는 모델이며, HTTP와는 다르게 완벽하게 논블록킹 방식으로 동작시킬 수 있다. (참고로... RSocket 를 사용한다고 무조건 논블록킹을 보장하는 것은 아니다.... 개발자가 개발을 그지같이 하면 예상하지 못한 부분에서 블록킹이 발생할수 있다.) 어쩃든, Request-Response 모델을 완성하기 위해서는 AbstractRSocket 의 메서드 중에서 requestResponse 메서드를 오버라이딩한다. 그리고, Mono<Payload> 또는 Mono<데이터타입> 를 반환해야 한다. fire-and-forget 에서는 Mono<Void> 를 반환했지만, RequestResponse 메서드는 단일 객체를 클라이언트에 전달해야 하기 때문에 Mono<Payload> 를 반환한다. 하지만, 단일 데이터 응답이기때문에 Flux 를 사용하지는 않는다.
#서버
#클라이언트
클라이언트에서는 Mono<Payload> 를 전달받는데, getDataUtf8 메서드를 사용해서 데이터를 꺼내면 된다.
코드는 아주 심플하고 간단하지만 만약 코드가 눈에 잘 안들어오는 개발자는 이 글을 그만 읽어라. 사실, 필자도 잘 모른다. (그만 읽으라고 해서 진짜 그만 읽으면 서운하다.) 필자의 성의를 생각해서라도 억지로라도 읽어보고 꼭 피드백을 해주길 바란다. 암튼, 애플리케이션을 실행하면 아래와 같다.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/rsocket-basic/request-response-model
Request Stream 모델은 단일 객체를 응답하는 request-response 모델과는 다르게,
2개 이상의 데이터 또는 객체를 스트림 방식으로 응답하는 모델이다.
#서버
#클라이언트
subscribe 메서드를 유심히 살펴보자.
requestStream 메서드를 실행하면 Flux<Payload> 를 반환한다.
응답받은 Flux<Payload> 를 subscribe 메서드를 사용해서 원하는 방식으로 데이터를 조작할 수 있다. subscribe 메서드는 다양한 오버로딩 메서드를 제공하는데, 필자의 샘플 코드에서는 데이터 처리 및 에러처리, 완료 처리가 모두 가능한 메서드를 사용했다.
Flux 또는 Mono 의 subscribe 에 대해서 잘 모르는 개발자는 아래 글을 참고하길 바란다.
https://brunch.co.kr/@springboot/155
애플리케이션을 실행해보자. subscribe 의 complete 리스너에 "completed"를 출력하도록 작성하였는데, 아래 샘플과 같이 정상적으로 완료 된 것을 확인할 수 있다.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/rsocket-basic/request-stream
가장 중요하지만... Channel 모델에 대한 자세한 설명은 생략한다. 나중에 다시 공부해서 작성하겠다.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/rsocket-basic/channel
지금까지 자바 라이브러리를 사용해서 RSocket 서버, 클라이언트 통신을 구현하였다. 필자는 실무에서 스프링을 주로 사용하기 때문에, 위와 같은 방식으로 구현할 일은 없을 것 같다. 이제 본격적으로 스프링 환경에서 검토를 해보자.
단, 스프링부트 버전에 따라서 연동 방식에 차이가 있다. 2.1 버전과 2.2 버전에서의 방식이 다르기 때문에 주의 깊게 이 글을 읽어보길 바란다.
최근에 스프링 프레임워크 5.2.RELEASE 가 발표 되었는데, RSocket 에 대한 공식 지원이 포함되었고, 스프링 부트에는 10월 중에 발표 되는 2.2.0.RELEASE 버전에서 RSocket 이 포함되어 나올 예정이다. 마일스톤 버전이지만, 스프링 부트 2.2.0.M6 버전에서 RSocket 을 연동한 스프링 부트를 사용할 수 있다. 2.2.X 에서는 스프링부트에서 RSocket 의 AutoConfiguration 을 제공하기 때문에 RSocket 을 좀 더 심플하게 사용할 수 있다. 또한 스프링 기존 Messaging 모듈과 통합할 수 있어서 추상화된 메시지 컴포넌트 구현이 가능하다. 그래서, 스프링 부트 버전에 따라서 RSocket 를 구현하는 방식이 차이가 있는데 먼저, 자체적으로 RSocket 컨피그를 정의해서 사용하는 방식을 알아보자. (필자가 실무에서 해본 방식이 아니라서, 확신이 없다. 해본 개발자가 있다면 피드백을 좀 해주길 바란다.)
10/3(목) 기준으로 스프링 부트의 2.1.X 릴리스 노트는, 2.1.9.RELEASE 버전까지 발표되었다. 수차례 설명했지만 해당 버전에는 RSocket 에 대한 지원이 되지 않는다. 간단하게 심플한 스프링 부트 애플리케이션을 생성하고, 스프링 부트 웹 디펜던시는 추가하지 않는다. 웹 디펜던시를 추가하지 않는 대신 우리는, io.rsocket. 로 시작하는 RSocket 모듈 디펜던시를 추가해야 한다.
컨피그 구성은 아래와 같다. 스프링 부트 웹을 실행하는 것이 아니라, RSocket 서버를 실행하고 block 를 걸어서 애플리케이션이 계속 실행하도록 하였다. 필자가 RSocket 서버를 구성해본 경험이 없기 때문에, 사실 해당 방법이 맞느지 확신은 없다.
AbstractRSocket 를 상속받아서 SimpleRSocketImpl 이라는 컴포넌트 클래스를 생성하였다. 각 메서드의 리턴 타입인 아래와 같다.
fireAndForget : Mono<Void>
requestResponse : Mono<Payload>
requestStream : Flux<Payload>
requestChannel : Flux<Payload>
필자가 만든 SimpleRSocketImpl 클래스에 필요한 기능을 구현하였다.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/spring-boot-rsocket-basic/server
RSOcket 클라이언트 코드는 아래와 같다.
자세한 내용은 생략한다. 이런 느낌으로 코드를 작성하면 될 것 같은데... 혹시 스프링 부트 2.1 이하 버전에서 RSocket 경험이 있는 개발자는 댓글로 피드백을 남겨주길 바란다.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/spring-boot-rsocket-basic/client
이제, 드디어!!!!
스프링 5.2, 스프링부트 2.2.X 버전에서의 RSocket 에 대해서 알아보자.
지난 주에 스프링 프레임워크 5.2 버전이 정식으로 발표되었는데 자세한 내용은 공식 블로그에 방문해서 읽어봐라.
https://spring.io/blog/2019/09/30/spring-framework-5-2-goes-ga
스프링 5.2에 꽤 많은 변화가 있는데, 그 중 눈에 띄는 내용은 Spring Messaging 에 RSocket 이 지원된다는 내용이다.
스프링부트 2.2.0.RC1 버전에서 테스트를 진행하겠다. 참고로, 2.2.0.M1~M6 마일스톤 버전이 업그레이드되면서 RSocket 메서드가 일부 변경이 되었다. 이 글을 읽는 개발자 중 RSocket 스프링부트 샘플 코드를 구글링해서 참고하는 개발자는 반드시 부트 버전을 확인하길 바란다. 또한, 필자의 코드는 RC1 버전이고, 2.2.0.RELEASE 공식 버전에서 바뀔 가능성도 있으니 주의가 필요하다. RC1 버전은 상용에서 사용하기 아직 미흡하니 좀 더 지켜보도록 하자.
#. server 셋팅
스프링부트 버전은 2.2.0.RC1 기반이고, spring-boot-starter-rsocket 디펜던시를 추가해준다. 참고로 부트 2.2.0.X 부터는 JUnit 5 테스트가 공식적으로 지원한다. 스프링부트에 의해서 RSocket 서버는 자동으로 컨피그가 생성이 된다. 그래서, 2.1.X 에서 했던 소스처럼 RSocket 서버를 실행하는 코드를 작성하지 않아도 된다. 대신, RSOcket 관련 설정을 애플리케이션 프로퍼티 설정에 추가해야 한다. application 프로퍼티 파일에 RSocket 서버의 포트를 지정해주자. 필자는 7000 으로 설정하였다.
애플리케이션을 실행하면 RSocket 서버가 잘 실행이 된다.
스프링부트에 연동된 RSocket 서버는 @MessageMapping 어노테이션으로 심플하게 엔드포인트 구성이 가능하다. 즉, AbstractRSocket 클래스를 상속받아서 구현할 필요가 없다.
단, 리턴 타입을 RSocket 의 통신모델에 맞게 잘 정의해줘야 한다. RSocket 통신 모델은 아래와 같이 동작한다.
HTTP 와 유사한 Request-Response 방식을 사용한다면, 전달 받을 응답 데이터는 한개의 객체가 되어야 한다. 즉, 리턴 타입이 Mono<Object> 이 되어야 한다. Request-Stream 방식을 사용한다면 응답 데이터는 1개 이상의 스트림 데이터이기 때문에, 응답 타입은 Flux 를 사용해야 한다.
#. client 셋팅
클라이언트는 서버와 디펜던시가 거의 같게 구성하였다. 단, 클라이언트는 또다른 컴포넌트에 데이터를 제공하기 위해서 webflux API 엔드포인트를 구성하기 위해서 starter-webflux 디펜던시를 추가하였다.
web MVC 모델을 사용해서 코드를 작성해도 되지만, 애플리케이션의 전체 구조를 모두 비동기&논블록킹 환경으로 구성을 하기 위해서 webflux 를 사용했다. web MVC 를 사용하면 서블릿 단계에서 블록킹 구조로 동작한다.
RSocketRequester 의 builder 또는 wrap 메서드를 사용해서 RSocketRequester Bean 을 정의해줘야 한다. 단, 위에서도 설명했지만, 스프링부트 마일스톤 버전에 따라서 사용하는 메서드가 다르다. 스프링부트 2.2.0.M2 초반 마일스톤 버전에서 사용하던 RSocketRequester.create 메서드는 사라졌다. 아마도 개발하는 과정에서 wrap 메서드로 변경이 된 것으로 추측된다. 스프링 프레임워크 5.2 버전이 정식 출시 되었지만, 스프링 부트 2.2.0은 아직 RC1 버전이기 때문에 좀 더 지켜봐야 한다. 스프링 5.2가 공식 출시되었기 때문에 왠만해서는 안바뀔거 같지만, 혹시 모르니 나중에 부트 2.2.0.RELEASE 버전이 출시되면 그때 다시 확인하길 바란다.
1개의 객체를 전달하는 모델이다. HTTP 방식과 유사하지만, HTTP 와는 다르게 논블록킹 방식으로 동작한다.
#. server
서버에서는 @MessageMapping 어노테이션을 사용해서 엔드포인트를 구성한다. 그리고, 리턴 타입은 Mono<Coffee> 로 하였다. 내부 비즈니스 로직은 이글에서 중요하지 않으니 Repository 내부 코드의 설명은 생략하겠다.
#. client
클라이언트에서는 RSocketRequester 를 의존성 주입해서 사용한다.
.route 에 RSocket 서버에서 @MessageMapping 으로 정의한 경로를 선언해준다. 참고로, 커피의 이름을 전달하면 이름에 맞는 커피를 조회하는 역할을 수행한다. 그래서, RSocket 통신에서 클라이언트는 커피의 이름을 전달한다. 최종적으로 응답받은 데이터는 retrieveMono 메서드를 사용해서 Mono<Coffee> 로 변환해준다.
#. server
1개 이상의 데이터 스트림을 전달해야 하기 때문에, Flux를 사용해야 한다. (Mono 는 단일 데이터 스트림에서 사용한다._)
#. client
클라이언트에서는 FLux 로 데이터를 받으면 된다.
#. server
서버에서는 클라이언트로 받은 요청을 처리만 하고, 그 어떤 데이터를 응답하지 않는다. 그래서 Mono.empty() 를 리턴해버리면 된다.
#. client
GetMapping 이 아니라, PostMapping 으로 작성해봤다.
또한, requester 의 최종 메서드는 send() 메서드를 실행한다. send 메서드는 Mono<Void> 를 반환한다. 스프링에서 제공하는 클래스로 이동해서 확인해보면 아래와 같다.
RSOcket 에서 제공하는 통신 모델 중에서 가장 중요한 Channel 에 대해서 공부를 해야하는데...
#. server
ㅇ..
나중에 작성하겠다... 가장 중요한 기능이기는 한데, 좀 더 공부해서 잘 정리해서 글을 남기는게 좋겠다는 생각이다. 필자가 대충 이해는 하고 있지만, 좀 더 공부해서 정리해서 남기는게 좋겠다는 생각이다. 코드는 필자의 github 을 확인하길 바라낟.
https://github.com/sieunkr/spring-boot-rsocket/tree/master/spring-boot-rsocket/server
https://github.com/sieunkr/spring-boot-rsocket/tree/master/spring-boot-rsocket/client
집중력이 떨어져서... 급히 마무리해야겠다....
사실, 필자는 RSocket 를 실무에서 사용해본적이 없기 때문에 RSocket에 대해서 상세하게 작성할 능력이 안된다. 그래서, 중요한 내용, 예를 들어서 Channel 모델 등에 대한 내용은 이 글에서 생략되었다. 그리고, 예외 처리에 대한 내용 역시 매우 중요한데 이 글에서는 생략하였다. 전반적으로 부족한 글이지만, RSocket 가 무엇인지 이해할 수 있는 수준으로 간략하게 정리하였다. 많이 부족한 글이지만, 이 글을 읽는 개발자에게 조금이라도 도움이 되기를 바라며, 나중에 RSocket 심화 내용은 실무에서 사용할 일이 생기면 그때 다시 공부해서 작성할 예정이다.