brunch

You can make anything
by writing

C.S.Lewis

by anonymDev Dec 14. 2022

스프링 배치 TaskletStep 생성과 동작 코드

spring batch의 효자 클래스 TaskletStep

본 글의 설명과 코드는 spring-batch의 4.3.x 버전을 기준으로 한다. 출처: spring-batch/4.3.x


지난 글 Spring batch와 @Transactional에서 spring batch(이하 스프링 배치)는  @Transaction 사용을 왜 막는가에 대해서 알아봤다. 이슈의 원인이 됐던 TaskletStep을 많이 언급했었다. 그래서 이번 글에서는 TaskletStep의 생성과 동작을 코드를 보며 분석하려 한다. TaskletStep은 Step 인터페이스를 구현한 클래스로 스프링 배치에서 범용적으로 사용되는 Step의 구현체이다. 우리가 쓰고 있던 생성 해서 사용하고 있던 Step은 많은 경우에 TaskletStep으로 구현된 인터페이스였을 것이다.


아래 예제 코드와 같이 StepBuilder를 활용해서 Chunk, ItemReader, ItemProcessor, ItemWriter로 빌드하는 방식으로 Step 생성한다. 가장 보편적인 Step 생성 코드인데 여기서 생성되는 Step 역시 구현체는 TaskletStep이다.

@Bean
public Step step(JobRepository jobRepository) {
   return new StepBuilder("step", jobRepository)
         .<Trade, Trade>chunk(2)
         .reader(itemReader())
         .processor(itemProcessor())
         .writer(itemWriter())
         .build();
}


TaskletStep에 대해서 알아보면 스프링 배치의 기본 원리에 대해서 이해하는데 도움이 된다.

TaskletStep 관련 코드를 살펴보며 위 코드에서 주입한 ItemReader, ItemProcessor, ItemWriter가 어떻게 동작하고 Chunking은 어떻게 동작하는지 살펴보려 한다. 우선 TaskletStep이 어떻게 생성되는지부터 알아야 한다.


TaskletStep의 생성 과정


StepBuilder의 빌드 메서드인 .chunk(), reader(), processor(), writer()는 모두 SimpleStepBuiler 타입을 생성해서 반환한다.


public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
   return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
}

(링크)


아래 세 개 Builder 메서드는 SimpleStepBuilder에 정의돼있다.

public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
   this.reader = reader;
   return this;
}

(링크)

public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
   this.processor = processor;
   return this;
}


public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
   this.writer = writer;
   return this;
}


그리고 마침내 build 메서드는 TaskletStep을 생성해서 반환한다. 

@Override
public TaskletStep build() {
   registerStepListenerAsItemListener();
   registerAsStreamsAndListeners(reader, processor, writer);
   return super.build();
}

SimpleStepBuilder의 부모는 AbstractTaskletStepBuilder 클래스이다. build() 함수 내부에서 부모의 메서드를 호출하는데 부모의 build() 메서드에 TaskletStep을 세팅하는 핵심들이 로직들이 모여있다. 이곳에서 TaskletStep이 초기화되고 TaskletStep 동작하는데 필요한 RepeatOperations(stepOperations), TransactionManager, Tasklet이 생성된다. Tasklet은 ItemReader, ItemProcessor, ItemWriter의 조합이라고 생각하면 쉽다.


public TaskletStep build() {
   registerStepListenerAsChunkListener();
   TaskletStep step = new TaskletStep(getName());
   super.enhance(step);
   step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0]));
   if (this.transactionManager != null) {
      step.setTransactionManager(this.transactionManager);
   }
   if (transactionAttribute != null) {
      step.setTransactionAttribute(transactionAttribute);
   }
   if (stepOperations == null) {
      stepOperations = new RepeatTemplate();
      ...(중략)
   }
   step.setStepOperations(stepOperations);
   step.setTasklet(createTasklet());
   ...(중략)
   return step;
} (링크)


Tasklet은 createTasklet 메서드에서 생성된다. createTasklet은 추상 메서드로 구현체는 자식 클래스인 SimpleStepBuilder가 갖고 있다. Tasklet 생성에 대한 제어는 AbstractTaskletStepBuilder가 아닌 상속한 클래스가 갖도록 코드가 구조화돼있다. 코드의 주석을 보면서 따라가보자.


