Home Celery 톺아보기
Post
Cancel

Celery 톺아보기

Celery 공식 문서에서는 Celery를 다음과 같이 정의하고 있습니다.

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

즉, Celery는 대량의 메시지를 처리하기 위한 간단하고 유연하며 신뢰할 수 있는 분산 시스템입니다.

Spring 생태계에서 RabbitMQ + @Async 또는 Kafka를 사용하여 비동기 작업을 처리하는 것처럼, Python 생태계에서는 Celery가 표준처럼 사용됩니다.

🍳 분석 환경

  • celery: 5.4.0
  • python: 3.13
  • redis: 7.x (브로커/백엔드)
  • fastapi: 0.121.1

✅ Celery의 핵심 구성 요소

Celery는 크게 다섯 가지 핵심 컴포넌트로 구성됩니다.

1
2
3
4
5
6
7
8
9
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Client    │───▶│   Broker    │───▶│   Worker    │
│ (Producer)  │    │  (Redis)    │    │ (Consumer)  │
└─────────────┘    └─────────────┘    └──────┬──────┘
                                             │
                   ┌─────────────┐           │
                   │   Backend   │◀──────────┘
                   │  (Redis)    │
                   └─────────────┘
컴포넌트역할Spring 비교
Client태스크를 생성하고 브로커에 발행RabbitTemplate.send()
Broker메시지 큐 (Redis, RabbitMQ)RabbitMQ, Kafka
Worker태스크를 consume하고 실행@RabbitListener
Backend태스크 결과 저장소-
Beat스케줄링 (정기 실행)@Scheduled

📌 Celery 기본 설정

Celery 앱은 다음과 같이 설정합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# worker/celery_app.py
from celery import Celery

app = Celery(
    "worker",
    broker="redis://localhost:6379/0",      # 메시지 브로커
    backend="redis://localhost:6379/0",     # 결과 저장소
    include=["worker.tasks.basic"],         # 태스크 모듈
)

app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Seoul",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,      # 30분 타임아웃
    result_expires=60 * 60,       # 1시간 후 결과 삭제
)

주요 설정 옵션

옵션설명
broker메시지 브로커 URL
backend결과 저장소 URL
include자동 로드할 태스크 모듈
task_serializer태스크 직렬화 방식 (json, pickle 등)
task_time_limit태스크 최대 실행 시간
result_expires결과 보관 기간



✅ 태스크 정의

1️⃣ 기본 태스크

가장 간단한 태스크 정의 방법입니다.

ex) Spring의 @Async 메서드 역할

1
2
3
4
5
6
7
8
9
10
11
from worker.celery_app import app

@app.task
def add(x: int, y: int) -> int:
    """두 숫자를 더하는 태스크"""
    return x + y

@app.task
def multiply(x: int, y: int) -> int:
    """두 숫자를 곱하는 태스크"""
    return x * y

@app.task 데코레이터를 붙이면 해당 함수가 Celery 태스크로 등록됩니다.


2️⃣ 태스크 호출 방법

태스크를 호출하는 방법은 크게 두 가지입니다.

.delay() - 간단한 호출

1
2
3
4
5
# 비동기 호출 (브로커에 메시지 발행)
result = add.delay(10, 20)

# task_id 확인
print(result.id)  # "abc-123-..."

.apply_async() - 상세 옵션 지정

1
2
3
4
5
6
7
8
result = add.apply_async(
    args=[10, 20],
    countdown=5,              # 5초 후 실행
    eta=datetime(2024, 1, 1), # 특정 시간에 실행
    expires=60,               # 60초 후 만료
    queue="high_priority",    # 특정 큐로 라우팅
    retry=True,               # 전송 실패시 재시도
)
메서드특징사용 시점
.delay()간편, 옵션 없음대부분의 경우
.apply_async()상세 옵션 지정 가능지연 실행, 큐 라우팅 등

Spring에서 RabbitTemplate.convertAndSend()로 메시지를 발행하는 것과 동일한 개념입니다.


3️⃣ bind=True로 태스크 정보 접근

