[Springboot] AWS SQS를 이용한 알림 시스템 구현하기 - 메시지 발행 로직
서론
기존에는 Spring의 `ApplicationEventPublisher`와 Redis의 Pub/Sub 구조를 활용해 알림 기능을 구현하고 있었다.
이 구조는 내부 서버간 이벤트 전파와 브라우저 클라이언트로의 SSE 알림 전송에 효과적이었지만, 알림 수가 많아질수록 처리에 병목이 생길 수 있으며 실패시 재처리가 어렵다는 문제가 있었다.
이러한 한계를 해결하고자 AWS SQS를 도입하여 알림 시스템을 재구성하게 되었다.
이 글에서 구조 설계, 고민했던 부분들과 해결 방안을 중심으로 알림 시스템 전환 과정을 정리하고자 한다.
SQS를 도입하게 된 이유
기존에는 웨이팅 호출 시 해당 웨이팅의 사용자에게 알림을 보내는 기능만 있었지만, 이후 쿠폰 이벤트가 시작되면 해당 가게를 찜한 사용자 전체에게 알림을 발송해야 하는 기능이 추가되었다. 이때 알림 대상자는 수만 명에 이를 수 있었고, 이를 기존 방식처럼 동기적으로 하나씩 처리하는 것은 서버 부하나 지연의 원인이 될 것이 예상되었다.
기존 시스템은 알림 데이터를 먼저 DB에 저장한 후 Redis Pub/Sub을 통해 전송하는 구조였는데, 이 방식은 Redis 서버에 장애가 발생하여 Publish되지 못 하면 재처리가 불가능하다는 한계가 있었다.
이를 해결하고자 다음과 같은 이유로 메시지 큐, AWS SQS 도입을 결정하게 되었다.
- 수평 확장성
- SQS는 다수의 Consumer 인스턴스를 통해 병렬 처리를 자연스럽게 지원
- 트래픽이 증가하더라도 서비스 인스턴스를 추가해 처리량을 유연하게 조절할 수 있음
- 결합도 감소
- Redis는 각 인스턴스가 실시간으로 연결되어 있어야 하므로 서버 간 결합도가 높음
- SQS는 느슨한 결합을 통해 시스템을 더 모듈화하고 유연하게 분리할 수 있음
- 장애 대응 및 재처리 가능
- 메시지가 처리되지 않으면 일정 시간 동안 SQS가 메시지를 보존
- Consumer 장애나 일시적 네트워크 오류 발생 시에도 재처리 로직을 쉽게 구현 가능
AWS SQS에 대해 더 알아본 내용은 아래 포스팅에 있다.
[AWS] AWS SQS(Simple Queue Service) 이해하기
AWS SQS란?Simple Queue Service로, AWS에서 서비스하는 메시지 큐이다.메시지 큐를 왜 사용할까?하나의 API 요청에 동시에 수행되는 후처리 작업들이 많은 경우 ⇒ 응답 지연 최소화 및 시스템 간 결합도
subbni.tistory.com
알림 이벤트 발송 구조 설계
📍 목표
- 알림 메시지의 유실 없이 전송을 보장한다. (At-least-once)
- 대량 발송 시에도 성능 및 추적성을 확보한다.
- 네트워크 장애 발생 시에도 재처리가 가능한 구조로 설계한다.
1. Outbox 테이블 설계
Outbox 패턴 : 데이터베이스 트랜잭션 내에서 처리한 결과와 함께 이벤트 메시지를 저장하고, 이후 별도의 비동기 프로세스가 메시지를 외부 시스템으로 발행하는 패턴
🔽 Outbox 패턴은 왜 필요할까?
이벤트 발행이나 알림 전송은 보통 핵심 비즈니스 트랜잭션이 완벽히 커밋된 후에만 수행되기를 기대한다.
@Transactional
public void saveAndSend() {
repository.save(...); // DB에 저장
sqsPublisher.send(...); // 메시지 전송
}
이렇게만 구현하면 메시지가 발행된 시점은 비즈니스 트랜잭션이 커밋되기 전이다.
만일 트랜잭션이 롤백되고 커밋되지 않는다하더라도 메시지 발행을 롤백할 수는 없어 문제가 발생한다.
따라서 Spring에서는 다음과 같은 코드를 자주 사용한다
@Service
@RequiredArgsConstructor
public class CouponEventService {
private final CouponEventRepository couponEventRepository;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public void createCouponEvent(...) {
CouponEvent event = new CouponEvent(...);
couponEventRepository.save(event);
// 커밋 후 실행되도록 이벤트 등록
eventPublisher.publishEvent(new CouponEventCreatedEvent(event.getId()));
}
}
@Component
@RequiredArgsConstructor
public class CouponEventLaunchedHandler {
private final SqsPublisher sqsPublisher;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handle(CouponEventLaunchedEvent event) {
sqsPublisher.send(...); // 트랜잭션 커밋 후 실행
}
}
Spring에서 제공하는 `@TransactionalEventListener`와 `phase = AFTER_COMMIT`를 사용하여 이벤트 발행 당시 열려있는 트랜잭션이 커밋된 후에만 이벤트가 발행되도록 하는 방법이다.
AFTER_COMMIT에서 SQS에 메시지를 전송하는 시점은 이미 비즈니스 트랜잭션이 성공적으로 커밋된 후다.
하지만 메시지 전송은 외부 서비스 호출을 의미하므로 다음과 같은 상황에서 메시지 전송에 실패할 수 있다.
- SQS 전송 중 네트워크 장애 발생
- 또는 SQS 호출 예외 발생
이 경우 DB는 정상 커밋됐지만 메시지는 외부 시스템에 전달되지 않아 알림이 누락되거나 후처리가 수행되지 못 하는 문제가 생긴다.
메시지가 성공적으로 전송되지 못 한 경우 비즈니스 트랜잭션을 롤백시키고 싶다 하더라도 이미 해당 트랜잭션은 커밋된 이후이다.
🔽 Outbox 패턴 구조
이러한 문제를 해결하기 위해 Outbox 패턴은 다음과 같은 구조로 동작한다.
- DB에 실제 비즈니스 데이터를 저장하는 것과 함께 이벤트 메시지를 Outbox 테이블에 함께 저장한다.
- 두 작업이 같은 트랜잭션 안에서 커밋되며 둘 중 하나라도 실패하면 트랜잭션 전체가 롤백되어 정합성을 유지할 수 있다.
- 트랜잭션 커밋 후, 별도의 Outbox Processor가 메시지를 외부로 전송한다.
- 실패 시에도 Outbox 테이블 내 마킹하여 재발행을 시도할 수 있다.
- 전송 결과를 처리한다.
- 성공 시 메시지를 삭제하거나 상태를 변경한다.
- 실패 시 상태를 FAILED로 두고 나중에 재시도할 수 있다.
▶︎ Outbox 엔티티 설계
@Entity
@Table(name = "outbox_messages",
indexes = {
@Index(name = "idx_status_created_at", columnList = "status, createdAt")
}
)
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EntityListeners(AuditingEntityListener.class)
public class OutboxMessage {
@Id
@Column(length = 36, unique = true)
private String id;
@Column(nullable = false)
private String aggregateType;
@Column
private Long aggregateId;
@Column(nullable = false)
private String payload;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private OutboxMessageStatus status = OutboxMessageStatus.PENDING;
@Column(nullable = false)
private Integer retryCount = 0;
@CreatedDate
@Column(updatable = false)
private LocalDateTime createdAt;
private LocalDateTime sentAt;
@Builder
public OutboxMessage(String id, String aggregateType, Long aggregateId, String payload) {
this.id = id;
this.aggregateType = aggregateType;
this.aggregateId = aggregateId;
this.payload = payload;
this.status = OutboxMessageStatus.PENDING;
this.retryCount = 0;
}
public void markAsSent() {
this.status = OutboxMessageStatus.SENT;
this.sentAt = LocalDateTime.now();
}
public void markAsFailed() {
this.status = OutboxMessageStatus.FAILED;
}
public void incrementRetryCount() {
this.retryCount++;
}
}
전송에 실패한 메시지를 처리할 때 `status`가 'FAILED'인 것을 `created_at`의 오름차순으로 조회하여 가져올 것이므로 두 컬럼에 대해 복합인덱스를 설정해주었다.
2. Notification + Outbox 저장 트랜잭션
사용자에게 전달될 알림 Notification은 DB에 저장되어 추후 사용자가 조회 가능하도록 한다.
따라서 알림 서비스를 필요로 하는 비즈니스 트랜잭션 내에서 사용자 조회용 Notificaiton과 메시지 발행용 Outbox가 모두 함께 저장되어야 하며, 하나라도 실패 시 전체 롤백 처리되도록 구성하였다.
아래는 가게 사장님이 쿠폰 이벤트를 발행할 것을 요청하면 Controller에서 호출되는 CouponEventService의 내용이다.
// CouponEventService
@Transactional
public CreateCouponEventResponse createCouponEvent(
AuthUser authUser,
Long storeId,
CreateCouponEventRequest request
) {
// ... 생략
CouponEvent couponEvent = CouponEvent.builder()
.store(store)
.name(request.getName())
.discountAmount(request.getDiscountAmount())
.totalQuantity(request.getTotalQuantity())
.issuedQuantity(0)
.expiresAt(request.getExpiresAt())
.createdAt(LocalDateTime.now())
.build();
CouponEvent savedCouponEvent = couponEventRepository.save(couponEvent);
eventPublisher.publishEvent(CouponEventLaunchedEvent.of(store, savedCouponEvent));
return CreateCouponEventResponse.from(savedCouponEvent);
}
비즈니스 로직을 처리하고 23번 줄에서 `CouponEventLaunchedEvent`를 발행한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class CouponEventLaunchedEventHandler {
private final StoreWishlistRepository storeWishlistRepository;
private final MessageStagingService messageStagingService;
private final ApplicationEventPublisher eventPublisher;
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void handle(CouponEventLaunchedEvent event) {
List<User> users = storeWishlistRepository.findAllUsersByStoreId(event.getStoreId());
String message = "[" + event.getStoreName() + "] " + NotificationType.COUPON_EVENT_LAUNCHED.getMessage();
NotificationMessagePublishRequest notificationMessagePublishRequest = new NotificationMessagePublishRequest(
MessageType.COUPON_EVENT_LAUNCHED,
message,
users,
NotificationType.COUPON_EVENT_LAUNCHED
);
List<OutboxMessage> outboxes = messageStagingService.stage(notificationMessagePublishRequest);
eventPublisher.publishEvent(new OutboxSavedEvent(outboxes));
}
}
해당 이벤트를 처리하는 핸들러이다. handle 메서드에서 사용하고 있는 MessageStagingService의 내용도 확인해보자.
@Component
@RequiredArgsConstructor
public class MessageStagingService {
private final NotificationService notificationService;
private final OutboxService outboxService;
@Transactional
public List<OutboxMessage> stage(NotificationMessagePublishRequest request) {
List<Notification> notifications =
notificationService.createBulk(request.getTargetUsers(), request.getNotificationType(), request.getMessage());
List<NotificationPayload> payloads = notifications.stream()
.map(notification -> new NotificationPayload(
IdGenerator.generateMessageId(),
notification.getId(),
notification.getUser().getId(),
notification.getType(),
notification.getText()
))
.toList();
// Outbox 저장
List<OutboxMessage> outboxes = outboxService.createNotificationOutboxes(payloads);
return outboxes;
}
}
정리해보자면 이렇다.
- 이벤트가 발행된 가게를 찜하고 있는 사용자들을 전부 가져온다.
- NotificationMessagePublishRequest는 "알림을 위한 메시지 발행 요청"에 필요한 정보를 담고 있다. 발행할 메시지 내용과 알림 대상자 정보 등을 넣어 MessageStagingService에 전달하고, outboxes를 전달받는다.
- MessageStagingService는 Request를 통해 전달받은 정보들로 Notification 데이터와 Outbox 데이터를 생성하여 저장한다.
- 전달 받은 outboxes 정보를 포함한 이벤트를 발행한다. 이 이벤트를 바라보고 있는 핸들러에서 outboxes로 실제 메시지를 발행하게 된다.
여기서 주의 깊게 봐야할 것은 핸들러가 `@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)`라는 점이다.
이를 통해 CouponEventService의 `createCouponEvent` 메서드에서 열린 트랜잭션 내에서 Notification, Outbox 저장이 모두 함께 수행될 수 있다. 만일 Notification이나 Outbox 저장에 실패한다면 트랜잭션 전체가 롤백되어 CouponEvent도 저장되지 않으므로 정합성을 유지할 수 있다.
참고로 Notification와 OutboxMessage는 모두 Id 전략을 `GenerationType.IDENTITY`가 아니라 수동 삽입하도록 설계하였다.
▶︎ GenerationType.IDENTITY 대신 ID 수동 삽입을 선택한 이유
Notification과 OutboxMessage는 모두 한 번에 수천~수만 건의 데이터를 저장해야 하는 대용량 insert 대상이다.
이를 위해 Spring Data JPA 대신, 성능 최적화를 위해 JdbcTemplate.batchUpdate()를 활용하여 직접 insert하도록 구현하였다.
OutboxMessage를 저장하는 OutboxJdbcRepository를 대표로 보자면 다음과 같다.
@Repository
@RequiredArgsConstructor
public class OutboxJdbcRepository {
private final JdbcTemplate jdbcTemplate;
private final String INSERT_SQL = """
INSERT INTO outbox_messages
(id, aggregate_type, aggregate_id, payload, status, retry_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, now())
""";
public void saveAll(List<OutboxMessage> outboxMessages) {
jdbcTemplate.batchUpdate(INSERT_SQL, outboxMessages, 1000,
((ps, msg) -> {
ps.setString(1, msg.getId());
ps.setString(2, msg.getAggregateType());
ps.setLong(3, msg.getAggregateId());
ps.setString(4, msg.getPayload());
ps.setString(5, msg.getStatus().name());
ps.setInt(6, msg.getRetryCount());
}
));
}
}
문제: @GeneratedValue(strategy = IDENTITY)와 batch insert는 함께 사용할 수 없다
- IDENTITY 전략은 DB가 insert 시점에 ID를 생성하는 방식이다.
- 하지만 batch insert의 경우, insert된 각 행에 대해 생성된 ID 값을 직접 알 수 있는 메커니즘이 없다.
- JPA에서는 SEQUENCE 전략을 사용하면 insert 전에 ID를 미리 할당받을 수 있어 해결이 가능하지만, 현재 프로젝트에서 사용하는 MySQL은 시퀀스를 지원하지 않기 때문에 SEQUENCE 전략을 사용할 수 없다.
- 결과적으로, batch insert 후에도 ID 값은 null 상태로 남게 된다.
- 따라서 생성된 ID를 기반으로 후속 처리를 해야 하는 상황에서는 부적절하다.
해결 : ID 직접 주입
현재 생성된 데이터를 기반으로 메시지를 발행해야 하므로, Notification과 Outbox의 ID가 필요하다.
따라서 두 테이블의 ID는 객체 생성 시점에 직접 ID를 생성해서 주입하도록 설계하였다.
다음은 각 테이블의 ID를 생성하는 IdGenerator 내용이다.
public class IdGenerator {
private IdGenerator() { }
public static String generateMessageId() {
return UUID.randomUUID().toString();
}
public static String generateNotificationId() {
return UUID.randomUUID().toString().substring(0,8);
}
}
3. SQS 발행 로직 구성
도메인 이벤트 핸들러에서 알림 대상 유저를 조회하고 Notification과 OutboxMessage를 저장한 뒤, 이 메시지들을 포함한 OutboxSavedEvent를 발행하고 있었다.
해당 이벤트를 바라보는 핸들러는 다음과 같다.
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxSavedEventHandler {
private final OutboxProcessor outboxProcessor;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handle(OutboxSavedEvent event) {
outboxProcessor.process(event.getMessages());
}
}
- Outbox 메시지들을 OutboxProcessor에 전달하여 실제 메시지 발송을 트리거한다.
이 핸들러는 `@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)` 이다.
이를 통해 `OutboxSavedEvent`가 발행된 트랜잭션인 메인 비즈니스 트랜잭션이 완전히 커밋되고 나서, 실제 메시지 발송이 수행됨을 보장할 수 있다.
다음은 저장된 Outbox 메시지들로 실제 메시지 발송 로직을 수행하는 OutboxProcessor의 내용이다.
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxProcessor {
private final MessagePublisher publisher;
private final OutboxMessageRepository outboxMessageRepository;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void process(List<OutboxMessage> messages) {
List<String> successIds = new ArrayList<>();
List<String> failedIds = new ArrayList<>();
for (OutboxMessage message : messages) {
try {
String payload = message.getPayload();
publisher.publish(payload);
successIds.add(message.getId());
} catch (Exception e) {
failedIds.add(message.getId());
log.warn("즉시 발송 실패: type={}, reason={}",
message.getAggregateType(), e.getMessage());
}
}
if (!successIds.isEmpty()) {
outboxMessageRepository.markAllAsSent(successIds, LocalDateTime.now());
}
if (!failedIds.isEmpty()) {
outboxMessageRepository.markAllAsFailed(failedIds);
}
}
}
- try-catch문 안에서 MessagePublisher.publish(payload)를 호출하여 메시지를 하나씩 큐에 발행한다.
- 발행에 성공한 메시지와 발행에 실패한 메시지를 따로 분리하고, 각각 'SENT' 혹은 'FAILED'로 마킹한다.
위 코드에서 주의해야할 점은 `@Transactional(propagation = Propagation.REQUIRES_NEW)`이라는 점이다.
- 이 메서드는 phase = TransactionPhase.AFTER_COMMIT 인 이벤트 핸들러에서 호출된다.
- 이 때는 이미 트랜잭션이 커밋되고 트랜잭션 동기화가 TransactionSynchronizationManager.clear()에 의해 이미 해제된 상태이다.
- 따라서 일반적인 @Transactional의 Propagation.REQUIRED로는 트랜잭션이 제대로 시작되지 않거나, 시작되더라도 Spring이 이를 관리하지 못할 수 있다.
실제로 초기에 단순 `@Transactional`만 달았더니 Outbox 메시지의 마킹이 DB에 반영되지 않아 관련 내용에 대해 찾아보았었다.
지금 얼추 개념적으로 이해는 가는데, 확실히 이해하기 위해서는 관련 내용에 대해 더 공부를 해봐야 할 것 같다.
어쨌거나 이렇게 메시지를 발행한다. 이제 발행에 실패하여 'FAILED'로 마킹된 메시지들을 재처리해주면 된다.
4. 발행 실패 메시지 재처리 스케줄러 구현
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRetryScheduler { // TODO : 여러 인스턴스에서 동시에 수행되지 않도록 Shed Lock 적용 필요
private final OutboxMessageRepository outboxMessageRepository;
private final MessagePublisher messagePublisher;
private static final int MAX_RETRY_COUNT = 3;
private static final int BATCH_SIZE = 100;
@Transactional
@Scheduled(fixedRate = 10000)
public void retry() {
List<OutboxMessage> messages = outboxMessageRepository
.findFailedOutboxByRetryCountLessThanOrderByCreatedAtDesc(
MAX_RETRY_COUNT,
BATCH_SIZE
);
if(messages.isEmpty()) return;
log.info("[Outbox 재시도 시작] count={}", messages.size());
for (OutboxMessage message : messages) {
try {
messagePublisher.publish(message.getPayload());
log.info("재발송 성공: id={}", message.getId());
message.markAsSent();
} catch (Exception e) {
message.incrementRetryCount();
log.warn("메세지 큐 재발송 실패: type={}, aggregateId={}, reason={}",
message.getAggregateType(), message.getAggregateId(), e.getMessage());
}
}
outboxMessageRepository.saveAll(messages);
}
}
- 10초마다 status = `FAILED`이며 retryCount가 `MAX_RETRY_COUNT = 3`개 이하인 Outbox 메시지들을 100개씩 createdAt 오름차순으로 (생성된 지 오래된 순으로) 조회해 온다.
- 하나씩 메시지를 재발행 시도하고 성공시 `SENT` 마킹을, 실패시 retryCount++ 처리를 한다.
3번, 즉 30초 내에 해결되지 못 한 이벤트는 단순 네트워크 장애로 인한 실패가 아닌 명시적 이유가 있는 것으로 판단하고 재처리 대상에서 제외한다. 이는 추후 슬랙 알림 서비스를 통해 관리자에게 알림이 가도록 추가 구현할 예정이다.
(+) 우선은 매우 단순하게 구현함

