brunch

You can make anything
by writing

C.S.Lewis

by 에디의 기술블로그 Dec 08. 2019

Drinking from the Stream

Pivotal Summit 2019 Seoul 발표 후기(2)

이 글은 Pivotal Summit 2019 Seoul 세미나 중 일부 세션에 대한 후기이다.


1. Diving Into Reactive (지난 글)

2. Drinking from the Stream (이번 글)


공식 발표 자료 및 영상은 아래에서 확인하길 바란다. 

https://connect.pivotal.io/summit_2019_seoul

https://www.youtube.com/watch?v=hzRZYItNhc4&feature=youtu.be&list=PLAdzTan_eSPSu1R7OvXBc631BI87Gv_Ud


해당 세션은 스프링 환경에서 메시지 스트림 기술에 대한 내용인데, SPring Cloud Stream 에 대한 내용이 핵심이다. 단, 발표자인 "마크헤클러"님은 소스코드를 RC2 및 Build-Snapshot 버전을 혼합해서 사용하였다. 필자는 "마크 헤클러"의 코드를 RELEASE 버전으로 변경하였다는 점을 이해해주길 바란다. 또한, 동시통역을 지원하지 않아서 영어를 잘 못하는 필자는 매우 이해하기 어려웠다. 


필자가 영어를 잘 못하기 때문에, 필자의 글에 잘못된 내용이 포함되어있을 가능성이 매우 높다. 반드시 공식 발표 영상을 찾아보길 바란다. "마크 헤클러"님은 자바 챔피언이며, 발표를 정말 재미있게 잘 한다. 문제는 영어를 잘 못하는 필자가 문제이다.


이 글은, 영어를 잘 못하는 필자가, 발표자(마크 헤클러)님의 소스코드를 하나씩 분석하면서, 필자 나름대로 재해석하였습니다. 필자의 글을 보기 전에 발표 영상을 보는 것을 강력 추천합니다. 



Spring Cloud Stream


해당 세미나 발표는 "Spring Cloud Stream"를 사용해서 분산 환경에서의 메시지 스트림 애플리케이션 구축에 대한 내용이다. 발표 데모 소스코드를 살펴보기 전에, 이해를 돕기 위한 관련 내용을 정리하였다. 메시지 패턴 기반의 통합 패턴에 대해서 전혀 모르는 개발자는 꼭 한번 읽어보길 바란다. 


사례

필자가 2년전에 구축했던 사례를 공유한다. 포털서비스에서 분산 시스템에서 실시간 데이터 전송을 위한 아키텍처 개선 사례이다. 

https://brunch.co.kr/@springboot/2

하지만, 해당 사례는 2년 전의 소스 코드이며, 당시 스프링 부트 버전이 1.5.X 라서 현재 사용하기에는 무리가 있다. 


메시지 통신하는 경우 무조건 Spring Cloud Stream(이하 SCTs) 를 사용해야 하는가? 

그렇지 않다. 특정 기술이 모든 것을 해결해 주지 않는다. 필자가 경험했던 SCTs 는 애플리케이션 통합을 심플하게 구축할 수 있도록 도와주지만, 반면에 몇기자 기술적인 제약이 존재한다. 메시지 브로커에서 제공하는 다양한 기능을 폭넓게 사용하고 싶다면 SCTs 를 사용하는 것은 바람직하지 않을 것이다. 그래서, 만약 메시지 브로커로 RabbitMQ 를 심도있게 사용하고 싶다면, SCTs를 사용하기 보다는 Spring AMQP 모듈을 직접 구현해서 사용하는게 좋을 수도 있다. Spring AMQP 관련해서는 아래 링크를 참고하길 바란다. 필자는 2년 전에 SCTs 를 선택하였지만, 각자의 선택은 여러분의 몫이다. 시스템을 맡고 있는 스스로 잘 판단해서 지혜롭게 결정하길 바란다. 

https://brunch.co.kr/@springboot/298


Spring Cloud Dataflow(이하 SCDF) 에서도 사용할 수 있는가? 

물론이다. SCTs 애플리케이션은 SCDF 에서 실행시킬 수 있으며, 쿠버네티스 같은 모던애플리케이션 런타임 환경에서 실행시킬 수 있다. 필자가 아무것도 모를 때 작성했던 글이라서 그닥 추천하고 싶지 않지만, SCDF 관련 한글 문서가 거의 없기 때문이 궁금한 개발자는 이 글이라도 읽어보길 바란다.

