ν΄λΉ μ리μ¦μμ μ 곡νλ λͺ¨λ μμ€μ½λλ github repository μμ μ 곡λ©λλ€. μμΈν μ½λμ ν μ€νΈ μΌμ΄μ€λ github repository μμ νμΈν΄μ£ΌμΈμ.
μ΄λ² [νμ΅ ν μ€νΈλ‘ λ°°μ보λ kafka] μ리μ¦λ μλ μμλλ‘ μ±ν°κ° ꡬμ±λκ³ , μλ¦¬μ¦ μΈλ‘ kafka κ΄λ ¨νμ¬ λμ± λ§μ νμ΅ μ 보λ kafka μ¬ν μΈμ μμ νμΈν μ μμ΅λλ€.
μλ¦¬μ¦ λͺ©μ°¨
- μ리μ¦λ₯Ό μμνλ©°
- kafka λΉ λ₯΄κ² νμ΄λ³΄κ³ μλ체νκΈ°
- kafka 컨μ κ³Ό μ©μ΄ μ 리
- νμ΅ν μ€νΈ μ€λΉνκΈ°
- νμ΅ ν μ€νΈλ‘ kafka producer μμ보기 <-- νμ¬ κΈ
- νμ΅ ν μ€νΈλ‘ kafka consumer μμ보기
- νμ΅ ν μ€νΈλ‘ partitioning μμ보기
- νμ΅ ν μ€νΈλ‘ consumer group κ³Ό rebalancing μμ보기
νμ΅μ λ¨κ³λ³ μμλ‘ λͺ©μ°¨κ° ꡬμ±λμ΄μμΌλ―λ‘ μ νλμ΄μΌ νλ μ±ν°κ° μ‘΄μ¬ν©λλ€
μ΄λ²μκ°μλ λλμ΄ μΉ΄νμΉ΄ λΈλ‘μ»€λ‘ λ©μμ§λ₯Ό λ°νν΄λ³΄λ μκ°μ΄λ€
KafkaProducer
java μμ μΉ΄νμΉ΄ ν΄λ¬μ€ν°λ‘ λ©μμ§λ₯Ό λ°νν λ μ¬μ©νλ java client λ KafkaProducer
μ΄λ€
KafkaProducer λ λ©μμ§ λ°νμ λΉλκΈ°λ‘ μννκΈ° λλ¬Έμ λ§μ μμ Record μ μ‘μ μ§μνλ€.
KafkaProducer μΈμ€ν΄μ€ μμ±νκΈ°
μμ±μμ parameter λ‘ μ λ¬λ key-value ννμ properties λ₯Ό ν΅ν΄μ client configuration μ λ±λ‘νλ€
ν΄λΉ configuration μ μμΈν μ¬νκ³Ό νΉμ μ€μ λ€μ confluent.io/kafka config μμ νμΈν μ μλ€
λλ μμΌλ‘ λ§μ ν μ€νΈμμ Producer μΈμ€ν΄μ€λ₯Ό μμ±ν μμ μ΄λΌμ Helper ν΄λμ€λ‘ producer μμ± λ‘μ§μ λΆλ¦¬μμΌ°λ€
public class KafkaProducerTestHelper {
public static KafkaProducer<String, String> getSimpleProducer() {
Map<String, Object> props = Map.of(
// bootstrap server μ€μ
"bootstrap.servers", "localhost:9092",
// kafka λ‘ μ μ‘ν message μ s/d ν΄λμ€ μ€μ
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer"
);
return new KafkaProducer<>(props);
}
}
κ·Έλ¦¬κ³ κ°κ°μ ν μ€νΈ μΌμ΄μ€μμλ Producer μ μΈμ€ν΄μ€λ₯Ό λ€μκ³Ό κ°μ΄ λ§λ€μλ€
μ΄λ¬ν λ°©λ²μ ν΅ν΄ μμΌλ‘ μ§νν ν μ€νΈμ λν΄μ νλ‘λμ μΈμ€ν΄μ€λ₯Ό μμ±ν μμ μ΄λ€
test 1 - KafkaProducer λ₯Ό μ΄μ©ν λ©μμ§ λ°ν ν μ€νΈ
μ΄λ² ν μ€νΈμμλ kafka producer λ₯Ό μ΄μ©ν΄μ λ©μμ§λ₯Ό λ°νν΄λ³΄λ €νλ€.
ν μ€νΈ μ½λ λΆμ
- μ£Όμ 1. λ°νν λ©μμ§λ₯Ό μ μ
- μΌλ°μ μΌλ‘ μΉ΄νμΉ΄μ μ μ₯λ μ΄λ²€νΈ λ‘κ·Έλ Record λΌλ κ°λ μ λ°μ΄ν°κ° μ μ₯λλ€.
- 첫λ²μ§Έ νλΌλ―Έν°λ‘ topic μ λκΈ°κ³ λλ²μ§Έ νλΌλ―Έν°λ‘ message payload λ₯Ό λκΈ΄λ€
- μ£Όμ 2. μ€μ λ‘ producer λ₯Ό ν΅ν΄μ record λ₯Ό μΉ΄νμΉ΄ λΈλ‘μ»€λ‘ μ μ‘νλ€
- produce κ²°κ³Όλ‘ Future λ₯Ό λ°νλ°λλ€.
- μ£Όμ 3.
Future.get()
μ°μ°μ ν΅ν΄ λΉλκΈ° produce response λ₯Ό blocking νλ€ - μ£Όμ 4. produce κ²°κ³Όλ‘ λ°νλ μ 보μ λν λ¨μΈλ¬Έ
μ 리
- β Producer λ Record λΌλ λ¨μλ‘ μΉ΄νμΉ΄ λΈλ‘μ»€λ‘ λ©μμ§λ₯Ό λ°ν
- β produce κ²°κ³Όλ₯Ό asynchronous νκ² μ²λ¦¬ κ°λ₯
- β κ²°κ³Όλ₯Ό blocking νμ¬ νμΈνμ§ μμΌλ©΄ μΌλΆ λ©μμ§μ λν΄ λλ½μ΄ λ°μν μ μμ
test 2. KafkaProducer μ‘΄μ¬νμ§ μλ topic μ λν΄μ λ©μμ§ λ°ν ν μ€νΈ
producer κ° λ©μμ§λ₯Ό λ°ννμλλ° λ§μ½ μ‘΄μ¬νμ§ μλ topic μ λ°ννλ€λ©΄ μ΄λ»κ² λ κΉ?
μ μ°λ¦¬λ μμ ν μ€νΈμμ ν ν½ μμ±μ νμ§ μμλ ν μ€νΈκ° μ±κ³΅νμκΉ?
κ·Έ μ λ΅μ λ°λ‘ Kafka μ κΈ°λ³Έ μ€μ μ μλ€.
λ€λ₯Έ λ§μ λΌμ΄λΈλ¬λ¦¬λ€κ³Ό λ§μ°¬κ°μ§λ‘ kafka μμ μ€ν μ΅μ (configuration) λ€μ νμλ‘ νλλ°, κΈ°λ³Έμ μΌλ‘ topic μμ± κΈ°λ₯μ΄ νμ±ν λμ΄μλ€.
κ·Έλμ ν ν½μ μ¬μ μ μμ±νμ§ μλλΌλ λ©μμ§λ₯Ό λ°νν λ μ‘΄μ¬νμ§ μλ ν ν½μ λ°νμ νλλΌλ λ¬Έμ κ° μμλ κ²μ΄λ€.
μ‘΄μ¬νμ§ μλ topic μ λ°νμ νλ ν
μ€νΈλ₯Ό μν΄ @EmbeddedKafka
μ€μ μ λ°κΏλ³΄μ
μ΄λ
Έν
μ΄μ
μ νλΌλ―Έν°λ‘ brokerProperties
μ "auto.create.topics.enable=false"
λ₯Ό λ겨μ ν ν½ μλ μμ± κΈ°λ₯μ λκ³ μλμ ν
μ€νΈλ₯Ό μ€νν΄λ³΄μ
κ·ΈλΌ μλμ κ°μ΄ UNKNOWN_TOPIC_OR_PARTITION
μλ¬κ° λ°μνλ€.
λλ Junit Extension μ ν΅ν΄ 2μ΄ timeout μ΄ λ°μνλ©΄ ν μ€νΈλ₯Ό ν΅κ³Όνλλ‘ μ΅μ€ν μ ν΄λμ€λ₯Ό λ§λ€μ΄μ μΆκ°ν΄μ€¬λλ°, κ΄λ ¨ μ½λλ₯Ό λ νμΈν΄λ³΄κ³ μΆμΌλ©΄ kafka νμ΅ν μ€νΈ github μμ νμΈν μ μλ€
μ 리
- β Producer λ μ‘΄μ¬νμ§ μλ topic μ λ©μμ§λ₯Ό λ°νν μ μμ
- β Kafka μ κΈ°λ³Έ μ€μ μ΄ μ‘΄μ¬νμ§ μλ topic μ μλ μμ±ν¨
- β broker props λ₯Ό ν΅ν΄ ν΄λΉ μ€μ μ λ μ μμ
test 3. λ©μμ§ λ°ν κ²°κ³Ό μ²λ¦¬ μ½λ°± λμ ν μ€νΈ
test 1 μμ producer κ° produce λ₯Ό μννλ©΄ λ©μμ§λ₯Ό λ°ννκ³ Future κ°μ²΄λ₯Ό λ°νλ°λλ€κ³ μ€λͺ νλ€.
λ§μ½ κ²°κ³Όλ₯Ό νμΈνκ³ μΆλ€λ©΄ Future.get() μ°μ°μ ν΅ν΄ μμ²μ λν΄ blocking νλ©΄ κ²°κ³Όλ₯Ό ν΄λΉ μ€λ λ λ΄μμ νμΈν μ μλ€.
νμ§λ§ λλμ λ©μμ§μ λν΄μ κ³μ block νμ¬ κ²°κ³Όλ₯Ό νμΈνλ κ²μ΄ μ΄λ€ μν©μμ μ¬λ°λ₯΄μ§ μμ μ νμΌ μ μλλ°, μ΄λ₯Ό μν΄ kafka λ callback μ΄λΌλ κ²μ μ 곡νλ€
μ΄λ² ν μ€νΈμμλ kafka producer κ° μ 곡νλ callback μ΄λΌλ κΈ°λ₯μ λν΄μ μμ보μ.
KafkaProducer λ λ©μμ§ λ°ν μμ²μ΄ μλ£λμμ λμ λν΄ κ²°κ³Ό μ²λ¦¬λ₯Ό μν callback μ λ±λ‘ν μ μλ€
callback μ org.apache.kafka.clients.producer.Callback
μΈν°νμ΄μ€λ₯Ό implements ν΄μΌνλ€
μ½λ°±μΈν°νμ΄μ€λ₯Ό ꡬννκ³ KafkaProducer.send() μ λλ²μ§Έ μΈμλ‘ ν΄λΉ μ½λ°±μ λκΈ°λ©΄ λ΄λΆμ μΌλ‘ kafka broker λ‘ record μ μ₯ μμ²μ λ§μ³€μ λ ν΄λΉ μ½λ°±μ μ€νμν¨λ€.
ν μ€νΈλ₯Ό μν μ½λ°±μ μλμ κ°μ΄ ꡬνν΄μ€¬λ€.
@Slf4j
public class SimpleProduceCallback implements Callback {
public static SimpleProduceCallback newOne() {
return new SimpleProduceCallback();
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (nonNull(metadata)) {
log.info("β
λ©μμ§ λ°ν μ±κ³΅, topic: {}, partition: {}, offset: {}, timestamp: {}",
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.timestamp());
} else {
log.error("β λ©μμ§ λ°ν μ€ν¨", exception);
}
}
}
onCompletion()
λ©μλμ 첫λ²μ§Έ μΈμλ‘ λ€μ΄μ€λ metatdata λ μ±κ³΅μ μΌλ‘ λ©μμ§λ₯Ό λ°ννμ λ μ μμ μΈ κ°μ΄ μΈν
λμ΄ μ λ¬λλ€
λ§μ½ μ μ‘μ μ€ν¨νμλ€λ©΄ -1 μ΄ μ€μ λ metatdata κ°μ²΄λ₯Ό λ°νλ°μ κ²μ΄λ€.
μλ ν μ€νΈλ₯Ό μ€νν΄λ³΄μ
λ§μ§λ§ λΌμΈμμ sut.close()
λ₯Ό μΆκ°ν΄μ€ μ΄μ λ ν
μ€νΈ μΌμ΄μ€κ° μ’
λ£λλ μμ μ΄ callback μ΄ νΈμΆλλ μμ λ³΄λ€ λ리기λλ¬Έμ ν
μ€νΈ κ²μ¦μ μν΄ blocking νμλ€.
κ²°κ³Όλ₯Ό νμΈνλ©΄ μ±κ³΅μ μΌλ‘ λ‘κ·Έκ° λμ€λ κ²μ νμΈν μ μλ€
μ 리
- β Producer λ λ μ½λ μ μ‘ μμ² κ²°κ³Όλ₯Ό 2κ°μ§ λ°©λ²μΌλ‘ νμΈν μ μλ€
- β Future.get μ°μ°μ ν΅ν΄ κ²°κ³Όλ₯Ό blocking νμ¬ νμΈν μ μλ€.
- β Callback μ λ±λ‘νμ¬ non-blocking λ°©μμΌλ‘ νμΈν μ μλ€
λκΈ