brunch

You can make anything
by writing

C.S.Lewis

by 에디의 기술블로그 Sep 20. 2019

CompletableFuture

자바 비동기 프로그래밍을 위한 CompletableFuture 검토

필자는 최근에 CompletableFuture 를 사용해서 개발을 진행하였는데, CompletableFuture는 작년에 한번 사용한 이후로는 실무에서 사용할 일이 거의 없었다. 메서드가 기억이 잘 나지 않았지만, 짧은 개발일정으로 인해서 일단 빠르게 구현을 완료하였다. 기능 구현은 빠르게 완료 하였지만, 마음이 편하지 않아서 나머지 공부로 CompletableFuture 를 차분하게 다시 공부하였다. 참고로, 이 글은 아주 기본적인 쓰레드, Future, 람다 등 Java 기초적인 내용은 알고 있어야 쉽게 읽을 수 있다. 



Sync, Async vs Blocking, Non-Blocking


CompletableFuture 에 대해서 공부하기 전에 우리는 Sync(동기),Async(비동기) 와 Blocking(블록킹),Non-Blocking(논블록킹) 에 대해서 먼저 생각해야한다. 비동기 와 논블록킹을 비슷한 개념이라고 생각하는 개발자가 많다. 사실, 개발자마다 조금 다르게 해석이 되고 있다. 공식적인 이론은 필자도 잘 모르지만, 필자가 생각하는 지극히 개인적인 생각으로 글을 작성하였다. 참고로, 매우 개인적인 생각이므로 잘못된 내용일 수도 있다. 잘못된 내용이 있다면 꼭 피드백을 해주길 바란다. 


필자의 허접한 생각을 꼭 바로 잡아주기를 부탁한다. 


Sync(동기), Async(비동기)


우리는 이 글에서 아주 간단한 샘플 코드를 사용할 것이다. 샘플 코드의 기능은 커피의 가격조회 기능을 제공한다. 예를 들어서 "latte" 라는 이름의 커피 가격을 조회하면 1100원 이라고 응답을 해주는 애플리케이션이다. 클래스에서는 가격을 조회하는 함수(메서드)를 제공해준다. 그리고, 클라이언트에서는 해당 메서드를 호출할 때 커피의 이름을 파라미터로 넘겨주면, 1100 원이라는 데이터를 리턴 받게 된다. 메서드를 제공하는 곳에서 기능을 모두 수행해서 결과값이 결정되면 그때 반환을 하게 된다. 이 글에서는 쉽게 설명하기 위해서 메서드를 호출(사용) 하는 곳을 클라이언트 라고 표현하겠다. 아래 그림과 같은 방식이다. 이 방식은 동기 방식이다.

정리하면, 클라이언트에서 "latte" 라는 커피의 가격을 조회하기 위해서, 클래스의 getPrice 라는 메서드를 실행하고, 동기 방식으로 1100원 이라는 결과를 리턴하였다. 메서드는 결과가 완성될때까지 반환을 하지 않는다. 


반면에, 비동기 방식은 어떻게 동작할까? 비동기 방식은 결과값이 결정되기 전에 일단 반환을 한다. "latte"라는 커피의 가격이 1100원이라는 정보를 찾기 전에 일단 빈값을 넘기게 된다. 필자의 샘플 그림에서는 void 라고 표현을 하였는데, 실제 코드에서는 void 가 아닐수도 있다. 참고로 비동기 프로그래밍은 개발하는 방식에 따라서 다양하다.

메서드를 호출(사용) 하는 곳, 즉 클라이언트에서는 최종 결과를 받기 전에 메서드로 부터 임시로 반환을 받는다. 그렇다면 클라이언트는 최종 결과인, "latte"라는 커피의 가격이 1100원 이라는 사실을 어떻게 알 수 있을까? 두 가지 방법이 있는데, Blocking, Non-Blockging 을 알아보면서 같이 살펴보자.