https://brunch.co.kr/@springboot/61


SCDF(Spring Cloud Dataflow) 관련해서는 나중에 각잡고 글을 작성하겠다. 


Message Broker

SCTs 에서는 공식적으로 두개의 메시지 브로커 연동을 지원하는데, Kafka 와 RabbitMQ 이다. SCTs 에서는 어떤 솔루션을 선택하든지 크게 상관없이 유연하게 구축할 수 있다. SCTs 는 확장성이 뛰어나며, 유연하게 메시지 기반으로 시스템을 통합할 수 있다. SCTs 에서는 아래와 같이 Source, Processor, Sink 애플리케이션으로 구분할 수 있다. 

병렬처리를 할 수 있도록 지원하며, 

유연하게 메시지 브로커를 변경할 수도 있다. RabbitMQ 에서 Kafka 로 변경한다면 아래와 같이 구성할수 있다. 


하지만, 레거시가 되어버린... Source, Processor, Sink

최근 버전의 SCTs 에서는 Source, Processor, Sink 개념은 레거시가 되었다...

레거시를 사용하는 대신에, 함수형 인터페이스를 사용해서 코드를 더 심플하게 작성할 수 있도록 제공한다. 

혹시라도... Java8 사용 경험이 없는 개발자는 아래 글을 먼저 읽어보길 바란다. 

https://brunch.co.kr/@springboot/276


이 글은 Pivotal Summit 발표에 대한 후기 및 소스코드 분석이다. 그래서, Spring Cloud Stream 에 대한 자세한 내용을 설명하지는 않겠다. 나중에 각잡고 글을 다시 쓰겠다. 이글은 세미나에 참석하지 못한 개발자 중에서 관심있는 분들에게 제공하는 글이며... 편하게 읽어주길 바란다.


샘플 코드, Demo


"마크 헤클러"님이 발표한 샘플 코드를 필자가 조금 수정하였는데, 발표자의 코드는 Maven 으로 작성하였지만, 필자는 Gradle 기반으로 다시 작성하였다. 또한, 발표자의 코드는 멀티모듈로 구성되었지만, 필자는 개별 애플리케이션으로 나누었다. 그리고 "마크 헤클러"님의 코드는 Docker, 쿠버네티스에서 실행할 수 있도록 구성되었지만 필자의 코드는 쿠버네티스 환경에 대한 설정은 전부 뺐다. 필자의 샘플 코드에 여러가지 수정사항이 있지만 필자가 설명하기 편한 방법으로 선택했을 뿐, Spring Cloud Stream 를 설명하는데 차이는 없다. 


"마크 헤클러"는 발표에서 "Source-Processor-Sink"를 사용하지 않고, 함수형 인터페이스를 사용하였다. 근데, 세미나 당일(11월중순) 기준으로 SCTs 의 버전이 Hoxton.RC2 까지 나왔었고 정식 발표가 되지 않은 상황이었다. "마크 헤클러"는 Hoxton.RC2 와 Hoxton.SNAPSHOT 버전을 섞어서 데모를 진행하였는데, 11월28일 Hoxton.RELEASE 버전이 공식 발표 되었기 때문에 이 글에서는 RELEASE 버전으로 다시 코드를 작성했다는 점을 이해해주길 바란다. 

https://spring.io/blog/2019/11/28/spring-cloud-hoxton-released


"마크 헤클러"님의 원본 소스를 보고 싶은 개발자는 아래 github 에서 확인할 수 있다. 

https://github.com/mkheck/drinking-from-the-stream


이 글은 발표자의 소스코드를 일부 수정하였고, Spring Cloud Stream 기술 디펜던시를 RELEASE 버전으로 개선하였다.


source, processor, sink 라는 이름으로, 세 개의 애플리케이션을 구현할 것이다. 


디펜던시

Source, Processor, Sink 각각의 애플리케이션에서 사용하는 디펜던시는 동일하며, 필자는 메시지 브로커로 RabbitMQ 를 사용할 것이다.

스프링부트 버전은 2.2.1 이다. Spring Cloud 버전은 Hoxton.RELEASE 이고 Spring Cloud Stream 버전은 Horsham 3.0.X 이다. 


기존 방법(레거시) 

