Celery

해야할 일이 적혀있는 메시지를 분산하여 비동기로 작업 처리

by 내가 사는 세상
celery-flow.png
제목 없음fdygtrggrgrffr.png


제목 없음fdygtrggrgrffrfreqfr.png
refer.png




STEP1. Publisher(Producer, 작업 생산자)

- “할 일(Task)”을 정의하고, 이를 실행 요청으로 큐에 넣는 주체

- Django 애플리케이션(django 컨테이너)이 해당 역할

- 코드에서 .delay() 또는 .apply_async() 호출 시, Celery가 작업 메시지(JSON 형태)를 만들어 Broker(예: Redis) 로 전송

- 발행된 작업에는 고유한 task_id가 부여되어, 이후 결과를 추적 가능

- Django는 즉시 반환 → 사용자는 빠른 응답을 받음

- 비동기 요청을 큐에 위임함으로써 API 지연을 최소화


- 기본사용법

# tasks.py

from celery import shared_task

import requests


@shared_task

def fetch_data(url):

try:

response = requests.get(url)

# HTTP 응답 코드가 200인 경우에만 데이터를 반환합니다.

if response.status_code == 200:

return response.text

else:

print(f"Failed to fetch data. Status code: {response.status_code}")

return None

except requests.RequestException as e:

print(f"An error occurred: {e}")

return None



# views.py

from .tasks import fetch_data


url = "https://www.example.com" # 데이터를 가져올 웹 페이지의 URL

result = fetch_data.delay(url)

# result = fetch_data.apply_async(args=[url])


if result.successful(): # Celery 작업의 결과를 확인하고 처리

data = result.get()

print("Fetched data successfully:")

print(data)

else:

print("Failed to fetch data.")




STEP2. Broker

- Publisher가 보낸 작업 메시지를 임시로 저장하고, Worker에게 전달하는 중개자

- 일반적으로 Redis 또는 RabbitMQ를 사용

- 큐(queue) 형태

- Redis 안에서 작업이 쌓이는 공간, 여러 워커가 나눠서 소비 가능

- Redis 내부에 list 형태로 작업이 쌓임





STEP3. Worker(Consumer, 작업 소비자)

- celery 컨테이너

- Redis 큐에서 메시지 꺼내서 실행

- 여러 워커가 하나의 큐를 경쟁적으로 소비(consume)

- 큐에서 작업을 꺼내 실제로 함수를 실행하는 주체

- 예시 명령 : celery -A config.celery_prod worker -l info --concurrency=4

- 옵션

- '--concurrency': 한 워커가 동시에 처리할 수 있는 작업 개수

- ex) --concurrency=1 : 한 번에 하나의 작업만 처리 : 직렬 실행

- ex) --concurrency=4 : 동시에 4개의 작업 처리 : CPU 4개 사용

- ex) --concurrency=8 : 동시에 8개의 작업 처리 : CPU 8개 사용 (8-core면 적합)


- '--concurrency'는 “몇 개의 worker 프로세스(자식 프로세스)를 띄울지”를 의미

- OS 입장에서는 CPU 코어 수와 관계없이 그 수만큼 프로세스를 실행

- Celery 워커 프로세스 작업을 실행하는 프로세스 (논리적 단위)

- CPU 코어 : 동시에 실제 연산 가능한 물리적 단위 (2개면 동시 실행은 2개까지)

- ex) 2core : 4worker : 2개의 CPU가 4개의 프로세스를 번갈아 처리 (OS 스케줄러가 교대 실행)


- '--queues': 특정 큐만 감시하도록 지정 가능 (큐 분리 전략)

- '--autoscale': 워커 프로세스를 부하에 따라 자동 확장


- 실행결과를 flower(5555 포트)에서 확인 가능


Task Routing(Queue)



Task Prioritization



Task Grouping #signature




Task Chaining



Task Rate Limits



Chord



STEP4. DB 저장(optional)

- db에 결과(results) 저장

- 작업 실행 결과(result.get())를 저장하고 제공하는 저장소







매거진의 이전글Redis