Blocking(블록킹), Non-Blocking(논-블록킹)


Sync(동기),Async(비동기)는 메서드를 제공하는 곳에서의 입장에 대한 것이라면, Blocking(블록킹), Non-Blocking(논블록킹)은 메서드를 호출(사용)하는 곳, 즉 클라이언트에서의 입장에 대한 것이다. 


클라이언트에서 데이터를 조회하는 두가지 방법에 대해서 생각해보자. 일단, 위에서 설명했듯이 Async 메서드는 결과를 완성하기 전에 일단 반환을 한다. 그래서 클라이언트에서는 결과가 완성되었을때쯤 다시 메서드를 조회해야 한다. 이 방법이 첫번째 방법이다. 메서드를 호출한 이후 어느정도 시간이 지난 후 다시 결과를 조회하는 방법이다. 그림을 통해서 함께 이해해보자.

getPriceAsync 메서드를 호출한 이후에 클라이언트는 다른 작업을 수행할 수 있다. 즉, 최종 결과를 조회할때까지 차단 되지 않고, 다른 작업을 할 수 있는 것이다. 하지만, 클라이언트가 다른 작업을 수행을 하다가, "latte"라는 가격이 필요한 시점에서 getPriceAsync 의 결과를 알고 싶을 때는 다시 데이터를 조회해야 한다. get 이라는 메서드를 사용했다고 가정해보자. 이 경우, get 메서드를 통해서 최종 결과를 전달 받기 전까지는 기다려여 한다. 서비스 제공 메서드는 비동기로 구현을 했지만, 클라이언트 입장에서는 항상 논블록킹은 아닌 상황이다. 데이터를 조회하는 순간에는 다시 블록킹으로 동작한다. 


이런 상황을 개선하기 위해서는 어떻게 하면 좋을까?


정답을 아는 개발자는 이제 이 글을 그만 읽어도 된다. 당신은 필자보다 더 뛰어난 개발자이므로... 

하지만, 혹시라도 시간이 남는다면 이 글을 읽어주고 꼭 피드백을 해주길 바란다. 


완벽한 논-블록킹 환경으로 개선하기 위해서는 콜백 함수를 구현해야 한다. 즉, 메서드를 제공하는 곳에서 결과가 완성되면 클라이언트로 결과가 나왔다고 알려주는 방법이다. 대충 아래와 비슷할 것이다. 

클라이언트에서는 완벽하게 논블록킹으로 동작을 한다. 즉, 결과가 넘어올때까지 차단이 되지 않고, 다른 작업을 계속 수행할 수 있다.



Async, Non-Blocking


Async(비동기), Non-Blocking(논블록킹) 에 대한 개발자들의 의견이 조금씩 다르다. 필자가 설명을 위해서 아주 단순하게 정리를 했는데, 필자의 개념이 틀렸을 수도 있다. 잘못된 내용에 대해서는 꼭 피드백을 해주길 바란다.



CompletableFuture (1) - 기초


이제 본격적으로 CompletableFuture 에 대해서 살펴보겠다. 


코드 준비 과정


기본적인 셋팅은, 스프링 부트 환경에서 작성하였다. 데이터를 제공해주는 Repository 클래스를 작성한다. getPriceByName 이라는 메서드는 커피의 이름을 파라미터로 받아서 커피의 가격을 알려주는 메서드이다. 단, 해당 메서드에 1초의 지연 시간을 설정하였다. 클라이언트는 커피의 가격을 조회하기 위해서 최소 1초가 걸릴 것이다.

서비스의 비즈니스 로직을 포함하고 있는 유스케이스 인터페이스를 아래와 같이 정의한다.

