Skip to content

비동기 태스크

truloop-ai-server의 Celery 기반 비동기 태스크 시스템을 설명합니다.


아키텍처


Celery 설정

celery_app.py

python
celery_app = Celery(
    __name__,
    broker=broker_url,      # Redis/Valkey
    backend=backend_url,    # Redis/Valkey
    include=auto_discover_tasks(),  # 자동 태스크 발견
)

주요 설정

설정설명
task_serializerjson태스크 데이터 직렬화
result_serializerjson결과 직렬화
timezoneAsia/Seoul시간대
task_acks_lateTrue태스크 완료 후 ACK
worker_prefetch_multiplier1Worker당 프리페치 수
visibility_timeout3600태스크 가시성 타임아웃 (1시간, broker/backend 모두)
priority_steps[0..9]태스크 우선순위 단계 (0-9)
accept_content["json"]허용 콘텐츠 타입
enable_utcTrueUTC 사용

자동 태스크 발견

tasks/ 디렉토리의 *_tasks.py 파일을 자동으로 발견합니다:

python
def auto_discover_tasks():
    # tasks/*_tasks.py → tasks.{name}_tasks
    for task_file in glob.glob("tasks/*_tasks.py"):
        module_name = task_file.replace("/", ".").replace(".py", "")
        task_modules.append(module_name)

태스크 정의

콘텐츠 생성 태스크

현재 3개의 Celery 태스크가 정의되어 있습니다:

태스크Queue설명
generate_content_taskcontent_generation일반 콘텐츠 생성
generate_highlight_content_taskhighlight_generation하이라이트 콘텐츠 생성
generate_poster_content_taskposter_generation포스터 콘텐츠 생성

태스크 흐름

태스크 파라미터

파라미터타입설명
generated_content_idint이미 생성된 LoopGeneratedContent의 ID
template_idint사용할 템플릿 ID
user_idint요청 사용자 ID
loop_idint연결된 룹 ID
input_datadict생성에 필요한 입력 데이터
lang_codestr?언어 코드
is_publicbool공개 여부
include_loop_infobool룹 정보 포함 여부

태스크 설정

python
@current_app.task(bind=True, max_retries=0)
설정설명
bindTrueself 접근 (태스크 인스턴스)
max_retries0재시도 없음

정보

모든 콘텐츠 생성 태스크는 재시도하지 않습니다 (max_retries=0). AI 생성 결과는 비결정적이므로, 실패 시 에러를 로깅하고 종료합니다.


Queue 라우팅

각 태스크 타입은 전용 Queue로 라우팅됩니다:

python
task_routes = {
    "tasks.content_generation_tasks.generate_content_task": {
        "queue": "content_generation"
    },
    "tasks.content_generation_tasks.generate_highlight_content_task": {
        "queue": "highlight_generation"
    },
    "tasks.content_generation_tasks.generate_poster_content_task": {
        "queue": "poster_generation"
    },
}

이를 통해 태스크 타입별로 독립적인 Worker 스케일링이 가능합니다.


TaskRegistry

TaskRegistry(app/core/task_registry.py)는 순환 import를 방지하고 타입 안전한 태스크 실행을 제공하는 중앙집중식 Celery Task 관리 클래스입니다.

python
from app.core.task_registry import TaskRegistry

# 포스터 콘텐츠 생성 태스크 실행
TaskRegistry.generate_poster_content(
    generated_content_id=1,
    template_id=2,
    user_id=3,
    loop_id=4,
    input_data={"media_eids": ["abc123"]},
)
메서드태스크 이름설명
generate_content()tasks.content_generation_tasks.generate_content_task일반/스토리 콘텐츠 생성
generate_highlight_content()tasks.content_generation_tasks.generate_highlight_content_task하이라이트 콘텐츠 생성
generate_poster_content()tasks.content_generation_tasks.generate_poster_content_task포스터 콘텐츠 생성

정보

TaskRegistrycelery_app.send_task()를 내부적으로 사용하여, 태스크 모듈을 직접 import하지 않고도 태스크를 발행할 수 있습니다. 이는 순환 import 문제를 해결합니다.


동기/비동기 브릿지

Celery Worker는 동기 프로세스이므로, FastAPI의 비동기 코드를 async_to_sync로 브릿지합니다:

python
from asgiref.sync import async_to_sync

# Celery 태스크 내에서 비동기 서비스 호출
async_to_sync(service._process_content_generation)(
    generated_content_id=generated_content_id,
    input_data=input_data,
    origin_template_id=origin_template_id,
    prompt=prompt,
    lang_code=lang_code,
)

실행 명령어

bash
# 기본 Worker 실행
celery -A celery_app worker --loglevel=info

# 특정 Queue만 처리
celery -A celery_app worker -Q content_generation --loglevel=info

# 여러 Queue 처리
celery -A celery_app worker -Q content_generation,highlight_generation --loglevel=info

# 동시 처리 수 지정
celery -A celery_app worker --concurrency=4 --loglevel=info

모니터링

Redis 연결 확인

시작 시 Redis 연결을 확인합니다. 연결 실패 시 Celery Worker는 즉시 종료됩니다:

python
if not test_redis_connection():
    logger.error("Failed to connect to Redis/Valkey!")
    sys.exit(1)

에러 추적

  • Sentry 통합으로 태스크 에러 자동 보고
  • 상세 로깅 (태스크 ID, 파라미터, 에러 정보)
  • 태스크 결과는 Redis Backend에 저장 (visibility_timeout: 1시간)