본문 바로가기

Infra/AWS

[AWS] SQS 전송 지연 설정

엔티티를 저장한 후, 그 정보를 이메일로 보내는 기능을 구현하였습니다. 이메일을 전송하는 작업은 시간이 오래 걸리는 작업이므로, 이를 비동기적으로 처리하기 위해 메세지 큐를 사용하였습니다. 저희 팀은 AWS 의 SQS(Simple Queue Service) 를 사용하고 있습니다. 

 

간단하게 예시를 들어보겠습니다. 우선 메세지 전송 처리부터 구현해보겠습니다.

@Transactional
fun create(seller: Seller, inquiry: Inquiry): InquiryHistory {
    val inquiryHistory = inquiryRepository.save(InquiryHistory(
        inquiry = inquiry,
        actionAt = ZoneDateTime.now()
    ))
    
    notificationCreateQueueService.send(NotificationQueue(seller.id, NotificationTargetDomain.INQUIRY, inquiry.id))
    
    return inquiryHistory
}

해당 코드는 seller 가 inquiry 를 작성할 때, create함수를 통해 해당 inquiry 가 등록된 날짜가 포함된 inquiryHistory 를 저장합니다. 그리고 이메일 발송을 비동기적으로 처리하기 위해서 notificationCreateQueueService.send() 함수를 호출합니다. 이 모든 과정은 트랜잭션으로 묶여 있습니다.

 

fun send(notificationQueue: NotificationQueue) {
    sqsTemplate.sendAsync { sqsSendOptions -> 
        sqsSendOptions
            .payload(notificationQueue)
            .queue(queue)
    }
}

send 함수는 notificationQueue메세지를 payload 로 하여 큐에 비동기적으로 전송합니다.

 

class NotificationQueue(
    val sellerId: Long,
    val targetDomain: NotificationTargetDomain,
    val targetId: Long
)

notificationQueue 메세지는 다음과 같이 작성하였습니다.

 

@SqsListener("\${sqs.queue.notification-create}")
@Transactional
fun receive(message: Message) {
    val notificationQueue = objectMapper.readValue(message.body(), NotificationQueue::class.java)

    val seller = sellerService.findSeller(notificationQueue.sellerId)
    if (notificationQueue.targetDomain == NotificationTargetDomain.INQUIRY) {
    	val inquiry = inquiryService.find(notificationQueue.targetId)
        val inquiryHistory = inquiryHistoryService.find(inquiry)
        inquiryEmailCreateService.call(inquiryHistory)
    }
}

큐에서 메시지를 수신한 후, notificationQueue.targetId를 이용해 inquiry를 조회하고, 그 다음 inquiry를 기반으로 inquiryHistory를 조회합니다. 최종적으로 이메일 발송 서비스를 호출하는 로직입니다.

 

문제 발생!

 

그러나 여기서 문제가 발생했습니다. Thread1 (엔티티를 저장하고 SQS 메시지를 전송하는 쓰레드)이 아직 완료되지 않은 상태에서 SQS가 메시지를 수신하여 Thread2 (메시지 내 ID 값을 통해 엔티티를 조회하는 쓰레드)가 실행되면서, 조회하고자 하는 inquiryHistory가 아직 데이터베이스에 커밋되지 않은 상태였던 것입니다.

 

이 문제를 해결하기 위해 두 가지 선택지를 고려했습니다

 

1. 트랜잭션이 완료된 후 메시지 전송

2. 트랜잭션이 완료된 후 메시지 수신

 

처음에는 첫 번째 방법을 시도해보려 했지만, 아직 적절한 해결책을 찾지 못했습니다. (혹시 좋은 의견이 있으시다면 공유해 주시면 감사하겠습니다.) 그래서 저는 관점을 바꾸어 두 번째 방법을 선택했습니다.

 

이제 두 번째 방법을 통해 문제를 해결한 과정을 공유하고자 합니다.

 

SQS 지연 대기열 적용

 

트랜잭션이 완료된 후 메시지를 수신하려면, 수신 시간을 지연시키는 방법이 필요했습니다. 이에 대해 조사한 결과, SQS에서는 메시지 전송을 지연시키는 기능을 제공하고 있었습니다. 이를 지연 대기열(Delay Queue)이라고 합니다.

 

지연 대기열을 사용하면, 새 메시지가 지정된 지연 시간 동안 소비자에게 표시되지 않도록 할 수 있습니다. 즉, 메시지가 대기열에 추가된 후 일정 시간이 경과한 후에야 소비자가 이를 수신하게 됩니다. 지연 대기열의 기본 지연 시간은 0초이며, 최대 15분까지 설정할 수 있습니다.

 

지연 대기열은 '전송 지연'과 '제한 시간 초과' 기능과 비슷하지만 차이가 있습니다. 지연 대기열은 메시지가 처음 대기열에 추가될 때 지연시키는 반면, 제한 시간 초과는 메시지가 소비된 후 다시 숨겨질 때 사용됩니다.

출처 : https://docs.aws.amazon.com/ko_kr/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html

 

AWS SQS 지연 대기열 적용에 관한 공식 문서는 해당 링크를 참고 부탁드립니다.

 

이 기능을 통해, 트랜잭션이 완료된 후에 메시지가 수신되도록 설정하여 문제를 해결할 수 있었습니다.