getPrice 는 동기 메서드이고, Async 가 붙은 나머지 메서드 두개는 비동기 메서드이다. 기능을 제공하는 곳에서 동기,비동기에 대한 개념을 포함하고 있다. 블록킹으로 할지, 논블록킹으로 할지 선택은 기능을 제공하는 클래스에서 결정되는게 아니라, 해당 메서드를 호출(사용) 하는 곳, 즉 클라이언트에서 선택할 것이다. 


Sync(동기) 방식


CoffeeUseCase 인터페이스의 구현체를 작성하겠다. 첫번째 메서드인 getPrice 는 동기 방식으로 데이터를 제공한다. 즉, 결과가 최종 완성이 되어야 반환하는 방식이다. 

테스트 코드를 작성해서 검증하였다. CoffeeComponentTest 라는 테스트 클래스를 생성하고, @ContextConfiguration 어노테이션을 사용해서 CoffeeComponent, CoffeeRepository 클래스를 지정하였다. 참고로, @SpringBootTest 어노테이션을 사용하면 테스트 빌드 시간이 길어지기 때문에 단위테스트에서는 사용하지 않도록 하자.  (참고로.. 필자가 나중에는 SpringBootTest 어노테이션을 사용하기는 했다. 이 글은 CompletableFUture 에 대한 내용이므로, 테스트 코드에 대한 내용은 상세하게 다루지는 않겠다.)

테스트는 잘 수행하였다. 1초라는 지연시간이 걸렸다. 만약, 두번 수행하면 동기호출이기 때문에 2초가 넘게 걸릴 것이다.



Async(비동기) 메서드, 논블록킹+블록킹 혼합


이번에는 Async(비동기) 메서드를 구현하자. 이제 드디어, CompletableFuture 를 사용한다. 새로운 쓰레드를 생성해서 Repository 를 통해서 데이터를 조회한다. 최종 데이터 연산이 끝나지 않아도 일단 return future 를 실행해서 먼저 반환을 한다. 

getPriceByName 에 1초의 지연시간을 임의로 주었지만, 해당 데이터를 무작정 기다리지 않고, 다른 작업을 병행할 수 있다. 아래와 같이 테스트 코드로 검증하였다. 

CompletableFuture<Integer> 로 리턴을 받았는데, 최종 데이터를 조회하기 전까지 다른 작업을 병행할 수 있다. 그래서 논블록킹, 즉, 차단되지 않고 다른 작업을 할 수 있다.  

하지만, 최종 데이터를 조회하기 위해서는 CompletableFuture 의 join 또는 get 메서드를 사용해야 한다. 일단, get 과 join 은 예외처리를 하는 방식이 조금 다른데, 자세한 내용은 생략하겠다. 어쨋든, join 이나 get 을 수행하는 시점에서는 데이터를 조회할 때까지 블록킹된다. 데이터가 계산이 안되었다면 될때까지 기다렸다가 결과를 전달받는다. 아래와 유사한 방식이다.

샘플 코드는 위 그림과는 좀 다르다. 반환이 void 가 아니라 CompletableFuture 로 리턴하였다. 즉, 아래의 그림과 같이 그려야 한다... 그림이 조금 이상할 수 있다. 이해 부탁한다. 

어쨋든, 메서드를 제공하는 곳에서는 CompletableFuture 를 반환하고, 메서드를 사용하는 곳, 즉 클라이언트에서는 논블록킹 과 블록킹이 혼합되어 있는 상황이다. 완벽한 논블록킹은 아니다. 


Async(비동기) 메서드를 좀 더 깔끔하게 수정 : supplyAsync, runAsync


getPriceAsync 메서드를 좀 더 깔끔하게 수정하겠다. 참고로, 테스트 코드는 수정이 되지 않는다. 즉, 메서드를 리팩토링한 이후에도 테스트 코드는 수정 없이 정상적으로 통과해야 한다. CompletableFuture 에서는 몇개의 유용한 팩토리 메서드를 제공하는데, 그 중에서 supplyAsync 와 runAsync 를 알아보자. supplyAsync 는 Supplier Functional Interface 를 파라미터로 받는다. 반면에 , runAsync 는 Runnable Functional Interface 를 파리미터로 받는다. 

