FaultTolerantStepBuilder의 재처리와 넘김 처리
아래 코드는 Step을 빌드하는 간단한 예제다. 이때 생성되는 Step Builder는 SimpleStepBuilder 클래스의 인스턴스다.
stepBuilderFactory.get(stepName)
.chunk<Input, Output>(chunkSize)
.reader(yourItemReader)
.processor(yourItemProcessor)
.writer(yourItemWriter)
---
추가로 SimpleStepBuilder의 faultTolerant() 메서드를 호출하면 SimpleStepBuilder를 감싼 확장 클래스인 FaultTolerantStepBuilder가 생성된다.
stepBuilderFactory.get(stepName)
.chunk<Input, Output>(chunkSize)
.reader(yourItemReader)
.processor(yourItemProcessor)
.writer(yourItemWriter)
.faultTolerant()
---
SimpleStepBuilder.java
public FaultTolerantStepBuilder<I, O> faultTolerant() {
return new FaultTolerantStepBuilder<>(this);
}
---
SimpleStepBuilder와 마친가지로 FaultTolerantStepBuilder는 AbstractTaskletStepBuilder의 구현체 클래스로 TaskletStep을 빌드하는 클래스다.
FaultTolerantStepBuilder는 확장된 기능으로 처리 실패한 아이템에 대해서 retry(재처리)와 skip(넘김 처리)을 추가로 제공한다.
/**
* A step builder for fully fault tolerant chunk-oriented item processing steps. Extends
* {@link SimpleStepBuilder} with additional properties for retry and skip of failed
* items
FaultTolerantStepBuilder는 SkipPolicy, RetryPolicy, BatchRetryTemplate 세 클래스를 구성요소로 실패에 대응한다.
SkipPolicy는 실패한 아이템에 대해서 넘김 처리를 할지 정책적으로 결정하는 인터페이스이다.
/**
* Policy for determining whether or not some processing should be skipped.
*
FaultTolerantStepBuilder의 기본 SkipPolicy는 LimitCheckingItemSkipPolicy 인스턴스다.
createSkipPolicy에서 skipPolicy 필드가 null 인 경우 LimitCheckingItemSkipPolicy 인스턴스를 생성해 필드에 값을 설정해 준다.
FaultTolerantStepBuilder.java
protected SkipPolicy createSkipPolicy() {
(중략)
LimitCheckingItemSkipPolicy limitCheckingItemSkipPolicy = new LimitCheckingItemSkipPolicy(skipLimit, map);
// skipPolicy가 null이면 위에서 생성한 limitCheckingItemSkipPolicy로 설정
if (skipPolicy == null) {
Assert.state(!(skippableExceptionClasses.isEmpty() && skipLimit > 0),
"If a skip limit is provided then skippable exceptions must also be specified");
skipPolicy = limitCheckingItemSkipPolicy;
}
(중략)
return skipPolicy;
}
---
LimitCheckingItemSkipPolicy는 넘김 처리 허용 횟수와 넘김 처리를 허용할 익셉션 타입을 명시해준다.
허용 횟수/ 익셉션까지만 넘김 처리를 허용한다.
LimitCheckingItemSkipPolicy.java
/**
* @param skipLimit the number of skippable exceptions that are allowed to be skipped
* @param skippableExceptions exception classes that can be skipped (non-critical)
*/
public LimitCheckingItemSkipPolicy(int skipLimit, Map<Class<? extends Throwable>, Boolean> skippableExceptions) {
this(skipLimit, new BinaryExceptionClassifier(skippableExceptions));
}
---
기본 SkipPolicy가 아닌 커스텀된 넘김 정책 처리가 필요하다면 skipPolicy() 메서드로 별도로 생성된 SkipPolicy 구현체 인스턴스를 주입해 준다.
public FaultTolerantStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
this.skipPolicy = skipPolicy;
return this;
}
---
SkipPolicy는 ChunkProvider와 ChunkProcessor에 각각 주입돼 Step의 ItemReader, ItemProcessor, ItemWriter의 넘김 처리 정책을 결정한다.
protected ChunkProvider<I> createChunkProvider() {
SkipPolicy readSkipPolicy = createSkipPolicy(); <----
readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
FaultTolerantChunkProvider<I> chunkProvider =
new FaultTolerantChunkProvider<>(getReader(), createChunkOperations());
chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
chunkProvider.setSkipPolicy(readSkipPolicy); <----
---
protected ChunkProcessor<I> createChunkProcessor() {
BatchRetryTemplate batchRetryTemplate = createRetryOperations(); <----
FaultTolerantChunkProcessor<I, O> chunkProcessor = new FaultTolerantChunkProcessor<>(getProcessor(),
getWriter(), batchRetryTemplate);
chunkProcessor.setBuffering(!isReaderTransactionalQueue());
chunkProcessor.setProcessorTransactional(processorTransactional);
SkipPolicy writeSkipPolicy = createSkipPolicy();
writeSkipPolicy = getFatalExceptionAwareProxy(writeSkipPolicy);
chunkProcessor.setWriteSkipPolicy(writeSkipPolicy); <----
chunkProcessor.setProcessSkipPolicy(writeSkipPolicy); <----
---
FaultTolerantChunkProvider.java
FaultTolerantChunkProvider는 FaultTolerantStepBuilder의 ChunkProvider 구현체로 아이템을 읽어와 처리할 수 있도록 공급하는 역할을 한다.
FaultTolerantChunkProvider.read() 메서드 내부에서 doReade()를 호출해 ItemReader를 실행한다. 이때 익셉션이 터지면 catch 블록에서 shouldSkip()을 익셉션을 무시하고 넘김 처리를 할지 아니면 익셉션을 던질지 결정한다.
@Override
protected I read(StepContribution contribution, Chunk<I> chunk) throws Exception {
while (true) {
try {
return doRead(); <----
}
catch (Exception e) { // 아이템 읽기 중 터진 익셉션을 잡아 넘김 처리할지 익셉션을 밖으로 던질지 결정
if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { <----
// increment skip count and try again
contribution.incrementReadSkipCount();
chunk.skip(e);
if (chunk.getErrors().size() >= maxSkipsOnRead) {
throw new SkipOverflowException("Too many skips on read");
}
logger.debug("Skipping failed input", e);
}
else { <---- 넘김 처리하지 않을 경우(shouldSkip== false) 익셉션을 던진다
if (rollbackClassifier.classify(e)) {
throw new NonSkippableReadException("Non-skippable exception during read", e);
}
logger.debug("No-rollback for non-skippable exception (ignored)", e);
}
---
FaultTolerantChunkProcessor 역시 catch 블록에서 ItemProcessor 또는 ItemWriter에서 던진 익셉션을 잡아 넘김 처리할지 말지 여부를 판단한다.
catch (Exception e) {
status = BatchMetrics.STATUS_FAILURE;
if (rollbackClassifier.classify(e)) {
throw e;
}
else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { <----
contribution.incrementProcessSkipCount();
logger.debug("Skipping after failed process with no rollback", e);
callProcessSkipListener(item, e);
}
else {
throw new NonSkippableProcessException(
"Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.", e);
}
}
---
else if 블록에서 shouldSkip()이 true 인 경우 넘김 처리를 하고 아닌 경우 아래 else 블록에서 NonSkippableProcessException을 던진다.
FaultTolearantChunkPovider와 Processor는 FaultTolerantStepBuilder로부터 주입받은 SkipPolicy에 따라서 아이템을 넘김 처리할지 말지 여부를 결정한다. 넘김 처리된 아이템의 에러는 무시하고 다음 아이템이 처리된다.
SkipPolicy와 비슷하게 할지 말지를 결정하는 클래스다. 다만 넘김 처리가 아닌 재처리에 대한 정책적 결정을 담당한다.
public interface RetryPolicy extends Serializable {
boolean canRetry(RetryContext var1); <----
RetryContext open(RetryContext var1);
void close(RetryContext var1);
void registerThrowable(RetryContext var1, Throwable var2);
}
canRetry가 SkipPolicy.shouldSkip()처럼 재처리 여부를 판단하는 메서드다(나머지 메서드에 대한 설명은 생략한다).
RetryPolicy를 별도로 설정하지 않은 경우 FaultTolereantStepBuilder는 RetryPolicy를 SimpleRetryPolicy로 설정한다.
protected BatchRetryTemplate createRetryOperations() {
RetryPolicy retryPolicy = this.retryPolicy;
SimpleRetryPolicy simpleRetryPolicy = null;
Map<Class<? extends Throwable>, Boolean> map = new HashMap<>(retryableExceptionClasses);
map.put(ForceRollbackForWriteSkipException.class, true);
simpleRetryPolicy = new SimpleRetryPolicy(retryLimit, map); <----
if (retryPolicy == null) { // retryPolicy 필드가 null이면 SimpleRetryPolicy로 설정한다
Assert.state(!(retryableExceptionClasses.isEmpty() && retryLimit > 0),
"If a retry limit is provided then retryable exceptions must also be specified");
retryPolicy = simpleRetryPolicy; <----
}
(중략)
/**
* RetryPolicy는 BatchRetryTemplate과 함께 동작한다.
* RetryPolicy를 BatchRetryTemplate에 주입한다.
* BatchRetryTemplate는 FaultTolerantChunkProcessor로 넘겨져 ItemProcessor와
* ItemWrite를 실행하는 콜백 함수를 실행한다.
**/
RetryPolicy retryPolicyWrapper = getFatalExceptionAwareProxy(retryPolicy);
BatchRetryTemplate batchRetryTemplate = new BatchRetryTemplate(); <----
if (backOffPolicy != null) {
batchRetryTemplate.setBackOffPolicy(backOffPolicy);
}
batchRetryTemplate.setRetryPolicy(retryPolicyWrapper); <----
(중략)
return batchRetryTemplate;
}
---
SimpleRetryPolicy는 최대 재처리 횟수와 재시도 허용하는 익셉션을 주입받아 재처리 여부를 결정한다.
public SimpleRetryPolicy(int maxAttempts, Map<Class<? extends Throwable>, Boolean> retryableExceptions) {
this(maxAttempts, retryableExceptions, false);
}
---
FaultTolerantChunkProvider는 재처리를 지원하지 않고 FaultTolerantChunkProcessor에서만 지원한다. 따라서 ItemReader는 익셉션 발생 시 재처리되지 않는다.
protected ChunkProcessor<I> createChunkProcessor() {
BatchRetryTemplate batchRetryTemplate = createRetryOperations(); <---
FaultTolerantChunkProcessor<I, O> chunkProcessor =
new FaultTolerantChunkProcessor<>(getProcessor(), getWriter(), batchRetryTemplate);
(후략)
---
아래 코드는 ItemProcessor를 실행하는 FaultTolerantChunkProcessor의 transform 메서드이다. ItemProcessor를 직접 호출하지 않고 실패 시 재처리를 지원하는 BatchRetryTemplate 콜백 함수에서 호출하도록 정의하여 넘겼다.
@Override
protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
(중략)
for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {
@Override
public O doWithRetry(RetryContext context) throws Exception {
Timer.Sample sample = BatchMetrics.createTimerSample();
String status = BatchMetrics.STATUS_SUCCESS;
O output = null;
try {
O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
if (cached != null && !processorTransactional) {
output = cached;
}
else {
output = doProcess(item); <--- ItemProcessor 실행
if (output == null) {
data.incrementFilterCount();
}
else if (!processorTransactional && !data.scanning()) {
cache.add(output);
}
}
}
(중략)
};
RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
@Override
public O recover(RetryContext context) throws Exception {
// 재처리 횟수 초과 후 처리 로직
};
// ItemProcessor를 batchRetryTemplate의 콜백함수로 넘겨 실행한다.
O output = batchRetryTemplate.execute(retryCallback, recoveryCallback,
new DefaultRetryState(getInputKey(item), rollbackClassifier)); <---
(후략)
---
ItemWriter도 ItemProcessor와 동일하게 RetryCallback 함수에서 호출되도록 정의하여 BatchRetryTemplate으로 파라미터로 넘겨 실행한다.
@Override
protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs) throws Exception {
final UserData<O> data = (UserData<O>) inputs.getUserData();
final AtomicReference<RetryContext> contextHolder = new AtomicReference<>();
RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>
(중략)
try {
batchRetryTemplate.execute( <---
retryCallback,
recoveryCallback,
new DefaultRetryState(inputs, rollbackClassifier));
}
(후략)
}
---
BatchRetryTemplate은 SimpleRetryPolicy에 정의된 재처리 허용 익셉션을 허용 횟수까지 시도한다.
FaultTolerantStepBuilder의 retryLimit(재처리 횟수)의 기본 값은 0이므로 빌드할 때 retryLimit(yourRetryLimit) 으로 최대 재처리 횟수 값을 지정해 주자. 안 그러면 횟수 제한에 걸려 재처리가 실행되지 않는다.
public class FaultTolerantStepBuilder<I, O> extends SimpleStepBuilder<I, O> {
private int retryLimit = 0;
---
public FaultTolerantStepBuilder<I, O> retryLimit(int retryLimit) {
this.retryLimit = retryLimit;
return this;
}
---
1. FaultTolerantStepBuilder는 익셉션 발생 시 넘김 처리와 재처리하는 TaskletStep을 빌드한다.
2. 넘김 처리와 재처리 정책을 결정하는 인터페이스는 SkipPolicy와 RetryPolicy다.
3. ItemReader는 넘김 처리만 지원한다.
4. ItemProcessor, ItemWriter는 넘김 처리와 실패처리를 모두 지원한다.
5. (중요) retryLimit으로 재처리 최대 횟수를 지정하지 않으면 재처리가 실행되지 않는다(기본 값이 0)