또, 가용 중인 서버가 여러 개가 될 경우 각각의 서버에서 같은 Outbox 데이터를 읽어 중복 재발행할 위험이 있다. 이를 막기 위해선 Shed Lock을 적용하여야 한다. 하지만 현재 우리 프로젝트의 서버는 단일 서버이므로 우선 해당 내용의 구현은 뒤로 미루었다.
하여튼! 이 재처리 스케줄러를 구현함으로써 발행에 실패한 메시지들을 재처리하여 일시적인 인프라 문제로 인한 손실 없이 메시지가 큐에 정상적으로 발행될 수 있도록 보장한다.
결론
이처럼 메시지 발행 로직은 Outbox 테이블을 중심으로 다음과 같은 단계로 구성되었다.
- 핵심 비즈니스 트랜잭션과 함께 Notification 및 Outbox 데이터를 하나의 트랜잭션 내에서 안전하게 저장
- 트랜잭션 커밋 이후에만 메시지 발송이 수행되도록 이벤트 리스너를 분리
- 발송 실패 시 메시지 상태를 마킹하여 재처리 가능하도록 설계
- 스케줄러를 통해 정기적으로 발송 실패 메시지를 조회 및 재시도하도록 처리
- 재시도 한계를 초과한 메시지는 알림 시스템 등 외부 대응 수단을 통해 후속 조치 가능하도록 준비
아래에 전체 흐름을 다이어그램으로 정리한 이미지를 남긴다.

다음은 메시지 소비 로직을 어떻게 구성했는지 작성해보겠다!