Supplier 는 파라미터는 없지만 반환값이 있는 함수형 인터페이스 이고, Runnable 는 파라미터,반환 모두 없는 함수형 인터페이스 이다. 필자는 Supplier 를 받아서 처리하는 supplyAsync 팩토리 메서드를 사용해서 아래와 같이 코드를 수정하였다. 아주 깔끔해졌다. 

로그를 찍어보기 위해서 아래와 같이 더 수정했다. 

테스트 코드를 돌리면 테스트가 성공할 것이다. 이때, 쓰레드 풀을 확인해보자. 

위와 같이 supplyAsync 로 수행하는 로직은 ForkJoinPool 의 commonPool 를 사용하는 것을 확인할 수 있다. 사실, 일반적으로 commonPool 을 사용하는 방법은 바람직하지 않다. 그래서, 좀 더 수정을 하자. supplyAsync 를 실행할 때 Executor 를 파라미터로 추가하면, Common Pool 에서 동작하지 않고 별도의 쓰레드 풀에서 동작할 것이다. 함수를 제공하는 코드를 아래와 같이 수정하였다. Executor 를 추가하였다. 

테스트 코드를 수행하면, commonPool 을 사용하지 않고, 별도로 정의한 쓰레드 풀을 사용한다. 


이제, Non-Blocking(논블록킹)을 위해서 코드를 더 수정하겠다. 비동기 메서드는 수정하지 않고, 클라이언트의 코드를 수정해야 한다. 


블록킹, 논블록킹은 메서드를 사용하는 곳, 즉 클라이언트에서의 입장이다. 


Non-Blocking 구현 : thenAccept, thenApply


위 코드는, CompletableFuture의 get, join 메서드를 사용하는데, 해당 메서드를 호출하는 순간에는 블록킹 현상이 발생한다. 논블록킹으로 개선하기 위해서는 콜백함수를 구현해야 하는데, CompletableFuture 는 thenAccept 와 thenApply 를 제공한다. thenAccept 는 CompletableFuture<Void> 를 반환한다. 즉, 결과를 반환하지 않는다. 하지만, thenApply 는 CompletableFuture<T> 즉, 데이터를 포함하는 Future 를 반환한다. 

thenAccept 메서드를 사용해서 테스트 코드를 작성해보겠다. 일단 getPriceAsync 는 CompletableFuture<Integer> 를 반환하는데, 이때 thenAccept 메서드를 정의하면 콜백함수를 선언할 수 있다. CompletableFuture 가 complete 가 되면, 즉 커피의 가격 조회가 완성되면 thenAccept 를 수행하게 될 것이다. 

위에서 작성했던, get 이나 join 을 사용해서 최종 연산이 된 데이터를 조회할 필요가 없다. 왜냐면, CompletableFuture 에서 알아서 최종 연산이 되면 콜백함수를 실행해주기 때문이다. 단, 해당 코드는 테스트 코드이기 때문에 제일 하단에 future.join 을 실행해서 블록킹 코드를 추가하였다. 사실, 실제 서비스 코드에서는 해당 코드는 필요 없지만, 테스트 코드이기 때문에 추가했는데, 해당 코드가 없다면 thenAccept 로직이 수행하기 전에 테스트는 통과해버릴 것이다. 왜냐면, 테스트 코드는 Main 쓰레드에서 동작하게 되고, thenAccept 콜백 메서드가 수행하기도 전에 Main 쓰레드는 종료되기 때문이다. Non-Blocking 코드이기 때문에 결과가 오는 것을 기다리지 않고 계속 코드가 동작이 되는데, 테스트 코드의 특성상 Main 쓰레드가 종료되기 때문에, Main 쓰레드를 종료시키지 않기 위해서 임의로 작성한 코드이다. 


