스프링부트 환경에서 카프카 배치 대량 메시지 수신
이번 글에서는 스프링부트 환경에서 카프카에 메시지를 발행, 수신 및 대량으로 수신할 수 있는 Batch Listener 를 검토해서 소개한다. 글의 주제가 "Spring Kafka Batch Listner"인데, 생각나는대로 주저리주저리 글을 작성하다보니 내용이 많이 길어지고 지루한 글이 되었다.
지루한 글이므로,
스프링부트에 익숙하고 카프카를 이미 사용해 본 개발자는 3장부터 읽는 것을 추천한다.
"Apache Kafka(이하 카프카)"의 기본 개념 및 설치 방법에 대해서 간략하게 소개한다. 카프카 설치/운영 경험이 있는 개발자는 이번 장은 넘어가도 된다. 실무 상용 환경에서의 카프카 브로커는 최소 3개 이상으로 구축해야 하지만, 간단하게 설명하기 위해서 단 1개의 브로커 환경으로 구축하였다.
아파치 카프카는 대규모 메시지 데이터를 빠르게 처리할 수 있는 메시지 플랫폼이다. 링크드인에서 처음 개발했고, 현재는 거의 모든 회사에서 사용 중인 필수 메시지 플랫폼으로 성장하였다. 카프카는 주로 아래와 같은 용도로 사용하게 된다.
Messaging
Website Activity Tracking
Metrics
Log Aggregation
Stream Processing
Event Sourcing
자세한 내용은 공식 레퍼런스를 참고하길 바란다.
https://kafka.apache.org/intro
카프카는 다른 메시지 브로커와는 다르게 Consumer 에게 메시지를 PUSH 하는 방식이 아니다. Consumer 가 브로커로부터 메시지를 직접 PULL 하는 방식으로 동작한다. 구글링하면 볼만한 자료가 많으니 찾아보길 바란다.
https://www.youtube.com/watch?v=XQJ9NoX_yyU
https://www.cloudkarafka.com/blog/2016-11-30-part1-kafka-for-beginners-what-is-apache-kafka.html
나중에 카프카에 대해서 각잡고 글을 작성하겠다.
(방법1)리눅스에 직접 설치
우분투에서의 설치 방법을 소개하지만, CentOS 에서도 동일하게 설치가 가능하다. 편의를 위해서 root 계정으로 설치를 진행하였고, 단일노드, 단일 브로커에 설치한다.
(상용에서는 최소 3개의 브로커를 설치 운영해야 한다.)
먼저 주키퍼를 설치하기 위해서, 주키퍼 3.4.14 버전을 다운로드하였다.
압축을 푼다.
심볼릭 링크를 설정한다.
??
주키퍼 설정 파일을 생성하고..
기본 설정을 추가한다.
zkserver.sh start 를 실행하면 주키퍼가 실행을 한다.
주키퍼가 정상적으로 실행이 되면 2181 포트가 바인딩 될 것이다.
주키퍼 설치 끝!
자...
이제 카프카를 설치하자. 카프카를 다운로드 한다. 필자는 2.3.1 버전을 다운로드 하였다.
파일 압축을 풀고, 심볼릭 링크를 걸어준다.
카프카 컨피그 파일을 열고,
아래와 같이 주키퍼 호스트 정보를 입력한다. 필자는, 로컬 서버에 주키퍼 와 카프카를 같이 실행할 예정이라서 별도의 수정은 하지 않고, localhost:2181 설정을 유지하였다.
(상용 서비스에서는 카프카 와 주키퍼 노드는 다른 물리 서버에 운영하는게 좋다.)
카프카를 실행한다.
아래와 같이 9092 포트가 바인딩이 되면 정상적으로 실행한 것이다.
카프카를 설치하면 Producer, Consumer 를 수행할 수 있는 스크립트를 함께 제공한다. 아래와 같이 메시지를 발행했을 때, Consumer 에서 메시지를 수신하는 것을 확인할 수 있다.
토픽의 파티션, 리플리카 정보를 확인할 수 있다.
아주 심플한 방법으로 카프카를 설치하였다.
(방법2) : Mac OS 에서 homebrew 로 설치
생략
(방법3) : Docker 사용
생략
필자는 회사 로컬 개발환경에서는 Mac OS 에 homebrew를 사용해서 카프카를 설치해서 사용한다. 다른 팀원들은 대부분 도커 이미지를 실행해서 개발한다.
자바 환경에서 카프카를 연동하기 위한 클라이언트 라이브러리에 대해서 알아보자.
자세한 설명은 생략한다.
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.3.1
Gradle 환경의 자바 애플리케이션을 시작한다. (스프링 아님..)
아래 샘플 코드와 같이, 메시지를 발행할 수 있다.
샘플 코드는 github 을 참고하길 바라며...
https://github.com/sieunkr/spring-kafka/tree/master/java-kafka
해당 글에서는, 스프링 프레임워크에 환경 기반으로 설명할 예정이기 때문에 자바 클라이언트 라이브러리에 대해서는 이정도로 짧게 소개하게 넘어가겠다.
스프링부트 환경에서 Kafka에 메시지를 발행하는 애플리케이션을 구현한다.
스프링부트 환경에서 실행하면 KafkaTempalte 는 자동 구성이 된다. 스프링부트에서 제공하는 KafkaAutoConfiguration 이라는 클래스에서 해당 역할을 수행한다. 스프링부트에서 제공하는 소스 코드를 반드시 찾아서 확인하길 바란다.
KafkaTemplate Bean 을 생성하는 구문이 @ConditionalOnMissingBean 어노테이션과 함께 선언되어 있다.
코드를 자세히 보면 KafkaTemplate<?, ?> 로 정의가 되어 있는데, KafkaTempalte<key, value> 에 모든 타입이 적용될 수 있도록 구성되었다. 또한, 주입 받는 KafkaProducerFactory 역시 @ConditionalOnMissingBean 으로 선언되어 있다.
소스 코드를 차분히 따라가보면 keySerializer 과 valueSerializer 모두 StringSerializer 로 설정이 되는 것을 확인할 수 있다.
소스코드를 정리해서 다시 설명해보면,
스프링부트 환경에서 Spring Kafka 를 사용하면 key와 value 모두 StringSerializer 를 사용하는 KafkaTemplate bean 이 자동으로 구성된다. Property 설정은 KafkaAutoConfiguration 클래스 상단에 @EnableConfigurationProperties 가 선언되었고, spring.kafka 로 시작하는 애플리케이션 프로퍼티 속성 값을 주입하게 된다.
필자는 아래와 같이 application.yml 파일에 카프카 브로커 속성 정보를 설정하였다. (IP 는 필자의 개인 VM 환경에서 실행한 카프카서버의 사설IP 및 포트 정보이다.)
@ConditionalOnMissingBean 어노테이션은 중요하기 때문에 한번 더 설명하겠다.
KafkaTempalte 스프링 빈이 이미 정의가 되어있다면 빈생성을 하지 않고, 반드시 스프링빈이 없을 때만 수행하도록 한다. 즉, @ConditionalOnMissingBean 을 통해서 스프링 빈 충돌을 해결할 수 있으며, 개발자가 직접 커스터마이징해서 Bean 생성을 할 수 있는 것이다. 위에서 설명한 필자의 샘플 코드는 개발자가 별도로 커스터마이징을 하지 않은 상황이며, 스프링 부트에서 제공하는 AutoConfiguration 을 통해서 생성된 KafkaTemplate를 그대로 사용하는 코드이다. 애플리케이션에서는 스프링부트가 자동으로 생성해준KafkaTemplate 빈을 주입해서 사용하면 된다. 아래 샘플 코드를 보자.
스프링부트의 마법으로 인해서, 애플리케이션 컨피그 설정은 최소화할 수 있고 개발자는 비즈니스 로직만 구현하면 된다. 하지만, 해당 코드에서 단순 문자열을 전송하는 기능은 정상적으로 잘 동작을 하지만, CoffeeDTO 를 전송하는 메시지는 오류가 발생한다.
직렬화 과정에서 문제가 발생하였는데, 이유는 기본으로 사용하는 KafkaTemplate Bean이 StringSerializer 를 사용하기 때문이다. DTO 메시지를 발행,수신 하기 위해서는 스프링부트에서 제공하는 KafkaTempalte 를 그대로 사용하는 것은 무리가 있어 보인다.
그렇다면...
KafkaTempalte Bean을 직접 구현해보자.
ProducerConfiguration이라는 컨피그 클래스를 신규로 생성한다. 해당 클래스에는 Configuration Property 설정을 불러오기 위해서 @EnableConfigurationProperties 어노테이션을 활용하겠다.
(참고로 @Value 어노테이션으로 프로퍼티 속성을 가져올수도 있다.)
필자가 임의로 정의한 CoffeeDTO 객체를 직렬화할 수 있도록, JsonSerializer를 설정하였다. 하지만, 별도의 커스텀하게 Bean 을 구성할 때 주의할 사항이 있다. 위에서 수차례 설명했지만, 스프링부트에서 제공하는 KafkaAutoConfiguration 클래스에서의 KafkaTemplate 빈 정의는 @ConditionalOnMissingBean 어노테이션으로 구성되어 있다. 즉, 필자가 애플리케이션에서 KafkaTempalte Bean을 정의하면, 스프링부트에서 제공하는 기본 KafkaTemplate<?, ?> 생성 구문을 수행하지 않는다. 그래서 KafkaTemplate<String, String> 타입으로 빈을 주입받을 수 없게 되는데, 아래와 같이 Bean 주입 시 오류가 발생한다.
KafkaTemplate Bean 빈을 직접 구현할 예정이라면, 주입받는 모든 Bean 타입에 대해서 정의를 해줘야 한다. 그래서, KafkaTemplate<String, String> 타입도 주입받을 수 있도록 컨피그 설정에 코드를 추가하였다.
모든 설정이 끝나면, 메시지를 발행해보자.
String 문자열 메시지 및 Coffee DTO 메시지 모두 정상적으로 메시지 발행된다. 아직 Consumer 코드를 작성하지 않았기 때문에 카프카 서버에서 직접 메시지를 수신해서 확인하자. 필자는 샘플코드에서 단순 문자열 메시지는 "coffee" 라는 이름의 토픽을 구성하였는데, 아래와 같이 메시지를 수신한다.
DTO 메시지는 "coffee-dto" 라는 이름의 토픽으로 수신하도록 하였다.
아래와 같이 정상적으로 메시지를 수신하는 것을 확인할 수 있다.
참고로, 두개의 토픽 모두 하나의 파티션을 사용하고 있는 상태이다.
(4장에서 파티션 개수를 늘려볼 예정이지만, 일단 아직은 한개인 상태이다. 한번 늘린 파티션은 다시 줄일 수 없다.)
샘플 코드는 github 에서 확인 가능하다.
https://github.com/sieunkr/spring-kafka/tree/master/spring-kafka-producer
스프링부트 환경에서, 카프카에 메시지를 발행하는 애플리케이션을 구현하였다.
이제!! 메시지를 수신하는 애플리케이션도 만들어보자.
스프링 환경에서 Kafka 메시지를 수신하는 애플리케이션을 구축한다.
중요한 내용이지만, 생략한다. 각자 공부하길 바란다.
1천 개의 메시지를 수신하는 애플리케이션을 만들어보자. 필자가 임의로 가정한 상황은, Producer 에서 메시지를 카프카에 전달하는 시간은 아주 짧지만, 브로커로부터 메시지를 가져와서 처리하는 Consumer 에서 지연시간이 발생한다고 가정하였다. @KafkaListener 에서 수행하는 로직에 10미리세컨드(0.01초) 지연시간을 추가하였다. 메시지를 수신해서 저장소에 저장하는 시간이 0.01초 정도가 된다는 가정이다.
컨슈머 개수를 별도로 설정하지 않았기 때문에 1개의 컨슈머로 동작을 하게 되는데 단일 쓰레드로 동작하게 된다. 그래서, 1천개의 메시지를 수신하는 성능은 매우 나쁘다.
0.01 초는 매우 짧은 시간이지만 0.01초가 1천개 작업이 쌓이게 된다면, 모든 작업을 수행하는데 걸리는 총 시간은 10초 이상이 될 것이다. 이런 나쁜 성능은 카프카를 사용하는 우리가 기대하는 성능이 아니다. 카프카는 대량 메시지를 미친 속도로 처리할 수 있는 엄청난 성능을 제공한다.
1천개의 메시지를 1초 전후로 작업이 끝낼 수 있도록 개선해보자.
파티션은 그대로 유지하면서 컨슈머 개수만 늘리면 큰 의미가 없다. 파티션 개수도 함께 늘려줘야 한다. 단, 파티션은 한번 늘리면 다시 줄일 수 없으므로 주의가 필요하다. 처음에는 작은 파티션으로 시작해서 점진적으로 늘리는 방향이 바람직하다는 개인적인 생각이다. 어쨋든 이 글에서는 파티션 5개, 컨슈머 5개로 늘려보겠다. 카프카 서버에서 아래와 같이 명령어를 실행해보면 토픽의 파티션을 5개로 설정할 수 있다. 단순 메시지를 발행/수신하는 "coffee" 이라는 이름의 토픽의 파티션 개수를 5개로 설정하였다.
애플리케이션에서는 KafkaListenerContainerFactory 설정에서 setConcurrency 로 컨슈머의 개수를 설정할 수 있다.
1천개의 대량의 메시지를 순간적으로 발행해보자.
컨슈머는 5개로 잘 동작하며, 1개의 컨슈머 및 토픽 파티션을 사용했던 처음 상황보다는 훨씬 빠르게 처리하는 것을 알 수 있다. 당연한 결과이지만 1개의 파티션에서 분배하던 메시지를 5개의 파티션에서 분배하기 때문에 당연히 훨씬 빨라지는 것이다. 하지만, 메시지 수신 결과는 뒤죽박죽이다. 제일 마지막으로 수신한 메시지가 1000이 아니라는 것을 로그를 통해서 확인할 수 있다. 파티션 분배로 메시지를 나눠서 수신하게 되면, 메시지의 순서 보장은 되지 않는다는 사실을 깨닫게 된다.
주의!! 이글에서는 단일노드,단일브로커 에서의 검토이며, 리플리카 설정 역시 1개로 테스트 중이다. 노드 및 브로커의 개수, 리플리케이션 설정, 메시지 전송방법, 컨슈머 그룹 등 다양한 변수에 의해서 성능은 크게 차이가 날 수 있다. 상용 서비스를 위한 인프라 설정이 아니라, 카프카를 공부하는 과정에서 검토했던 내용이라는 점을 이해해주길 바란다.
이벤트 메시지에 대한 순서보장을 해야한다면, 파티션키를 지정해서 메시지를 보내는 방법이 있다. 혹시, 다른 방법이 있다면 댓글로 알려주길 바란다. 필자의 경험이 부족하여, 파티션키를 지정하는 방법 외에 더 좋은 방법이 생각이 나질 않는다.
https://github.com/sieunkr/spring-kafka/tree/master/spring-kafka-consumer
이제, 드디어... 이 글의 핵심 주제인 Batch Listener 에 대해서 살펴보도록 하자.
(글을 반나절 정도 쓰다가 지쳐서 더이상 자세히 쓸 기운이 없다. 글이 너무 길어지고 있어서 매우 피곤해서 대충 글을 빠르게 마무리를 해야겠다. 혹시라도 1장부터 여기까지 정독해서 읽은 개발자가 있다면...(거의 없겠지만) 지루하고 재미 없는 이글을 여기까지 읽어 준것에 대해서 진심으로 감사하게 생각한다. 남은 내용은 대충 빠르게 마무리하겠다.)
위 샘플 코드를 그대로 활용해서 Batch Listener 에 대해서 소개하겠다.
그림을 그려서 멋지게 설명하고 싶었지만, 생략한다.
KafkaListenerContainerFactory 빈 정의시에 setBatchListener 설정을 true 로 설정하면 Batch 방식으로 메시지를 수신할 수 있다.
필자는 한번에 가져올 수 있는 메시지 최대 사이즈를 500으로 설정하였다.
아래와 같이 코드를 변경하였다. DefaultKafkaConsumerFactory 를 넘겨줄 때 JsonDeserializer 를 반드시 넘겨줘야 한다. 그래서, Config 설정에서 Deserializer 구문은 주석처리를...
어쩃든, 참고 소스는 아래 github 을 참고하길 바란다.
리스너 함수는 아래와 같이 작성한다. 반드시, 애플리케이션에서 커스텀하게 정의한 containerFactory 를 지정해야한다.
Producer 에서 1000 개의 데이터를 대량으로 Coffee DTO 메시지를 보내보자. 해당 토픽은 현재 파티션과 컨슈머의 개수는 1개인 상황인데, 매우 빠르게 메시지를 수신처리하는 것을 확인할 수 있다. 마지막으로 받은 메시지의 개수는 500개 정도 된다.
한 번에 메시지를 대량으로 처리하기 때문에, 매우 뛰어난 성능을 발휘한다.
생략
생략
생략
생략
메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우
메시지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우
전송 속도는 느리지만 메시지 손실이 없어야 하는 경우
생략
이 글 하나로 카프카에 대해서 모든 것을 설명하기에는 무리가 있다. 끝이 없이 작성하다보면 거의 책을 쓰는 수준이 될것 같다. 더 작성하고 싶지만, 상세한 내용은 생략하겠다.
나중에 기회가 되면, 그때 다시 글을 작성하는 것으로..
드디어 마지막 장이다. Spring Cloud Stream 에서의 Batch Consume 에 대해서 검토해보자.
자세한 내용은 생략한다.
https://brunch.co.kr/@springboot/305
https://brunch.co.kr/@springboot/2
스프링부트 2.2.2.RELEASE , Spring Cloud Hoxton.SR1 디펜던시를 추가한다.
핵신 모듈인 Spring Cloud Stream Kafka 는 3.0.1.RELEASE 버전으로 추가된다.
메시지 수신 메서드를 함수형 인터페이스로 구성할 예정이며, application.yml 파일에서 batch-mode 설정을 true, ackEachRecord 는 false 로 설정하면 Batch Listener 를 작동할 수 있다.
아래 코드와 같이 Consumer 함수형 인터페이스를 사용하였다.
혹시, Consumer 가 어떻게 동작되는지 모르겠다면 필자의 글을 읽어보길 바란다.
https://brunch.co.kr/@springboot/305
Producer 에서 1천개의 메시지를 순간적으로 발행해보자. 아래와 같이 Batch 로 메시지를 대량으로 수신하는 것을 확인할 수 있고, 마지막 메시지는 497건의 메시지를 받으면서 끝난다.
@EnableBinding(Sink.class), @StreamListener 등의 어노테이션을 사용하는 기존 방식에서도 Batch Mode로 메시지를 수신하는 것을 확인하였다. 하지만, DTO 매핑이 정상적으로 되지 않고 Byte 코드 그대로 전달된다. 필자가 잘못한건지 아니면 스프링에서 지원하지 않는지 정확히 확인을 하지 못하였다.
Spring Cloud Stream 3.X버전에서 레거시(EnableBinding, StreamListener)코드의 Batch Mode 를 정상적으로 사용할 수 있는 방법을 아는 개발자는 필자에게 알려주길 바란다.
또한, Spring Cloud Stream 2.X 포함 그 이전 버전에서 Kafka Batch Mode 를 사용할 수 있는지 아는 개발자는 알려주길 바란다.
필자의 샘플 코드는 github 에서..
https://github.com/sieunkr/spring-kafka/tree/master/scst-kafka-consumer
이번 글에서는 스프링 부트 환경에서 Kafka 연동 및 Batch 방식으로 대량의 메시지를 가져오는 방법에 대해서 검토하였다. 사실, 그동안 RabbitMQ를 주로 사용하였는데, 이번에 Kafka 를 살펴보면서 RabbitMQ 와 Kafka 의 장단점에 대해서 좀 더 상세하게 공부를 하고 있다.
메시지 시스템에 대한 내공을 더 쌓고 언제가 될지 모르겠지만 RabbitMQ vs Kafka 라는 주제로 글을 작성할 예정이다.