๐Ÿ’Š Java & Kotlin & Spring/- spring framework +

DLQ ๋กœ SQS Consumer ์—๋Ÿฌ ์ฒ˜๋ฆฌํ•˜๊ธฐ

Wonit 2024. 7. 29. 00:44

๊ด€๋ จ ๊ธ€

 

์•ž์„  ์‹œ๊ฐ„์— ์šฐ๋ฆฌ๋Š” SQS ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  Spring Boot 3 ๋กœ producer/consumer ๋ฅผ ๊ตฌํ˜„ ํ•˜์˜€๋‹ค.

 

์ด๋ฒˆ ์‹œ๊ฐ„์—๋Š” Consumer Application ์—์„œ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋Š” ์‹คํŒจ์— ๋Œ€์ฒ˜ํ•˜๋Š” ์—ฌ๋Ÿฌ ๋ฐฉ๋ฒ• ์ค‘ ํ•˜๋‚˜์ธ DLQ ๋ฅผ ์•Œ์•„๋ณผ ๊ฒƒ์ด๋‹ค.

 

๋˜ํ•œ ์‹ค์Šต์„ ํ†ตํ•ด Spring Boot 3 ์™€ kotlin ์œผ๋กœ ๊ฐ„๋‹จํžˆ DLQ Consumer ๋ฅผ ๊ตฌํ˜„ํ•ด๋ณด๋„๋ก ํ•˜์ž.

 

๋ชฉ์ฐจ

  • DLQ
  • AWS ์—์„œ DLQ ๊ตฌ์„ฑํ•˜๊ธฐ
  • DLQ Consumer ๊ตฌํ˜„ํ•˜๊ธฐ
  • FIFO ์™€ DLQ

DLQ

 

์‹œ์Šคํ…œ์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์‹คํŒจํ•œ๋‹ค. ๊ทธ๋Ÿฌ๋ฏ€๋กœ ์šฐ๋ฆฌ๋Š” ์‹คํŒจ์— ์ž˜ ๋Œ€์ฒ˜ํ•  ์ˆ˜ ์žˆ๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ ๋ฐฉ๋ฒ•์„ ๊ณ ์•ˆํ•ด์•ผ ํ•˜๊ณ  ์ด๋ฅผ ๋‚ด๊ฒฐํ•จ์„ฑ (fault tolerant) ๋ผ๊ณ  ํ•œ๋‹ค.

 

AWS SQS ์—์„œ๋„ ์ด๋Ÿฌํ•œ ์—๋Ÿฌ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด DLQ ๋ผ๋Š” ๊ฒƒ์„ ์ œ๊ณตํ•œ๋‹ค.

 

DLQ ๋Š” Enterprise Integration Pattern ์ด๋ผ๋Š” ์ฑ…์—์„œ ์†Œ๊ฐœ๋œ Dead-Letter-Channel ์ด๋ผ๋Š” ํŒจํ„ด์„ AWS ์—์„œ ๊ตฌํ˜„ํ•œ managed queue ์„œ๋น„์Šค์ด๋‹ค.

 

 

์•ž์„  ์ฑ…์—์„œ๋Š” Dead Letter Channel ์„ ์•„๋ž˜์™€ ๊ฐ™์ด ์„ค๋ช…ํ•œ๋‹ค.

 

๋ฉ”์‹œ์ง• ์‹œ์Šคํ…œ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•  ์ˆ˜ ์—†๊ฑฐ๋‚˜ ์ „๋‹ฌํ•ด์„œ๋Š” ์•ˆ ๋œ๋‹ค๊ณ  ํŒ๋‹จ๋˜๋ฉด ๋ฉ”์‹œ์ง€๋ฅผ Dead Letter Channel ํ˜น์€ Dead Letter Queue ๋กœ ์ด๋™์‹œ์ผœ ๋ณด๊ด€ํ•ฉ๋‹ˆ๋‹ค.

 

์ฆ‰, Consumer Application ์—์„œ ํŠน์ • ํ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋‹ค ์ฒ˜๋ฆฌ์— ์‹คํŒจํ•œ๋‹ค๋ฉด ์ด๋ฅผ Dead Letter Channel ๋กœ ์˜ฎ๊ธด๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

DLQ ๋™์ž‘ ์›๋ฆฌ๋Š” ์ง์ ‘ queue ๋ฅผ ๋งŒ๋“ค๋ฉด์„œ ์•Œ์•„๋ณด์ž

 

AWS ์—์„œ DLQ ๊ตฌ์„ฑํ•˜๊ธฐ

 