bind=True 옵션을 사용하면 태스크 인스턴스에 접근할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
from celery import Task

@app.task(bind=True)
def task_with_info(self: Task, message: str) -> dict:
    """태스크 자체 정보에 접근하는 예제"""
    return {
        "task_id": self.request.id,
        "task_name": self.name,
        "message": message,
        "retries": self.request.retries,
    }

self.request를 통해 접근 가능한 정보:

속성설명
self.request.id태스크 ID
self.request.retries현재 재시도 횟수
self.request.args위치 인자
self.request.kwargs키워드 인자


4️⃣ 재시도 로직

실패 시 자동 재시도하는 태스크를 정의할 수 있습니다.

ex) Spring의 @Retryable 역할

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.task(
    bind=True,
    max_retries=3,           # 최대 3번 재시도
    default_retry_delay=10,  # 재시도 간격 10초
    autoretry_for=(Exception,),  # 자동 재시도할 예외
)
def send_email(self, user_id: str, email: str) -> dict:
    """이메일 발송 (실패 시 자동 재시도)"""
    try:
        # 외부 API 호출
        response = email_api.send(email)
        return {"status": "sent", "user_id": user_id}
    except EmailServerError as e:
        # 수동 재시도 (더 세밀한 제어)
        raise self.retry(exc=e, countdown=60)

재시도 옵션

옵션설명
max_retries최대 재시도 횟수
default_retry_delay기본 재시도 간격 (초)
autoretry_for자동 재시도할 예외 클래스들
retry_backoff지수 백오프 활성화
retry_jitter재시도 간격에 랜덤성 추가

📌 결과 조회

태스크 결과는 AsyncResult를 통해 조회합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from celery.result import AsyncResult

# 태스크 호출
result = add.delay(10, 20)
task_id = result.id

# 나중에 결과 조회
async_result = AsyncResult(task_id)

# 상태 확인
print(async_result.status)  # PENDING, STARTED, SUCCESS, FAILURE

# 결과 확인 (완료된 경우)
if async_result.ready():
    if async_result.successful():
        print(async_result.result)  # 30
    else:
        print(async_result.traceback)  # 에러 정보

태스크 상태

상태설명
PENDING대기 중 (또는 알 수 없음)
STARTED실행 중
SUCCESS성공
FAILURE실패
RETRY재시도 대기
REVOKED취소됨


Fire & Forget 패턴

실무에서는 대부분 결과를 조회하지 않습니다.

1
2
3
# 결과 안 기다림, 안 확인함
send_welcome_email.delay(user_id, email)
return response  # 바로 응답

이를 Fire & Forget 패턴이라고 합니다. 결과가 성공이든 실패든 확인하지 않고, 실패 시 Celery의 재시도 메커니즘과 모니터링(Flower, Sentry)에 맡깁니다.


📌 태스크 체이닝

Celery는 여러 태스크를 조합하는 다양한 방법을 제공합니다.

ex) Spring의 CompletableFuture.thenApply() 체이닝 역할

1️⃣ Chain - 순차 실행

이전 태스크의 결과가 다음 태스크의 입력으로 전달됩니다.

1
2
3
4
5
6
7
8
9
from celery import chain

# step1 → step2 → step3 순차 실행
task_chain = chain(
    step1.s("data"),  # s()는 signature의 약자
    step2.s(),
    step3.s(),
)
result = task_chain.apply_async()
1
2
3
4
5
step1("data") → "[step1:data]"
                      ↓
step2("[step1:data]") → "[step2:[step1:data]]"
                              ↓
step3("[step2:[step1:data]]") → 최종 결과


2️⃣ Group - 병렬 실행

여러 태스크를 동시에 실행합니다.

1
2
3
4
5
6
7
8
9
10
11
from celery import group

# 병렬 실행
task_group = group(
    add.s(1, 2),
    add.s(3, 4),
    multiply.s(5, 6),
)
result = task_group.apply_async()

# 결과: [3, 7, 30]


3️⃣ Chord - 병렬 실행 후 콜백

병렬 실행 결과를 모아서 콜백 태스크에 전달합니다.

1
2
3
4
5
6
7
8
9
10
11
12
from celery import chord

@app.task
def sum_results(results: list[int]) -> int:
    return sum(results)

# (1+2) + (3+4) + (5+6) = 21
task_chord = chord(
    [add.s(1, 2), add.s(3, 4), add.s(5, 6)],  # 병렬 실행
    sum_results.s()  # 콜백
)
result = task_chord.apply_async()

✅ Beat - 스케줄링

정기적으로 실행되는 태스크를 설정할 수 있습니다.

ex) Spring의 @Scheduled 역할

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from celery.schedules import crontab

app.conf.beat_schedule = {
    # 매 분마다 실행
    "heartbeat-every-minute": {
        "task": "worker.tasks.basic.heartbeat",
        "schedule": 60.0,  # 60초마다
    },
    # 매일 자정에 실행
    "daily-report": {
        "task": "worker.tasks.basic.daily_report",
        "schedule": crontab(hour=0, minute=0),
    },
    # 매주 월요일 오전 9시
    "weekly-summary": {
        "task": "worker.tasks.basic.weekly_summary",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
    },
}

crontab 표현식

표현식설명
crontab()매 분
crontab(minute=0, hour=0)매일 자정
crontab(minute=0, hour='*/3')3시간마다
crontab(day_of_week='mon')매주 월요일

Beat 실행:

1
celery -A worker.celery_app beat --loglevel=info

✅ 실무 적용 패턴

어떤 작업을 Celery에 위임할까?

위임 O위임 X
이메일/SMS 발송즉시 응답 필요한 조회
외부 API 호출가벼운 DB 작업
리포트 생성빠른 계산
이미지 처리즉시성 필요한 인증
실패 시 재시도 필요동기적 흐름 필요


실무 구조 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# router.py
@router.post("/users/signup")
async def signup(request: SignupRequest):
    return await user_service.signup(request)

# service.py
class UserService:
    async def signup(self, request: SignupRequest):
        # 1. 유저 생성 (동기)
        user = await self.repository.save(...)

        # 2. 즉시성 필요한 작업 (동기)
        await self._send_verification_sms(user.phone, code)

        # 3. Celery 위임 (비동기 - Fire & Forget)
        send_welcome_email.delay(user.id, user.email)
        notify_admin_new_signup.delay(user.id)

        return response

판단 기준:

  • 즉시성 필요 → 동기 처리
  • 처리 보장/재시도 필요 → Celery 위임
  • 외부 API, 느린 작업 → Celery 위임
  • 유저 응답과 무관 → Celery 위임

📌 Redis vs RabbitMQ

항목RedisRabbitMQ
설치/운영간단복잡
성능빠름상대적으로 느림
메시지 영속성기본 메모리디스크 저장
DLQ (Dead Letter Queue)없음네이티브 지원
라우팅단순Exchange/Binding
사용처간단한 비동기메시지 보장 중요

결론:

  • 간단한 비동기 작업 → Redis
  • 메시지 유실 방지 중요 → RabbitMQ
  • Celery 코드는 동일, 브로커만 변경하면 됨

✅ 워커 실행

1
2
3
4
5
6
7
8
# 워커 시작
celery -A worker.celery_app worker --loglevel=info

# 동시성 설정
celery -A worker.celery_app worker --concurrency=4

# 특정 큐만 처리
celery -A worker.celery_app worker -Q high_priority,default

주요 옵션

옵션설명
--concurrency동시 처리 워커 수
-Q처리할 큐 지정
--loglevel로그 레벨
--autoscale=10,3자동 스케일링 (최대 10, 최소 3)


동시성(Concurrency)과 Pool

