[BATCH-1668] added check for transaction
본 글의 설명과 코드는 spring-batch의 4.3.x 버전을 기준으로 한다. 출처: spring-batch/4.3.x
spring-batch(이하 스프링 배치)의 Job을 트랜잭션 안에서 실행하게 되면 아래와 같은 에러 메시지를 보게 된다. 익셉션 없이 실행하려면 메시지에서 권고하는 바와 같이 @Transactional을 제거해야 한다.
@Transactional
public void runJob() {
jobLauncher.run(job, jobParameters);
}
Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
스프링 배치의 JobRepository 기본 설정이 외부 트랜잭션을 허용하지 않기 때문이다. 스프링 배치 프로젝트 코드를 보며 JobRepository에 대해서 심도있게 알아가보자.
스프링 배치에는 JobRepository를 포함한 기본 컴포넌트를 초기화하는 클래스 DefaultBatchConfigurer가 존재한다(최근에 DefaultBatchiConfiguration으로 변경됨). 동일 타입의 빈을 정의하지 않을 경우에 기본 컴포넌트들이 빈으로 등록된다. 기본 컴포넌트에는 스프링 배치를 구성하는 핵심인 JobRepository, JobExplorer, JobLauncher, TransactionManager가 있다. DefaultBatchConfigurer의 initialize 메서드에서 앞서 언급한 네 개의 빈들이 생성된다.
@PostConstruct
public void initialize() {
try {
if(dataSource == null) {
(중략)...
} else {
this.jobRepository = createJobRepository();
this.jobExplorer = createJobExplorer();
}
this.jobLauncher = createJobLauncher();
(후략)...
JobRepositoryFactoryBean의 getObject 메서드 호출을 통해서 JobRepository를 생성하는데
JobRepositoryFactoryBean가 JobRepository의 팩토리 클래스이다.
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(getTransactionManager());
factory.afterPropertiesSet();
return factory.getObject();
}
@Override
public JobRepository getObject() throws Exception {
if (proxyFactory == null) {
afterPropertiesSet();
}
return (JobRepository) proxyFactory.getProxy(getClass().getClassLoader());
}
팩토리 메서드 getObject를 살펴보면 JpaRepository를 Proxy 객체로 생성하는 것을 알 수 있다. proxyFactory를 따라가서 어떤 프락시 객체로 생성해주는지 살펴보자.
proxyFactory는 initializeProxyFactory 메서드 안에서 초기화된다.
private void initializeProxy() throws Exception {
if (proxyFactory == null) {
proxyFactory = new ProxyFactory();
(중략)...
if (validateTransactionState) {
DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(new MethodInterceptor() {
(중략)...
});
NameMatchMethodPointcut pointcut = new NameMatchMethodPointcut();
pointcut.addMethodName("create*");
advisor.setPointcut(pointcut);
proxyFactory.addAdvisor(advisor);
}
(후략)...
}
DefaultPointCutAdvisor 타입의 advisor를 proxyFactory의 Advisor(어드바이저)로 주입해주고 있다. 프락시에서 어드바이저가 어떤 역할을 하는가? 프락시 대상이 되는 객체의 인터페이스에 부가 기능의 실행 시점과 동작을 정의하는 객체를 의미하지 않는가. 그리고 NameMatchMethodPointCut 설정을 통해서 create* 패턴을 가진 함수에서 실행되도록 정의됐다. 어드바이저의 동작(MethodInterceptor)이 어떻게 구현됐는지는 MethodInterceptor로 구현된다.
DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
throw new IllegalStateException(
"Existing transaction detected in JobRepository. "
+ "Please fix this and try again (e.g. remove @Transactional annotations from client).");
}
return invocation.proceed();// <---타켓 클래스의 메소드호출
}
});
MethodInterceptor는 create* 메서드 실행(invocation) 전에 TransactionSyncronizerManager를 통해 현재 스레드에서 트랜잭션 활성화 여부(isActualTransactionActive)를 확인한다. 그리고 트랜잭션이 활성화돼있다면 IllegalStateException을 던진다. 메시지는 우리가 앞에서 봤던 텍스트와 동일한다.
throw new IllegalStateException(
"Existing transaction detected in JobRepository. "
+ "Please fix this and try again (e.g. remove @Transactional annotations from client).");
드디어 의문을 가졌던 익셉션이 발생하는 이유와 지점을 알았다.
그렇다면 create* 패턴을 가진 JobRepository의 메서드들은 어떤 것들이 있고 언제 호출될까?
JobRepository를 확인하면 바로 알 수 있다. 아래 두 개 메서드이다.
1. createJobInstance
2. createJobExecution
이름 그대로 JobInstance와 JobExecution을 생성하는 메서드들이다. 이름으로 유추해보건대 JobInstance와 JobExecution을 생성하는 시점은 분명 Job을 실행하는 시작 부분일 것이다. 그렇기 때문에 Job이 실행되면서 위 익셉션이 발생했을 것이다. 실제 사용처를 확인해보자.
프로젝트 내에 사용되는 곳을 확인해보니 JsrJobOperator의 start()와 SimpleJobLauncher의 run()에서 호출하고 있다. 두 개의 메서드 모두 Job을 시작하는 메서드다.
JsrJobOperator.java
@Override
public long start(String jobName, Properties params) throws JobStartException,
JobSecurityException {
(생략)...
JobInstance jobInstance = jobRepository.createJobInstance(jobNames[0], jobParameters);
jobExecution = jobRepository.createJobExecution(jobInstance, jobParameters, jobConfigurationLocation);
try {
...
taskExecutor.execute(new Runnable() {
@Override
public void run() {
JsrJobContextFactoryBean factoryBean = null;
try {
...
job.execute(jobExecution);
(후략)..
}
---
SimpleJobLauncher.java
@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, ... {
...
job.getJobParametersValidator().validate(jobParameters);
jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
...
job.execute(jobExecution);
(후략)...
---
JobRepository의 create* 패턴의 메서드들은 Job이 실행(job.Execute(jobExecution))되기 전에 호출된다. 즉 Job을 실행하기 전에 현재(Job을 실행하는) 스레드에 트랜잭션이 활성화됐는지를 검사하게 된다. 그리고 기존에 트랜잭션이 열려 있다면 익셉션을 던져 Job을 멈춘다.
그런데...
트랜잭션을 검사에 대한 필요성은 아래 이슈를 통해 제기돼 해결(ff15bd)됐다(아주 오래전 일이다).
[BATCH-1668] #1916
Check for existing transaction when job is started (and fail if present by default)
요약하면 내용은 대략 이렇다. TaskletStep을 @Transactional이 붙은 메서드에서 실행했는데 두 번째 청크(Chunk)를 처리하던 도중 데드락에 걸린 이슈이다.
TaskletStep은 doExecute()에서 TransactionTemplate으로 TransactionCallback 구현체 ChunkTransactionCallback을 넘겨 Tasklet(이하 테스크릿)을 실행한다(링크). 그리고 TaskletStep은 Semaphore(이하 세마포어)를 넘겨 테스크릿 실행의 동시성 제어를 한다.
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
(중략)...
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext
...
RepeatStatus result;
try {
result = new TransactionTemplate(transactionManager, transactionAttribute)
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
}
...
세마포어는 테스크릿 실행 후 StepExecution의 상태를 업데이트할 때 획득한다. 여기서 세마포어를 획득하지 못하고 데드락이 발생했다는 것이 BATCH-1668 이슈의 내용이다.
@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
TransactionSynchronizationManager.registerSynchronization(this); //TransactionSync 등록
...
try {
result = tasklet.execute(contribution, chunkContext);
...
// If the step operations are asynchronous then we need
// to synchronize changes to the step execution (at a
// minimum). Take the lock *before* changing the step
// execution.
try {
semaphore.acquire(); // 여기서 데드락이 걸렸다.
locked = true;
}
(후략)...
세마포어를 풀어주는 부분이 어디 있었길래 데드락이 걸린 걸까?
TransactionSynchronizationAdapter의 afterCompletion을 구현하여 트랜잭션 커밋 후에 세마포어를 풀도록 구현이 돼있었다. 근데 문제는 afterCompletion이 호출되지 않은 것이다. 왜 호출되지 않았을까?
@Override
public void afterCompletion(int status) {
try {
중략...
finally {
// Only release the lock if we acquired it, and release as late
// as possible
if (locked) {
semaphore.release();
}
locked = false;
...
호출되지 않은 이유는 매우 간단하다. @Transactional을 붙인 메서드에서 테스크릿을 실행했기 때문이다. 기존에는 ChunkTransactionCallback이 실행(doInTransaction)되면서 트랜잭션을 열고 종료하면서 커밋을 했다. 하지만 외부에서 열린 더 큰 단위의 기존 트랜잭션 안에서 실행되면서 해당 트랜잭션이 커밋 시점이 도래하지 않았고 따라서 afterCompletion이 실행되지 않은 것이다.
그렇다면 외부 트랜잭션과 별개의 새로운 트랜잭션을 열면 될까?
TransactionTemplate이 새로운 트랜잭션을 생성하도록 강제하여 외부 트랜잭션과 별개로 커밋하면 데드락 문제는 해결된다. 그렇게 하면 afterCompletion이 기대한 시점에 호출돼 세마포어가 릴리즈되기 때문이다.
transactionTemplate.setPropagationBehavior(PROPAGATION_REQUIRES_NEW);
result = transactionTemplate.execute(new ChunkTransactionCallback(chunkContext, semaphore));
하지만 이 역시 문제는 있다. 외부 트랜잭션과 내부 트랜잭션의 StepExecution 상태의 일관성이 깨지게 될 수 있다. 관련하여 해당 이슈에서도 새로운 트랜잭션이 가지는 문제점에 관해 언급한다. 그리하여 Job 실행 외부에서 임의로 트랜잭션을 시작하지 못하도록 한 것이다.
스프링 배치는 Job과 Step의 실행 상태(JobInstance, JobExecution, StepExecution)를 관리하는 JobRepository가 있다. JobRepository의 오퍼레이션에 스프링 배치 외부의 트랜잭션 개입을 막기 위해 Job 실행 시점에 외부에 기존 트랜잭션이 열려있는지를 검사한다. 만약 트랜잭션이 활성화돼있다면 익셉션을 던져 Job 실행을 중단한다.
프락시를 초기화할 때 트랜잭션 활성화 여부를 체크하지 않도록 하면 된다. initializeProxy에서는 validateTransactionState가 true인 경우에 트랜잭션 상태를 체크하는 메서드 인터셉터를 프락시에 주입한다.
if (validateTransactionState) {
DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
throw new IllegalStateException(
"Existing transaction detected in JobRepository. "
+ "Please fix this and try again (e.g. remove @Transactional annotations from client).");
...
validateTransactionState은 AbstractJobRepositoryFactoryBean의 필드로 기본 값이 true이다. 해당 필드의 값을 false로 설정해주면 된다.
public abstract class AbstractJobRepositoryFactoryBean implements FactoryBean<JobRepository>, InitializingBean {
...
private String isolationLevelForCreate = DEFAULT_ISOLATION_LEVEL;
private boolean validateTransactionState = true;
...
JobRepositoryFactoryBean을 setValidateTransactionState(false)로 설정해주고 JobRepository를 생성해서 빈으로 등록해주면 된다.
@Bean
public JobRepository yourCustomJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(getTransactionManager());
factory.setValidateTransactionState(false);
factory.afterPropertiesSet();
return factory.getObject();
}