@Override
protected Tasklet createTasklet() {

    // reader와 writer는 nullable 하지 않다.
   Assert.state(reader != null, "ItemReader must be provided");
   Assert.state(writer != null, "ItemWriter must be provided");

    

  // RepeatOperations 타입의 chunkOperations를 생성해 chunkProvider가 반복 동작하도록 한다.

  // chunk 크기만큼 ItemReader가 Item을 읽기 위해서다.
   RepeatOperations repeatOperations = createChunkOperations();


  // ChunkProvider는 말 그대로 Item의 묶음인 Chunk<I>를 공급하는 역할을 한다.

  // Item을 읽어오는 ItemReader가 주입된다.
   SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);

    

    // chunkProcessor에는 아이템을 처리하고 쓰기 위한 Processor와 Writer가 주입된다.
   SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
   ...(중략)

    //*

    * 최종적으로 초기화된 chunkProvider와 chunkProcessor를 주입해의     

    *  ChunkOrientedTasklet 타입의 tasklet을 반환한다.

    *//

ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
   tasklet.setBuffering(!readerTransactionalQueue);
   return tasklet;
} (링크)


Tasklet의 동작 살펴보기


위 코드에서 주입된 ItemReader, ItemWriter, ItemProcessor는 Builder에서 정의돼 주입된 객체들이다. 즉 Tasklet은 위 세 개 reader, writer, processor의 조합인 것이다. 그리고 tasklet은 ChunkOrientedTasklet 타입의 클래스로 생성된다. ChunkOrientedTasklet은 Tasklet의 구현체로 Tasklet이 동작하는 로직을 구현하고 있다.


TaskletStep은 지난 시간에 공부했던 대로 Chunk 단위로 트랜잭션에서 처리한다.

그리고 해당 로직은 ChunkTransactionCallbackdoInTransaction에 위치하고 있다. taskletexecute 메서드를 호출해 tasklet을 실행한다.


@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
   ... (중략)
         try {
            result = tasklet.execute(contribution, chunkContext);

// tasklet이 result가 null이면 모든 chunk가 처리된 것이므로 TaskletStep을 종료한다.
            if (result == null) {
               result = RepeatStatus.FINISHED;
            }
         }

 ... (후략)

(링크)



ChunkOrientedTasklet의 execute 메서드를 살펴보자. inputs가 null인 경우 chunkProvider로 Chunk<I> 타입의 inputs를 공급받아 chunkProcessor로 넘긴다. 로직은 꽤나 단순한다. 그리고 마지막으로 inputs.isEnd를 통해 inputs가 끝인지 아닌지를 체크해서 RepeatStatus를 반환한다.


@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)

...(중략)

   if (inputs == null) {
      inputs = chunkProvider.provide(contribution);
      (중략)
   }
   chunkProcessor.process(contribution, inputs);
(중략)
   return RepeatStatus.continueIf(!inputs.isEnd());
}

(링크)


RepeatStatus.continueIf는 파라미터로 넘어온 continuable이 true이면 CONTINUEABLE을 반환해서  Tasklet이 다시 호출돼 다음 Chunk를 처리하도록 하고 false이면 FINISHED를 호출해 Tasklet을 재호출 하지 않고 종료하도록 한다.


public static RepeatStatus continueIf(boolean continuable) {
   return continuable ? CONTINUABLE : FINISHED;
}



ItemReader, ItemWriter, ItemProcessor 심화 학습


SimpleStepBuilder의 createTasklet 메서드를 보면 chunkProvider와 chunkProcessor를 각각 SimpleChunkProvider와 SimpleChunkProcessor 타입의 클래스로 생성한다. 각 클래스의 provide 메서드와 process 메서드를 살펴보며 아이템 공급과 처리가 어떻게 동작하는지 살펴보자.


SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);

SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>


우선 SimpleChunkProvider의 provide이다. repeatOperations로 itemReader.read() 반복 호출하여 읽은 item을 Chunk(inputs)에 추가한다. itemReader가 null을 반환하면 repeatOperation를 종료(RepeatStatus.FINISHED)한다. 마지막으로 읽어 온 Item Chunk를 반환한다.


