대량의 요청에도 흔들림 없는 견고한 아키텍처
이번 글에서는 실전에서 운영 가능한 멀티에이전트 기반 추천 시스템을 어떻게 설계하고 구현했는지, 그리고 수많은 요청이 동시에 몰려와도 안정적이고 효율적으로 처리할 수 있는 비동기 아키텍처를 어떻게 구성했는지를 소개합니다.
예시로 다루는 태스크는 다음과 같습니다:
ERP 시스템에서 ‘구매 품명’을 입력받으면,
그 품명에 대해 대분류 → 중분류 → 소분류의 분류 체계를 예측하여 추천합니다.
먼저, Vector DB 기반 검색으로 유사 품명을 찾고, 해당 유사 품명의 분류 체계를 최대 3개까지 추출합니다.
유사 품명이 없다면, Object Storage에 주기적으로 업데이트되는 기준 정보를 활용해 LLM 기반으로 단계별 분류를 예측합니다.
마지막으로, LLM의 예측 결과와 기존 검색 결과가 중복되는지를 확인하고, 중복이 발생할 경우 해당 분류 체계에 가중치를 부여하여 최종 순위를 재조정합니다.
즉, 검색 + 생성 + 리뷰 단계가 함께 작동하는 멀티에이전트 구조이며, 동기/비동기를 혼합하여 병렬 처리 효율을 최대화한 시스템입니다.
---
전체 구조는 다음과 같은 구성으로 되어 있습니다:
1. 클라이언트는 REST API를 통해 입력 품명을 전송합니다.
2. FastAPI 서버는 입력 요청을 비동기 큐에 넣고 즉시 응답을 종료합니다. (비동기 처리 구조)
3. 컨슈머 워커(Worker)는 큐에서 입력을 꺼내 처리하며, 내부에서 다음과 같은 작업을 수행합니다:
1) Vector DB에서 유사 품명 검색
2) LLM을 통한 분류 예측 (단계별: 대 → 중 → 소)
3) 중복 여부 확인 및 가중치 재조정
4. 최종 결과는 별도의 콜백(Callback) URL로 전송되어 저장 또는 후속 처리됩니다.
이 아키텍처는 고부하 상황에서도 안정적인 처리를 보장하기 위해 설계되었습니다.
---
2.1 왜 비동기 구조인가?
FastAPI는 기본적으로 비동기 처리를 지원하지만, LLM 호출, ES 검색, S3 기준정보 조회 등은 대부분 외부 I/O 작업입니다. 이러한 호출을 await 없이 그대로 처리하면 이벤트 루프가 막혀버리는 이벤트 루프 블로킹 현상이 발생합니다.
이벤트 루프(Event Loop)는 Python의 asyncio 구조에서 코루틴 간 실행을 스케줄링하는 엔진인데, 블로킹 함수가 포함되면 전체 처리가 중단됩니다.
이를 방지하기 위해 주요 외부 호출 (ES, LLM, S3 등)은 모두 쓰레드 풀(Thread Pool) 내에서 동기 방식으로 처리하고, 결과를 비동기적으로 반환합니다. 이로써 이벤트 루프 블로킹 없이 안정적으로 처리됩니다.
2.2 컨슈머와 쓰레드풀 구조
컨슈머(Consumer)는 비동기 큐에서 항목을 꺼내 비즈니스 로직을 처리하는 비동기 함수입니다.
현재는 컨슈머 하나가 큐에서 아이템을 하나씩 처리하도록 설정되어 있지만, 이는 향후 구성에서 한 번에 여러 개 처리할 수 있도록 확장 가능합니다.
각 컨슈머 내부에서는 여러 개의 동기 작업을 병렬로 처리하기 위해 ThreadPoolExecutor를 사용합니다. 즉, 컨슈머는 작업 분배자, 쓰레드풀은 작업 처리자 역할입니다.
이를 통해 큐 → 컨슈머 → 쓰레드풀 구조가 구성되며, CPU 병렬성과 I/O 병렬성을 모두 확보합니다.
2.3 MAX_WORKERS vs CONSUMER_WORKERS
MAX_WORKERS: ThreadPoolExecutor에서 동시 실행 가능한 쓰레드 수입니다. 너무 크면 오버헤드가 발생합니다.
CONSUMER_WORKERS: 동시에 실행되는 컨슈머 개수입니다. 기본적으로 큐에서 꺼내는 속도에 영향을 줍니다.
실제로는 CONSUMER_WORKERS가 1 이상이면 충분하고, 작업 부하가 높아지면 MAX_WORKERS 수를 늘려 쓰레드 처리를 늘려줍니다.
---
시스템이 동시 요청을 안정적으로 처리하는 핵심은 다음과 같은 점에 있습니다:
비동기 큐를 이용해 요청을 버퍼링합니다.
큐 용량은 현재 1,000개로 설정되어 있습니다. 이보다 많은 요청이 들어오면 429 (Too Many Requests)를 반환합니다.
큐가 꽉 차는 것을 방지하기 위한 백프레셔(Backpressure) 설계를 적용하였습니다. 서버가 처리 가능한 속도보다 입력이 많을 경우, 일정 이상 처리를 제한하여 전체 시스템이 다운되지 않도록 합니다.
실패 요청은 지수 백오프(Exponential Backoff) 전략을 적용해 재시도합니다. 실패한 후 곧바로 다시 요청하지 않고, 1초, 2초, 4초처럼 점점 길게 대기 후 재시도하여 시스템 과부하를 방지합니다.
---
핫패스란 요청 처리 경로에서 성능과 지연에 가장 민감한 경로를 의미합니다. 이 시스템에서는 다음이 핫패스입니다:
- Vector DB 검색
- LLM 분류 예측
- 중복 필터링 및 가중치 조정
핫패스를 빠르게 유지하기 위해 다음을 고려했습니다:
1) 불필요한 로깅 최소화
2) 공유 자원 접근 최소화
3) 결과를 빠르게 반환하고 후처리는 콜백 구조로 전환
---
┌────────────────────┐
│ N개 클라이언트 요청 │
└────────┬───────────┘
│
▼
┌───────────────────┐
│ FastAPI 엔드포인트 │
│ - await queue.put │ ← 대기 상태 (큐가 꽉 차면)
└────────┬──────────┘
│
▼
┌──────────────────────┐
│ 비동기 큐 (AsyncIO Queue) │
│ - maxsize=1000 │
└────────┬─────────────┘
│
▼
┌──────────────────────┐
│ Consumer Worker (n개) │ ◀── consumer_workers
│ - await queue.get() │
│ - ThreadPoolExecutor.submit│
└────────┬─────────────┘
│
▼
┌──────────────────────┐
│ ThreadPool 내부 Task │
│ - Vector DB 검색 │
│ - LLM 분류 예측 │
│ - 중복 필터링 및 조정 │
└──────────────────────┘
│
▼
콜백 또는 결과 저장/응답
---
비동기 큐 -> 컨슈머 -> 쓰레드풀 처리
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 큐 초기화 (최대 1000개)
queue = asyncio.Queue(maxsize=1000)
# 스레드풀 초기화 (최대 동시 10개 작업)
thread_pool = ThreadPoolExecutor(max_workers=10)
# 예시: Vector DB 검색, LLM 예측 등 실제 작업
def handle_request(request_id):
print(f"[Worker] 처리 시작 - {request_id}")
import time
time.sleep(1) # I/O 작업 대체
print(f"[Worker] 처리 완료 - {request_id}")
# 비동기 컨슈머: 큐에서 아이템을 꺼내 쓰레드풀에 분배
async def consumer_loop():
while True:
request_id = await queue.get()
asyncio.get_running_loop().run_in_executor(thread_pool, handle_request, request_id)
queue.task_done()
# 요청을 받는 API 함수처럼 동작
async def handle_client_request(request_id):
await queue.put(request_id)
print(f"[API] 요청 큐에 등록 완료 - {request_id}")
return {"status": "queued", "request_id": request_id}
# 전체 실행 예시
async def main():
# 컨슈머 N개 실행
consumer_tasks = [asyncio.create_task(consumer_loop()) for _ in range(3)]
# 20개의 요청을 simulate
await asyncio.gather(*(handle_client_request(f"req-{i}") for i in range(20)))
# 큐가 빌 때까지 대기
await queue.join()
# 컨슈머 종료 (이 데모에서는 강제 종료 안함)
for task in consumer_tasks:
task.cancel()
asyncio.run(main())
---
실시간 로깅은 logrotate와 연동되어 주기적으로 분할되고, 에러 발생 시 알림이 발송됩니다.
mlflow를 lifespan 내부에서 연동하여 모델 호출 및 처리 로그를 기록합니다.
API 서버는 docker로 실행되며, 환경에 따라 dev/qa/prod Dockerfile을 분리하여 운영 유연성을 확보합니다.
---
이번 프로젝트를 통해 다음과 같은 교훈을 얻었습니다:
멀티에이전트 구조에서도 요청 흐름을 분리하여 유연하고 안정적인 시스템을 구성할 수 있다.
FastAPI 기반 비동기 아키텍처에 큐 + 쓰레드 풀을 혼합하여 병렬성을 확보하고, 이벤트 루프를 보호할 수 있다.
외부 시스템(LLM, S3, ES 등)과의 통신은 동기 호출이라도 쓰레드로 감싸는 전략이 매우 효과적이다.
시스템 아키텍처 설계에서 가장 중요한 점은 ‘단순함을 유지하면서도 병목을 최소화하는 것’임을 다시 느낀 경험이었습니다.
---
이벤트 루프(Event Loop): 비동기 코루틴을 스케줄링하는 asyncio의 핵심 구조
이벤트 루프 블로킹: 동기 작업이 비동기 흐름을 가로막아 전체 지연을 유발하는 현상
컨슈머(Consumer): 큐에서 항목을 꺼내 작업을 수행하는 처리 단위
쓰레드 풀(Thread Pool): 병렬 처리를 위해 미리 생성된 쓰레드 집합
백프레셔(Backpressure): 처리 가능한 양보다 많은 요청이 들어올 때, 처리 흐름을 제한하여 안정성 유지
백오프(Backoff): 실패 시 재시도 시점을 뒤로 미루는 전략
지수 백오프: 재시도 간격을 점차 길게 조정하는 방식
핫패스: 전체 요청 흐름 중 가장 중요하고 병목이 발생할 가능성이 높은 경로