- 예상했던, 예상하지 못헀던 추가 작업
스프링 클라우드(Spring Cloud), MQ(Message Queuing) 를 연동한 서비스 도입 사례를 공유합니다.
관련 연구 3 - Spring Cloud Stream, RabbitMQ 연동
Spring Cloud Stream, RabbitMQ 실제 서비스 연동
예상했던, 예상하지 못했던 추가 작업(현재글)
부록 A - RabbitMQ 연동하는 다양한 방법
부록 B - Spring Cloud Stream 버전 이슈
정리 - Event-driven microservices
사내 서비스 개선사항이기 때문에 공개하기 어려운 대외비는 내용에서 제외하였습니다. 서비스 아키텍처는 외부에 공유해도 되겠다는 판단으로 공유합니다. 개선사항 의견을 말씀해 주시면 감사드리겠습니다.
작업을 하면서 예상했던, 예상하지 못했던 작업들에 대해서 정리를 해볼려고 합니다. 품질 높은 서비스를 제공하기 위해서는 다양한 이슈 상황에 대한 검토가 필요합니다.
연결이 끊겼을 때에 대한 방어로직은 반드시 구현해야 했습니다. 스프링 클라우드는 알수 없는 커넥션 연결이 끊기면 자동으로 재연결을 시도하기 때문에 어느정도 안정성있는 서비스 운영이 가능했습니다. 즉, 스프링 클라우드 1.2.0.RELEASE 에서의 셧다운이벤트 발생시에는 연결이 끊겼다는 로그를 남기고 재연결을 시도합니다. 하지만 이번 작업은 A 라는 아키텍처를 B라는 아키텍처로 바꾸는 과정이고, B 라는 아키텍처에 문제가 발생을 하면 A 라는 아키텍처로 임시로 돌리는 방어로직이 필요합니다. B 아키텍처는 MQ 연동을 하여 개선하는 작업인데, 만약 MQ 연동이 문제가 발생을 한다면 자동으로 A 라는 아키텍처로 돌려야 합니다. MQ 연동이 문제가 발생했다는 이벤트를 어떻게 받을 수 있을까요? 스프링 클라우드에서 제공해주는 셧다운이벤트리스너 즉, shutdownCompleted 메소드를 활용해야할 것 같은데... 고민을 해봤지만 쉽게 답은 나오지 않았습니다. 커넥션 생성 로직, 채널 생성 로직 등 커스터마이징 할 수 있는지 전부 확인을 해봤지만 쉽게 답을 찾지는 못했습니다. 만약 해답을 찾지 못했다면 스프링클라우드 연동은 아마 포기했을지도 모릅니다.
해결방법 - 지난 글에서 잠시 설명했습니다만, 스프링 클라우드는 MQ 연동시에 ConnectionFactory –> Connection –> Channel 생성의 순서로 RabbitMQ 를 연동합니다. org.springframework.boot. autoconfigure.amqp 패키지에는 RabbitMQ 연동의 커넥션팩토리를 생성해주는 로직이 있습니다. 아래의 소스에서 확인 가능합니다.
package org.springframework.boot.autoconfigure.amqp
...생략
public class RabbitAutoConfiguration {
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
...생략
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
factory.getObject());
...생략
여기서 중요한 것은 @ConditionalOnMissingBean , new CachingConnectionFactory 입니다. 일단 new CachingConnectionFactory 의 내부 로직을 들어가보면 셧다운 이벤트 리스너를 구현하고 있고, 셧다운 이벤트 리스너에는 shutdownCompleted 메소드를 구현해야 합니다. 즉, CachingConnectionFactory 에서는 shutdownCompleted 를 반드시 구현해야 하고, 아래와 같이 소스를 확인할 수 있습니다.
package org.springframework.amqp.rabbit.connection;
...생략
public class CachingConnectionFactory extends AbstractConnectionFactory
implements InitializingBean, ShutdownListener
...생략
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
this.closeExceptionLogger.log(logger, "Channel shutdown", cause);
}
...생략
소스를 보면 shutdownCompleted 메소드에서는 로그만 찍고 끝입니다. 이 메소드를 커스터마이징 해야합니다. 어떻게 하면 좋을까요? 일단 다시 위에 커넥션팩토리 생성 소스를 보니 @ConditionalOnMissingBean 어노테이션이 선언되어있습니다. Bean 이 존재하지 않을때 실행되는 어노테이션 입니다. 음... 해당 구문을 프로젝트에 먼저 선언을 하고 빈을 생성하면, 해당 구문이 실행되지 않을테니, 자연스럽게 커스터마이징 할 수 있겠다는 판단이었습니다. 프로젝트에 RabbitConfig 라는 클래스를 선언하였고, 아래와 같이 소스를 구현해봤습니다.
@Configuration
public class RabbitConfig {
@Bean
@ConfigurationProperties(prefix = "spring.rabbitmq")
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
// 로그 남기기
// 셧다운시 대응하기 위한 방어로직 구현
}
};
return connectionFactory;
}
이렇게 구현하니, RabbitMQ 에서 연결이 끊겼을 때 셧다운이벤트리스너의 shutdownCompleted가 실행되어 방어로직이 구현하는 것을 확인하였습니다. RabbitMQ 가 끊기는 경우는 다양합니다. 네트워크가 끊기는 경우, 서버가 갑자기 다운되는 경우, RabbitMQ 관리툴에서 강제로 커넥션을 Close 한 경우 등 입니다.
6.1에서 RabbitMQ 가 끊겼을 때에 대한 내용을 설명하였습니다. 근데 만약 RabbitMQ 서버가 단 한대만 있다면 어떻게 될까요? RabbitMQ 서버가 다시 살아 돌아올때까지 우리는 계속 기존 A 로직으로만 서비스를 해야 합니다. RabbitMQ 서버가 죽어도 품질 높고, 안정적인 서비스 운영을 해야합니다. 서버 구성은 시스템담당자가 진행하기 때문에 저는, HA 구성에 대한 업무를 요청하였습니다. 1번 RabbitMQ 서버가 죽거나, 네트워크가 끊기면 2번 RabbitMQ 서버로 전환할 수 있도록 시스템 설정을 진행하였습니다. 자 그럼, 1번 RabbitMQ 서버가 죽었을 때 일어나는 순서는 아래와 같습니다.
1번 RabbitMQ 서버 Down --> 연결이 끊기면 자동으로 MQ의 Queue 삭제 됨 --> 셧다운 이벤트 발생 --> 서비스에서는 A 방어로직 실행 --> 서비스(스프링 클라우드) 에서 MQ로의 자동 재연결을 시도 --> HA 구성에 의한 2번 RabbitMQ 서버로의 연결 시도 --> 연결 후 Queue 생성 --> 메시지를 수신받으면 B 로직 적용 --> 서비스 정상화
RabbitMQ 를 운영해보신 분이라면 아마 이상한 점을 느끼실 수 있을 것입니다. 여기서 보면, Queue 를 AutoDelete 설정을 한 이유와, 클러스터링을 하지 않은 이유 입니다. 일단, 해당 아키텍처는 단순한 메시지 신호만 전달하는 로직입니다. 유실되어도 되는 데이터 이기 때문에 굳이 클러스터링이 필요없다고 판단하였고, 연결이 끊기면 삭제되어도 된다는 판단하에 AutoDelete 설정을 하였습니다. 만약 정말 중요한 데이터이고 유실되면 안된다면 RabbitMQ 두 대의 서버를 클러스터링으로 묶는것이 좋을 듯 합니다.
스프링 클라우드의 버전을 올리면서 디펜던시로 인해서 스프링 부트 버전이 많이 올라갔습니다. 그 덕분에 소스 수정을 많이 하게 되었습니다. 해당 내용은 나중에 시간이 된다면... 정리를 해보겠습니다.
배포 이후 다른작업을 하면서 고민했던 내용입니다. 실제 서비스 적용은 하지 않았습니다. 소비자는 RabbitMQ 로 부터 메시지를 받은 이후, 다음 메시지를 받을 준비가 되었다면 메시지에 대한 Acknowledge 를 전송합니다. 만약 Acknowledge 를 받지 못하면 다음 Queue 는 Ready 상태로 대기하게 됩니다. 아래 링크가 도움이 될지는 모르겠습니다만, 한번 읽어보시길 바랍니다. (저는 시간이 없어서 안읽어봤습니다...)
https://www.rabbitmq.com/confirms.html
암튼, 스프링 클라우드에서는 acknowledge-mode 를 AUTO 로 설정합니다. 서비스 프로젝트의 메시지 리스너 이벤트가 끝나면 자동으로 ACK 를 날리고, Queue 를 받을 수 있다는 신호를 회신합니다.
spring.cloud.stream.rabbit.bindings.SINK-INPUT-NEWS-NEWSBOX-UPDATE-SIGNAL.consumer.acknowledge-mode=AUTO
하지만, 소비자가 수동으로 신호를 날리고 싶다면... 예를 들어서 특정 프로세스가 끝난 이후에 Queue 를 순차적으로 받고 싶다면 AUTO 를 MANUAL 로 바꾸고 아래와 같이 설정하여 구현할 수 있습니다.
@StreamListener(Sink.inboundNewsboxSign)
public void subscribe(PojoTest pojotest, @Header(AmqpHeaders.CHANNEL) Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
생략...
try{
channel.basicAck(deliveryTag, false);
}
catch(Exception e){
생략...
참고로 acknowledge-mode 이 AUTO 인 경우에는 헤더에 Channel 이 넘어오질 않습니다. MANUAL 인 경우에는 넘어오는 것을 디버깅을 통해서 확인하였습니다. 하지만, 해당 내용은 실제 서비스에 적용하지는 않았습니다. 하지만, 팀내에서 다른 작업에서는 활용할 가능성이 많게 되어서 한번 정리를 해봤습니다.
6번의 포스팅을 통해서 "스프링 클라우드, MQ 도입 사례" 에 대해서 공유를 해봤습니다. 많이 부족한 실력이라서 미흡한 점이 많습니다. 혹시라도 의견 있으시면 댓글 부탁드립니다. 추후에 시간이 된다면 [부록A - RabbitMQ 연동하는 다양한 방법], [부록B - Spring Cloud Stream 버전 이슈], [정리 - Event Driven Microservices] 에 대해서 글을 쓸 예정입니다만, 다른 공부로 인해서 아마 많이 늦어질 것 같습니다. 이상 스프링 클라우드, MQ 도입 사례를 마치겠습니다.