다크 모드
비동기 태스크
truloop-ai-server의 Celery 기반 비동기 태스크 시스템을 설명합니다.
아키텍처
Celery 설정
celery_app.py
python
celery_app = Celery(
__name__,
broker=broker_url, # Redis/Valkey
backend=backend_url, # Redis/Valkey
include=auto_discover_tasks(), # 자동 태스크 발견
)주요 설정
| 설정 | 값 | 설명 |
|---|---|---|
task_serializer | json | 태스크 데이터 직렬화 |
result_serializer | json | 결과 직렬화 |
timezone | Asia/Seoul | 시간대 |
task_acks_late | True | 태스크 완료 후 ACK |
worker_prefetch_multiplier | 1 | Worker당 프리페치 수 |
visibility_timeout | 3600 | 태스크 가시성 타임아웃 (1시간, broker/backend 모두) |
priority_steps | [0..9] | 태스크 우선순위 단계 (0-9) |
accept_content | ["json"] | 허용 콘텐츠 타입 |
enable_utc | True | UTC 사용 |
자동 태스크 발견
tasks/ 디렉토리의 *_tasks.py 파일을 자동으로 발견합니다:
python
def auto_discover_tasks():
# tasks/*_tasks.py → tasks.{name}_tasks
for task_file in glob.glob("tasks/*_tasks.py"):
module_name = task_file.replace("/", ".").replace(".py", "")
task_modules.append(module_name)태스크 정의
콘텐츠 생성 태스크
현재 3개의 Celery 태스크가 정의되어 있습니다:
| 태스크 | Queue | 설명 |
|---|---|---|
generate_content_task | content_generation | 일반 콘텐츠 생성 |
generate_highlight_content_task | highlight_generation | 하이라이트 콘텐츠 생성 |
generate_poster_content_task | poster_generation | 포스터 콘텐츠 생성 |
태스크 흐름
태스크 파라미터
| 파라미터 | 타입 | 설명 |
|---|---|---|
generated_content_id | int | 이미 생성된 LoopGeneratedContent의 ID |
template_id | int | 사용할 템플릿 ID |
user_id | int | 요청 사용자 ID |
loop_id | int | 연결된 룹 ID |
input_data | dict | 생성에 필요한 입력 데이터 |
lang_code | str? | 언어 코드 |
is_public | bool | 공개 여부 |
include_loop_info | bool | 룹 정보 포함 여부 |
태스크 설정
python
@current_app.task(bind=True, max_retries=0)| 설정 | 값 | 설명 |
|---|---|---|
bind | True | self 접근 (태스크 인스턴스) |
max_retries | 0 | 재시도 없음 |
정보
모든 콘텐츠 생성 태스크는 재시도하지 않습니다 (max_retries=0). AI 생성 결과는 비결정적이므로, 실패 시 에러를 로깅하고 종료합니다.
Queue 라우팅
각 태스크 타입은 전용 Queue로 라우팅됩니다:
python
task_routes = {
"tasks.content_generation_tasks.generate_content_task": {
"queue": "content_generation"
},
"tasks.content_generation_tasks.generate_highlight_content_task": {
"queue": "highlight_generation"
},
"tasks.content_generation_tasks.generate_poster_content_task": {
"queue": "poster_generation"
},
}이를 통해 태스크 타입별로 독립적인 Worker 스케일링이 가능합니다.
TaskRegistry
TaskRegistry(app/core/task_registry.py)는 순환 import를 방지하고 타입 안전한 태스크 실행을 제공하는 중앙집중식 Celery Task 관리 클래스입니다.
python
from app.core.task_registry import TaskRegistry
# 포스터 콘텐츠 생성 태스크 실행
TaskRegistry.generate_poster_content(
generated_content_id=1,
template_id=2,
user_id=3,
loop_id=4,
input_data={"media_eids": ["abc123"]},
)| 메서드 | 태스크 이름 | 설명 |
|---|---|---|
generate_content() | tasks.content_generation_tasks.generate_content_task | 일반/스토리 콘텐츠 생성 |
generate_highlight_content() | tasks.content_generation_tasks.generate_highlight_content_task | 하이라이트 콘텐츠 생성 |
generate_poster_content() | tasks.content_generation_tasks.generate_poster_content_task | 포스터 콘텐츠 생성 |
정보
TaskRegistry는 celery_app.send_task()를 내부적으로 사용하여, 태스크 모듈을 직접 import하지 않고도 태스크를 발행할 수 있습니다. 이는 순환 import 문제를 해결합니다.
동기/비동기 브릿지
Celery Worker는 동기 프로세스이므로, FastAPI의 비동기 코드를 async_to_sync로 브릿지합니다:
python
from asgiref.sync import async_to_sync
# Celery 태스크 내에서 비동기 서비스 호출
async_to_sync(service._process_content_generation)(
generated_content_id=generated_content_id,
input_data=input_data,
origin_template_id=origin_template_id,
prompt=prompt,
lang_code=lang_code,
)실행 명령어
bash
# 기본 Worker 실행
celery -A celery_app worker --loglevel=info
# 특정 Queue만 처리
celery -A celery_app worker -Q content_generation --loglevel=info
# 여러 Queue 처리
celery -A celery_app worker -Q content_generation,highlight_generation --loglevel=info
# 동시 처리 수 지정
celery -A celery_app worker --concurrency=4 --loglevel=info모니터링
Redis 연결 확인
시작 시 Redis 연결을 확인합니다. 연결 실패 시 Celery Worker는 즉시 종료됩니다:
python
if not test_redis_connection():
logger.error("Failed to connect to Redis/Valkey!")
sys.exit(1)에러 추적
- Sentry 통합으로 태스크 에러 자동 보고
- 상세 로깅 (태스크 ID, 파라미터, 에러 정보)
- 태스크 결과는 Redis Backend에 저장 (visibility_timeout: 1시간)