@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {
   final Chunk<I> inputs = new Chunk<>();
   repeatOperations.iterate(new RepeatCallback() {
      @Override
      public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
         I item = null;
        ... (중략)
         try {
            item = read(contribution, inputs);
         }
         ... (중략
         if (item == null) {
            inputs.setEnd();
            return RepeatStatus.FINISHED;
         }
         inputs.add(item);
         contribution.incrementReadCount();
         return RepeatStatus.CONTINUABLE;
      }
   });
   return inputs;
}

(링크)


(doRead는 read 메서드에서 내부에서 호출된다)

protected final I doRead() throws Exception {
   try {
      listener.beforeRead();
      I item = itemReader.read();
      if (item != null) {
         listener.afterRead(item);
      }
      return item;
   }
   ...(중략)
}


ChunkProvider가 제공한 Item Chunk를 처리하는 ChunkProcessor의 process가 실행된다. isComplete, transform, write 메서드가 차례대로 호출된다.


@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
    ...(중략)
   if (isComplete(inputs)) {
      return;
   }
   Chunk<O> outputs = transform(contribution, inputs);
   contribution.incrementFilterCount(getFilterCount(inputs, outputs));
   write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
} (링크)

 

Item Chunk가 비어있으면 즉시 process를 종료한다. 더 이상 처리할 아이템이 없기 때문이다.

protected boolean isComplete(Chunk<I> inputs) {
   return inputs.isEmpty();
}


Chunk에 item이 있는 경우 transform 함수를 실행해 Chunk안에 담긴 item을 각각 순회하며 처리한다. 그리고 처리한 아이템을 Chunk<O> 타입의 outputs에 담아 반환한다.

protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
   Chunk<O> outputs = new Chunk<>();
   for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
      final I item = iterator.next();
      O output;
      ...(중략)
      try {
         output = doProcess(item);
      }
      ...(중략)
      if (output != null) {
         outputs.add(output);
      } else {
         iterator.remove();
      }
   }
   return outputs;
} (링크)


(TaskletStep을 생성할 때 빌더에서 주입해준 ItemProcessor가 실행된다)

protected final O doProcess(I item) throws Exception {
...(중략)
   try {
      listener.beforeProcess(item);
      O result = itemProcessor.process(item);
      listener.afterProcess(item, result);
      return result;
   }
...(중략)
}


마지막으로 write 메서드이다. ItemProcessor가 처리한 Chunk<O>를 통째로 ItemWriter에 넘겨 쓰기 작업을 실행한다.

protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
   ...(중략)
   try {
      doWrite(outputs);
   }
   ...(중략)
}(링크)


protected final void doWrite(Chunk<O> items) throws Exception {
...(중략)
   try {
      listener.beforeWrite(items);
      writeItems(items);
      doAfterWrite(items);
   }
   ...(중략)
}


(역시 ItemWriter가 실행된다)

protected void writeItems(Chunk<O> items) throws Exception {
   if (itemWriter != null) {
      itemWriter.write(items);
   }
}


마무리

정리하면 스프링 배치에서 흔히 사용하는 Chunk 사이즈 별로 아이템을 읽고 처리하고 쓰는 Step은 TaskletStep이었다(아닌 경우도 있다).

TaskletStep 내부에는 Tasklet이 있는데 구현체는 ChunkOrientedTasklet이라는 클래스이다.

이 클래스는 ItemReader, ItemProcessor, ItemWriter를 chunk 크기만큼 읽고 처리하고 쓴다. TaskletStep은 ChunkTransactionCallback으로 Tasklet을 호출함으로써 chunk 단위로 트랜잭셔널하게 처리되도록 했다.


TaskletStep의 코드 흐름을 단순화된 시퀀스 다이어그램으로 표현하면 다음과 같다.

이미지를 클릭하면 크게 보입니다


매거진의 이전글 Spring batch와 @Transactional

작품 선택

키워드 선택 0 / 3 0

댓글여부

afliean
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari