πŸ“š μ‹œλ¦¬μ¦ˆ/- ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ λ°°μ›Œλ³΄λŠ” kafka

[ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ λ°°μ›Œλ³΄λŠ” kafka] 5. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ μΉ΄ν”„μΉ΄ 컨슈머(kafka consumer) μ•Œμ•„λ³΄κΈ°

Wonit 2023. 9. 7. 21:40

ν•΄λ‹Ή μ‹œλ¦¬μ¦ˆμ—μ„œ μ œκ³΅ν•˜λŠ” λͺ¨λ“  μ†ŒμŠ€μ½”λ“œλŠ” github repository μ—μ„œ μ œκ³΅λ©λ‹ˆλ‹€. μžμ„Έν•œ μ½”λ“œμ™€ ν…ŒμŠ€νŠΈ μΌ€μ΄μŠ€λŠ” github repository μ—μ„œ ν™•μΈν•΄μ£Όμ„Έμš”.

 

GitHub - my-research/kafka: apache kafka docs & practical usages(with spring) that i composed

apache kafka docs & practical usages(with spring) that i composed - GitHub - my-research/kafka: apache kafka docs & practical usages(with spring) that i composed

github.com

 

이번 [ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ λ°°μ›Œλ³΄λŠ” kafka] μ‹œλ¦¬μ¦ˆλŠ” μ•„λž˜ μˆœμ„œλŒ€λ‘œ 챕터가 κ΅¬μ„±λ˜κ³ , μ‹œλ¦¬μ¦ˆ μ™Έλ‘œ kafka κ΄€λ ¨ν•˜μ—¬ λ”μš± λ§Žμ€ ν•™μŠ΅ μ •λ³΄λŠ” kafka 심화 μ„Έμ…˜ μ—μ„œ 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

μ‹œλ¦¬μ¦ˆ λͺ©μ°¨

  1. μ‹œλ¦¬μ¦ˆλ₯Ό μ‹œμž‘ν•˜λ©°
  2. kafka λΉ λ₯΄κ²Œ 훑어보고 μ•„λŠ”μ²΄ν•˜κΈ°
  3. kafka 컨셉과 μš©μ–΄ 정리
  4. ν•™μŠ΅ν…ŒμŠ€νŠΈ μ€€λΉ„ν•˜κΈ°
  5. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ kafka producer μ•Œμ•„λ³΄κΈ°
  6. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ kafka consumer μ•Œμ•„λ³΄κΈ° <-- ν˜„μž¬ κΈ€
  7. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ partitioning μ•Œμ•„λ³΄κΈ°
  8. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ consumer group κ³Ό rebalancing μ•Œμ•„λ³΄κΈ°

ν•™μŠ΅μ˜ 단계별 μˆœμ„œλ‘œ λͺ©μ°¨κ°€ κ΅¬μ„±λ˜μ–΄μžˆμœΌλ―€λ‘œ μ„ ν–‰λ˜μ–΄μ•Ό ν•˜λŠ” 챕터가 μ‘΄μž¬ν•©λ‹ˆλ‹€


 

μ•žμ„œ μš°λ¦¬λŠ” λ©”μ‹œμ§€λ₯Ό μΉ΄ν”„μΉ΄ 브둜컀둜 λ°œν–‰ν•˜λŠ” producer 의 java client 에 λŒ€ν•΄μ„œ μ•Œμ•„λ³΄μ•˜λ‹€.

 

이번 μ‹œκ°„μ—λŠ” μΉ΄ν”„μΉ΄ λΈŒλ‘œμ»€μ— μ €μž₯된 둜그 λ©”μ‹œμ§€λ“€μ„ consume ν•˜λŠ” consumer java client 인 KafkaConsumer 에 λŒ€ν•΄μ„œ ν•™μŠ΅ν…ŒμŠ€νŠΈλ₯Ό 톡해 μ•Œμ•„λ³Ό μ˜ˆμ •μ΄λ‹€

KafkaConsumer

KafkaConsumer ν΄λž˜μŠ€λŠ” μ§€λ‚œ Producer 와 λ§ˆμ°¬κ°€μ§€λ‘œ μΉ΄ν”„μΉ΄ 브둜컀의 νŠΉμ • 토픽에 ν•΄λ‹Ήν•˜λŠ” 이벀트 둜그 Record λ₯Ό consume ν•˜λŠ” 역할을 μˆ˜ν–‰ν•œλ‹€

 