기존 방법에 대해서 자세하게 설명하지는 않겠다. 발표자인 "마크 헤클러"는 레거시 라고 표현하였지만, SCTs 를 사용하는 대부분 애플리케이션에서 주로 사용하는 방법일 것이다.

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.RELEASE/reference/html/spring-cloud-stream.html#_annotation_based_binding_names_legacy


Functional binding names

샘플 코드는 기존 방식을 사용하지 않고, Functions 를 사용해서 바인딩한다. 프로퍼티 설정은 아래와 같이 정의해서 선언해줘야 한다. 

input

functionName + "-" + in + "-" + index

output

functionName + "-" + out + "-" + index 

https://cloud.spring.io/spring-cloud-static/Hoxton.RELEASE/reference/htmlsingle/#binding-with-functions

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.RELEASE/reference/html/spring-cloud-stream.html#spring_cloud_function


친절하게 설명하기 어려우니... 샘플 코드를 먼저 보자...


Source = Supplier

Source 애플리케이션은 메시지 스트림 영역에서, 제일 앞단에 위치한다. 처리를 하기 위한 초기 데이터를 만들어주는 영역이다. (만들수도 있고, 외부로 부터 수집할 수도 있다.) 어쩃든, SOurce 에서는 초기 데이터를 Processor 에 전달해주기 위해서 @Output 선언 방식으로 데이터를 메시지 브로커에 전달해줘야 한다. 하지만, 기존 @Output 으로 선언하던 방식 대신, Supplier 함수형 인터페이스를 선언해서 구현할 수 있다.  

이때 중요한 점은 "함수 네이밍"이다. sendCoffee 라는 함수 이름을 명명했다면, 프로퍼티 설정에서도 함수이름을 정의해서 아래와 같이 작성해줘야 한다. 

외부로 나가는 메시지 발행의 역할을 하기 때문에 -out- 으로 정의하였다. 놀랍게도, 메시지 발행을 위한 설정은 이걸로 끝이다. destination 은 RabbitMQ 의 Exchange 경로이며, binder=rabbit 는 메시지 브로커로 RabbitMQ 를 사용하겠다는 의미이다. 만약 카프카를 사용하고 싶다면 카프카 디펜던시를 추가한 이후, binder=kafka 로 변경하면 된다.  


...binder=kafka


놀랍지 않은가? Spring Cloud Stream 에서 강조하는 유연하게 메시지 브로커를 선택할 수 있는 추상화 전략이다. 어떤 메시지 브로커를 사용해도 내부 비즈니스 로직은 전혀 변경되지 않는다. RabbitMQ를 사용하기 때문에, 기본적인 연동 정보를 application.properties 파일에 셋팅해준다. 

스프링부트 와 Spring Cloud Stream 이 만들어내는 마법으로 아주 간단하게 메시지를 발행할 수 있다. 

처음 설명한듯이 우리는 SOurce, Processor, Sink 세개의 애플리케이션을 구축할 것이다. 만약, Source 애플리케이션을 단독으로 실행하게 되면, 메시지를 메시지브로커인 RabbitMQ에 발행하기 시작한다. 좀 더 정확히는, RabbitMQ의 Processor 이라는 이름의 Exchange 에 메시지를 발행하게 된다. 아래 캡쳐 화면은 processor 이라는 이름의 Exchange 가 생성된 화면이다.

아래 캡쳐는 processor 에 메시지를 발행하고 있는 화면이다. 

하지만, Exchange 에 바인딩 되어있는 Queue 가 아직 없기 때문에 어느 누구도 메시지를 수신하지는 않는다. RabbitMQ 는 Exchange 에 메시지를 발행하게 되는데, Exchange 에 바인딩 되어있는 Queue 중에서 라우팅 키에 맞게 메시지를 전달하게 된다. Source 애플리케이션을 단독으로 실행하게 되면, Exchange 에 바인딩 되어있는 Queue 가 없기 때문에 메시지가 전달되지 않는 것이다. RabbitMQ 관리도구를 확인해보면 아래와 같이 no bindings.. 라고 표시된다.


Kafka 와 RabbitMQ 의 동작 모델은 매우 다르다. RabbitMQ에서 Kafka 로 변경하는 일은 프로퍼티 설정정 한줄로 간단하게 해결할 수 있겠지만, 사실 각각의 메시지 브로커의 내부 동작방식이 매우 다르기 때문에 좀 더 섬세하게 검토를 해야한다. "마크 헤클러"님이 세미나에서 발표한 것처럼 간단하게 프로퍼티 설정만으로 모든 것이 해결될 것이라는 생각은 조금 위험할 수 있다.  


