해야할 일이 적혀있는 메시지를 분산하여 비동기로 작업 처리
- “할 일(Task)”을 정의하고, 이를 실행 요청으로 큐에 넣는 주체
- Django 애플리케이션(django 컨테이너)이 해당 역할
- 코드에서 .delay() 또는 .apply_async() 호출 시, Celery가 작업 메시지(JSON 형태)를 만들어 Broker(예: Redis) 로 전송
- 발행된 작업에는 고유한 task_id가 부여되어, 이후 결과를 추적 가능
- Django는 즉시 반환 → 사용자는 빠른 응답을 받음
- 비동기 요청을 큐에 위임함으로써 API 지연을 최소화
- 기본사용법
# tasks.py
from celery import shared_task
import requests
@shared_task
def fetch_data(url):
try:
response = requests.get(url)
# HTTP 응답 코드가 200인 경우에만 데이터를 반환합니다.
if response.status_code == 200:
return response.text
else:
print(f"Failed to fetch data. Status code: {response.status_code}")
return None
except requests.RequestException as e:
print(f"An error occurred: {e}")
return None
# views.py
from .tasks import fetch_data
url = "https://www.example.com" # 데이터를 가져올 웹 페이지의 URL
result = fetch_data.delay(url)
# result = fetch_data.apply_async(args=[url])
if result.successful(): # Celery 작업의 결과를 확인하고 처리
data = result.get()
print("Fetched data successfully:")
print(data)
else:
print("Failed to fetch data.")
- Publisher가 보낸 작업 메시지를 임시로 저장하고, Worker에게 전달하는 중개자
- 일반적으로 Redis 또는 RabbitMQ를 사용
- 큐(queue) 형태
- Redis 안에서 작업이 쌓이는 공간, 여러 워커가 나눠서 소비 가능
- Redis 내부에 list 형태로 작업이 쌓임
- celery 컨테이너
- Redis 큐에서 메시지 꺼내서 실행
- 여러 워커가 하나의 큐를 경쟁적으로 소비(consume)
- 큐에서 작업을 꺼내 실제로 함수를 실행하는 주체
- 예시 명령 : celery -A config.celery_prod worker -l info --concurrency=4
- 옵션
- '--concurrency': 한 워커가 동시에 처리할 수 있는 작업 개수
- ex) --concurrency=1 : 한 번에 하나의 작업만 처리 : 직렬 실행
- ex) --concurrency=4 : 동시에 4개의 작업 처리 : CPU 4개 사용
- ex) --concurrency=8 : 동시에 8개의 작업 처리 : CPU 8개 사용 (8-core면 적합)
- '--concurrency'는 “몇 개의 worker 프로세스(자식 프로세스)를 띄울지”를 의미
- OS 입장에서는 CPU 코어 수와 관계없이 그 수만큼 프로세스를 실행
- Celery 워커 프로세스 작업을 실행하는 프로세스 (논리적 단위)
- CPU 코어 : 동시에 실제 연산 가능한 물리적 단위 (2개면 동시 실행은 2개까지)
- ex) 2core : 4worker : 2개의 CPU가 4개의 프로세스를 번갈아 처리 (OS 스케줄러가 교대 실행)
- '--queues': 특정 큐만 감시하도록 지정 가능 (큐 분리 전략)
- '--autoscale': 워커 프로세스를 부하에 따라 자동 확장
- 실행결과를 flower(5555 포트)에서 확인 가능
- db에 결과(results) 저장
- 작업 실행 결과(result.get())를 저장하고 제공하는 저장소