์šฐ๋ฆฌ๋Š” ์ง€๋‚œ ์‹œ๊ฐ„์— Spring Boot 3 ๋กœ AWS SQS ์ธํ”„๋ผ๋ฅผ ๊ตฌ์ถ•ํ•˜์˜€๋‹ค.

 

์•ž์„œ ๋งŒ๋“  queue ์ธ member-event ์— ๋Œ€ํ•œ DLQ ๋ฅผ ๊ตฌ์„ฑํ•  ๊ฒƒ์ด๋‹ค.

 

 

DLQ ์—ญ์‹œ ์ผ๋ฐ˜์ ์ธ queue ์™€ ๋™์ผํ•˜๋ฏ€๋กœ aws sqs console ์—์„œ ์‹ ๊ทœ ํ๋ฅผ ๋งŒ๋“ค์–ด์ฃผ๋ฉด ๋œ๋‹ค.

 

์ด๋ฆ„์€ ๊ตฌ๋ถ„ํ•˜๊ธฐ ์‰ฝ๊ฒŒ member-event-dlq ๋กœ ์„ค์ •ํ•˜๊ณ  ์•ž์„  ์‹ค์Šต๊ณผ ๋™์ผํ•œ ์„ค์ •์œผ๋กœ queue ๋ฅผ ๋งŒ๋“ค๋‹ค ๋ณด๋ฉด redrive policy ์— ๋Œ€ํ•œ ์„ค์ •์„ ํ•ด์•ผํ•œ๋‹ค.

 

 

์–ด๋–ค queue ์— ๋Œ€ํ•ด DLQ ์„ค์ •์„ ํ•  ๊ฒƒ์ธ๊ฐ€? ์— ๋Œ€ํ•œ ์„ค์ •์ด๋‹ค. ์šฐ๋ฆฌ๋Š” member-event ์— ๋Œ€ํ•œ DLQ ๋ฅผ ๊ตฌ์„ฑํ•  ๊ฒƒ์ด๋ฏ€๋กœ ์•ž์„œ ์ƒ์„ฑํ•œ ํ์˜ ARN ์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

 

 

๊ทธ๋Ÿผ ์œ„์™€ ๊ฐ™์ด 2๊ฐœ์˜ ํ๊ฐ€ ์ƒ์„ฑ๋˜์—ˆ๋‹ค.

 

์ด์ œ member-event ์˜ DLQ ๋กœ member-event-dlq ๋ฅผ ์—ฐ๊ฒฐ์‹œ์ผœ์ฃผ๊ธฐ ์œ„ํ•ด member-event ํ์˜ ์„ค์ •์„ ๋ณ€๊ฒฝํ•ด๋ณด์ž.

 

 

์œ„์˜ ์„ค์ •์—์„œ 2๊ฐ€์ง€๋ฅผ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

  1. ์–ด๋–ค ํ๋ฅผ DLQ ๋กœ ์„ค์ •ํ•  ๊ฒƒ์ธ๊ฐ€
  2. ์ตœ๋Œ€ retry ํšŸ์ˆ˜๋Š” ์–ผ๋งˆ์ธ๊ฐ€

 

๊ธฐ๋ณธ์ ์œผ๋กœ queue ์€ message ๋ฅผ ์ฒ˜๋ฆฌ ์™„๋ฃŒํ•˜๋ฉด ํ•ด๋‹น message ๋ฅผ ์ œ๊ฑฐํ•˜๋Š” ์ฑ…์ž„์ด ์กด์žฌํ•œ๋‹ค.

 

ํ•˜์ง€๋งŒ ์–ด๋–ค ์ด์œ ์—์„œ์ธ์ง€ ์ฒ˜๋ฆฌ๋ฅผ ๋ชปํ•˜์—ฌ ์‚ญ์ œ๊ฐ€ ์•ˆ๋œ ๋Œ€์ƒ๋“ค์„ ๋‹ค๋ฅธ ์ž„์‹œ ํ๋กœ ์˜ฎ๊ธฐ๋Š” ๊ฒƒ์ด DLQ ์˜ ํ•ต์‹ฌ์ด๋‹ค.

 

์œ„์— ๋ณด์ด๋Š” miximum receives ๋Š” ๋ช‡๋ฒˆ ์žฌ์‹œ๋„ ๋์— DLQ ๋กœ ์˜ฎ๊ธธ ๊ฒƒ์ธ๊ฐ€? ๋ฅผ ๊ฒฐ์ •ํ•˜๋Š” ํ•„๋“œ๋‹ค.

 

์•ž์˜ ์„ค์ •์€ 3๋ฒˆ retry ๋์— ์‹คํŒจํ•˜๋ฉด DLQ ๋กœ ์˜ฎ๊ธฐ๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค.

 

DLQ Consumer ๊ตฌํ˜„ํ•˜๊ธฐ

 

DLQ Consumer ์—ญ์‹œ ์ผ๋ฐ˜ Queue ์™€ ๋™์ผํ•˜๋ฏ€๋กœ ์•ž์„  ์‹ค์Šต๋•Œ ๋งŒ๋“ค์—ˆ๋˜ SQS Listener ์™€ ๋™์ผํ•˜๊ฒŒ ์—ฐ๋™ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

 

@Component
class MemberEventListener {

  @SqsListener("member-event")
  fun listen(message: String) {
    println("received message ($message)")
  }

  @SqsListener("member-event-dlq")
  fun listenDLQ(message: String, ack: Acknowledgement) {
    println("DLQ received message ($message)")
  }

 

DLQ ์— ์ •์ƒ์ ์œผ๋กœ ๋ฉ”์‹œ์ง€๊ฐ€ ์ž˜ ์˜ฎ๊ฒจ์ง€๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ์‹ค์Šต ํ™˜๊ฒฝ์„ ์•ฝ๊ฐ„ ๋ณ€๊ฒฝํ•ด์•ผ ํ•œ๋‹ค.

 

์˜๋„์ ์œผ๋กœ member-event consumer ์—์„œ ์‹คํŒจํ•˜๊ธฐ

 

DLQ ๋Š” origin queue ์˜ ์ฒ˜๋ฆฌ๊ฐ€ ์‹คํŒจํ•˜์˜€์„ ๋•Œ ๋ฉ”์‹œ์ง€๊ฐ€ ์ „์†ก๋˜๋ฏ€๋กœ, member-event ํ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋”๋ผ๋„ ack ๋ฅผ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๋„๋ก ์„ค์ •์„ ๋ฐ”๊ฟ” ๋ณผ ๊ฒƒ์ด๋‹ค.

 

@Configuration
class SqsConfiguration {
  @Bean
  fun defaultSqsListenerContainerFactory(sqsAsyncClient: SqsAsyncClient) =
    SqsMessageListenerContainerFactory
      .builder<Any>()
      .configure { options: SqsContainerOptionsBuilder ->
        options
          .acknowledgementMode(MANUAL) // ACK ์ˆ˜๋™
      }
      .sqsAsyncClient(sqsAsyncClient)
      .build()
}

 

์•„๋ž˜ Configuration Bean ์„ ๋“ฑ๋กํ•˜๋ฉด Consumer instance ๋ฅผ ์ƒ์„ฑํ•  ๋•Œ ์ˆ˜๋™ commit mode ์ธ consumer ๋ฅผ ์ƒ์„ฑํ•  ๊ฒƒ์ด๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์ด DLQ ๋กœ ๋ฉ”์‹œ์ง€๊ฐ€ ๋“ค์–ด์˜จ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Queue Type ๊ณผ DLQ

 

์•ž์„œ ์šฐ๋ฆฌ๋Š” SQS ์—๋Š” 2๊ฐ€์ง€ ํƒ€์ž…์ด ์กด์žฌํ•œ๋‹ค๊ณ  ํ™•์ธํ–ˆ๋‹ค.

 

 

DLQ ๋ฅผ ๊ตฌ์„ฑํ•  ๋•Œ๋Š” ํ•œ ๊ฐ€์ง€ ์ฃผ์˜ํ•ด์•ผ ํ•  ์ ์ด ์กด์žฌํ•œ๋‹ค.

 

๋ฐ”๋กœ FIFO Type ์˜ Queue ์—์„œ๋Š” DLQ ๋ฅผ ๊ตฌ์„ฑํ•  ๊ฒฝ์šฐ ๋™์ผ message group ์—์„œ ๋‹ค๋ฅธ message ๋“ค์ด Blocking ๋œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

 

First In First Out ์„ ํ†ตํ•ด Exactly Once ๋ฅผ ๋ณด์žฅํ•˜๋Š” FIFO ๋Š” ๋ฉ”์‹œ์ง€ ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•˜๊ธฐ ์œ„ํ•ด ๋‚ด๋ถ€์ ์œผ๋กœ ์ •๋ ฌ๊ณผ์ •์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

 

๊ทธ๋ž˜์„œ ํŠน์ • ๋ฉ”์‹œ์ง€๊ฐ€ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š์œผ๋ฉด ํ•ด๋‹น ๋ฉ”์‹œ์ง€๊ฐ€ ์กด์žฌํ•˜๋Š” origin queue ์˜ message group ์€ DLQ ์— ์˜ํ•ด Blocking ๋œ๋‹ค๋Š” ์ ์„ ๋ช…์‹ฌํ•ด์•ผ ํ•œ๋‹ค.