[νμ΅ ν μ€νΈλ‘ λ°°μ보λ kafka] 5. νμ΅ ν μ€νΈλ‘ μΉ΄νμΉ΄ 컨μλ¨Έ(kafka consumer) μμ보기
ν΄λΉ μ리μ¦μμ μ 곡νλ λͺ¨λ μμ€μ½λλ github repository μμ μ 곡λ©λλ€. μμΈν μ½λμ ν μ€νΈ μΌμ΄μ€λ github repository μμ νμΈν΄μ£ΌμΈμ.
μ΄λ² [νμ΅ ν μ€νΈλ‘ λ°°μ보λ kafka] μ리μ¦λ μλ μμλλ‘ μ±ν°κ° ꡬμ±λκ³ , μλ¦¬μ¦ μΈλ‘ kafka κ΄λ ¨νμ¬ λμ± λ§μ νμ΅ μ 보λ kafka μ¬ν μΈμ μμ νμΈν μ μμ΅λλ€.
μλ¦¬μ¦ λͺ©μ°¨
- μ리μ¦λ₯Ό μμνλ©°
- kafka λΉ λ₯΄κ² νμ΄λ³΄κ³ μλ체νκΈ°
- kafka 컨μ κ³Ό μ©μ΄ μ 리
- νμ΅ν μ€νΈ μ€λΉνκΈ°
- νμ΅ ν μ€νΈλ‘ kafka producer μμ보기
- νμ΅ ν μ€νΈλ‘ kafka consumer μμ보기 <-- νμ¬ κΈ
- νμ΅ ν μ€νΈλ‘ partitioning μμ보기
- νμ΅ ν μ€νΈλ‘ consumer group κ³Ό rebalancing μμ보기
νμ΅μ λ¨κ³λ³ μμλ‘ λͺ©μ°¨κ° ꡬμ±λμ΄μμΌλ―λ‘ μ νλμ΄μΌ νλ μ±ν°κ° μ‘΄μ¬ν©λλ€
μμ μ°λ¦¬λ λ©μμ§λ₯Ό μΉ΄νμΉ΄ λΈλ‘μ»€λ‘ λ°ννλ producer μ java client μ λν΄μ μμ보μλ€.
μ΄λ² μκ°μλ μΉ΄νμΉ΄ λΈλ‘컀μ μ μ₯λ λ‘κ·Έ λ©μμ§λ€μ consume νλ consumer java client μΈ KafkaConsumer μ λν΄μ νμ΅ν μ€νΈλ₯Ό ν΅ν΄ μμλ³Ό μμ μ΄λ€
KafkaConsumer
KafkaConsumer
ν΄λμ€λ μ§λ Producer μ λ§μ°¬κ°μ§λ‘ μΉ΄νμΉ΄ λΈλ‘컀μ νΉμ ν ν½μ ν΄λΉνλ μ΄λ²€νΈ λ‘κ·Έ Record λ₯Ό consume νλ μν μ μννλ€
Kafka μ consumer λ Connection Pooling κ³Ό λ€νΈμν¬ νλ‘ν μ½μ κ΄λ¦¬νλ€.
μΉ΄νμΉ΄λ λ€λ₯Έ μΌλ°μ μΈ λ©μμ§ νμ λ€λ₯΄κ² λ©μμ§λ₯Ό μλΉνλλΌλ ν΄λΉ λ©μμ§λ₯Ό μμ νμ§ μλλ€.
μμ¦ λ§μ΄ μ¬μ©νλ λ€λ₯Έ λ©μμ§ νμΈ sqs λ₯Ό 보면, λ©μμ§ ack λ₯Ό κ±°μΉκ³ λ€λ©΄ DeletionPolicy λ₯Ό μ€μ ν μ μλλ°, μΉ΄νμΉ΄λ μμ κ° μμΌλ κ΄λ ¨ μμ μ μ± μ΄ μλ€.
κ·Έλμ μΈμ λ μ§, λꡬλ μ§ topic μ λν event log λ₯Ό replay ν μ μλ€.
μ΄ μ»¨μ μ΄ λ°λ‘ λ€λ₯Έ λ©μμ§ λ―Έλ€μ¨μ΄λ€κ³Ό μΉ΄νμΉ΄κ° κ΅¬λΆλλ μ€μν μ΄μ λΌκ³ λ³Ό μ μλ€.
KafkaConsumer μΈμ€ν΄μ€ μμ±νκΈ°
μμ±μμ parameter λ‘ μ λ¬λ key-value ννμ properties λ₯Ό ν΅ν΄μ client configuration μ λ±λ‘νλ€
ν΄λΉ configuration μ μμΈν μ¬νκ³Ό νΉμ μ€μ λ€μ confluent.io/kafka config μμ νμΈν μ μλ€
μμ νλ‘λμ ν μ€νΈμ λ§μ°¬κ°μ§λ‘ μ¬λ¬ ν μ€νΈμμ μ¬μ©ν μ μλλ‘ Test Hepler ν΄λμ€λ₯Ό ν΅ν΄ μ½κ² 컨μλ¨Έ μΈμ€ν΄μ€λ₯Ό μμ±ν μ μλλ‘ λΆλ¦¬νμλ€
public class KafkaConsumerTestHelper {
public static KafkaConsumer<String, String> simpleConsumer() {
Map<String, Object> props = Map.of(
"bootstrap.servers", "localhost:9092",
"group.id", "my-consumer",
"enable.auto.commit", "true",
"auto.offset.reset", "earliest",
// kafka λ‘ μλΉν message μ s/d ν΄λμ€ μ€μ
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"
);
return new KafkaConsumer<>(props);
}
}
μ¬κΈ°μ λμ¬κ²¨λ³Ό μ€μ μ enable.auto.commit
κ³Ό auto.offset.reset
μΈλ° μ΄λ μΆν 컨μλ¨Έμ commit κ³Ό offset ννΈμμ λ€μ μ΄μΌκΈ°λ₯Ό νλλ‘ νκ² λ€. (stay tuned!!)
ν μ€νΈ 1. KafkaConsumer λ₯Ό ν΅ν λ©μμ§ μλΉ ν μ€νΈ
μ΄λ² ν μ€νΈμμλ λ°νλ λ©μμ§λ₯Ό KafkaConsumer λ₯Ό ν΅ν΄ μλΉν΄λ³΄λλ‘ νκ² λ€
ν μ€νΈ λΆμ
- μ£Όμ 1.
- ν ν½μ λ©μμ§λ₯Ό λ°ννλ€
- consumer ν μ€νΈλ₯Ό μ½κ² νκΈ° μν΄ ν μ€νΈμ© ν ν½μ λ©μμ§λ₯Ό produce νλ Helper ν΄λμ€κ° μ¬μ©λμλ€. μμΈν μ½λλ github μμ νμΈν μ μλ€
- μ£Όμ 2.
- μΉ΄νμΉ΄ 컨μλ¨Έ μΈμ€ν΄μ€λ₯Ό νΉμ ν ν½μ subscribe μν¨λ€.
- μΉ΄νμΉ΄ 컨μλ¨Έ μΈμ€ν΄μ€λ νλ²μ μ¬λ¬κ°μ ν ν½μ subscribe ν μ μλ€
- μ£Όμ 3.
- subscribe κ° λμλ€κ³ ν΄μ λ μ½λλ₯Ό κ³μν΄μ consume νλκ²μ΄ μλλ€.
- μ€μ λ‘ μ΄λ²€νΈ λ‘κ·Έλ₯Ό fetch νλ μΌμ
poll()
λ©μλμμ μνλλ€ - κ²°κ³Όλ‘ ConsumerRecords λ₯Ό λ°ννλ€. μ¬λ¬κ°μ ν ν½μ λν΄ consume μ΄ κ°λ₯νκΈ° λλ¬Έμ record μ체λ μ¬λ¬κ°λ€.
- timeout μ λν Duration μ 2μ΄λ‘ μ§μ νλ€
- μ£Όμ 4.
- νΉμ ν ν½μ λ©μμ§κ° μ μλΉλμλμ§ κ²μ¦νλ λ¨μΈλ¬Έμ΄λ€. μμ ν¬νΌ ν΄λμ€μ΄λ©° μμΈν μ½λλ github μμ νμΈν μ μλ€
test 2. 컨μλ¨Έμ multi topic consume ν μ€νΈ
μμ ν
μ€νΈμμ subscribe()
λ©μλλ₯Ό ν΅ν΄μ μ¬λ¬κ°μ ν ν½μ λν΄ μλΉλ₯Ό ν μ μλ€κ³ νμλ€.
μ€μ λ‘ ν μ€νΈν΄λ³΄μ
helper ν΄λμ€λ₯Ό ν΅ν΄ 2κ°μ μλ‘ λ€λ₯Έ ν ν½μ λ©μμ§λ₯Ό μννλ€.
KafkaConsumer μΈμ€ν΄μ€μ μλ‘ λ€λ₯Έ μμ 2κ°μ§ ν ν½μ subscribe νμκ³ λ§μ°¬κ°μ§λ‘ poll()
μ μννλ€.
κ·Έλ¦¬κ³ Records λ¨μΈμ ν΅ν΄ 2κ°μ λ©μμ§λ₯Ό μλΉνλμ§ νμΈνμκ³ , μ±κ³΅νμλ€
kafka μ offset κ³Ό commit
μμ μ¬λ¬ μκ°μ κ±Έμ³μ κ³μν΄μ κ°μ‘°ν΄μ¨ μ€μν λ΄μ©μ΄ μλ€.
μΉ΄νμΉ΄λ μ ν리μΌμ΄μ μμ λ°μνλ μ΄λ²€νΈμ λν λ‘κ·Έ μμ€ν μ΄λ€.
λ‘κ·Έλ μμ€ν λ΄μμ μΌμ΄λ μΌλ ¨μ μ¬κ±΄λ€μ΄λ€.
μ°λ¦¬λ μΌλ°μ μΌλ‘ λ°μν λ‘κ·Έλ€ μ¬μ΄μ μ΄λ€ μλ‘μ΄ λ‘κ·Έλ₯Ό μΆκ°νλ €νμ§ μλλ€. λ¨μ§ λ‘κ·Έλ κ³μν΄μ μμΌ λΏμ΄λ€.
λν μ°λ¦¬λ νΉμ μμ μ λ°μν λ‘κ·Έλ€μ μΈμ λ λ€μ μ‘°νν μ μλ€.
μΉ΄νμΉ΄λ λ‘κ·Έμ append only μ seeking by offset νΉμ±μ λ°λ₯΄λ log based architecture
λ₯Ό μ§ν₯νλ€.
μμμλ μ€λͺ νλ―μ΄ μΉ΄νμΉ΄λ λ©μμ§λ₯Ό μλΉνλ©΄, λ€λ₯Έ MQ μ λ¬λ¦¬ λ©μμ§λ₯Ό μμ (pop) νμ§ μλλ€.
κ·ΈλΌ consumer λ λλ체 μ΄λ»κ² μμ μ΄ κ°μ Έκ°μΌν message μ λν΄μ μ μ μμκΉ?
μ λ΅μ λ°λ‘ offset κ³Ό commit μ μλ€.
offset κ³Ό commit
kafka μμ producer κ° ν ν½μ μ μ₯ν λ©μμ§λ€μ linear νκ² μμΈλ€.
κ·ΈλΌ λ©μμ§λ€μ κ°κ°μ μμΉ μ 보(offset)μ κ°κ² λλλ°, μΉ΄νμΉ΄ 컨μλ¨Έλ μ΄ μμΉ μ 보(offset)λ₯Ό ν΅ν΄μ μ΄λκΉμ§ λ©μμ§λ₯Ό μλΉνμλμ§μ μ μλ€.
μ΄λ¬ν μμΉ μ 보(offset)λ kafka λ΄λΆμ __consumer_offsets
λΌλ νΉλ³ν ν ν½μ μ μ₯νλ€.
κ·Έλ¦¬κ³ μ»¨μλ¨Έκ° λ©μμ§λ₯Ό μλΉνκ³ λλ©΄ λ³ΈμΈμ΄ μ²λ¦¬ν μμΉ μ 보(offset)λ₯Ό μ λ°μ΄νΈ (commit) νλ€.
commit μ΄λΌλ κ³Όμ μ κ±°μΉκ³ λλ©΄ λ€μ poll νμμ μλ‘ update λ offset λΆν° μΌμ λλ§νΌ λ©μμ§λ₯Ό μλΉνκ² λκ³ , μμ κ³Όμ μ λ°λ³΅νκ² λλ€.
μ μ΄λ κ² offset κ³Ό commit μ΄λΌλ κ°λ μ λ§λ€μμκΉ?
offset κ³Ό commit μ μ΄μ©νλ€λ©΄ λκ°μ§μ ν° μ₯μ μ΄ μλ€.
- λ©μμ§ replay
- μμ μ±
kafka κ° λ€λ₯Έ MQ μ λ€λ₯Έ μ΄μ λ message λ₯Ό pop μ νμ§ μκΈ° λλ¬Έμ΄λΌκ³ νλ€.
message λ₯Ό pop νμ§ μκ³ λ©μμ§ μ€λ³΅μ λ§κΈ° μν΄μλ offset κ³Ό κ°μ μμΉμ λ³΄κ° νμνλ€.
κ·ΈλμΌ νΉμ 컨μλ¨Έκ° μ΄λ―Έ μλΉν λ©μμ§μ λν΄μ μ€λ³΅ν΄μ μλΉνμ§ μλλ‘ νλ€.
μ΄λ κ² offset μ μ μ₯νκ³ κ΄λ¦¬νλ©΄μ μ₯μ μν©μ λν΄ λ μμ μ μΌλ‘ μ΄μν μ μκ² λλ€.
μ₯μ κ° λ°μνκ³ μ₯μ κ° λ³΅κ΅¬λμλ€λ©΄ λ§μ§λ§μΌλ‘ commit ν offset λΆν° λ€μ μ½μΌλ©΄ λκΈ° λλ¬Έμ΄λ€
commit μ μ’ λ₯ 2κ°μ§
μ΄λ¬ν commit μ μλ/μλ commit μ΄ μ‘΄μ¬νλ€.
auto commit
auto commit μ λ©μμ§λ₯Ό κ°μ Έμ¬ λλ§λ€ commit μ νλ λ°©λ²μ΄λ€.
μΌμ μ£ΌκΈ°λ§λ€ νΉμ λ©μμ§λ₯Ό μλΉν νμ μλμΌλ‘ offset μ commit νλ λ°©λ²μ΄λ€.
KafkaConsumer
μμλ poll()
μ νΈμΆν λ κ°μ₯ λ§μ§λ§ offset μ commit νλ€.
μ΄ λ°©λ²μ κ²½μ°, μ€νμ μ κ΄λ¦¬νλ κ°μ₯ νΈλ¦¬ν λ°©λ²μΌλ‘ μλ €μ Έμλ€.
νμ§λ§ λ©μμ§μ μ€λ³΅ μ²λ¦¬ λ¬Έμ κ° λ°μν μ μλλ°, λ§μ½ λ©μμ§λ₯Ό μλΉνμ§λ§ poll μ νΈμΆνκΈ° μ μ μ₯μ κ° λ°μνλ€λ©΄ offset update κ° λμ§ μμμΌλ―λ‘ λ©μμ§κ° μ€λ³΅ν΄μ μλΉλλ€.
manual commit
manual commit μ λ©μμ§ μ²λ¦¬κ° μλ£λ λκΉμ§ commit νμ§ μλλ€.
μΌλ°μ μΈ acknowledgement μ λμΌνκ² λμνλ€.
컨μλ¨Έκ° λ©μμ§λ₯Ό μ²λ¦¬ν λ€, λͺ μμ μΌλ‘ μ²λ¦¬λ offset μ commit νλ λ°©μμ΄λ€
μ΄ λ°©λ²μ κ²½μ°, λ©μμ§ μλΉ μ체λ λ릴 μ μμΌλ λ©μμ§ μ²λ¦¬μ μ νμ±κ³Ό 무μμ€μ 보μ₯νλ λ°©λ²μ΄λ€.
μ΄μ ν μ€νΈλ₯Ό ν΅ν΄ μμ보μ
test 3. auto commit test
μλ 컀λ°μ ν μ€νΈν΄λ³΄μ
ν μ€νΈ μ€λͺ
- μ£Όμ 1
- νλ² λ©μμ§λ₯Ό poll ν λ 3κ°μ λ©μμ§λ₯Ό μλΉνλλ‘ μ€μ νλ€
- μ£Όμ 2
- auto commit mode λ₯Ό νμ±ννλ€
- μ£Όμ 3
- λ©μμ§λ₯Ό λ°ννλ€
- 3κ° λ¨μλ‘ μνλ²³, μ΄λͺ¨μ§ μμλ‘ λ°ννλ€.
- μ£Όμ 4
- 첫λ²μ§Έ poll
- 3κ° λ¨μμ΄λ―λ‘ μνλ²³λ§ μλΉλλ€
- μ£Όμ 5
- λλ²μ§Έ poll
- 3κ° λ¨μμ΄λ―λ‘ μ΄λͺ¨μ§κ° μλΉλλ€
βοΈ expect: μλ μ»€λ° λͺ¨λμ΄λ―λ‘ λ λ²μ§Έ poll() νΈμΆμ μ΄λͺ¨μ§ λ©μμ§κ° μλΉλλ€
ν μ€νΈ κ²°κ³Ό
ν μ€νΈλ μ±κ³΅νλ€
νμ΅ ν μ€νΈ μ 리
- β Consumer λ Record λΌλ λ¨μλ‘ μΉ΄νμΉ΄ λΈλ‘μ»€λ‘ λ©μμ§λ₯Ό μλΉ
- β
μ€μ λ©μμ§ μλΉλ
poll()
μ ν΅ν΄ μνλ¨ - β
subscribe()
λ₯Ό ν΅ν΄ μ¬λ¬ ν ν½μ λν΄ μ»¨μλ¨Έ λ±λ‘μ ν μ μμ - β consumer λ commit mode λ₯Ό ν΅ν΄ μλ/μλμΌλ‘ commit ν μ μμ