SQS 장시간 작업 처리

2025. 4. 18. 17:45·Python

 

sqs란?

 

  • SQS(Simple Queue Service)
  • AWS의 관리형 메시지 큐 서비스
  • 일반적으로 짧은 시간 안에 처리되는 작업에 사용
    - 결제 요청 -> 결제 처리 서버
    - 회원가입 -> 이메일 발송
    - 주문 접수 -> 재고 차감
  • SQS의 기본 Visibility Timeout도 30초인데, 이게 "30초면 충분히 처리하고 삭제하겠지"라는 전제라고 한다.

 

 

장시간 작업?!

그런데 일반적으로 저런 사용용도 외 장시간 작업에 사용한다면??

수 분~수십 분 걸리는 작업 (ex. 대용량 파일 분석, AI 추론 등)에 sqs를 그냥 사용하려고 하면 문제가 생긴다.

 

 

 Visibility Timeout 문제


SQS의 핵심 메커니즘:

1. Consumer가 메시지를 받는다 (receive)
2. 그 메시지는 다른 Consumer에게 안 보이게 된다 (visibility timeout 시작)
3. 처리 완료 후 메시지를 삭제한다 (delete)
4. 만약 timeout 안에 삭제 안 하면? -> 메시지가 다시 큐에 노출된다

 



장시간 작업에서 벌어지는 일:

[00:00] 메시지 수신, 작업 시작
[00:30] Visibility Timeout 만료 (기본 30초)
[00:30] 같은 메시지가 다른 Consumer에게 또 전달됨
[00:30] 같은 작업이 2개 동시 실행됨 (중복!)
[01:00] 또 만료, 3번째 실행...

 


최대 timeout을 12시간까지 늘릴 수 있지만, 고정값으로 설정한다면?

  • 너무 길게 잡으면: 실패한 작업이 오래 묶여있어서 재시도가 늦어짐
  • 너무 짧게 잡으면: 정상 처리 중인데 중복 실행됨

 

 

해결 방법 : Visibility Timeout 동적 연장

 

AWS는 `ChangeMessageVisibility` API를 제공하는데,

이걸 활용해서 처리 중에 주기적으로 timeout을 연장하는 방식으로 구현해 보았다.

 

 

1: 자동 연장 (Heartbeat 패턴)

 

별도 스레드가 주기적으로 timeout을 갱신

처리 시간을 예측할 수 없을 때 적합

 

def _start_visibility_timeout_extension(self, receipt_handle, interval=10, extension=300):
    """interval초마다 timeout을 extension초로 재설정하는 백그라운드 워커"""
    stop_event = threading.Event()

    def extend_loop():
        # 최초 1회 즉시 설정
        self._extend_visibility_timeout(receipt_handle, extension)

        while not stop_event.is_set():
            if stop_event.wait(interval):  # interval초 대기, 중간에 stop 오면 즉시 종료
                break
            self._extend_visibility_timeout(receipt_handle, extension)

    thread = threading.Thread(target=extend_loop, daemon=True)
    thread.start()
    return stop_event

 

 

사용 예시

# 작업 시작 시 - 10초마다 5분으로 갱신
stop = self._start_visibility_timeout_extension(receipt_handle, interval=10, extension=300)

try:
    do_long_running_work()
finally:
    stop.set()  # 반드시 중지

 

 

📌 

  • interval(10초) << extension(300초) 이므로, 한두 번 갱신 실패해도 괜찮
  • threading.Event로 깔끔하게 정리 가능 (finally에서 set())
  • daemon=True -> 메인 프로세스 종료 시 자동 정리

 

2: 콜백 연장 (단계별 Heartbeat)

 

작업 단계가 명확할 때, 각 단계 완료 시점에 직접 연장을 호출

 

def create_visibility_timeout_callback(self, receipt_handle, timeout_seconds=300):
    """호출할 때마다 timeout을 연장하는 콜백 반환"""
    def callback(stage_name=""):
        self._extend_visibility_timeout(receipt_handle, timeout_seconds)
    return callback

 

 

사용예시

callback = sqs.create_visibility_timeout_callback(receipt_handle, 300)

step1_result = extract_pages()
callback("페이지_추출")          # 아직 살아있다고 알림

step2_result = analyze(step1_result)
callback("분석_완료")            # 또 알림

save(step2_result)
callback("저장_완료")

 

 

 

비교

상황 추천
처리 시간 예측 불가, 단계 구분 없음 자동 연장 (heartbeat)
단계가 명확, 단계별 로깅이 필요 콜백 연장
둘 다 해당 자동 연장 기본 + 단계별 로그만 추가

 

 

 


 

 

 

메시지 삭제 관리

 

원칙

처리 성공 -> 삭제
처리 실패 (재시도 가능) -> 삭제 안 함 (timeout 후 자동 재노출)
처리 실패 (재시도 불가) -> 삭제 (DLQ로 이동시키거나 로깅 후 삭제)

 

 

인데,, 나 같은 경우 +DB 상태로 판별해서 삭제하고 있다

왜냐하면 큐 메시지만으로는 지금 처리중인지, 이미 끝났는지 판단이 어렵기 때문이다.

따라서 추가적으로 DB에 작업 상태를 관리하고, 메세지 수신 시 DB 상태를 확인한 뒤 처리를 결정한다.

 

메시지 수신
  |
  +-- DB 상태 조회
  |
  +-- processing -> 처리 시작 (또는 이미 처리 중이면 skip)
  +-- completed / failed -> 메시지 삭제 (이미 끝난 작업)
  +-- 레코드 없음 -> 메시지 삭제 (고아 메시지 정리)
  +-- 처리 중 예외 -> 메시지 유지 (timeout 후 재노출)

 

이렇게 하면

  • 서버가 중간에 죽어도 큐에 메세지가 남아있어 재처리됨
  • 이미 완료된 작업은 중복 실행되지 않음
  • 삭제된 작업의 고아 메세지가 큐에 무한히 쌓이는 현상을 막을 수 있음

 

 

전체 흐름

 

 

 

정리

 

SQS가 장시간 작업에 적합하지 않다고 하는 이유

  • 기본 Visibility Timeout이 30초로 짧은 작업 전제
  • 작업 진행률 추적, 단계별 실행 같은 기능이 없음
  • 보통 Step Functions, Celery + Redis/RabbitMQ, Bull(Node.js) 등이 권장됨

 

그럼에도 SQS를 쓸 수 있는 경우..?

  • 이미 AWS 환경이고 추가 인프라 도입이 부담될 때
  • 작업 빈도가 낮아서 전용 워커 인프라가 과잉 설계일 때
  • Visibility Timeout 연장 패턴만으로 충분히 커버 가능할 때

 

핵심 포인트

  • Visibility Timeout 동적 연장이 핵심 - 고정값이 아니라 처리 중에 주기적으로 갱신
  • DB 상태와 큐 메시지 동기화 - 둘 중 하나만으로는 안정적인 작업 관리 불가
  • 작업량이 늘어나거나 복잡한 워크플로우가 필요해지면 전용 솔루션으로 전환 고려

 

 

728x90
저작자표시 (새창열림)

'Python' 카테고리의 다른 글

ERROR: Failed to build 'flash_attn' when getting requirements to build wheel  (0) 2026.01.15
fara-7b 모델 테스트 과정 끄적끄적..  (0) 2025.12.01
LLM Multi-Provider 아키텍처 설계 (Registry + Strategy 패턴)  (0) 2025.04.10
Flask + LangGraph 환경에서 LLM Streaming 처리 구조  (0) 2025.03.27
LLM 시스템 프롬프트 캐싱 구조  (0) 2024.03.09
'Python' 카테고리의 다른 글
  • ERROR: Failed to build 'flash_attn' when getting requirements to build wheel
  • fara-7b 모델 테스트 과정 끄적끄적..
  • LLM Multi-Provider 아키텍처 설계 (Registry + Strategy 패턴)
  • Flask + LangGraph 환경에서 LLM Streaming 처리 구조
에이전트공방
에이전트공방
Agent로 먹고살기..🛠️
  • 에이전트공방
    에이전트공방
    에이전트공방
  • 전체
    오늘
    어제
    • 분류 전체보기 (107)
      • Python (17)
      • AWS (5)
      • Server (7)
      • Docker (7)
      • GIT (7)
      • Node (9)
      • React (11)
      • OS (8)
      • Java (3)
      • JavaScript (11)
      • Vue (12)
      • CSS (5)
  • 링크

  • 인기 글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.6
에이전트공방
SQS 장시간 작업 처리
상단으로

티스토리툴바