Kafka 의 consumer λŠ” Connection Pooling κ³Ό λ„€νŠΈμ›Œν¬ ν”„λ‘œν† μ½œμ„ κ΄€λ¦¬ν•œλ‹€.

 

μΉ΄ν”„μΉ΄λŠ” λ‹€λ₯Έ 일반적인 λ©”μ‹œμ§€ 큐와 λ‹€λ₯΄κ²Œ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λ”λΌλ„ ν•΄λ‹Ή λ©”μ‹œμ§€λ₯Ό μ‚­μ œν•˜μ§€ μ•ŠλŠ”λ‹€.

 

μš”μ¦˜ 많이 μ‚¬μš©ν•˜λŠ” λ‹€λ₯Έ λ©”μ‹œμ§€ 큐인 sqs λ₯Ό 보면, λ©”μ‹œμ§€ ack λ₯Ό 거치고 λ‹€λ©΄ DeletionPolicy λ₯Ό μ„€μ •ν•  수 μžˆλŠ”λ°, μΉ΄ν”„μΉ΄λŠ” μ‚­μ œκ°€ μ—†μœΌλ‹ˆ κ΄€λ ¨ μ‚­μ œ 정책이 μ—†λ‹€.

 

κ·Έλž˜μ„œ μ–Έμ œλ“ μ§€, λˆ„κ΅¬λ“ μ§€ topic 에 λŒ€ν•œ event log λ₯Ό replay ν•  수 μžˆλ‹€.

 

이 컨셉이 λ°”λ‘œ λ‹€λ₯Έ λ©”μ‹œμ§€ 미듀웨어듀과 μΉ΄ν”„μΉ΄κ°€ κ΅¬λΆ„λ˜λŠ” μ€‘μš”ν•œ 이유라고 λ³Ό 수 μžˆλ‹€.

 

KafkaConsumer μΈμŠ€ν„΄μŠ€ μƒμ„±ν•˜κΈ°

 

μƒμ„±μžμ˜ parameter 둜 μ „λ‹¬λœ key-value ν˜•νƒœμ˜ properties λ₯Ό ν†΅ν•΄μ„œ client configuration 을 λ“±λ‘ν•œλ‹€

 

 

ν•΄λ‹Ή configuration 의 μžμ„Έν•œ 사항과 νŠΉμ • 섀정듀은 confluent.io/kafka config μ—μ„œ 확인할 수 μžˆλ‹€

 

μ•žμ„  ν”„λ‘œλ“€μ„œ ν…ŒμŠ€νŠΈμ™€ λ§ˆμ°¬κ°€μ§€λ‘œ μ—¬λŸ¬ ν…ŒμŠ€νŠΈμ—μ„œ μ‚¬μš©ν•  수 μžˆλ„λ‘ Test Hepler 클래슀λ₯Ό 톡해 μ‰½κ²Œ 컨슈머 μΈμŠ€ν„΄μŠ€λ₯Ό 생성할 수 μžˆλ„λ‘ λΆ„λ¦¬ν•˜μ˜€λ‹€

 

public class KafkaConsumerTestHelper {

  public static KafkaConsumer<String, String> simpleConsumer() {
    Map<String, Object> props = Map.of(
            "bootstrap.servers", "localhost:9092",
            "group.id", "my-consumer",
            "enable.auto.commit", "true",
            "auto.offset.reset", "earliest",
            // kafka 둜 μ†ŒλΉ„ν•  message 의 s/d 클래슀 μ„€μ •
            "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"
    );

    return new KafkaConsumer<>(props);
  }
}

 

μ—¬κΈ°μ„œ λˆˆμ—¬κ²¨λ³Ό 섀정은 enable.auto.commit κ³Ό auto.offset.reset 인데 μ΄λŠ” μΆ”ν›„ 컨슈머의 commit κ³Ό offset νŒŒνŠΈμ—μ„œ λ‹€μ‹œ 이야기λ₯Ό ν•˜λ„λ‘ ν•˜κ² λ‹€. (stay tuned!!)

 

ν…ŒμŠ€νŠΈ 1. KafkaConsumer λ₯Ό ν†΅ν•œ λ©”μ‹œμ§€ μ†ŒλΉ„ ν…ŒμŠ€νŠΈ

 

이번 ν…ŒμŠ€νŠΈμ—μ„œλŠ” λ°œν–‰λœ λ©”μ‹œμ§€λ₯Ό KafkaConsumer λ₯Ό 톡해 μ†ŒλΉ„ν•΄λ³΄λ„λ‘ ν•˜κ² λ‹€

 

ν…ŒμŠ€νŠΈ 뢄석

  • 주석 1.
    • 토픽에 λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•œλ‹€
    • consumer ν…ŒμŠ€νŠΈλ₯Ό μ‰½κ²Œ ν•˜κΈ° μœ„ν•΄ ν…ŒμŠ€νŠΈμš© 토픽에 λ©”μ‹œμ§€λ₯Ό produce ν•˜λŠ” Helper ν΄λž˜μŠ€κ°€ μ‚¬μš©λ˜μ—ˆλ‹€. μžμ„Έν•œ μ½”λ“œλŠ” github μ—μ„œ 확인할 수 μžˆλ‹€
  • 주석 2.
    • μΉ΄ν”„μΉ΄ 컨슈머 μΈμŠ€ν„΄μŠ€λ₯Ό νŠΉμ • 토픽에 subscribe μ‹œν‚¨λ‹€.
    • μΉ΄ν”„μΉ΄ 컨슈머 μΈμŠ€ν„΄μŠ€λŠ” ν•œλ²ˆμ— μ—¬λŸ¬κ°œμ˜ 토픽에 subscribe ν•  수 μžˆλ‹€
  • 주석 3.
    • subscribe κ°€ λ˜μ—ˆλ‹€κ³  ν•΄μ„œ λ ˆμ½”λ“œλ₯Ό κ³„μ†ν•΄μ„œ consume ν•˜λŠ”κ²ƒμ΄ μ•„λ‹ˆλ‹€.
    • μ‹€μ œλ‘œ 이벀트 둜그λ₯Ό fetch ν•˜λŠ” 일은 poll() λ©”μ„œλ“œμ—μ„œ μˆ˜ν–‰λœλ‹€
    • 결과둜 ConsumerRecords λ₯Ό λ°˜ν™˜ν•œλ‹€. μ—¬λŸ¬κ°œμ˜ 토픽에 λŒ€ν•΄ consume 이 κ°€λŠ₯ν•˜κΈ° λ•Œλ¬Έμ— record μžμ²΄λ„ μ—¬λŸ¬κ°œλ‹€.
    • timeout 에 λŒ€ν•œ Duration 을 2초둜 μ§€μ •ν•œλ‹€
  • 주석 4.
    • νŠΉμ • 토픽에 λ©”μ‹œμ§€κ°€ 잘 μ†ŒλΉ„λ˜μ—ˆλŠ”μ§€ κ²€μ¦ν•˜λŠ” 단언문이닀. μ—­μ‹œ 헬퍼 클래슀이며 μžμ„Έν•œ μ½”λ“œλŠ” github μ—μ„œ 확인할 수 μžˆλ‹€

 

test 2. 컨슈머의 multi topic consume ν…ŒμŠ€νŠΈ

 

μ•žμ„  ν…ŒμŠ€νŠΈμ—μ„œ subscribe() λ©”μ„œλ“œλ₯Ό ν†΅ν•΄μ„œ μ—¬λŸ¬κ°œμ˜ 토픽에 λŒ€ν•΄ μ†ŒλΉ„λ₯Ό ν•  수 μžˆλ‹€κ³  ν•˜μ˜€λ‹€.

 

μ‹€μ œλ‘œ ν…ŒμŠ€νŠΈν•΄λ³΄μž

 

 

helper 클래슀λ₯Ό 톡해 2개의 μ„œλ‘œ λ‹€λ₯Έ 토픽에 λ©”μ‹œμ§€λ₯Ό μˆ˜ν–‰ν–ˆλ‹€.

 

KafkaConsumer μΈμŠ€ν„΄μŠ€μ— μ„œλ‘œ λ‹€λ₯Έ μ•žμ„  2가지 토픽을 subscribe ν•˜μ˜€κ³  λ§ˆμ°¬κ°€μ§€λ‘œ poll() 을 μˆ˜ν–‰ν–ˆλ‹€.

 

그리고 Records 단언을 톡해 2개의 λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λŠ”μ§€ ν™•μΈν•˜μ˜€κ³ , μ„±κ³΅ν•˜μ˜€λ‹€

 

kafka 의 offset 과 commit

 

μ•žμ„  μ—¬λŸ¬ μ‹œκ°„μ— κ±Έμ³μ„œ κ³„μ†ν•΄μ„œ κ°•μ‘°ν•΄μ˜¨ μ€‘μš”ν•œ λ‚΄μš©μ΄ μžˆλ‹€.

 

μΉ΄ν”„μΉ΄λŠ” μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ λ°œμƒν•˜λŠ” μ΄λ²€νŠΈμ— λŒ€ν•œ 둜그 μ‹œμŠ€ν…œμ΄λ‹€.

 

λ‘œκ·ΈλŠ” μ‹œμŠ€ν…œ λ‚΄μ—μ„œ μΌμ–΄λ‚œ 일련의 사건듀이닀.

 

μš°λ¦¬λŠ” 일반적으둜 λ°œμƒν•œ λ‘œκ·Έλ“€ 사이에 μ–΄λ–€ μƒˆλ‘œμš΄ 둜그λ₯Ό μΆ”κ°€ν•˜λ €ν•˜μ§€ μ•ŠλŠ”λ‹€. 단지 λ‘œκ·ΈλŠ” κ³„μ†ν•΄μ„œ μŒ“μΌ 뿐이닀.

 

λ˜ν•œ μš°λ¦¬λŠ” νŠΉμ • μ‹œμ μ— λ°œμƒν•œ λ‘œκ·Έλ“€μ„ μ–Έμ œλ“  λ‹€μ‹œ μ‘°νšŒν•  수 μžˆλ‹€.

 

μΉ΄ν”„μΉ΄λŠ” 둜그의 append only 와 seeking by offset νŠΉμ„±μ„ λ”°λ₯΄λŠ” log based architecture λ₯Ό 지ν–₯ν•œλ‹€.

 

μ•žμ—μ„œλ„ μ„€λͺ…ν–ˆλ“―이 μΉ΄ν”„μΉ΄λŠ” λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λ©΄, λ‹€λ₯Έ MQ 와 달리 λ©”μ‹œμ§€λ₯Ό μ‚­μ œ(pop) ν•˜μ§€ μ•ŠλŠ”λ‹€.

 

그럼 consumer λŠ” λ„λŒ€μ²΄ μ–΄λ–»κ²Œ μžμ‹ μ΄ κ°€μ Έκ°€μ•Όν•  message 에 λŒ€ν•΄μ„œ μ•Œ 수 μžˆμ„κΉŒ?

 

정닡은 λ°”λ‘œ offset κ³Ό commit 에 μžˆλ‹€.

 

offset κ³Ό commit

 

kafka μ—μ„œ producer κ°€ 토픽에 μ €μž₯ν•œ λ©”μ‹œμ§€λ“€μ€ linear ν•˜κ²Œ μŒ“μΈλ‹€.

 

그럼 λ©”μ‹œμ§€λ“€μ€ 각각의 μœ„μΉ˜ 정보(offset)을 κ°–κ²Œ λ˜λŠ”λ°, μΉ΄ν”„μΉ΄ μ»¨μŠˆλ¨ΈλŠ” 이 μœ„μΉ˜ 정보(offset)λ₯Ό ν†΅ν•΄μ„œ μ–΄λ””κΉŒμ§€ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜μ˜€λŠ”μ§€μ•Œ 수 μžˆλ‹€.

 

μ΄λŸ¬ν•œ μœ„μΉ˜ 정보(offset)λŠ” kafka λ‚΄λΆ€μ˜ __consumer_offsets λΌλŠ” νŠΉλ³„ν•œ 토픽에 μ €μž₯ν•œλ‹€.

 

그리고 μ»¨μŠˆλ¨Έκ°€ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜κ³  λ‚˜λ©΄ 본인이 μ²˜λ¦¬ν•œ μœ„μΉ˜ 정보(offset)λ₯Ό μ—…λ°μ΄νŠΈ (commit) ν•œλ‹€.

 

commit μ΄λΌλŠ” 과정을 거치고 λ‚˜λ©΄ λ‹€μŒ poll νƒ€μž„μ— μƒˆλ‘œ update 된 offset λΆ€ν„° μΌμ •λŸ‰λ§ŒνΌ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜κ²Œ 되고, μ•žμ„  과정을 λ°˜λ³΅ν•˜κ²Œ λœλ‹€.

μ™œ μ΄λ ‡κ²Œ offset κ³Ό commit μ΄λΌλŠ” κ°œλ…μ„ λ§Œλ“€μ—ˆμ„κΉŒ?

 

offset κ³Ό commit 을 μ΄μš©ν•œλ‹€λ©΄ λ‘κ°€μ§€μ˜ 큰 μž₯점이 μžˆλ‹€.

  1. λ©”μ‹œμ§€ replay
  2. μ•ˆμ •μ„±

 

kafka κ°€ λ‹€λ₯Έ MQ 와 λ‹€λ₯Έ μ΄μœ λŠ” message λ₯Ό pop 을 ν•˜μ§€ μ•ŠκΈ° λ•Œλ¬Έμ΄λΌκ³  ν–ˆλ‹€.

 

message λ₯Ό pop ν•˜μ§€ μ•Šκ³  λ©”μ‹œμ§€ 쀑볡을 막기 μœ„ν•΄μ„œλŠ” offset κ³Ό 같은 μœ„μΉ˜μ •λ³΄κ°€ ν•„μš”ν•˜λ‹€.

 

κ·Έλž˜μ•Ό νŠΉμ • μ»¨μŠˆλ¨Έκ°€ 이미 μ†ŒλΉ„ν•œ λ©”μ‹œμ§€μ— λŒ€ν•΄μ„œ μ€‘λ³΅ν•΄μ„œ μ†ŒλΉ„ν•˜μ§€ μ•Šλ„λ‘ ν•œλ‹€.

 

μ΄λ ‡κ²Œ offset 을 μ €μž₯ν•˜κ³  κ΄€λ¦¬ν•˜λ©΄μ„œ μž₯μ•  상황에 λŒ€ν•΄ 더 μ•ˆμ •μ μœΌλ‘œ μš΄μ˜ν•  수 있게 λœλ‹€.

 

μž₯μ• κ°€ λ°œμƒν•˜κ³  μž₯μ• κ°€ λ³΅κ΅¬λ˜μ—ˆλ‹€λ©΄ λ§ˆμ§€λ§‰μœΌλ‘œ commit ν•œ offset λΆ€ν„° λ‹€μ‹œ 읽으면 되기 λ•Œλ¬Έμ΄λ‹€

commit 의 μ’…λ₯˜ 2가지

 

μ΄λŸ¬ν•œ commit 은 μžλ™/μˆ˜λ™ commit 이 μ‘΄μž¬ν•œλ‹€.

 

auto commit

 

auto commit 은 λ©”μ‹œμ§€λ₯Ό κ°€μ Έμ˜¬ λ•Œλ§ˆλ‹€ commit 을 ν•˜λŠ” 방법이닀.

 

일정 μ£ΌκΈ°λ§ˆλ‹€ ν˜Ήμ€ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•œ 후에 μžλ™μœΌλ‘œ offset 을 commit ν•˜λŠ” 방법이닀.

 

KafkaConsumer μ—μ„œλŠ” poll() 을 ν˜ΈμΆœν•  λ•Œ κ°€μž₯ λ§ˆμ§€λ§‰ offset 을 commit ν•œλ‹€.

 

이 λ°©λ²•μ˜ 경우, μ˜€ν”„μ…‹μ„ κ΄€λ¦¬ν•˜λŠ” κ°€μž₯ νŽΈλ¦¬ν•œ λ°©λ²•μœΌλ‘œ μ•Œλ €μ Έμžˆλ‹€.

 