해당 테스트 코드를 좀 더 깔끔하게 할 수 있는 방법을 알고 있는 개발자는 피드백을 남겨주길 바란다. 


thenAccept 는 CompletableFuture<Void> 를 반환한다. 즉, 연산된 데이터를 반환하지 않기 때문에 해당 로직이 끝나면 데이터를 조회할 수 없다. 만약, 데이터를 반환하기 위해서는 어떻게 구현하면 될까? 정답은 thenApply 를 사용해야 한다. 커피의 가격을 조회한 다음에, 100원을 추가하고 싶다면 어떻게 할까? 아래 샘플처럼 코드를 짜봤다. 

참고로, thenApply 와 thenAccept 메서드를 별도의 쓰레드로 동작하고 싶다면, thenApplyAsync 와 thenAcceptAsync 메서드를 사용하면 된다. 

CoffeeComponent 에서의 쓰레드와, CoffeeComponentTest 즉, 함수를 호출하는 쪽에서의 쓰레드는 별도의 쓰레드풀에서 동작하게 된다. 



ThreadPoolTaskExecutor


참고로, 최종 소스코드는 쓰레드풀 설정을 다시 변경하였다. 필자는, 스프링에서 제공하는 ThreadPoolTaskExecutor 를 사용하였다. 


자세한 설명은 생략한다.


CompletableFuture (2) - 중급


글이 조금 어렵다. 왠지 이 글은 아무도 안볼것 같다. 글을 더 잘 쓰고 싶지만, 시간이 없으니 대충 마무리해야겠다. 이제, CompletableFuture 의 조금 더 어려운 함수를 검토하겠다. CompletableFuture의 조합에 대한 내용이다.  


thenCombine


CompletableFuture 를 2개를 실행해서 결과를 조합할때 사용한다. thenCombine 는 병렬 실행을 해서 조합하는데, 순차적으로 실행하지 않는다. 커피의 가격을 조회하는 기능은 1초이 지연시간이 있다. 만약 순차적으로 조회하면 1 + 1 이되기 때문에 2초가 걸릴 것이다. 그래서 우리는 동시에 두가지 조회를 같이 수행한 다음에 결과를 조합할 것이고, 그러면 2개를 조회하는데 1초가 걸리도록 프로그램을 작성할 것이다. (병렬 프로그래밍이다..) 

테스트 코드를 돌려보자. 

커피 이름 중, 라떼 와 모카를 조회하는데, 총 1초가 걸렸다. 즉, 2초가 걸리지 않았다. 두 작업은 별도의 쓰레드풀에서 동작하고, thenCombine 메서드에 의해서 조합이 된다. 장난을 좀 해보자. 쓰레드풀 사이즈를 1로 바꾸면 어떻게 될까? 

쓰레드풀의 쓰레드 개수(사이즈)를 1로 했기 때문에, 병렬로 수행하지 못하고 하나의 쓰레드를 사용한다. 그래서 1초가 아니라 2초의 시간이 걸렸다. 


thenCompose


thenCompose 메서드는 바로 위에서 설명한 thenCombine 와는 다르게 CompletableFuture 를 순차적으로 실행한다. 가격을 조회하는 기능이 있고, 조회된 가격에서 할인을 하는 기능을 별도로 조회하는 기능을 만들어보자. 

1. 가격 조회

2. 조회된 가격에 할인율 적용

이라는 기능을 순차적으로 수행해야 한다. 


자세한 설명은 생략한다. 


allOf


