- 멀티 쓰레드 및 이벤트 기반 배치 연동
by sieun
이번 글에서는, Spring Batch와 RabbitMQ를 연동하여 이벤트 기반 배치 프로세스를 구축한다. 일반적으로 배치 프로그램의 종류는 아래와 같다.
정기 배치 : 정해진 시간에 자동으로 실행
이벤트 배치 : 정해진 조건(이벤트)에 맞으면 자동 또는 수동으로 실행
이벤트 배치는 정해진 조건에 맞으면, 즉 Event-Driven(이벤트 기반)으로 배치가 실행이 된다. 이번 글에서는 RabbitMQ를 연동하여 Spring Batch의 Job을 이벤트 기반으로 실행하게 할 것이다.
(필자는 Spring Batch 개발은 이번이 처음이라서 미흡한 점이 많을 것이다.
잘못된 점 피드백 부탁드립니다...
개발 요구사항을 정리하면 아래와 같다.
간단한 Spring Batch(스프링 배치) 프로젝트를 구축한다
Spring Batch 프로젝트는 Spring Boot(스프링 부트) 기반으로 구현하며, 웹으로 운영한다.
Event-Driven(이벤트 기반)으로 Batch Job(배치 잡) 이 실행된다.
이벤트 메시지는 RabbitMQ를 브로커로 활용한다.
Spring Cloud Stream를 활용하여 RabbitMQ에 연동한다.
Batch Job 은 싱글 쓰레드가 아닌, 멀티 쓰레드로 동작한다.
멀티 쓰레드로 실행되는 Job은, Job 성격에 맞게 쓰레드 풀(Pool) 관리를 Job별로 따로 한다.
개발을 위해서 억지로 예시를 만들었다. 카페가 있다고 가정하자. 카페에서는 커피를 만들 수 있고, 브런치를 만들 수도 있다. 근데 이 카페는 커피를 만드는 사람과 브런치를 만드는 사람은 서로 작업을 도와주지 않는다. 커피 만드는 사람은 커피만 만들어야 하고, 브런치를 만드는 사람은 브런치만 만들어야 한다. 커피 만드는 사람은 6명이고, 브런치를 만드는 사람은 2명이다. 커피 또는 브런치를 만드는 과정이 하나의 Job이다. Job에는 정해진 Step(과정)을 순차적으로 수행한다. (참고로, 스프링 배치에서 Step은 Reader, Processor, Writer의 과정을 수행한다.) 주문받는 종업원의 역할은, 주문을 전달하는, 즉 메시지를 전달하는 MQ, 메시지 브로커라고 가정하자. MQ는 주문을 받아서 어떤 작업을 할지 전달하고, 스프링 배치 프로젝트의 커피 리스너와 브런치 리스너가 메시지를 받아서 Job을 실행한다. 이때 리스너는 멀티스레드로 Job을 실행시킨다. 커피는 6개의 쓰레드이고, 브런치는 2개의 쓰레드이다. 커피를 만드는 바리스타와 브런치를 만드는 제빵사는 각자의 역할이 나누어 있기 때문에, 서로의 일을 도와주지 않는다. 만약 커피를 만드는 과정에서 문제가 발생해서, 커피 만드는 일이 정체되고 있어도, 브런치는 계속 만들어 낼 수 있다. 각자의 쓰레드풀에서 동작하기 때문이다. 만약 이런 경우에, 특정 Job(예를 들어서 브런치 만드는 일 또는 커피 만드는 일)이 너무 많이 몰려있을 때는, 해당 Job 에 대한 주문을 더 이상 받지 않도록 할 수도 있다. 누가? 주문을 받는 사람이, 즉, 메시지 브로커가 수행한다. RabbitMQ에서 더 이상 메시지를 보내지 않도록 대기(Ready) 하는 것이다.
사실 현실 세계에서는 변수가 많기 때문에, 이 예시는 매우 억지스럽다. 혹시라도 위 예시가 전혀 이해가 되지 않는다면 이 글은 패스해도 된다.
기본적으로 스프링 배치를 한 번이라도 사용해 본 개발자에 한해서 이 글을 읽어주시길 바란다. Spring Batch에 대해서 전혀 모른다면 이 글을 읽는 것이 어려울 수는 있다. 왜냐면 필자의 설명이 매우 불친절하기 때문이다. 혹시라도 Spring Batch의 전문가가 있다면, 이 글을 보고 잘못된 점을 지적해주길 바란다. 일단, 스프링 배치에 대한 설명은 깔끔하게 생략한다. 스프링 배치를 몇일 공부하고 몇 가지 의문점이 생겼다.
첫 번째 의문은, DB에 대한 의존성을 줄이는 방법이 있는가?
일단, 테이블의 의존성으로 인해서 배치가 실행이 안 되는 경우가 종종 있다. 예를 들어서 마지막 배치 실행이 정상적으로 종료가 안되었을 때 다음 배치가 실행이 안된다. DB에 저장된 값 때문에 배치에 문제가 있다면 해당 값을 수정하여 정상적으로 배치 실행을 할 수 있다. 의존성을 분리하는 방법에 대해서는 명확하게 답을 찾지는 못했다.
(참고로 DB를 인메모리 DB로 구현이 가능하다. 배치 실행 로그를 따로 저장/관리/모니터링하고 있다면, 배치 테이블의 로그는 휘발성이어도 상관없을 것 같다. H2 등의 자바 인메모리 DB로 구축을 하면 된다.)
두 번째 의문은, 스프링 배치 프로젝트에서 Job을 병렬로 실행할 수 있도록 지원하는 기능이 있는가?
역시 명확하게 찾지 못했다. 스프링 레퍼런스에서는 아주 간략하게 설명이 되어있다. Step 병렬 실행과 Partitioner로 병렬 실행에 대해서는 상세한 가이드가 되어있지만, Job 병렬 실행에 대해서는 사례를 찾기가 쉽지 않다.
오해하면 안 된다. 내가 하고 싶은 것은 Job 병렬 실행이다. Step 병렬 실행 및 Partitioner 병렬 실행은 이미 알고 있는 내용이다.
메시지 패턴은 매우 중요한데, 특히 최근 클라우드 환경 또는 MSA(마이크로 서비스 아키텍처)에서 반드시 필요한 개념이다. 개인적으로는 MSA 에 대한 관심이 많아서 자연스럽게 메시지 패턴 기반의 Event-Driven(이벤트 기반) 아키텍처 구축에 대해서 공부 중인데, MSDN을 많이 참고하고 있다. 아래 링크를 참고하자.
https://docs.microsoft.com/ko-kr/azure/architecture/patterns/category/messaging
https://msdn.microsoft.com/library/dn589781.aspx
Spring Batch와 RabbitMQ를 연동하는 방법은 여러 가지인데, 이번 글에서는 Spring Cloud Stream 라이브러리를 활용하여 연동 작업을 하였다. 아래 링크에서 연동 방법을 간단하게 확인 가능하다.
https://brunch.co.kr/@springboot/17
Spring Cloud Stream을 활용하여 리스너 클래스를 두 개 생성한다. 커피 리스너와 브런치 리스너를 만든다. 각각의 리스너 클래스는 각각의 스레드로 실행이 된다. 지금까지의 작업을 간단하게 쉽게 설명하면 커피 만드는 사람이 한 명, 브런치 만드는 사람이 한 명이고, 각자의 역할을 수행하는 것으로 이해하면 된다. 또한 각각의 쓰레드는 동기 프로세스로 구현된다. 즉, 하나의 커피를 완성하기 전에는, 다른 커피를 동시에 만들 수가 없다.
각 작업에 대한 Queue 가 있고, 큐는 durable 설정을 true로 설정하였다.
spring.cloud.stream.rabbit.bindings.SINK-BRUNCH.consumer.durableSubscription=true
각 큐에 대한 바인딩키는 coffee.update.test, brunch.update.test로 설정하였는데 Exchange 타입이 direct 이기 때문에 라우팅 키가 바인 딩키와 완벽하게 일치해야 메시지가 전송이 된다. 위에 그림에서는 Spring Batch가 두 개인데, 하나의 스프링 배치로 운영이 가능하다면 하나로 구성해도 된다. 일단, 이번 글에서는 Spring Batch 프로젝트가 한 개라는 가정하에 글을 작성해본다. 만약 두대로 운영을 한다면, 큐에 연동된 배치 프로젝트에서 group 설정을 하면, 구독하고 있는 두대의 배치가 번갈아가면서 메시지를 전송받을 것이다. 일단 이번 글에서는 헷갈리니깐 한대의 스프링 배치에 메시지를 전송한다고 가정하고 글을 작성한다. 샘플 소스는 아래 github에서 확인 가능하다.
https://github.com/sieunkr/spring-batch/tree/master/demon-single-thread
커피 만드는 작업, 브런치 만드는 작업에 대한 각각의 리스너를 통해서, 각각의 쓰레드에서 이벤트 기반으로 배치를 수행하는 작업을 구현하였다. 하지만, 내가 원하는 멀티쓰레드 환경은 아니다. 위 상황은 커피 만드는 사람이 한 명, 브런치 만드는 사람이 한 명인 상황이다.
나는 커피는 6명, 브런치는 2명이 (멀티쓰레드)환경으로 만들고 싶다.
Spring Batch에서 기본적으로 Job 에 대한 멀티쓰레드 지원에 대해서 명확하게 설명을 못 찾았다. 레퍼런스에서는 아래와 같이 나와있기는 하다.
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> </property>
</bean>
"Any implementation of the spring
TaskExecutor interface can be used to control how jobs are asynchronously executed."
XML 설정 방법을 아래와 같디 Java로 변경해서 구현한다. DefaultBatchConfigurer를 상속받아서 override 하면 될 것 같다. (내 생각이다, 개발팀에 다른 팀원에게 물어봤는데, 다들 잘 모른다고 한다.) 일단 ThreadPoolTaskExecutor 빈을 선언한다.
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(6);
pool.setMaxPoolSize(30);
pool.setWaitForTasksToCompleteOnShutdown(false);
return pool;
}
그리고, DefaultBatchConfigurer를 상속받고 createJobLauncher() 메소드를 Override 한다.
@Component
@EnableBatchProcessing
public class CustomBatchConfigurer extends DefaultBatchConfigurer {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(super.getJobRepository());
jobLauncher.afterPropertiesSet();
jobLauncher.setTaskExecutor(taskExecutor);
return jobLauncher;
}
}
이때 중요한 것은 위에서 선언한 taskExecutor 빈을 setTaskExecutor 메서드에 넘겨준다.
jobLauncher.setTaskExecutor(taskExecutor);
(항상 느끼지만, 카카오 브런치로 개발 블로그를 하면 안 될 것 같다. 코드를 이쁘게 붙여 넣기가 너무 어렵다. 조만간 다른 블로그로 넘어갈지도 모르겠다....)
암튼, 이렇게 선언하면 리스너 클래스에서 오토와이어링한 JobLauncher는 멀티쓰레드에 의해서 실행이 된다. 위 소스에서는 6개의 쓰레드가 실행이 된다. (사실 6개 이상의 쓰레드로 동작한다. setMaxPoolSize 설정도 바꿔야 하고, 예비 큐 사이즈도 0으로 설정해야한다. 일단 그러려니 하고 넘어가자..) 추가로 들어오는 요청 중에서 대기 중인 경우에는 쓰레드풀의 큐에 쌓이고, 큐의 용량이 꽉 차면 Spring Cloud Stream 의 리스너 이벤트에서 더 이상 메시지를 수신하지 않는다. 이 경우 메시지 브로커에 메시지가 발행이 되면, 구독하고 있는 스프링 배치에서 더 이상 메시지를 못 가져오기 때문에 RabbitMQ의 Ready 카운트가 증가하게 된다.
하지만, 이 방법도 뭔가 내가 생각하는 방향이 아닌 것이었다........
각각의 Job 에 대한 쓰레드 관리를 같이 하다. 만약, 커피 만드는 작업은 금방 처리가 되는데, 브런치 만드는 작업이 오래 걸리는 작업이라면 어떻게 될까? 6개의 쓰레드가 모두 브런치만 만들고 있다면 커피 만드는 작업은 못하고 대기해야 한다. 이 카페의 종업원은 커피를 만들 수도 있고, 브런치를 만들 수도 있고 아주 멀티 플레이어이다. 하지만 내가 원했던 방향은, 커피 만드는 일과, 브런치 만드는 일을 확실하게 구분하여 각자의 역할을 따로 수행을 한다는 가정이다. (현실세계는 그렇지 않지만) 지금 구현한 내용은 아래 소스에서 확인 가능하다.
https://github.com/sieunkr/spring-batch/tree/master/demon-multi-thread
CustomBatchConfigurer에서의 createJobLauncher()는 공통 선언 메소드인데 이 부분을 각각의 Job 에 따라서 나누면 될 것 같은데, 솔직히 잘 모르겠다. 일단 포기, GG
(필자는 스프링 배치 개발이 처음이다. github 의 소스는 오류 투성이니 그냥 패스해도 좋다.)
위 방법으로는 잘 생각이 나지 않아서, @Async 어노테이션을 활용해보려고 한다. 일단, 2개의 ThreadPoolTaskExecutor 빈을 선언한다. 명시적으로 빈 이름을 설정한다.
@Bean(name = "threadPoolTaskExecutorByCoffee")
public ThreadPoolTaskExecutor taskExecutorByCoffee() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(6);
pool.setMaxPoolSize(30);
pool.setWaitForTasksToCompleteOnShutdown(false);
return pool;
}
@Bean(name = "threadPoolTaskExecutorByBrunch")
public ThreadPoolTaskExecutor taskExecutorByBrunch() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(2);
pool.setMaxPoolSize(2);
pool.setWaitForTasksToCompleteOnShutdown(false);
return pool;
}
그리고, 리스너 클래스 상단에 @EnableAsync를 선언하고, @StreamListener 메소드에 @Async 메소드를 선언한다. 이때 중요한 것은 빈의 이름을 명시적으로 선언한다.
@EnableAsync
@EnableBinding(BrunchListener.Sink.class)
public class BrunchListener {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job brunchProcessJob;
@Async("threadPoolTaskExecutorByBrunch")
@StreamListener(Sink.inboundTest)
public void subscribe() {
생략...
}
생략...
}
실행하면 cutorByCoffee라는 쓰레드와, cutorByBrunch라는 쓰레드가 생성이 된다. 이렇게 구현하면, 멀티 쓰레드로 구현이 가능하며, 각 Job 에 대한 쓰레드를 따로 관리할 수 있다. 브런치 만드는 쓰레드가 모두 실행 중이더라도, 커피 만드는 일은 커피 쓰레드를 통해서 개별적으로 Job을 수행할 수 있다.
https://github.com/sieunkr/spring-batch/tree/master/demon-multi-thread-async
요구사항에 맞게 나름 구현이 된 것 같지만, 의문점이 많다.
사실 업무 때문에 한달전에 고민을 했던 내용인데, 까먹기 전에 글로 남긴다. 부서 이동으로 인해서 실제로 적용은 하지 못했다. 해당 배치는 명백하게 이벤트 기반으로 작동을 해야 하기 때문에 메시지 브로커를 앞에 둔다. 메시지 브로커에 연동되어 있는 배치는 확장성이 뛰어나고, RabbitMQ 의 라우팅 기능으로 인해서 유연한 메시지 전송이 가능하다. 메시지 패턴을 적용했을 때 프로젝트 간의 통합은 느슨하게 연결되는데, 이 느슨한 연결이 핵심이다. 배치 Job 에 포함되어 있는 Step 은 파티셔너 기능으로 인해서 병렬 실행을 한다. 하지만, 동일한 Job에 이벤트가 몰렸을 경우 Job을 멀티쓰레드로 실행을 해야 한다. 단, 특정 Job 의 경우에는 실행 시간이 오래 걸릴 수도 있기 때문에, Job 을 관리하는 쓰레드를 따로 관리하고, 한쪽(특정 Job) 에서 문제가 발생을 해도, 다른 Job 은 정상적으로 실행이 되어야 하는 아키텍처를 검토했던 상황이었다.
생각한 요구사항에 맞게 빠르게 반나절 정도 걸려서, 개발을 해봤지만 여러 가지 의문점이 남는다.
과연 이 방법이 최선일까? (왠지 아닐 거 같은데..)
Thread Safe 한가? 쓰레드 간에 충돌이 나지는 않을까?
비동기(Async)와 멀티 쓰레드는 개념은 다른데, 나는 지금 이 두 개념을 모두 이해하고 개발을 한 건가?
사실 주니어 개발자들이 많이 헷갈리는 부분이 바로 비동기와 멀티쓰레드에 대한 것이다. 비동기와 멀티쓰레드를 같은 개념이라고 생각하는 개발자가 꽤 많다. 하지만 아니다. 나는 주니어 개발자가 아닌데도 헷갈린다.
싱글 쓰레드, 동기
싱글 쓰레드, 비동기
멀티 쓰레드, 비동기
예를 들어서 정리하면 이렇다.
싱글 쓰레드, 동기 :
카페에 한 명의 종업원이 있다. 근데, 이 종업원은 한 번에 하나의 일만 가능하다. 커피 1잔과, 브런치 1개를 만들어야 하는 상황이라면 커피 1잔을 먼저 만들고 그다음에 브런치를 순차적으로 만든다.
싱글 쓰레드, 비동기 :
카페에 역시 한 명의 종업원이 있다. 근데 커피머신, 브런치 만드는 장비에는 타이머 설정이 가능하다. 커피 머신에 커피 1잔을 올려놓고, 다른 일을 한다. 커피가 완성되면 알람이 울리고 그때 가서 커피 완성을 마무리하면 된다. 어쨌든 이 종업원은 커피 1잔을 만들어서 타이머를 걸어 놓고 브런치를 만들기 위해 전자레인지에 브런치 재료를 넣어놓고, 설거지를 하다가 알람이 울리면 다시 돌아와서 커피와 브런치를 완성한다.
멀티 쓰레드, 비동기 :
카페에 종업원이 여러 명이다. 비동기로 진행하는 업무를 여러 명에서 동시에 작업이 가능하다. 단, 해당 종업원 간에 충돌이 나지 않도록 잘 관리해야 한다. 예를 들어서, 종업원 A가 만드는 커피는 아메리카노, 종업원 B가 만드는 커피는 카페모카인데 서로가 만드는 커피가 섞이면 안 된다. 아메리카노에 우유를 넣으면 안 되는 것이다.
멀티 쓰레드 + 비동기 가 맞나?
@Async 가 비동기 개념인지, 멀티쓰레드 개념인지 헷깔린다. 망했다....
내가 개발한 작업은, 혹시 멀티 쓰레드 이면서 비동기는 아닌거 같은데...
(멀티 쓰레드, 동기인가??????;;; )
과연 나는 비동기, 멀티쓰레드 개념을 제대로 이해하고 있는가??
주변에 물어봐도 제대로 대답해주는 개발자가 없다.
의문점만을 남긴 채 글을 마무리한다. 그래도 이번 기회에 Spring Batch에 대해서 조금은 알게 되었다.
부족한 실력에 반성하고 또 반성하면서 이 글을 마치려고 한다.