스프링 부트 환경에서 RabbitMQ 연동하여, 메시지 패턴 구현하기
이번 글은, Spring AMQP 에 대해서 간단하게 정리해서 글을 작성하였다. 스프링 부트 환경에서 RabbitMQ 를 연동하였고, 기본적인 메시지 발행, 수신에 대한 구현 샘플 코드를 포함하였다.
스프링부트, AMQP, RabbitMQ 등 기본 지식 이해가 필요하지만, 이 글에서 자세한 내용은 생략한다. 관련해서 필자의 예전 글을 읽어보길 바란다.
https://brunch.co.kr/@springboot/2
https://brunch.co.kr/@springboot/3
https://brunch.co.kr/@springboot/6
스프링 부트 환경에서 RabbitMQ 를 연동하는 방법을 알아보자.
이번 글에서는 Spring-Cloud-Stream 를 사용하지 않고, Spring AMQP 를 사용할 것이다.
메시지를 수신하는 Subscriber 역할의 Consumer 프로젝트를 신규로 작성하였다. spring-boot-starter-amqp 스타터에서 기본적인 커넥션을 연동해주기 때문에 프로퍼티 설정만 추가해주면 된다.
ConnectionFactory 를 재정의하는 방법도 있지만 자세한 내용은 생략한다. RabbitMQ 는 세가지 타입의 Exchange 가 있는데, Topic 로 설정하겠다. 스프링부트 환경에서 Configuration 설정을 통해서 심플하게 Queue, Binding 을 설정할 수 있다. 아래 샘플 코드를 보라.
리스너는 구현하는 방법은 여러가지가 있는데, 이번 글에서는 두가지 방법을 소개할 예정인데
@RabbitListner 어노테이션을 사용하는 방법
SimpleMessageListenerContainer 를 사용해서 직접 리스너를 구현하는 방법
@RabbitListener 먼저 소개하겠다. 아래 샘플과 같이 메시지 리스너를 설정할 수 있다.
queues 속성에 QUEUE 이름을 정확하게 명시해야 한다. 위에서 먼저 설명한 컨피그 설정에서 큐 를 Bean 으로 등록을 했다. 해당 리스너가 정상적으로 동작하기 위해서는 당연한 얘기지만 큐가 잘 설정이 되어있어야 한다. 만약 큐 네임을 잘못 명시했다면 아래와 같이 오류가 발생할 것이다.
참고로 @RabbitListener 에는 다양한 속성을 추가할 수 있다. 모든 내용을 설명하기는 어려우니, 알아서 찾아보길 바란다. (이 글은 친절하지 않을 예정이다...)
애플리케이션을 실행하면, RabbitMQ 에 커텍션을 맺고, 채널을 자동으로 생성하며, 큐를 리스닝할 것이다. 비록, Spring-CLoud-Stream 에서 사용하는 방법이 더 간편하지만, 이 글에서는 Cloud-Stream 에 대해서는 논하지 않겠다. 만약 큐가 RabbitMQ 에 없다면 자동으로 큐가 생성이 될 것이다.
큐는 "cafe.topic" 라는 익스체인지에 바인딩 되어있는데, 라우팅 키는 "order.coffee.#" 이다.
자, 메시지 수신을 만들어봤다. 이제 메시지를 발행하는 애플리케이션을 작성해보자. 쉽다.
신규 애플리케이션을 만들고, 동일하게 디펜던시 및 컨피그 설정을 하자. 메시지를 발행하는 기능이기 때문에 리스너를 구현할 필요는 없다. 하지만, 메시지를 보내야 하기 때문에 RabbitTemplate Bean 을 설정해야 한다. RabbitTemplate 빈 을 정의하자.
참고로, ConnectionFactory 를 별도로 정의하지는 않았다. 스프링부트에서 자동 구성을 해줄 것이다. 해당 설정을 커스텀하게 변경하기 위해서는 ConnectionFactory 빈을 재정의하면 된다. 어쩃든 RabbitTemplate 를 의존성 주입한 후, 메시지를 전송해보자.
요렇게 전송하면 된다. RabbitMQ 관리 툴에서 정상적으로 메시지가 전송된 것을 확인할 수 있다.
Consumer 의 로그에 아래와 같이 시스템 로그를 확인해보자. 메시지는 잘 전송되었다.
(Body:'"Message"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=cafe.topic, receivedRoutingKey=order.coffee.first, deliveryTag=2, consumerTag=amq.ctag-YWg9h9R7bonuyZmbUCli2g, consumerQueue=coffee.queue])
지금까지는 메시지를 단순 문자열로 전송을 하였는데, 두가지 방법이 더 있다.
CustomMessage(직접 정의한) 별도의 커스텀 클래스 사용
MessageBuilder 사용
Custom Message 를 위한 클래스를 선언하고, 보내주면 된다.
또한 다른 방법으로는 아래와 같이 MessageBuilder 를 사용할 수 있다.
자세한 내용은 직접 구현해보길 바란다.
메시지 리스닝을 할 수 있는 두번 째 방법인 SimpleChannelMessageListener 에 대해서 알아보자.
위 샘플과 기본 환경은 동일하다. SimpleMessageListenerContainer 빈을 생성하고, 리스너를 등록해줘야 한다.
메시지 리스너는 ChannelAwareMessageListener 를 구현해야 한다.
간단한 서비스 클래스를 구현한다.
그리고, 메시지 리스너에서는 아래와 같이 서비스를 호출하자. 이때, 메시지를 CustomMessage 로 매핑해서 전달한다.
onMessage 메서드에서, 현재 쓰레드가 무엇인지 확인하는 시스템 로그를 남긴다. SimpleMessageListenerContainer 를 선언해줄때 별도의 셋팅을 하지 않는다면, 쓰레드의 디폴트 개수는 1개이다. 메시지를 여러건 동시에 발행하면, 수신하는 로그는 아래와 같이 찍히는 것을 확인할 수 있다. 쓰레드는 계속 simple..Container-1 을 사용한다.
요 얘기는 어떤 의미냐면... 위에서 필자가 accept 로직에 3초의 지연시간을 추가했었다. 즉, 메시지 처리 속도가 겁나 느릴거다. 메시지를 20개 정도를 보내보자. 메시지 발행 후 15초 정도 지난 시점의 모니터링이다. 아직도 Unacked 된 건이 14건이나 된다.
20개의 메시지를 모두 처리하는 데 1분이라는 시간이 소요 되었다. 왜이런걸까?
당연한 얘기지만, Consumer 에서 메시지를 1개의 쓰레드로 처리하고 있기 때문이다. Consumer 의 쓰레드를 증가하는 방법은 간단하다. simpleMessageListenerContainer.setConcurrency(쓰레드개수) 로 설정할 수 있다. 하지만 일단, 다른 방법으로 먼저 살펴보자. simpleMessageListenerContainer에서 실행하는 쓰레드 수는 그대로 유지하면서, 서비스에서 구현한 메서드를 비동기로 작성해보자. 비동기 메서드를 구현하는 방법은 여러가지이지만, 이 글에서는 간단하게 @Async 어노테이션을 사용해보겠다.
5건의 메시지를 보내면 총 걸리는 시간이 3초이다. 5개의 메시지를 모두 병렬로 처리할 것이다. 아래 로그를 확인해보면, simpleMessageListenerContainer-1 은 계속 동일한 쓰레드를 사용하지만, @Async 어노테이션이 붙은 메서드는 별도의 쓰레드를 사용해서 병렬처리를 한다.
해당 구현은, 메시지를 수신해서 모든 프로세스를 처리하기 전에 메시지 수신알림을 보내는 구조이다. 즉, 3초기 지나기 전에 이미 ACK 를 보내버린다는 의미다. 3초가 되기 전에 이미 unacked 가 0 이 된다.
메시지를 모두 처리하기 전에 미리 ACK 를 날려서, 수신알림을 보내는게 과연 맞을까? 다른 방법을 생각해보자. SimpleMessageListenerContainer 설정을 변경해보자. setConcurrentConsumers 에 쓰레드 개수를 설정하자. 그리고, 그 전에 설정했던 @Async 어노테이션을 제거해서 비동기 로직을 삭제하자.
그리고, 실행해보면... 10개의 쓰레드로 동작하는 것을 확인할 수 있다.
자!! 중요한 내용이다. Ack 전달을 3초가 지난 후에 각각의 처리가 끝난 후에 발송한다. @Async 메서드를 사용해서 처리했던 위 예시는 메시지를 처리하기 전에 미리 ACK 를 보냈기 때문에 합리적인 판단은 아닐수도 있다. 반면에, Consumer 쓰레드를 개수를 증가한 해당 상황은 모든 처리가 완료된 이후에 ACK 를 보낸다. 좀 더 이상적인 상황이라고 생각된다. 물론, 임의로 ACK 를 보낼수도 있다. 아래와 같이 강제로 ACK 를 보낼수도 있다.
사실 필자도 이제 공부한지 하루,이틀밖에 되지 않았고 글쓰는게 힘들어서 이정도로만 정리하겠다. 알아서 공부를 해보길 바란다. 욕심 같아서는, RabbitMQ 관련해서 더 상세한 내용을 이 글에 남기고 싶지만 여건상 그럴 수가 없다.
메시지 브로커는 메시지에 대한 큐 분산 역할을 수행한다. 그림을 통해서 이해해보자.
Producer 에서 보낸 메시지는, Consumer 에 고르게 분산되어 전달한다. 만약, Consumer 중 1대가 장애가 발생을 해도, 나머지 한대에서 남은 작업을 처리할 수 있기 때문에 시스템 가용성도 높다.
이 글에서, 메시지 패턴 관련해서 모든 내용을 설명하기는 어렵다. 필자가 개인적으로 자주 보는 링크를 소개할테니, 백엔드 개발자는 꼭 읽어보길 바란다.
https://docs.microsoft.com/ko-kr/azure/architecture/patterns/category/messaging
또한, 기회가 된다면 "기업 통합 패턴" 이라는 책도 읽을만 하다. (꽤 두껍지만, 레퍼런스 용으로도 괜찮다.)
http://www.acornpub.co.kr/book/enterprise-integration-patterns
이번 글에서 메시지 패턴 전략에 대해서 전부 소개하기는 어렵지만, 조만간 올해가 끝나기 전에 "메시지 패턴" 이라는 주제로 각잡고 글을 작성할 예정이다.
나중에 시간 되면 나중에 글을 쓰겠다. 트랜잭션, 메시지 Retry, HA 구성 등등 다양한 내용이 포함될 것이다. 다음 글을 쓸 시간이 될지 모르겠다.
짧은 검토 시간으로 상세하게 글을 쓰기는 어려움이 있었다. 하지만, 오랫만에 RabbitMQ 를 공부해보니, 더 자세히 알아보고 싶다는 생각이 들었다. 조만간 "메시지 패턴", "RabbitMQ 심화" 라는 두 개의 큰 주제로 글을 남길 예정이다.
드디어 필자의 블로그에 100번째 글을 업로드 하게 되었다. 2년동안 많은 일이 있었지만, 블로그는 개발자로서 내 자신을 지켜주는 작은 희망과 같은 역할을 하였다. 비록, 블로그를 통해서 얻은 수익은 0원이지만, 그럼에도 불구하고 해당 블로그를 통해서 많은 것을 배우게 되었고, 주변 개발자에게 조금이라도 도움이 되었다고 생각한다. 앞으로, 자주 글을 쓰지는 못하겠지만 일주일에 1개씩이라도 글을 쓰면서 꾸준히 블로그를 유지할 수 있는 성실한 개발자가 되도록 노력하겠다.