Celery 공식 문서에서는 Celery를 다음과 같이 정의하고 있습니다.
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
즉, Celery는 대량의 메시지를 처리하기 위한 간단하고 유연하며 신뢰할 수 있는 분산 시스템입니다.
Spring 생태계에서 RabbitMQ + @Async 또는 Kafka를 사용하여 비동기 작업을 처리하는 것처럼, Python 생태계에서는 Celery가 표준처럼 사용됩니다.
🍳 분석 환경
- celery: 5.4.0
- python: 3.13
- redis: 7.x (브로커/백엔드)
- fastapi: 0.121.1
✅ Celery의 핵심 구성 요소
Celery는 크게 다섯 가지 핵심 컴포넌트로 구성됩니다.
1
2
3
4
5
6
7
8
9
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │───▶│ Broker │───▶│ Worker │
│ (Producer) │ │ (Redis) │ │ (Consumer) │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
┌─────────────┐ │
│ Backend │◀──────────┘
│ (Redis) │
└─────────────┘
| 컴포넌트 | 역할 | Spring 비교 |
|---|---|---|
| Client | 태스크를 생성하고 브로커에 발행 | RabbitTemplate.send() |
| Broker | 메시지 큐 (Redis, RabbitMQ) | RabbitMQ, Kafka |
| Worker | 태스크를 consume하고 실행 | @RabbitListener |
| Backend | 태스크 결과 저장소 | - |
| Beat | 스케줄링 (정기 실행) | @Scheduled |
📌 Celery 기본 설정
Celery 앱은 다음과 같이 설정합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# worker/celery_app.py
from celery import Celery
app = Celery(
"worker",
broker="redis://localhost:6379/0", # 메시지 브로커
backend="redis://localhost:6379/0", # 결과 저장소
include=["worker.tasks.basic"], # 태스크 모듈
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Seoul",
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30분 타임아웃
result_expires=60 * 60, # 1시간 후 결과 삭제
)
주요 설정 옵션
| 옵션 | 설명 |
|---|---|
broker | 메시지 브로커 URL |
backend | 결과 저장소 URL |
include | 자동 로드할 태스크 모듈 |
task_serializer | 태스크 직렬화 방식 (json, pickle 등) |
task_time_limit | 태스크 최대 실행 시간 |
result_expires | 결과 보관 기간 |
✅ 태스크 정의
1️⃣ 기본 태스크
가장 간단한 태스크 정의 방법입니다.
ex) Spring의 @Async 메서드 역할
1
2
3
4
5
6
7
8
9
10
11
from worker.celery_app import app
@app.task
def add(x: int, y: int) -> int:
"""두 숫자를 더하는 태스크"""
return x + y
@app.task
def multiply(x: int, y: int) -> int:
"""두 숫자를 곱하는 태스크"""
return x * y
@app.task 데코레이터를 붙이면 해당 함수가 Celery 태스크로 등록됩니다.
2️⃣ 태스크 호출 방법
태스크를 호출하는 방법은 크게 두 가지입니다.
.delay() - 간단한 호출
1
2
3
4
5
# 비동기 호출 (브로커에 메시지 발행)
result = add.delay(10, 20)
# task_id 확인
print(result.id) # "abc-123-..."
.apply_async() - 상세 옵션 지정
1
2
3
4
5
6
7
8
result = add.apply_async(
args=[10, 20],
countdown=5, # 5초 후 실행
eta=datetime(2024, 1, 1), # 특정 시간에 실행
expires=60, # 60초 후 만료
queue="high_priority", # 특정 큐로 라우팅
retry=True, # 전송 실패시 재시도
)
| 메서드 | 특징 | 사용 시점 |
|---|---|---|
.delay() | 간편, 옵션 없음 | 대부분의 경우 |
.apply_async() | 상세 옵션 지정 가능 | 지연 실행, 큐 라우팅 등 |
Spring에서 RabbitTemplate.convertAndSend()로 메시지를 발행하는 것과 동일한 개념입니다.
3️⃣ bind=True로 태스크 정보 접근
bind=True 옵션을 사용하면 태스크 인스턴스에 접근할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
from celery import Task
@app.task(bind=True)
def task_with_info(self: Task, message: str) -> dict:
"""태스크 자체 정보에 접근하는 예제"""
return {
"task_id": self.request.id,
"task_name": self.name,
"message": message,
"retries": self.request.retries,
}
self.request를 통해 접근 가능한 정보:
| 속성 | 설명 |
|---|---|
self.request.id | 태스크 ID |
self.request.retries | 현재 재시도 횟수 |
self.request.args | 위치 인자 |
self.request.kwargs | 키워드 인자 |
4️⃣ 재시도 로직
실패 시 자동 재시도하는 태스크를 정의할 수 있습니다.
ex) Spring의 @Retryable 역할
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.task(
bind=True,
max_retries=3, # 최대 3번 재시도
default_retry_delay=10, # 재시도 간격 10초
autoretry_for=(Exception,), # 자동 재시도할 예외
)
def send_email(self, user_id: str, email: str) -> dict:
"""이메일 발송 (실패 시 자동 재시도)"""
try:
# 외부 API 호출
response = email_api.send(email)
return {"status": "sent", "user_id": user_id}
except EmailServerError as e:
# 수동 재시도 (더 세밀한 제어)
raise self.retry(exc=e, countdown=60)
재시도 옵션
| 옵션 | 설명 |
|---|---|
max_retries | 최대 재시도 횟수 |
default_retry_delay | 기본 재시도 간격 (초) |
autoretry_for | 자동 재시도할 예외 클래스들 |
retry_backoff | 지수 백오프 활성화 |
retry_jitter | 재시도 간격에 랜덤성 추가 |
📌 결과 조회
태스크 결과는 AsyncResult를 통해 조회합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from celery.result import AsyncResult
# 태스크 호출
result = add.delay(10, 20)
task_id = result.id
# 나중에 결과 조회
async_result = AsyncResult(task_id)
# 상태 확인
print(async_result.status) # PENDING, STARTED, SUCCESS, FAILURE
# 결과 확인 (완료된 경우)
if async_result.ready():
if async_result.successful():
print(async_result.result) # 30
else:
print(async_result.traceback) # 에러 정보
태스크 상태
| 상태 | 설명 |
|---|---|
PENDING | 대기 중 (또는 알 수 없음) |
STARTED | 실행 중 |
SUCCESS | 성공 |
FAILURE | 실패 |
RETRY | 재시도 대기 |
REVOKED | 취소됨 |
Fire & Forget 패턴
실무에서는 대부분 결과를 조회하지 않습니다.
1
2
3
# 결과 안 기다림, 안 확인함
send_welcome_email.delay(user_id, email)
return response # 바로 응답
이를 Fire & Forget 패턴이라고 합니다. 결과가 성공이든 실패든 확인하지 않고, 실패 시 Celery의 재시도 메커니즘과 모니터링(Flower, Sentry)에 맡깁니다.
📌 태스크 체이닝
Celery는 여러 태스크를 조합하는 다양한 방법을 제공합니다.
ex) Spring의 CompletableFuture.thenApply() 체이닝 역할
1️⃣ Chain - 순차 실행
이전 태스크의 결과가 다음 태스크의 입력으로 전달됩니다.
1
2
3
4
5
6
7
8
9
from celery import chain
# step1 → step2 → step3 순차 실행
task_chain = chain(
step1.s("data"), # s()는 signature의 약자
step2.s(),
step3.s(),
)
result = task_chain.apply_async()
1
2
3
4
5
step1("data") → "[step1:data]"
↓
step2("[step1:data]") → "[step2:[step1:data]]"
↓
step3("[step2:[step1:data]]") → 최종 결과
2️⃣ Group - 병렬 실행
여러 태스크를 동시에 실행합니다.
1
2
3
4
5
6
7
8
9
10
11
from celery import group
# 병렬 실행
task_group = group(
add.s(1, 2),
add.s(3, 4),
multiply.s(5, 6),
)
result = task_group.apply_async()
# 결과: [3, 7, 30]
3️⃣ Chord - 병렬 실행 후 콜백
병렬 실행 결과를 모아서 콜백 태스크에 전달합니다.
1
2
3
4
5
6
7
8
9
10
11
12
from celery import chord
@app.task
def sum_results(results: list[int]) -> int:
return sum(results)
# (1+2) + (3+4) + (5+6) = 21
task_chord = chord(
[add.s(1, 2), add.s(3, 4), add.s(5, 6)], # 병렬 실행
sum_results.s() # 콜백
)
result = task_chord.apply_async()
✅ Beat - 스케줄링
정기적으로 실행되는 태스크를 설정할 수 있습니다.
ex) Spring의 @Scheduled 역할
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from celery.schedules import crontab
app.conf.beat_schedule = {
# 매 분마다 실행
"heartbeat-every-minute": {
"task": "worker.tasks.basic.heartbeat",
"schedule": 60.0, # 60초마다
},
# 매일 자정에 실행
"daily-report": {
"task": "worker.tasks.basic.daily_report",
"schedule": crontab(hour=0, minute=0),
},
# 매주 월요일 오전 9시
"weekly-summary": {
"task": "worker.tasks.basic.weekly_summary",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
},
}
crontab 표현식
| 표현식 | 설명 |
|---|---|
crontab() | 매 분 |
crontab(minute=0, hour=0) | 매일 자정 |
crontab(minute=0, hour='*/3') | 3시간마다 |
crontab(day_of_week='mon') | 매주 월요일 |
Beat 실행:
1
celery -A worker.celery_app beat --loglevel=info
✅ 실무 적용 패턴
어떤 작업을 Celery에 위임할까?
| 위임 O | 위임 X |
|---|---|
| 이메일/SMS 발송 | 즉시 응답 필요한 조회 |
| 외부 API 호출 | 가벼운 DB 작업 |
| 리포트 생성 | 빠른 계산 |
| 이미지 처리 | 즉시성 필요한 인증 |
| 실패 시 재시도 필요 | 동기적 흐름 필요 |
실무 구조 예시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# router.py
@router.post("/users/signup")
async def signup(request: SignupRequest):
return await user_service.signup(request)
# service.py
class UserService:
async def signup(self, request: SignupRequest):
# 1. 유저 생성 (동기)
user = await self.repository.save(...)
# 2. 즉시성 필요한 작업 (동기)
await self._send_verification_sms(user.phone, code)
# 3. Celery 위임 (비동기 - Fire & Forget)
send_welcome_email.delay(user.id, user.email)
notify_admin_new_signup.delay(user.id)
return response
판단 기준:
- 즉시성 필요 → 동기 처리
- 처리 보장/재시도 필요 → Celery 위임
- 외부 API, 느린 작업 → Celery 위임
- 유저 응답과 무관 → Celery 위임
📌 Redis vs RabbitMQ
| 항목 | Redis | RabbitMQ |
|---|---|---|
| 설치/운영 | 간단 | 복잡 |
| 성능 | 빠름 | 상대적으로 느림 |
| 메시지 영속성 | 기본 메모리 | 디스크 저장 |
| DLQ (Dead Letter Queue) | 없음 | 네이티브 지원 |
| 라우팅 | 단순 | Exchange/Binding |
| 사용처 | 간단한 비동기 | 메시지 보장 중요 |
결론:
- 간단한 비동기 작업 → Redis
- 메시지 유실 방지 중요 → RabbitMQ
- Celery 코드는 동일, 브로커만 변경하면 됨
✅ 워커 실행
1
2
3
4
5
6
7
8
# 워커 시작
celery -A worker.celery_app worker --loglevel=info
# 동시성 설정
celery -A worker.celery_app worker --concurrency=4
# 특정 큐만 처리
celery -A worker.celery_app worker -Q high_priority,default
주요 옵션
| 옵션 | 설명 |
|---|---|
--concurrency | 동시 처리 워커 수 |
-Q | 처리할 큐 지정 |
--loglevel | 로그 레벨 |
--autoscale=10,3 | 자동 스케일링 (최대 10, 최소 3) |
동시성(Concurrency)과 Pool
--concurrency=4는 동시에 처리할 수 있는 태스크 수를 의미합니다. 기본은 멀티 프로세스(prefork) 방식입니다.
1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────┐
│ Main Process │
│ (Worker Controller) │
└─────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Child │ │ Child │ │ Child │ │ Child │
│ Proc 1 │ │ Proc 2 │ │ Proc 3 │ │ Proc 4 │
│ Task A │ │ Task B │ │ Task C │ │ (idle) │
└────────┘ └────────┘ └────────┘ └────────┘
Pool 종류
1
2
3
4
5
6
7
8
# 멀티 프로세스 (기본값)
celery -A app worker --pool=prefork --concurrency=4
# 멀티 스레드
celery -A app worker --pool=threads --concurrency=4
# 비동기 이벤트 루프 (gevent/eventlet)
celery -A app worker --pool=gevent --concurrency=100
| Pool | 동작 방식 | 적합한 작업 | Spring 비교 |
|---|---|---|---|
prefork | 멀티 프로세스 | CPU 바운드 (계산, 이미지 처리) | ProcessPoolExecutor |
threads | 멀티 스레드 | I/O 바운드 (가벼운 API 호출) | ThreadPoolExecutor (@Async) |
gevent | 코루틴 (greenlet) | I/O 바운드 (대량 네트워크 요청) | WebFlux (리액티브) |
eventlet | 코루틴 | gevent와 유사 | - |
solo | 단일 스레드 | 디버깅용 | - |
실무 선택 기준
| 상황 | 추천 Pool | concurrency |
|---|---|---|
| 이미지 리사이징, PDF 생성 | prefork | CPU 코어 수 |
| 이메일/SMS 발송 | gevent | 100~1000 |
| 외부 API 대량 호출 | gevent | 100~500 |
| 일반적인 혼합 작업 | prefork | 코어 수 * 2 |
태스크별 타임아웃 설정
전역 설정 외에 태스크별로 개별 타임아웃을 지정할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
# 전역 설정 (기본값)
app.conf.update(
task_time_limit=30 * 60, # 30분
)
# 태스크별 개별 설정
@app.task(time_limit=3600) # 1시간
def long_batch_job():
...
@app.task(time_limit=7200) # 2시간
def super_long_report():
...
soft_time_limit vs time_limit
| 옵션 | 동작 |
|---|---|
soft_time_limit | 시간 초과 시 SoftTimeLimitExceeded 예외 발생 (정리 가능) |
time_limit | 시간 초과 시 강제 종료 (SIGKILL) |
1
2
3
4
5
6
7
8
9
10
11
12
from celery.exceptions import SoftTimeLimitExceeded
@app.task(soft_time_limit=3500, time_limit=3600)
def batch_job():
try:
# 오래 걸리는 작업
...
except SoftTimeLimitExceeded:
# soft limit 도달 - 정리 작업 수행
save_progress()
cleanup()
raise
실무에서는 soft_time_limit으로 먼저 경고받아 정리할 시간을 확보하고, time_limit으로 최종 강제 종료하는 패턴을 사용합니다.
📌 모니터링 - Flower
Celery 전용 웹 모니터링 도구입니다.
1
2
pip install flower
celery -A worker.celery_app flower --port=5555
http://localhost:5555에서:
- 실시간 태스크 상태 확인
- 워커 상태 모니터링
- 태스크 성공/실패 통계
- 태스크 취소(revoke)
- 임시로 사용은 괜찮으나 가독성이 안좋은 관계로 운영 환경에서는 매트릭을 별도로 수집하여 모니터링을 하는것을 권장
✅ 마치며
- 이처럼 Celery는 Python 생태계에서 비동기 작업 처리의 표준으로, Spring의
@Async+ RabbitMQ 조합과 유사한 역할을 합니다. 간단한 설정으로 강력한 분산 태스크 처리 시스템을 구축할 수 있다는 것이 가장 큰 장점입니다. - 그냥 단순히 MQ 추상화 + Retry + Scheduling의 기능을 제공하는 라이브러리라고 생각하면 단순할 것 같다.
- Spring Batch 처럼 복잡한 Step 구조는 지원해주진 않지만, 배치성 작업을 구현하는데에는 충분할 것 같다.
- 처리보장이 필요한 작업이나, 비동기 로직이 필요할 경우 celery를 이용하여 EDD를 구현하는것도 좋은 방법일 것 같다.