자, 이제 메시지를 초기 생성해서 발행하는 Source 애플리케이션을 구축하였다. 이제 Source 로 부터 메시지를 전달받아서 중간에 처리하고 Sink 에 발행해주는 Processor 를 구축해보자.


Processor

Processor 애플리케이션은 Source 와 Sink 중간에 위치한다. 그래서 어떤 데이터를 전달받아야 하며, 해당 데이터를 처리해서 Sink 에 데이터를 전달(발행)해야 한다. 자바 8 에서 제공하는 함수형 인터페이스 중에서, 매개변수도 있고, 리턴도 하는 함수형 인터페이스는 뭐가 있을까?


Function !!! 함수형 인터페이스를 사용하면 된다.

Processor 에서는 Source 에서 발행한 메시지를 구독하여 수신한다. 프로퍼티 설정에서 destination 은 Exchange 이고, group 는 Queue 이름이 된다. 만약 group 를 별도로 설정하지 않으면 익명의 Queue 이름으로 생성이 될 것이다. 또한 메시지를 수신받기 때문에 out 이 아니라 in 으로 설정해야 한다. function-in-index 의 타입으로 속성을 설정하면 된다. 필자는 샘플에서 익명의 Queue 를 셋팅하겠다. 또한 binder 를 rabbit 로 설정하여 RabbitMQ 를 메시지 브로커로 사용할 것이다.

Processor 에서는 Source 에서 전달받은 메시지를 중간 작업으로 처리한 후, Sink 에 메시지를 전달해야 한다. 이때는 반대로 메시지를 Sink 애플리케이션에 발행해야 하기 때문에 in 이 아니라 out 으로 설정해야 한다. 이때 sink 라는 이름의  Exchange 에 메시지를 발행하며, Exchange 에 발행하기 때문에 Queue 이름을 따로 설정할 필요는 없다. 왜냐면, RabbitMQ 에 메시지 발행할 때는 Queue 에 발행하는 것이 아니라 Exchange 에 발행을 하기 때문이다. 

"마크 헤클러"의 샘플 코드에서는 Processor 에서 순차적으로 실행해야 하는 함수는 두개이며, processIt 와 fixIt 이다. 두 함수를 순차적으로 실행해야 하는 조건으로 processIt | fixIt 라고 설정하였다. 지금까지 설명한 프로터피 설정은 아래와 같다.

Processor 애플리케이션이 실행이 되면 자동으로 Queue 가 생성이 된다. 위에서 설명했듯이 group 설정을 따로 하지 않으면 익명이 Queue 가 셋팅된다. 

Features 에 표시되는 "AD"는 Auto-Delete 값이 true라는 의미다. 즉, Processor 애플리케이션이 다운되면 자동으로 Queue 는 삭제될 것이다. 물론, Processor 애플리케이션이 실행되면 다시 Queue 는 다시 생성이 된다. 이때 중요한 사실은 Queue 가 어떻게 Exchange 에 바인딩 되어있는지 알아야 한다. 익명의 Queue 는 processor 라는 Exchange 에 바인딩이 되어있는데, 바인딩키는 # 으로 설정이 되었다. 

심플하면서도 충격적인 설정이다. 

이게 어떤 의미인지..주저리주저리 설명을 해보겠다. Exchange 에 바인딩 되어있는 processor.anonymous Queue 는 바인딩키가 # 으로 설정이 되어있기 때문에, Exchange 에 전달되는 모든 메시지를 수신하게 된다. 즉, Fanout 처럼 동작하는 것이다. RabbitMQ 의 Exchange 타입은 direct, topic, fanout 세가지 타입이 존재하며, 애플리케이션 기능에 맞게 설정하게 되는데 Fanout 은 어떤 메시지가 들어오던 상관없이 모든 Queue 에 발행을 하게 된다. 즉, Exchange 가 topic 타입이지만 fanout 처럼 동작하게 되는 것이다. 


더이상의 자세한 설명은 패스한다. 필자의 설명이 오히려 개발자를 혼란스럽게 하고 있다. 아마 이 글을 읽다가 너무 지루해서 읽는 것을 포기했을 것이다. 필자도 잘 모르겠으니 RabbitMQ 및 AMQP 관련 모델을 각자 찾아보길 바라며...


