๊ด๋ จ ๊ธ
- AWS SQS + Spring Boot 3 + kotlin ์ธํ๋ผ ๊ตฌ์ถํ๊ธฐ
- AWS SQS Consumer ์๋ฌ๋ฅผ DLQ ๋ก ์ฒ๋ฆฌํ๊ธฐ
- message converter ๋ฅผ ์ด์ฉํ sqs message serializer
- AWS SDK ๋ก DLQ ์ ์์ธ ๋ฉ์์ง ์ฒ๋ฆฌํ๊ธฐ
์์ ์๊ฐ์ ์ฐ๋ฆฌ๋ SQS ๋ฅผ ์์ฑํ๊ณ Spring Boot 3 ๋ก producer/consumer ๋ฅผ ๊ตฌํ ํ์ฌ DLQ ๋ฅผ ์ด์ฉํ์ฌ ์๋ฌ๋ฅผ ์ฒ๋ฆฌ ํ์๋ค.
DLQ ์ ์์ธ ๋ฉ์์ง๋ค์ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ์ ๋ค์ํ๋ค. ์ง์ console ์ ์ด์ฉํ ์๋ ์๊ณ programmatically ํ๊ฒ ์ฒ๋ฆฌํ ์๋ ์๋ค.
์ด๋ฒ ์๊ฐ์๋ web console UI ๊ฐ ์๋ programmatically ํ ๋ฐฉ๋ฒ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ 2๊ฐ์ง ๋ฐฉ๋ฒ์ ๋ํด์ ์์๋ณผ ์์ ์ด๋ค.
๋ชฉ์ฐจ
- ๋ฐฐ๊ฒฝ
- DLQ ์ redrive
- DLQ ์ ์์ธ message redrive ํ๊ธฐ
- ํน์ message ๋ง redrive ํ๊ธฐ
๋ฐฐ๊ฒฝ
ํ์ฌ์์ ํน์ ์์คํ ์์ SQS ๋ฅผ ์ฌ์ฉํ๋๋ฐ, DLQ ๋ฅผ ๊ตฌ์ฑํ์ฌ ์ฒ๋ฆฌ์ ์ดํจํ ๋ฉ์์ง๋ค์ ๊ด๋ฆฌํ๋ค.
๋ณดํต consumer ์์ message ์ฒ๋ฆฌ์ ์คํจํ๋ ์ด์ ๋ 2๊ฐ์ง๋ก ๋๋๋ค.
- ์ผ์์ ์ธ ์๋ฌ
- ๋ณด์ ์ด ํ์ํ ์๋ฌ
๋ณดํต 1๋ฒ์ network timeout ์ด๋ service temporary unavailable ๊ณผ ๊ฐ์ ์ผ์ด์ค๋ก ๋๋ถ๋ถ ์ผ์ ์๊ฐ์ด ์ง๋ ํ์ retry ๋ฅผ ์ํํ๋ฉด ์ ์์ ์ผ๋ก ๋์ํ๋ค.
ํ์ง๋ง 2๋ฒ์ ๊ฒฝ์ฐ๋ ๊ฐ๋ฐ์๋ ์ด์์๊ฐ ์๋์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ํ์ธํ๊ณ ๋ณด์ ์ ์ฒ๋ฆฌํด์ผ ํ๋ค.
๋ณด์ ์ด ์๋ฃ๋๋ฉด message ๋ฅผ redrive ํ์ฌ ์ฌ์๋๋ฅผ ํ๋ค.
ํ์ง๋ง ์ฌ๋ด์ ๊ฐ๋ฐ ํ๊ฒฝ ์ ์ฑ ์ ์ํด aws console ์ ์ ๊ทผํ๊ธฐ๊ฐ ๊ท์ฐฎ๊ณ ๋ณต์กํ๊ธฐ์ ๋ฆฌ์์ค ๋ญ๋น๊ฐ ์ฌํ๊ณ ์ด๋ฅผ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ํด๊ฒฐํ ์ ์๋ ๋ฐฉ๋ฒ์ด ํ์ํ๊ณ ์ด๋ฅผ Spring Boot 3.x ์ kotlin ์ผ๋ก ๊ตฌํํ ๋ด์ฉ๋ค์ ๊ณต์ ํด๋ณด๋ ค ํ๋ค.
DLQ ์ redrive
SQS ๋ฅผ ํตํด messaging infrastructure ๋ฅผ ๊ตฌ์ฑํ application ์์๋ DLQ ๋ฅผ ๊ตฌ์ฑํ์ฌ ์คํจ์ ๋ํด ๋์ํ๋ค.
2014๋ AWS ๊ฐ ์ฒ์์ผ๋ก DLQ ๋ฅผ ์ถ์ํ๊ธฐ ์ ์ consumer application ์์ ๋ฉ์์ง ์ฒ๋ฆฌ์ ์คํจํ๋ฉด ํด๋น ๋ฉ์์ง๊ฐ retention ๊ธฐ๊ฐ๋์ ๊ณ์ํด์ queue ์ ๋จ์ ์๊ฒ ๋์๋ค.
๊ฒฐ๊ตญ consumer application ์ ์ฒ๋ฆฌํ์ง ๋ชปํ๋ message ๋ฅผ ๊ณ์ํด์ ์์๋์์ผ๋ก์จ ์ฒ๋ฆฌ๋์ด ๋ฎ์ ์ ๋ฐ์ ์์์ง๋ง.
AWS ๋ 2014๋ DLQ ๋ฅผ ๊ณต๊ฐํ๋ฉฐ SQS ์์ ์ฒ๋ฆฌ๋์ง ์์ ๋ฉ์์ง๊ฐ ๊ณ์ํด์ queue ์ ๋จ์์๋ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ์๋ค.
DLQ ์ ๋ํ ์ค์ ๊ณผ ๊ตฌ์ฑ์ ์ง๋ ์๊ฐ์ ์ค์ต์ ํตํด ์์๋ณธ DLQ ๋ก SQS Consumer ์๋ฌ ์ฒ๋ฆฌํ๊ธฐ ์์ ํ์ธํ ์ ์๋ค.
message redrive ๋?
redrive ๋, DLQ ์ ์ ์ฅ๋ ๋ฉ์์ง๋ฅผ consumer application ์ด ํด๋น ๋ฉ์์ง๋ฅผ ๋ค์ ์ฒ๋ฆฌํ ์ ์๋๋ก origin queue ๋ก ์ฎ๊ธฐ๋ ๊ฒ์ ์๋ฏธํ๋ค.
2021๋ AWS re:Invent ์์ ์ฒ์์ผ๋ก DLQ ๋ฅผ aws web console UI ์์ redrive ํ ์ ์๊ฒ ํ์๋ค.
ํ์ง๋ง ์ด๋ฅผ ํ๋ก๊ทธ๋๋ฐ ๋ฐฉ์์ผ๋ก ์ปจํธ๋กค ํ ์ ์๋ ๋ฐฉ๋ฒ์ ์์๊ณ ํด๋จผ ์๋ฌ๊ฐ ๊ฐ๋ํ ๊ฐ๋ฐ์๋ค์ ์๋์ ์์ญ์ผ๋ก ๋จ์์์๋ค.
ํ์ง๋ง 2023๋ 6์ 8์ผ Aws ์์๋ DLQ ๋ฅผ handling ํ๋ ์๋ก์ด API ๋ฅผ ๊ฐ๋ฐํ์ฌ AWS Sdk ์ ํฌํจ์์ผฐ๋ค.
DLQ ์ ์์ธ ๋ฉ์์ง redrive ํ๊ธฐ
AWS consule UI ๊ฐ ์๋ cli ๋ sdk ๋ก DLQ ์ ์์ธ ๋ฉ์์ง๋ฅผ redrive ํ๊ธฐ ์ํด์๋ start-message-move-task
๋ผ๋ ์ปค๋งจ๋๋ฅผ ์ด์ฉํ ์ ์๋ค.
์๋์ ์ ์ฝ์กฐ๊ฑด์๋ง ํด๋น๋๋ค๋ฉด DLQ ์์ origin queue ๋ก message ๋ฅผ redrive ํ ์ ์๋ค.
- DLQ ๊ตฌ์ฑ์ด ๋ queue ๋ก๋ง redrive ํ ์ ์์
- DLQ ์ ์ ์ฒด message ๊ฐ redrive ๋จ์ ๊ณ ๋ คํ์ ํจ
๊ตฌํํ๊ธฐ
์ฐ์ AWS SDK ๋ฅผ ์ฌ์ฉํด์ผ ํ์ง๋ง ๋๋ Spring Boot ๋ฅผ ์ด์ฉํ๊ณ ์๊ณ Spring Cloud AWS ์์ ์ ๊ณตํ๋ io.awspring.cloud:spring-cloud-aws-starter-sqs:3.1.1
๋ฅผ ์ด์ฉํ์ฌ ๊ตฌํํ ๊ฒ์ด๋ค.
์ค์ต์ ์ฌ์ฉ๋ ๋ชจ๋ ์์ค์ฝ๋๋ https://github.com/my-research/try-aws-sdk ์์ ํ์ธํ ์ ์๋ค.
์๋ Controller ๋ API ์์ฒญ์ ๋ฐ์ผ๋ฉด DLQ ๋ฅผ ์๋ณธ ํ๋ก ๋ค์ redrive ํ๋ ๊ธฐ๋ฅ์ ์ํํ๋ค.
@RestController
class SupportSqsController(
private val sqsClient: SqsAsyncClient,
) {
@GetMapping("/re-drive/messages")
fun reDriveAll() {
val request = StartMessageMoveTaskRequest.builder()
.sourceArn(MEMBER_DLQ_ARN)
.destinationArn(MEMBER_QUEUE_ARN)
.build()
sqsClient.startMessageMoveTask(request)
}
}
startMessageMoveTask
๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์๋ spring-cloud-aws-starter-sqs ์์ sqs client ๋ก ์ฃผ๋ก ์ฌ์ฉํ๋ SqsTemplate
์ด ์๋ SqsAsyncClient
๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค.
SqsAsyncClient
๋ฅผ ํตํด startMessageMoveTask
๋ฅผ ํธ์ถํ๊ณ , ์ธ์๋ก source ์ destination ๋ง ์ง์ ํด์ฃผ๋ฉด ๋๋ค.
ํ ์คํธํ๊ธฐ (๊ฒ์ฆ)
์ค์ต์์ ์ฌ์ฉ๋ SQS ์ consumer / config ์ ๋ณด๋ค์ ์์ ์ค์ต ๊ธ์ธ AWS SQS + Spring Boot 3 + kotlin ์ธํ๋ผ ๊ตฌ์ถํ๊ธฐ ์ AWS SQS Consumer ์๋ฌ๋ฅผ DLQ ๋ก ์ฒ๋ฆฌํ๊ธฐ ์์ ํ์ธํ ์ ์์ผ๋ฉฐ ์ ์ฒด ์์ค์ฝ๋๋ github ์์ ํ์ธํ ์ ์์ต๋๋ค.
์ฐ๋ฆฌ๊ฐ ๋ง๋ ๊ธฐ๋ฅ์ ํ ์คํธํ๊ธฐ ์ํด์ DLQ ์ ๋ค์ 2 ๊ฐ์ ๋ฉ์์ง๋ฅผ ์์๋ค.
redrive policy ์ ์ํด 3ํ retry ํ ์ต์ข ์ ์ผ๋ก ack ๊ฐ ๋์ง ์์ DLQ ์ 2๊ฐ์ ๋ฉ์์ง๊ฐ ์์ธ ์ํ์ด๋ค.
๊ทธ๋ฆฌ๊ณ ์์ ์ฐ๋ฆฌ๊ฐ ๋ง๋ API ๋ฅผ ํธ์ถํ๋ฉด ๋ค์๊ณผ ๊ฐ์ด ์ ์ฒด์ message ๊ฐ redrive ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
๋ฌธ์ ์
ํ์ง๋ง ์ด๋ ๊ฒ ํ์ ๋ ์น๋ช ์ ์ธ ๋ฌธ์ ๊ฐ ์กด์ฌํ๋ค.
์ ์ฒด ๋ฉ์์ง๋ฅผ redrive ํ๊ธฐ๋ ์ข๊ฒ ์ง๋ง ์์น ์๋ message ๋ค ๊น์ง redrive ๋์ด ์์ํ์ง ๋ชปํ๋ ์๋ฌ๊ฐ ๋ฐ์ํ ์ ์๋ค.
(ํ์ฌ ๋ฒ์ ์์๋ ๊ฐ๋ณ message ๋ง redrive ํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ์ง ์๋๋ฏ ํ๋ค..)
์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด์๋ DLQ ์์ messageId ๋ก ๊ฐ๋ณ ๋ฉ์์ง๋ค์ ์๋ณํ๊ณ ์๋ณธ ํ๋ก ๋ณด๋ด ํด๋น ๋ฉ์์ง๋ฅผ DLQ ์์ ์๋์ผ๋ก ์ ๊ฑฐํด์ค์ผ ํ๋ค.
์๋์์ ๋ ์์ธํ ์์๋ณด๋๋กํ์
ํน์ message ๋ง redrive ํ๊ธฐ
์ด ๋ฐฉ๋ฒ์ 2023๋ ์ ์ ๊ท๋ก ์ถ์๋ ๊ธฐ๋ฅ์ด ์๋๋ผ ๊ธฐ์กด์ ์กด์ฌํ๋ Aws SDK ๋ฅผ ์ด์ฉํ๋ ๊ฒ์ด๋ค.
์ปจ์ ์ ๊ฐ๋จํ๋ค.
- DLQ ์์ message ๋ฅผ polling
- ํน์ messageId ๋ก message ๋ฅผ filter
- ํด๋น message ๋ฅผ ์๋ณธ ํ๋ก ๋ค์ ์ ์ก
- ํด๋น message ๋ฅผ DLQ ์์ ์ ๊ฑฐ
๊ตฌํํ๊ธฐ
์๋๋ ๊ฐ๋จํ Controller ์ด๋ค.
/messages/{id}
๋ฅผ ํตํด messageId ๋ฅผ ์ ๋ฌ ๋ฐ๊ณ ํด๋น id ๋ก ๋ฉ์์ง๋ฅผ ์ฐพ์ ์๋ณธ ํ๋ก ์ ์กํ์ฌ DLQ ์์ ์ ๊ฑฐํ๋ ๊ธฐ๋ฅ์ ์ํํ๋ค.
@RestController
class SupportSqsController(
private val sqsClient: SqsAsyncClient,
) {
@GetMapping("/re-drive/messages/{id}")
fun reDriveBy(@PathVariable id: String) {
val messages = receiveMessages()
// ๋๋ฒ๊น
๋ก๊ทธ ์ถ๊ฐ
if (messages.isEmpty()) {
return@repeat
}
val reDriveTarget = messages.firstOrNull { it.messageId() == id }
reDriveTarget?.let {
reDriveMessage(id, it)
deleteMessage(it, id)
return
}
}
/**
* message ๋ฅผ polling
*/
private fun receiveMessages(): MutableList<Message> {
val request = ReceiveMessageRequest.builder()
.queueUrl(MEMBER_DLQ_NAME)
.maxNumberOfMessages(2)
.waitTimeSeconds(10)
.maxNumberOfMessages(10)
.visibilityTimeout(3)
.build()
val messageFuture = sqsClient.receiveMessage(request)
val messages = messageFuture.get().messages()
println("Received messages: ${messages.size}")
return messages
}
/**
* message ๋ฅผ ๋ค์ origin ํ๋ก send
*/
private fun reDriveMessage(id: String, it: Message) {
println("$id ์ ํด๋นํ๋ message ์กด์ฌํ์ฌ origin queue ๋ก redrive")
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(MEMBER_QUEUE_NAME)
.messageBody(it.body())
.build())
}
/**
* DLQ ์์ message ๋ฅผ ์ ๊ฑฐ
*/
private fun deleteMessage(it: Message, id: String) {
val deleteMessageRequest = DeleteMessageRequest
.builder()
.queueUrl(MEMBER_DLQ_NAME)
.receiptHandle(it.receiptHandle())
.build()
sqsClient.deleteMessage(deleteMessageRequest)
println("DLQ ์์ message($id, ${it.receiptHandle()}) ์ ๊ฑฐ๋จ")
}
}
์ด์ ์ค์ต์ ํตํด ํ์ธํด๋ณด์.
ํ ์คํธํ๊ธฐ (๊ฒ์ฆ)
์์ ํ ์คํธ์ ๋ง์ฐฌ๊ฐ์ง๋ก ์ฐ๋ฆฌ๊ฐ ๋ง๋ ๊ธฐ๋ฅ์ ํ ์คํธํ๊ธฐ ์ํด์ DLQ ์ ๋ค์ 2 ๊ฐ์ ๋ฉ์์ง๋ฅผ ์์๋ค.
์ฐธ๊ณ ๋ก messageId ๋ฅผ ํ์ธํ๊ธฐ ์ํด์๋ console ์์ ์ง์ ํ์ธํ ์๋ ์์ง๋ง ๋ค์๊ณผ ๊ฐ์ด consumer ์์ messageHeader ๋ฅผ parsing ํ ์๋ ์๋ค.
@SqsListener("member-event")
fun listen(
@Header(value = "id") messageId: String,
message: String
) {
println("received message(id:$messageId,$message)")
}
๊ทธ๋ฆฌ๊ณ ์์ ๋ง๋ API ๋ฅผ ํตํด ๊ฐ๋ณ redrive ๋ฅผ ํธ์ถํ๋ฉด ์๋์ ๊ฐ์ด ์ ์์ ์ผ๋ก ์๋ณธ Q ๋ก ๋ฉ์์ง๊ฐ redrive ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
redrive ๊ฐ ๋์ด ๋ค์ ์๋ณธ ํ์ redrive policy ํ์๋งํผ retry ๋ฅผ ์ํํ์๋ค.
์์ ๊ตฌํ์์๋ 2๊ฐ์ง ๋ฌธ์ ์ ์ด ์กด์ฌํ๋ค.
- polling ์ด ์๋
- message ๋ฅผ redrive ํ๋ฉด ์ ๊ท message ๊ฐ ์์ฑ๋จ
์์ ํจ์ receiveMessages()
๋ polling ์ด ์๋๋ค. maxNumberOfMessages
๋งํผ ๋จ์ง receive ํ๋ ์ญํ ์ ์ํํ๋ค.
๋ง์ฝ ๋ฉ์์ง๊ฐ 1000๊ฐ ์์ฌ์๋ค๋ฉด maxNumberOfMessage ์์ฑ ์์ ์ต๋์ธ 10๋งํผ๋ง ํ ๋ฒ ์์ ํ๊ณ ๋๋๋ค.
์ด๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด์๋ repeat(n)
๊ณผ ๊ฐ์ ์ฌ๋ฌ๊ฐ์ง ๋ฐฉ๋ฒ์ผ๋ก receiver ๋ฅผ polling ํ์์ผ๋ก ๊ตฌํํด์ค์ผ ํ๋ค.
๋ํ redrive ํ๋ค๋ฉด message ๋ฅผ ์๋ณธ ํ๋ก ์ฎ๊ธฐ๋ ๊ฒ์ด ์๋๋ผ ์ ๊ท๋ก ๋ฉ์์ง๋ฅผ ์์ฑํด์ ๋ฐํํ๋ ๊ฒ์ด๋ค.
๊ทธ๋์ messageId ๊ฐ ์๋กญ๊ฒ ์์ฑ๋๋ค๋ ๊ฒ์ ์ ์ํด์ผ ํ๋ค.
๋๊ธ