위에서 설명한, thenCombine 는 동시에 병렬로 실행해서 결과를 조합하는 역할을 한다. 라떼 1잔과 모카 1잔의 가격을 더해서 총 가격을 계산하는 역할을 수행한다고 가정하자. 라떼 와 모카 의 가격을 동시에 조회해서 결과를 내야 하는데, 라뗴를 조회하는 메서드와, 모카를 조회하는 메서드는 둘 다 완료가 되어야 최종 결과가 완성될 수 있다. 만약, 하나라도 완료가 되지 않으면 라떼와 모카의 가격의 총합을 구할 수가 없다. thenCombine 의 구현은 두개의 CompletableFuture 를 조합할 때 사용했는데, 만약 CompletableFuture 가 3개 이상이면 어떻게 될까? 이때 활용가능한 메서드가 바로 allOf 이다. 아래 샘플 코드와 같이 라뗴, 모카, 아메리카노 를 조회해서 가격을 조회하는 기능을 구현해보자. CompletableFuture 를 3개를 만들고, allOf 메서드에 CompletableFuture 리스트를 파라미터로 넣어주면 모든 Future 가 완료 되었을 때 thenApply가 실행이 된다. 코드가 좀 복잡한데 하나씩 알아보자. 

CompletableFuture.allOf(future1...) : future 가 모두 완료되면 CompletableFuture<Void> 를 반환

thenApply : allOf 으로 부터 받은 CompletableFuture<Void> 를 사용하지는 않지만, 무언가 데이터를 반환해야 한다. 해당 로직에서 반환해야 하는 값은, 각 Future 로 부터 받은 각각의 커피의 가격 리스트이다. 그래서 해당 로직에서는 상단에 선언한 completableFutureList 의 스트림 구문을 선언해서 각각의 Future 에서 데이터를 조회해서 리스트로 만들어준다. CompletableFuture<List<Integer>> 가 반환될 것이다. 

join() : CompletableFuture<List<Integger>> 에서 List<Integer> 를 조회한다. 각각의 커피의 가격이기 때문에 [1200, 1300, 900] 이 된다. 

stream, reduce : 리스트의 데이터를 합산한다. 


참고로, allOf 는 모든 Future 가 완료 되었을 때 수행한다면, anyOf 라는 메서드는 아무 Future 나 하나라도 완료되면 수행하는 메서드이다. 참고로 해당 내용은 Java 8 in Action 에 비슷한 내용이 있지만 필자는 해당 책을 많이 참고하지는 않았다. 책에서 나온 메서드 이름이 같다는 사실을 뒤늦게 알았다. 어쨋든 좋은 책이니 꼭 읽어보길 바란다. 


CompletableFuture (3) - 고급


고급 내용은 다음에 시간에 작성하겠다. 글을 쓰는게 너무 힘들다. 고급 내용은 익셉션 및 에러 핸들링에 대한 내용이다. 필자도 아직 잘 모르곘다. 공부하고 글을 다시 쓰겠다. 



샘플 코드는 github 에서 확인 가능합니다.

https://github.com/sieunkr/completablefuture-study/tree/master/completablefuture



글마무리


CompletableFuture 에 대해서 간략하게 알아봤다. 필자는 최근에 책을 구매하였는데, 책 제목은 "자바 병렬 프로그래밍" 이다. 사실 매우 오래전에 나온 책이라서 사기전에 고민을 조금 했는데, 해당 책을 조금씩 읽어보니 재미있는 내용이 많았다. 다음에 시간이 되면 자바 병렬 프로그래밍에 대해서 글을 쓸 예정이다. 이상 허접한 글을 마치겠다. 조금이라도 도움이 되었다면 감사하게 생각하며, 글에 잘못된 내용이 있다면 댓글을 남겨주길 바란다.



레퍼런스


https://link.medium.com/7dHHqroUTZ

https://dzone.com/articles/java-8-completablefutures-part-i

https://www.javacodegeeks.com/2013/05/java-8-definitive-guide-to-completablefuture.html

https://www.callicoder.com/java-8-completablefuture-tutorial/

https://medium.com/@senanayake.kalpa/fantastic-completablefuture-allof-and-how-to-handle-errors-27e8a97144a0

https://dzone.com/articles/which-thread-executes-completablefutures-tasks-and

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari