sqs란?
- SQS(Simple Queue Service)
- AWS의 관리형 메시지 큐 서비스
- 일반적으로 짧은 시간 안에 처리되는 작업에 사용
- 결제 요청 -> 결제 처리 서버
- 회원가입 -> 이메일 발송
- 주문 접수 -> 재고 차감 - SQS의 기본 Visibility Timeout도 30초인데, 이게 "30초면 충분히 처리하고 삭제하겠지"라는 전제라고 한다.
장시간 작업?!
그런데 일반적으로 저런 사용용도 외 장시간 작업에 사용한다면??
수 분~수십 분 걸리는 작업 (ex. 대용량 파일 분석, AI 추론 등)에 sqs를 그냥 사용하려고 하면 문제가 생긴다.
Visibility Timeout 문제
SQS의 핵심 메커니즘:
1. Consumer가 메시지를 받는다 (receive)
2. 그 메시지는 다른 Consumer에게 안 보이게 된다 (visibility timeout 시작)
3. 처리 완료 후 메시지를 삭제한다 (delete)
4. 만약 timeout 안에 삭제 안 하면? -> 메시지가 다시 큐에 노출된다
장시간 작업에서 벌어지는 일:
[00:00] 메시지 수신, 작업 시작
[00:30] Visibility Timeout 만료 (기본 30초)
[00:30] 같은 메시지가 다른 Consumer에게 또 전달됨
[00:30] 같은 작업이 2개 동시 실행됨 (중복!)
[01:00] 또 만료, 3번째 실행...
최대 timeout을 12시간까지 늘릴 수 있지만, 고정값으로 설정한다면?
- 너무 길게 잡으면: 실패한 작업이 오래 묶여있어서 재시도가 늦어짐
- 너무 짧게 잡으면: 정상 처리 중인데 중복 실행됨
해결 방법 : Visibility Timeout 동적 연장
AWS는 `ChangeMessageVisibility` API를 제공하는데,
이걸 활용해서 처리 중에 주기적으로 timeout을 연장하는 방식으로 구현해 보았다.
1: 자동 연장 (Heartbeat 패턴)
별도 스레드가 주기적으로 timeout을 갱신
처리 시간을 예측할 수 없을 때 적합
def _start_visibility_timeout_extension(self, receipt_handle, interval=10, extension=300):
"""interval초마다 timeout을 extension초로 재설정하는 백그라운드 워커"""
stop_event = threading.Event()
def extend_loop():
# 최초 1회 즉시 설정
self._extend_visibility_timeout(receipt_handle, extension)
while not stop_event.is_set():
if stop_event.wait(interval): # interval초 대기, 중간에 stop 오면 즉시 종료
break
self._extend_visibility_timeout(receipt_handle, extension)
thread = threading.Thread(target=extend_loop, daemon=True)
thread.start()
return stop_event
사용 예시
# 작업 시작 시 - 10초마다 5분으로 갱신
stop = self._start_visibility_timeout_extension(receipt_handle, interval=10, extension=300)
try:
do_long_running_work()
finally:
stop.set() # 반드시 중지
📌
- interval(10초) << extension(300초) 이므로, 한두 번 갱신 실패해도 괜찮
- threading.Event로 깔끔하게 정리 가능 (finally에서 set())
- daemon=True -> 메인 프로세스 종료 시 자동 정리
2: 콜백 연장 (단계별 Heartbeat)
작업 단계가 명확할 때, 각 단계 완료 시점에 직접 연장을 호출
def create_visibility_timeout_callback(self, receipt_handle, timeout_seconds=300):
"""호출할 때마다 timeout을 연장하는 콜백 반환"""
def callback(stage_name=""):
self._extend_visibility_timeout(receipt_handle, timeout_seconds)
return callback
사용예시
callback = sqs.create_visibility_timeout_callback(receipt_handle, 300)
step1_result = extract_pages()
callback("페이지_추출") # 아직 살아있다고 알림
step2_result = analyze(step1_result)
callback("분석_완료") # 또 알림
save(step2_result)
callback("저장_완료")
비교
| 상황 | 추천 |
| 처리 시간 예측 불가, 단계 구분 없음 | 자동 연장 (heartbeat) |
| 단계가 명확, 단계별 로깅이 필요 | 콜백 연장 |
| 둘 다 해당 | 자동 연장 기본 + 단계별 로그만 추가 |
메시지 삭제 관리
원칙
처리 성공 -> 삭제
처리 실패 (재시도 가능) -> 삭제 안 함 (timeout 후 자동 재노출)
처리 실패 (재시도 불가) -> 삭제 (DLQ로 이동시키거나 로깅 후 삭제)
인데,, 나 같은 경우 +DB 상태로 판별해서 삭제하고 있다
왜냐하면 큐 메시지만으로는 지금 처리중인지, 이미 끝났는지 판단이 어렵기 때문이다.
따라서 추가적으로 DB에 작업 상태를 관리하고, 메세지 수신 시 DB 상태를 확인한 뒤 처리를 결정한다.
메시지 수신
|
+-- DB 상태 조회
|
+-- processing -> 처리 시작 (또는 이미 처리 중이면 skip)
+-- completed / failed -> 메시지 삭제 (이미 끝난 작업)
+-- 레코드 없음 -> 메시지 삭제 (고아 메시지 정리)
+-- 처리 중 예외 -> 메시지 유지 (timeout 후 재노출)
이렇게 하면
- 서버가 중간에 죽어도 큐에 메세지가 남아있어 재처리됨
- 이미 완료된 작업은 중복 실행되지 않음
- 삭제된 작업의 고아 메세지가 큐에 무한히 쌓이는 현상을 막을 수 있음
전체 흐름

정리
SQS가 장시간 작업에 적합하지 않다고 하는 이유
- 기본 Visibility Timeout이 30초로 짧은 작업 전제
- 작업 진행률 추적, 단계별 실행 같은 기능이 없음
- 보통 Step Functions, Celery + Redis/RabbitMQ, Bull(Node.js) 등이 권장됨
그럼에도 SQS를 쓸 수 있는 경우..?
- 이미 AWS 환경이고 추가 인프라 도입이 부담될 때
- 작업 빈도가 낮아서 전용 워커 인프라가 과잉 설계일 때
- Visibility Timeout 연장 패턴만으로 충분히 커버 가능할 때
핵심 포인트
- Visibility Timeout 동적 연장이 핵심 - 고정값이 아니라 처리 중에 주기적으로 갱신
- DB 상태와 큐 메시지 동기화 - 둘 중 하나만으로는 안정적인 작업 관리 불가
- 작업량이 늘어나거나 복잡한 워크플로우가 필요해지면 전용 솔루션으로 전환 고려
728x90
'Python' 카테고리의 다른 글
| ERROR: Failed to build 'flash_attn' when getting requirements to build wheel (0) | 2026.01.15 |
|---|---|
| fara-7b 모델 테스트 과정 끄적끄적.. (0) | 2025.12.01 |
| LLM Multi-Provider 아키텍처 설계 (Registry + Strategy 패턴) (0) | 2025.04.10 |
| Flask + LangGraph 환경에서 LLM Streaming 처리 구조 (0) | 2025.03.27 |
| LLM 시스템 프롬프트 캐싱 구조 (0) | 2024.03.09 |