--concurrency=4는 동시에 처리할 수 있는 태스크 수를 의미합니다. 기본은 멀티 프로세스(prefork) 방식입니다.

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────┐
│                  Main Process                    │
│               (Worker Controller)                │
└─────────────────────────────────────────────────┘
          │         │         │         │
          ▼         ▼         ▼         ▼
     ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
     │ Child  │ │ Child  │ │ Child  │ │ Child  │
     │ Proc 1 │ │ Proc 2 │ │ Proc 3 │ │ Proc 4 │
     │ Task A │ │ Task B │ │ Task C │ │ (idle) │
     └────────┘ └────────┘ └────────┘ └────────┘

Pool 종류

1
2
3
4
5
6
7
8
# 멀티 프로세스 (기본값)
celery -A app worker --pool=prefork --concurrency=4

# 멀티 스레드
celery -A app worker --pool=threads --concurrency=4

# 비동기 이벤트 루프 (gevent/eventlet)
celery -A app worker --pool=gevent --concurrency=100
Pool동작 방식적합한 작업Spring 비교
prefork멀티 프로세스CPU 바운드 (계산, 이미지 처리)ProcessPoolExecutor
threads멀티 스레드I/O 바운드 (가벼운 API 호출)ThreadPoolExecutor (@Async)
gevent코루틴 (greenlet)I/O 바운드 (대량 네트워크 요청)WebFlux (리액티브)
eventlet코루틴gevent와 유사-
solo단일 스레드디버깅용-

실무 선택 기준

상황추천 Poolconcurrency
이미지 리사이징, PDF 생성preforkCPU 코어 수
이메일/SMS 발송gevent100~1000
외부 API 대량 호출gevent100~500
일반적인 혼합 작업prefork코어 수 * 2


태스크별 타임아웃 설정

전역 설정 외에 태스크별로 개별 타임아웃을 지정할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
# 전역 설정 (기본값)
app.conf.update(
    task_time_limit=30 * 60,  # 30분
)

# 태스크별 개별 설정
@app.task(time_limit=3600)  # 1시간
def long_batch_job():
    ...

@app.task(time_limit=7200)  # 2시간
def super_long_report():
    ...

soft_time_limit vs time_limit

옵션동작
soft_time_limit시간 초과 시 SoftTimeLimitExceeded 예외 발생 (정리 가능)
time_limit시간 초과 시 강제 종료 (SIGKILL)
1
2
3
4
5
6
7
8
9
10
11
12
from celery.exceptions import SoftTimeLimitExceeded

@app.task(soft_time_limit=3500, time_limit=3600)
def batch_job():
    try:
        # 오래 걸리는 작업
        ...
    except SoftTimeLimitExceeded:
        # soft limit 도달 - 정리 작업 수행
        save_progress()
        cleanup()
        raise

실무에서는 soft_time_limit으로 먼저 경고받아 정리할 시간을 확보하고, time_limit으로 최종 강제 종료하는 패턴을 사용합니다.


📌 모니터링 - Flower

Celery 전용 웹 모니터링 도구입니다.

1
2
pip install flower
celery -A worker.celery_app flower --port=5555

http://localhost:5555에서:

  • 실시간 태스크 상태 확인
  • 워커 상태 모니터링
  • 태스크 성공/실패 통계
  • 태스크 취소(revoke)
  • 임시로 사용은 괜찮으나 가독성이 안좋은 관계로 운영 환경에서는 매트릭을 별도로 수집하여 모니터링을 하는것을 권장

✅ 마치며

  • 이처럼 Celery는 Python 생태계에서 비동기 작업 처리의 표준으로, Spring의 @Async + RabbitMQ 조합과 유사한 역할을 합니다. 간단한 설정으로 강력한 분산 태스크 처리 시스템을 구축할 수 있다는 것이 가장 큰 장점입니다.
  • 그냥 단순히 MQ 추상화 + Retry + Scheduling의 기능을 제공하는 라이브러리라고 생각하면 단순할 것 같다.
  • Spring Batch 처럼 복잡한 Step 구조는 지원해주진 않지만, 배치성 작업을 구현하는데에는 충분할 것 같다.
  • 처리보장이 필요한 작업이나, 비동기 로직이 필요할 경우 celery를 이용하여 EDD를 구현하는것도 좋은 방법일 것 같다.
This post is written by PRO.