ν•˜μ§€λ§Œ λ©”μ‹œμ§€μ˜ 쀑볡 처리 λ¬Έμ œκ°€ λ°œμƒν•  수 μžˆλŠ”λ°, λ§Œμ•½ λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν–ˆμ§€λ§Œ poll 을 ν˜ΈμΆœν•˜κΈ° 전에 μž₯μ• κ°€ λ°œμƒν•œλ‹€λ©΄ offset update κ°€ λ˜μ§€ μ•Šμ•˜μœΌλ―€λ‘œ λ©”μ‹œμ§€κ°€ μ€‘λ³΅ν•΄μ„œ μ†ŒλΉ„λœλ‹€.

 

manual commit

 

manual commit 은 λ©”μ‹œμ§€ μ²˜λ¦¬κ°€ μ™„λ£Œλ  λ•ŒκΉŒμ§€ commit ν•˜μ§€ μ•ŠλŠ”λ‹€.

 

일반적인 acknowledgement 와 λ™μΌν•˜κ²Œ λ™μž‘ν•œλ‹€.

 

μ»¨μŠˆλ¨Έκ°€ λ©”μ‹œμ§€λ₯Ό μ²˜λ¦¬ν•œ λ’€, λͺ…μ‹œμ μœΌλ‘œ 처리된 offset 을 commit ν•˜λŠ” 방식이닀

 

이 λ°©λ²•μ˜ 경우, λ©”μ‹œμ§€ μ†ŒλΉ„ μžμ²΄λŠ” 느릴 수 μžˆμœΌλ‚˜ λ©”μ‹œμ§€ 처리의 μ •ν™•μ„±κ³Ό 무손싀을 보μž₯ν•˜λŠ” 방법이닀.

 

이제 ν…ŒμŠ€νŠΈλ₯Ό 톡해 μ•Œμ•„λ³΄μž

 

test 3. auto commit test

 

μžλ™ 컀밋을 ν…ŒμŠ€νŠΈν•΄λ³΄μž

 

ν…ŒμŠ€νŠΈ μ„€λͺ…

  • 주석 1
    • ν•œλ²ˆ λ©”μ‹œμ§€λ₯Ό poll ν•  λ•Œ 3개의 λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„ν•˜λ„λ‘ μ„€μ •ν•œλ‹€
  • 주석 2
    • auto commit mode λ₯Ό ν™œμ„±ν™”ν•œλ‹€
  • 주석 3
    • λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•œλ‹€
    • 3개 λ‹¨μœ„λ‘œ μ•ŒνŒŒλ²³, 이λͺ¨μ§€ μˆœμ„œλ‘œ λ°œν–‰ν•œλ‹€.
  • 주석 4
    • 첫번째 poll
    • 3개 λ‹¨μœ„μ΄λ―€λ‘œ μ•ŒνŒŒλ²³λ§Œ μ†ŒλΉ„λœλ‹€
  • 주석 5
    • λ‘λ²ˆμ§Έ poll
    • 3개 λ‹¨μœ„μ΄λ―€λ‘œ 이λͺ¨μ§€κ°€ μ†ŒλΉ„λœλ‹€
β˜‘οΈ expect: μžλ™ 컀밋 λͺ¨λ“œμ΄λ―€λ‘œ 두 번째 poll() ν˜ΈμΆœμ‹œ 이λͺ¨μ§€ λ©”μ‹œμ§€κ°€ μ†ŒλΉ„λœλ‹€

ν…ŒμŠ€νŠΈ κ²°κ³Ό

ν…ŒμŠ€νŠΈλŠ” μ„±κ³΅ν•œλ‹€

ν•™μŠ΅ ν…ŒμŠ€νŠΈ 정리

  • βœ… Consumer λŠ” Record λΌλŠ” λ‹¨μœ„λ‘œ μΉ΄ν”„μΉ΄ 브둜컀둜 λ©”μ‹œμ§€λ₯Ό μ†ŒλΉ„
  • βœ… μ‹€μ œ λ©”μ‹œμ§€ μ†ŒλΉ„λŠ” poll() 을 톡해 μˆ˜ν–‰λ¨
  • βœ… subscribe() λ₯Ό 톡해 μ—¬λŸ¬ 토픽에 λŒ€ν•΄ 컨슈머 등둝을 ν•  수 있음
  • βœ… consumer λŠ” commit mode λ₯Ό 톡해 μžλ™/μˆ˜λ™μœΌλ‘œ commit ν•  수 있음