- 실제 서비스 연동 사례
스프링 클라우드(Spring Cloud), MQ(Message Queuing) 를 연동한 서비스 도입 사례를 공유합니다.
관련 연구 3 - Spring Cloud Stream, RabbitMQ 연동
Spring Cloud Stream, RabbitMQ 실제 서비스 연동(현재글)
예상했던, 예상하지 못했던 추가 작업
부록 A - RabbitMQ 연동하는 다양한 방법
부록 B - Spring Cloud Stream 버전 이슈
정리 - Event-driven microservices
사내 서비스 개선사항이기 때문에 공개하기 어려운 대외비는 내용에서 제외하였습니다. 서비스 아키텍처는 외부에 공유해도 되겠다는 판단으로 공유합니다. 개선사항 의견을 말씀해 주시면 감사드리겠습니다.
이전 포스팅 참고부탁드립니다.
https://brunch.co.kr/@springboot/2
https://brunch.co.kr/@springboot/3
https://brunch.co.kr/@springboot/6
https://brunch.co.kr/@springboot/9
드디어 이번 글에서는 Spring Cloud Stream, RabbitMQ 서비스 연동 사례를 공유합니다. Spring Cloud Stream 에 대해서는 지난 관련연구 글에서 간략하게 정리하였습니다. 이번 글에서는 서비스에 적용하는 과정에서 고민했던 내용 중심으로 글을 작성하고자 합니다. 상세 기술에 대해서는 Spring Cloud Stream 공식 레퍼런스에서 확인하시면 됩니다.
https://cloud.spring.io/spring-cloud-stream/
RabbitMQ 를 연동하는 방법은 매우 많습니다만, 제가 아는 방법은 아래 3가지 입니다.
1. RabbitMQ 제공 라이브러리
2. Spring AMQP Project
3. Spring Cloud Stream
1번 방법은 RabbitMQ 제공 라이브러리를 그대로 사용하는 방법입니다. 2번과 3번 방법은 Spring Framework 프로젝트를 연동하는 방법입니다. 참고로 다양한 환경에서의 연동 방법은 RabbitMQ 공식 홈페이지에서 상세하게 확인 가능합니다.
https://www.rabbitmq.com/devtools.html
아무튼 처음 검토 시점에서부터 Spring Cloud Stream 을 연동할 계획으로 시작하였습니다. 지난 스프링캠프 2017에서 인상깊게 들었던 Spring Cloud Stream & Kafka 기반의 실시간 배포시스템 구축 사례를 보고, 제가 담당하고 있는 서비스에도 같은 방법으로 도입을 하면 되겠다고 판단했기 때문입니다. 하지만, 실제 적용 과정에서는 어려움이 있었습니다. 스프링 부트(Spring Boot) 버전 으로 인해서 초반 검토시 많은 고민을 하였습니다. 적용할 프로젝트의 스프링부트 버전이 매우 낮은 1.1.3 버전이었습니다. Spring Cloud Stream 를 적용할려면 기본적으로 스프링부트 디펜던시 적용이 필요하기 때문에 스프링부트 버전을 올려야 하는데... 스프링 부트 버전을 너무 높이 올리면 다른 서비스에서 장애가 날 수도 있다는 생각에, 리스크를 줄이기 위해서 스프링 부트 버전을 1.3.7 정도까지만 올리고 Spring Cloud Stream Binder Rabbit 1.0.3 RELEASE 를 적용하기로 초반에 검토하였습니다. 도입사례가 많지 않았고 1.0.3.RELEASE 버전에 대해서 상세하게 알지 못한 상황이었습니다. 다른 업무로 인해서 매우 바쁜 상황이라서 Spring Cloud Stream 를 연동하면 아주 심플하게 구현할 수 있겠다는 기대감(?)이 있었기 때문에 희망을 갖고 그대로 진행을 하였습니다만, 아래와 같은 이슈가 발견 되었습니다.
RabbitMQ 연동할 때 Exchange Type 를 지정하지 못함. 오직 Topic 타입만 사용
라우팅키 설정이 매우 제한적임
[관련 연구 2 - RabbitMQ]에서도 정리 했듯이, RabbitMQ 에서 Topic 타입은 바인딩키에 따라서 Direct 처럼, 또는 Fanout 처럼 사용은 가능합니다. Spring Cloud Stream 1.0.3.RELEASE 에서도 마찬가지로 Topic 타입만 제공하지만 바인딩키 설정에 따라서는 Direct 처럼, Fanout 처럼 사용은 가능합니다만 너무 제한적이었습니다. 바인딩키 설정 조차 라이브러리가 강제로 설정을 해버립니다. RabbitMQ 를 쓰는 이유가 없을 정도로 의미가 없는 연동이라는 판단이었고, 커스터마이징 하기 매우 어렵다고 생각해서 좌절했습니다. 혹시 이 글을 보시는 분이 있을지 모르겠지만, 혹시라도 보신 분중에 1.0.3.RELEASE 를 적용하신 분이 있다면 의견 좀 부탁드리겠습니다. 제가 잘못 알고 있는건지... 주변에 써본 사람이 없어서 조금 답답했습니다. Pivotal 에 문의를 하는게 빠를수 있겠네요. 암튼 그래서 저는 좀 더 고민이 필요했습니다.
Spring Cloud Stream 1.0.3.RELEASE 를 그냥 사용할까?
Spring Cloud Stream 를 쓰지 말고 그냥 Spring AMQP Project 를 사용할까?
Spring Cloud Stream 버전을 더 올려볼까?
사실 지금 아키텍처에서는 간단한 Message 전송하기 위한 MQ 도입이기 때문에 Exchange Type 및 바인딩 키 설정이 결정적으로 중요하지는 않았습니다만 대충개발하고 싶지는 않았고... 추후에 확장성 측면에서 봤을 때는 너무 좋지 않다는 판단이었습니다. 그래서 Spring Cloud Stream 버전을 올려본 이후에 여전히 같은 문제가 발생을 한다면 Spring Cloud 도입을 포기하고, Spring AMQP Project 를 사용하기로 하였습니다. 다행히고 검토 결과 Spring Cloud Stream 1.2.0.RELEASE 에서는 많은 부분이 개선이 되었습니다. 물론, 1.2.0.RELEASE 로 올리는 과정으로 인해서 스프링부트의 버전을 1.5.2 까지 올려야 했습니다. 스프링 부트 버전 업그레이드로 인한 소스 수정이 많았기 때문에 기획팀 협조하에 많은 테스트를 수행하였습니다. 리스크를 최소화 할 수 있도록 꼼꼼히 진행하였고 다행히 서비스 도입 결과 문제가 전혀 없이 깔끔하게 서비스를 하고 있는 상황입니다. 해당 내용을 연동사례에 대한 정리가 끝나고 [부록 A - RabbitMQ 연동하는 다양한 방법], [부록B - Spring Cloud Stream 버전 이슈] 에서 정리할 예정입니다.
[부록 A - RabbitMQ 연동하는 다양한 방법] 에서는 세가지 연동 방법에 대해서 정리할 예정입니다. RabbitMQ 제공 라이브러리, Spring AMQP Project, Spring Cloud Stream... 입니다. 사실 Spring Framework 는 RabbitMQ 를 연동을 쉽고 심플하게 구현하기 위해서 Annotation 등의 기술을 제공해줄 뿐 내부 로직은 결국 RabbitMQ 에서 제공해주는 라이브러리를 사용합니다. [부록B - Spring Cloud Stream 버전 이슈] 에서는 Spring Cloud Stream 의 버전 이슈에 대한 내용을 작성할려고 합니다. 참고로 해당 프로젝트를 진행했던 당시가 올해7월이었고 당시에 1.2.0.RELEASE 가 나름 최신 버전이었습니다만, 지금 글을 쓰는 12월에는 2.0.0.M3 버전까지 나온 상황입니다.
주저리주저리 글을 너무 지루하게 쓰고 있습니다만, 어차피 자세히 보는 사람도 많지 않을테고, 그냥 저를 위한 글이기는 합니다. 까먹기 전에 기록을 남기고 싶었습니다!!
이전 글에 작성한 연동 방법을 참고 부탁드리며, Consumer 입장에서 메시지를 구독하는 방법 위주로 작성을 합니다. @EnableBinding , @StreamListener 등의 아주 간단한 Annotation 및 소스 몇줄만 추가하면 연동이 가능합니다. 그래서 실제로 백단에서 어떤 프로세스가 돌아가는지는 라이브러리를 보기 전에는 알기 어렵습니다. 라이브러리에서 실제로 어떻게 연동이 되는지 궁금하시다면 아래 시퀀스 다이어그램을 바탕으로 라이브러리 소스를 직접 까보시면 됩니다.
제가 그린 그림이라서 이해하기 어려울 수 있습니다만... ConnectionFactory –> Connection –> Channel 생성의 순서로 연동이 된다는 것만 아셔도 됩니다.
RabbitMQ에 커넥션을 맺고 채널을 생성하는 과정입니다.
커넥션을 맺고 채널을 생성한 이후에는 Exchange 생성, Queue 생성, Exchange 와 Queue 바인딩 등의 주요 과정을 수행합니다. 물론 이 과정도 라이브러리를 까보기 전에는 모릅니다.
주요 로직은 RabbitExchangeQueueProvisioner -> provisionConsumerDestination() 에서 확인 가능합니다.
org.springframework.cloud.stream.binder.AbstractMessageChannelBinder
public final Binding<MessageChannel> doBindConsumer(
...
final ConsumerDestination destination = this.provisioningProvider.provisionConsumerDestination(name, group, properties);
...
org.springframework.cloud.stream.binder.rabbit.provisioning.RabbitExchangeQueueProvisioner
...........
근데, 브런치로 개발블로그를 하면 안된다는 사실을 이제 깨달았습니다. 많이 불편하네요. 일단, RabbitExchangeQueueProvisioner 소스를 보면 Exchange Type 를 지정하는 구문도 있고, 바인딩키를 설정하는 로직도 있습니다. 물론 해당 설정 값은 Property 파일에서 설정 가능합니다.
서비스 프로젝트에 Spring Cloud Stream 와 RabbitMQ 를 연동해봤습니다. 일단, Gradle 에 디펜던시 설정을 추가합니다. 스프링부트 버전도 업그레이드 했기 때문에 boot 버전도 변경하였습니다.
build.gradle
dependencies {
. . .
compile('org.springframework.cloud:spring-cloud-starter-stream-rabbit:1.2.0.RELEASE')
Properties 설정을 추가합니다. 참고로 설정 관련해서는 반드시 레퍼런스 참고 부탁드립니다. 해당 설정은 저희 프로젝트 환경에 맞게 변경된 내용입니다.
application-dev.properties
spring.cloud.stream.bindings.SINK-INPUT-TEST.destination=news
spring.cloud.stream.bindings.SINK-INPUT-TEST.group=서버의 유니크한 값 설정
spring.cloud.stream.rabbit.bindings.SINK-TEST.consumer.bindingRoutingKey=news.update.test
spring.cloud.stream.rabbit.bindings.SINK-TEST.consumer.exchangeType=direct
spring.cloud.stream.rabbit.bindings.SINK-TEST.consumer.durableSubscription=false
spring.cloud.stream.binders.rabbit.type=rabbit
//참고로 rabbitMQ 연결 설정 정보를 따로 properties 에 분리했습니다.
spring.rabbitmq.host=${rabbitmq.host}
spring.rabbitmq.virtual-host=testhost
spring.rabbitmq.username=${rabbitmq.username}
spring.rabbitmq.password=${rabbitmq.password}
RabbitMQ 연결 설정 정보는 따로 물리적인 Property 파일로 분리하였습니다. QA 및 실서버 환경이 다르기 때문입니다. 참고로 주제와는 벗어나지만 Spring Cloud Config 를 활용한다면 더 유연하게 서비스 운영이 가능할 것 같습니다. 그리고 리스너 구현은 아래와 같이 구현합니다.
listener.java
listener.java @EnableBinding(*****Listener.Sink.class)
public class *****Listener {
@Autowired
private News*****Dao news*****Dao;
@StreamListener(Sink.inboundTest)
public void subscribe(PojoTest pojotest) {
if(pojotest.is***()){ //비즈니스 로직
}
}
public interface Sink {
String inboundTest = "SINK-TEST";
@Input(inboundTest)
SubscribableChannel *****Listener();
}
}
application-dev.properties 파일을 보면 바인딩키 설정, Exchange Type 설정 등 중요 설정이 있습니다. 그리고 subscribe 에서 메시지를 리스닝 할때는 Pojo 형식에 맞는 메시지만 수신이 됩니다. Pojo 형식에 맞지 않으면 해당 이벤트 리스너를 실행하지 않습니다. 물론, 이 방법 말고 다른 리스닝 방법이 매우 많고 버전마다 많이 다릅니다. 반드시, Spring Cloud Stream 레퍼런스를 확인 부탁드립니다.
https://docs.spring.io/spring-cloud-stream/docs/
이 작업에서 제가 제일 고민했던 내용은 RabbitMQ 에서 커넥션이 끊겼을 때 이벤트를 발생시킬 수 있는지에 대해서입니다. 첫번째 글[ https://brunch.co.kr/@springboot/2] 보시면 아셨겠지만 해당 프로젝트는 주기적인 API 데이터 연동 로직을 이벤트 중심의 실시간 데이터 연동으로 개선하는 사항입니다. A라는 기존 로직이 있고, B 라는 신규 로직이 있다고 가정하면!! 즉, A는 API 주기 호출, B 는 실시간 MQ 연동입니다. 만약...B 라는 로직이 문제가 발생을 하면 어떻게 될까? B로직을 구현해서 적용했는데 100% 완벽하게 적용될 것이라는 보장은 없기 때문에 반드시 A 라는 방어 로직을 남겨두고 B 로직이 실패했을 때는 A 가 실행이 되어야 하는 것입니다. 물론 서비스가 안정화 되면 A로직을 빼도 됩니다. 하지만 배포 단계에서는 예측하지 못한 에러가 발생할 수 있기 때문에 완벽하게 준비를 해야합니다.
즉, 다시 정리를 하면 RabbitMQ 와의 커넥션이 끊기거나, 서버가 죽거나 했을 때 ShutDown 이벤트를 발시켜야 하고, 해당 이벤트 리스너에 방어로직을 구현해야 합니다.
다음 글에서는 [예상했던, 예상하지 못했던 추가 작업]이라는 주제로 작성할 예정입니다. Spring Cloud Stream 의 ShutDown 이벤트를 구현한 방법에 대해서 정리할 예정이며, 커넥션 연결에 대한 테스트 및 HA 구성, RabbitMQ 관리툴 보는 법 등 다양한 내용을 정리할 예정입니다.
https://brunch.co.kr/@springboot/20