Sink

마지막으로 Sink 애플리케이션을 실행해보자. 해당 애플리케이션은 스트림 의 마지막 과정으로 최종 데이터를 저장하는 역할을 한다. 프로퍼티 설정은 아래와 같다. 메시지를 수신하기 때문에 in 설정만 하였다.

drinkit 라는 함수를 사용할 것이며, in 으로 설정되어있기 때문에 메시지를 수신하게 된다. group 설정을 따로 하지 않았기 때문에 메시지는 익명으로 sink.anonymous... 라는 Queue 가 생성이 된다. 해당 Queue 는 sink 라는 Exchange 에 바인딩이 되며, 키는 # 로 설정이 되어서, Topic 타입의 Exchange 이지만 Fanout 처럼 동작한다.

Sink 는 Processors 에서 처리한 데이터를 전달받아서 소비한다. 즉, 매개변수는 있지만 리턴값이 없는 Consumer 함수형 인터페이스를 사용한다. 이때 필자는 application.properties 에서 "consumer.concurrency=3" 을 설정하였다. 즉, 3개의 쓰레드를 생성해서 병렬처리를 하게 된다. 

Sink 애플리케이션에서 메시지를 수신해서 처리하는 핵심 함수를 Consumer 함수형 인터페이스를 사용해서 아래와 같이 작성하였다.

Source, Processor, Sink 애플리케이션을 모두 실행해보자. Source 에서 최초로 생성된 메시지는 Processor 에서 중간 처리를 한 후, 마지막으로 Sink 에 전달되게 된다. 로그에서 쓰레드 이름을 찍어보면 아래와 같이 3개의 쓰레드가 동작하며, 쓰레드 이름은 "Queue 이름 - 쓰레드번호" 가 되는 것을 확인할 수 있다. 

샘플 코드는 아래 링크에서 확인하길 바라며...

https://github.com/sieunkr/pivotal-summit-seoul-2019/tree/master/drinking-from-the-stream

https://github.com/mkheck/drinking-from-the-stream


마무리


대충 글을 마무리하겠다...


Spring Cloud Stream

다들 잘 알고 있겠지만, RabbitMQ 와 Kafka 는 가장 유명하고 많이 사용하는 메시지 플랫폼이다. 이 글에서 RabbitMQ 와 Kafka 를 비교하거나, 장단점을 정리하지는 않겠다. 필자는 시스템 통합을 위한 메시지 플랫폼으로는 RabbitMQ 를 선호하고, 로그나 데이터 수집을 위해서는 Kafka 를 선호한다. SPring CLoud Stream 에서는 어떤 메시지 플랫폼을 사용하는지 상관없이 추상화해서 기능을 제공한다. 만약 이런 추상화 기능이 제공하지 않는다면, 각 메시지 플랫폼에 맞게 컨피그레이션 설정과 비즈니스 로직을 각 메시지 플랫폼에 맞게 개발해야 한다. 하지만 Spring Cloud Stream 을 사용한다면, 컨피그레이션 설정과 비즈니스 로직을 별도로 구분할 필요가 없다. 스프링부트 와 Spring Cloud Stream 에서 컨피그레이션 설정을 알아서 해줄것이다. 개발자는 application.properties 에 속성만 설정해주면 되기 때문에 매우 유연하며 마이크로서비스 환경에서 아주 유용하게 사용할 수 있다. 하지만, 심플한 반면에 심화 기능을 커스터마이징하기 쉽지 않을 것이며, 여러가지 개발 제약사항이 존재한다. 글 초반에도 설명했지만 Spring Cloud Stream 을 사용하지 않고, SPring AMQP 또는 Spring Kafka 모듈을 직접 사용하는 것이 오히려 개발하기 편할수도 있다. 어쩃든, Spring Cloud Stream 이 제공하는 마법은 아주 심플하고 좋지만, 때로는 너무 낮은 수준의 기능을 제공하기 때문에, 상용 서비스에서 핵심 기술로 사용하는 것은 조금은 고민해볼 필요는 있다. 물론, 필자는 적극적으로 사용하였고 크게 문제가 되지는 않았다. 각자 시스템 상황에 맞게 잘 생각하길 바란다. 

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari