만들어보기/TroubleShooting

[SpringBoot] SQS 메시지 발행 병목 @Async + Executor로 해결하기

다섯자두 2025. 5. 16. 22:14

☑️ 문제 정의 : 비동기처럼 보이는 동기 처리

현재 우리 시스템의 알림 전송은 SQS 기반 Outbox 패턴을 따르고 있다.

알림이 필요한 이벤트가 발생하면 다음과 같은 흐름으로 메시지가 처리된다.

  1. 알림 정보를 Notification 테이블(사용자 조회용)과 Outbox 테이블(메시지 발행용)에 저장
  2. 저장된 메시지를 SQS에 발행
  3. SQS 소비자가 메시지를 받아 실제 사용자에게 알림 전송

알림 전송은 구조적으로 비동기처럼 보이지만, SQS로 메시지를 발행하는 로직만은 동기적으로 동작하고 있었다.

이로 인해 대규모 알림 발송 시 병목이 발생할 수밖에 없는 구조였다.

▶︎ 단계별 병목 지점 정리

병목이 예상되는 지점은 다음과 같다.

  1. 알림 데이터를 DB에 저장할 때
  2. 메시지를 SQS로 발행할 때
  3. 알림을 사용자에게 전송할 때
단계 처리 내용 기존 처리 방식 병목 해결 방식 해결 여부
① 알림 저장 Notification + Outbox 저장 JPA saveAll JdbcTemplate의 Batch Update 적용 해결
② 메시지 발행 Outbox → SQS 동기 처리 비동기 + 병렬 처리로 리팩토링 필요 ❌ 미해결
③ 사용자 전송 SQS 메시지 소비 및 알림 전송 단일 인스턴스 처리 SQS 소비자 수 확대로 병렬 분산 가능 해결 가능

1번과 3번은 기술적으로 해결 방안이 마련되어 있었지만
2번은 트래픽이 몰릴수록 선형적으로 지연되는 구조라 결국 전체 API 응답 속도 저하를 야기했다 . .

▶︎ 실제 테스트 결과

실제로 20000명의 사용자가 찜한 가게에서 쿠폰 이벤트를 발행할 경우를 테스트해보았다.

응답 속도 7분 31초 ^^....ㅎㅎ

SQS 발행도 비동기 + 병렬 처리가 반드시 필요하다는 것을 절감했다.


☑️  리팩토링 목표

  1. @Async 기반 비동기 처리로 전환 → 응답 속도 단축
    • 발행 작업을 별도 쓰레드로 넘겨서 API 요청 자체를 빠르게 응답
    • 사용자는 메시지 처리 여부와 상관없이 빠르게 응답 받을 수 있게 함
  2. 메시지 Batch 단위로 병렬 처리 → 작업 속도 향상
    • 100개 단위로 쪼개서 여러 쓰레드가 병렬로 처리하도록
    • 전체 처리 시간 감소

☑️  리팩토링

OutboxProcessor

  • 메시지를 100개 단위로 나누고 비동기 발행을 요청하는 역할
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxProcessor {

    private final OutboxPublisher outboxPublisher;
    static final int BATCH_SIZE = 100;

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void process(List<OutboxMessage> messages) {
        List<List<OutboxMessage>> batches = partition(messages);

        for (List<OutboxMessage> batch : batches) {
            outboxPublisher.publishBatch(batch);
        }
    }

    private List<List<OutboxMessage>> partition(List<OutboxMessage> messages) {
        List<List<OutboxMessage>> result = new ArrayList<>();
        for (int i = 0; i < messages.size(); i += BATCH_SIZE) {
            result.add(messages.subList(i, Math.min(i + BATCH_SIZE, messages.size())));
        }
        return result;
    }
}

OutboxPublisher

  • 실제 발행 로직을 비동기로 실행하는 역할
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxPublisher {

    private final MessagePublisher publisher;
    private final OutboxMessageRepository outboxMessageRepository;
    private final AggregateQueueMapper aggregateQueueMapper;

    @Async("outboxExecutor")
    @Transactional
    public void publishBatch(List<OutboxMessage> messages) {
        String threadName = Thread.currentThread().getName();
        log.info("[OUTBOX-PUBLISHER] 시작 - thread={}", threadName);

        List<String> successIds = new ArrayList<>();
        List<String> failedIds = new ArrayList<>();

        for (OutboxMessage message : messages) {
            try {
                String queueName = aggregateQueueMapper.getQueueName(message.getAggregateType());
                publisher.publish(queueName, message.getPayload());
                successIds.add(message.getId());
            } catch (Exception e) {
                log.warn("Failed to publish outbox : id={}, reason={}", message.getId(), e.getMessage());
                failedIds.add(message.getId());
            }
        }

        if (!successIds.isEmpty()) {
            outboxMessageRepository.markAllAsSent(successIds, LocalDateTime.now());
            log.info("Published {} messages successfully on thread {}", successIds.size(), threadName);
        }
        if (!failedIds.isEmpty()) {
            outboxMessageRepository.markAllAsFailed(failedIds);
            log.warn("Failed to publish {} messages on thread {}", failedIds.size(), threadName);
        }
    }
}

▶︎ Executor 설정

  • 병렬 처리와 안정성 확보를 위해서 쓰레드 풀 구성
@Configuration
public class ExecutorConfig {

    @Bean(name = "outboxExecutor")
    public ExecutorService outboxExecutor() {
        AtomicInteger threadNumber = new AtomicInteger(1);

        ThreadFactory threadFactory = runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("outbox-executor-" + threadNumber.getAndIncrement());
            return thread;
        };

        return new ThreadPoolExecutor(
          10,
          20,
          60L, TimeUnit.SECONDS,
          new LinkedBlockingQueue<>(1000),
                threadFactory,
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}
설정값 설명
corePoolSize = 10 기본 10개 스레드 병렬 처리
maxPoolSize = 20 10개 추가 생성 가능
queueCapacity = 1000 초과된 작업은 큐에 대기
CallerRunsPolicy 큐도 가득 차면 호출자 스레드가 직접 실행 → 유실 없이 정합성 보장하도록 함
  • rejectionPolicy로 ``CallerRunsPolicy``를 선택한 이유
    • 알림이 유실되지 않도록 정확성/신뢰성을 보장하는 정책을 선택함

스레드 풀은 가장 기본적으로 세팅을 했고, 최적화를 위해서는 추후 튜닝 계획을 세워봐야할 것 같다.


🚨 테스트 중 발생한 문제

데드락 발생

메시지 발행 중 outbox 테이블에서 데드락이 발생했다.

2025-05-16T17:56:43.185+09:00  WARN 44031 --- [wait4eat] [   scheduling-1] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 1213, SQLState: 40001
2025-05-16T17:56:43.186+09:00 ERROR 44031 --- [wait4eat] [   scheduling-1] o.h.engine.jdbc.spi.SqlExceptionHelper   : Deadlock found when trying to get lock; try restarting transaction
2025-05-16T17:56:43.187+09:00 ERROR 44031 --- [wait4eat] [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task

org.springframework.dao.CannotAcquireLockException: 
could not execute statement [Deadlock found when trying to get lock; try restarting transaction] 
[/* update for com.example.wait4eat.global.message.outbox.entity.OutboxMessage */update outbox_messages set aggregate_id=?,aggregate_type=?,payload=?,retry_count=?,sent_at=?,status=? where id=?]; 
SQL [/* update for com.example.wait4eat.global.message.outbox.entity.OutboxMessage */update outbox_messages set aggregate_id=?,aggregate_type=?,payload=?,retry_count=?,sent_at=?,status=? where id=?]

원인

  • `OutboxPublisher`에서 PENDING 메시지를 처리하는 도중
  • `OutboxRetryScheduler`에서도 동일 메시지를 재발행하고, UPDATE 시도하여 데드락 발생

원인은 `OutboxRetryScheduler`에서 PENDING & FAILED 상태인 아웃박스 메시지를 5초마다 확인하고 재발행하고 있기 때문이었다.

기존 로직에서는 PENDING 상태로 남아있는 아웃박스 메시지는 '발행 실패'와 마찬가지였으므로 위와 같이 설계한 것이었는데, 이제는 이를 수정할 필요가 있다.

해결 방법

  • `OutboxRetryScheduler`는 FAILED 상태의 메시지만 재처리하도록 제한
  • 새 스케줄러 추가 - 1분 이상 PENDING 상태인 메시지를 FAILED 마킹 처리
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxExpirationScheduler {

    private final OutboxMessageRepository outboxMessageRepository;

    /**
     * 즉시 발행에 실패한 메시지를 FAILED로 자동 보정
     */
    @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)
    @Transactional
    public void markOldPendingMessagesAsFailed() {
        int updatedCount = outboxMessageRepository.markPendingAsFailedBefore(LocalDateTime.now().minusMinutes(1));
        log.info("[OutboxExpirationScheduler] 1분 이상 PENDING 상태인 메시지 {}건을 FAILED로 변경", updatedCount);

    }
}

기존 재발행 스케줄러는 FAILED 상태인 메시지만 재발행하도록 하여 `OutboxPublisher`와 겹치지 않도록 하고,
PENDING 상태로 남아있는 메시지를 실패처리하는 스케줄러를 추가하여 이를 해결하였다.


☑️  결과

다음과 같이 스레드 풀 내의 스레드가 병렬로 메시지 발행을 처리한다.

API 응답 속도는?

당연히 향상되었으며 5초 이내에 응답이 완료되는 것을 확인하였다.

  • 10,000건에 대해 batch update 저장 테스트 결과, 0.851초 소요됨
  • 저장 시간이 선형적으로 증가한다고 가정할 경우,  Notification + Outbox 데이터 총 40,000건 저장 시 약 3.4초 소요될 것 으로 예측
  • 기타 부수 작업들을 고려하면 찜한 유저 2만명 대해 5초 이내 응답 속도는 정상 범위로 판단된다

▶︎ 성과 요약

항목 개선 전 개선 후
메시지 발행 방식 동기 처리 비동기 Batch 병렬 처리
최대 응답 시간 약 451초 약 5초
발행 안정성 데드락 발생 상태 분리 및 조건 재처리로 안정화

💬 마무리하며

이번 리팩토링을 통해 알림 발송의 병목을 정확히 짚고,
비동기 & 병렬 처리로 응답성과 처리 속도를 동시에 개선할 수 있었다.

향후에는

  • Executor의 동적 튜닝
  • 알림 처리량 기반 자동 확장

등 운영 관점의 최적화도 적용해보면 좋을 것 같다.