λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°
  • μž₯원읡 κΈ°μˆ λΈ”λ‘œκ·Έ
πŸ“š μ‹œλ¦¬μ¦ˆ/- ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ λ°°μ›Œλ³΄λŠ” kafka

[ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ λ°°μ›Œλ³΄λŠ” kafka] 4. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ μΉ΄ν”„μΉ΄ ν”„λ‘œλ“€μ„œ(kafka producer) μ•Œμ•„λ³΄κΈ°

by Wonit 2023. 9. 7.

ν•΄λ‹Ή μ‹œλ¦¬μ¦ˆμ—μ„œ μ œκ³΅ν•˜λŠ” λͺ¨λ“  μ†ŒμŠ€μ½”λ“œλŠ” github repository μ—μ„œ μ œκ³΅λ©λ‹ˆλ‹€. μžμ„Έν•œ μ½”λ“œμ™€ ν…ŒμŠ€νŠΈ μΌ€μ΄μŠ€λŠ” github repository μ—μ„œ ν™•μΈν•΄μ£Όμ„Έμš”.

 

GitHub - my-research/kafka: apache kafka docs & practical usages(with spring) that i composed

apache kafka docs & practical usages(with spring) that i composed - GitHub - my-research/kafka: apache kafka docs & practical usages(with spring) that i composed

github.com

이번 [ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ λ°°μ›Œλ³΄λŠ” kafka] μ‹œλ¦¬μ¦ˆλŠ” μ•„λž˜ μˆœμ„œλŒ€λ‘œ 챕터가 κ΅¬μ„±λ˜κ³ , μ‹œλ¦¬μ¦ˆ μ™Έλ‘œ kafka κ΄€λ ¨ν•˜μ—¬ λ”μš± λ§Žμ€ ν•™μŠ΅ μ •λ³΄λŠ” kafka 심화 μ„Έμ…˜ μ—μ„œ 확인할 수 μžˆμŠ΅λ‹ˆλ‹€.

μ‹œλ¦¬μ¦ˆ λͺ©μ°¨

  1. μ‹œλ¦¬μ¦ˆλ₯Ό μ‹œμž‘ν•˜λ©°
  2. kafka λΉ λ₯΄κ²Œ 훑어보고 μ•„λŠ”μ²΄ν•˜κΈ°
  3. kafka 컨셉과 μš©μ–΄ 정리
  4. ν•™μŠ΅ν…ŒμŠ€νŠΈ μ€€λΉ„ν•˜κΈ°
  5. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ kafka producer μ•Œμ•„λ³΄κΈ° <-- ν˜„μž¬ κΈ€
  6. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ kafka consumer μ•Œμ•„λ³΄κΈ°
  7. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ partitioning μ•Œμ•„λ³΄κΈ°
  8. ν•™μŠ΅ ν…ŒμŠ€νŠΈλ‘œ consumer group κ³Ό rebalancing μ•Œμ•„λ³΄κΈ°

ν•™μŠ΅μ˜ 단계별 μˆœμ„œλ‘œ λͺ©μ°¨κ°€ κ΅¬μ„±λ˜μ–΄μžˆμœΌλ―€λ‘œ μ„ ν–‰λ˜μ–΄μ•Ό ν•˜λŠ” 챕터가 μ‘΄μž¬ν•©λ‹ˆλ‹€


 

μ΄λ²ˆμ‹œκ°„μ—λŠ” λ“œλ””μ–΄ μΉ΄ν”„μΉ΄ 브둜컀둜 λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•΄λ³΄λŠ” μ‹œκ°„μ΄λ‹€

 

KafkaProducer

 

java μ—μ„œ μΉ΄ν”„μΉ΄ ν΄λŸ¬μŠ€ν„°λ‘œ λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•  λ•Œ μ‚¬μš©ν•˜λŠ” java client λŠ” KafkaProducer 이닀

 

KafkaProducer λŠ” λ©”μ‹œμ§€ λ°œν–‰μ„ λΉ„λ™κΈ°λ‘œ μˆ˜ν–‰ν•˜κΈ° λ•Œλ¬Έμ— λ§Žμ€ μ–‘μ˜ Record 전솑을 μ§€μ›ν•œλ‹€.

 

KafkaProducer μΈμŠ€ν„΄μŠ€ μƒμ„±ν•˜κΈ°

 

μƒμ„±μžμ˜ parameter 둜 μ „λ‹¬λœ key-value ν˜•νƒœμ˜ properties λ₯Ό ν†΅ν•΄μ„œ client configuration 을 λ“±λ‘ν•œλ‹€

 

ν•΄λ‹Ή configuration 의 μžμ„Έν•œ 사항과 νŠΉμ • 섀정듀은 confluent.io/kafka config μ—μ„œ 확인할 수 μžˆλ‹€

 

λ‚˜λŠ” μ•žμœΌλ‘œ λ§Žμ€ ν…ŒμŠ€νŠΈμ—μ„œ Producer μΈμŠ€ν„΄μŠ€λ₯Ό 생성할 μ˜ˆμ •μ΄λΌμ„œ Helper 클래슀둜 producer 생성 λ‘œμ§μ„ λΆ„λ¦¬μ‹œμΌ°λ‹€

 

public class KafkaProducerTestHelper {
  public static KafkaProducer<String, String> getSimpleProducer() {
    Map<String, Object> props = Map.of(
      // bootstrap server μ„€μ •
      "bootstrap.servers", "localhost:9092",
      // kafka 둜 전솑할 message 의 s/d 클래슀 μ„€μ •
      "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"
    );
    return new KafkaProducer<>(props);
  }
}

 

그리고 각각의 ν…ŒμŠ€νŠΈ μΌ€μ΄μŠ€μ—μ„œλŠ” Producer 의 μΈμŠ€ν„΄μŠ€λ₯Ό λ‹€μŒκ³Ό 같이 λ§Œλ“€μ—ˆλ‹€

 

 

μ΄λŸ¬ν•œ 방법을 톡해 μ•žμœΌλ‘œ 진행할 ν…ŒμŠ€νŠΈμ— λŒ€ν•΄μ„œ ν”„λ‘œλ“€μ„œ μΈμŠ€ν„΄μŠ€λ₯Ό 생성할 μ˜ˆμ •μ΄λ‹€

 

test 1 - KafkaProducer λ₯Ό μ΄μš©ν•œ λ©”μ‹œμ§€ λ°œν–‰ ν…ŒμŠ€νŠΈ

이번 ν…ŒμŠ€νŠΈμ—μ„œλŠ” kafka producer λ₯Ό μ΄μš©ν•΄μ„œ λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•΄λ³΄λ €ν•œλ‹€.

 

ν…ŒμŠ€νŠΈ μ½”λ“œ 뢄석

  • 주석 1. λ°œν–‰ν•  λ©”μ‹œμ§€λ₯Ό μ •μ˜
    • 일반적으둜 카프카에 μ €μž₯될 이벀트 λ‘œκ·ΈλŠ” Record λΌλŠ” κ°œλ…μ˜ 데이터가 μ €μž₯λœλ‹€.
    • 첫번째 νŒŒλΌλ―Έν„°λ‘œ topic 을 λ„˜κΈ°κ³  λ‘λ²ˆμ§Έ νŒŒλΌλ―Έν„°λ‘œ message payload λ₯Ό λ„˜κΈ΄λ‹€
  • 주석 2. μ‹€μ œλ‘œ producer λ₯Ό ν†΅ν•΄μ„œ record λ₯Ό μΉ΄ν”„μΉ΄ 브둜컀둜 μ „μ†‘ν•œλ‹€
    • produce 결과둜 Future λ₯Ό λ°˜ν™˜λ°›λŠ”λ‹€.
  • 주석 3. Future.get() 연산을 톡해 비동기 produce response λ₯Ό blocking ν•œλ‹€
  • 주석 4. produce 결과둜 λ°˜ν™˜λœ 정보에 λŒ€ν•œ 단언문

정리

  • βœ… Producer λŠ” Record λΌλŠ” λ‹¨μœ„λ‘œ μΉ΄ν”„μΉ΄ 브둜컀둜 λ©”μ‹œμ§€λ₯Ό λ°œν–‰
  • βœ… produce κ²°κ³Όλ₯Ό asynchronous ν•˜κ²Œ 처리 κ°€λŠ₯
  • βœ… κ²°κ³Όλ₯Ό blocking ν•˜μ—¬ ν™•μΈν•˜μ§€ μ•ŠμœΌλ©΄ 일뢀 λ©”μ‹œμ§€μ— λŒ€ν•΄ λˆ„λ½μ΄ λ°œμƒν•  수 있음

test 2. KafkaProducer μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ” topic 에 λŒ€ν•΄μ„œ λ©”μ‹œμ§€ λ°œν–‰ ν…ŒμŠ€νŠΈ

 

producer κ°€ λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•˜μ˜€λŠ”λ° λ§Œμ•½ μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ” topic 에 λ°œν–‰ν–ˆλ‹€λ©΄ μ–΄λ–»κ²Œ 될까?

 

μ™œ μš°λ¦¬λŠ” μ•žμ„  ν…ŒμŠ€νŠΈμ—μ„œ ν† ν”½ 생성을 ν•˜μ§€ μ•Šμ•„λ„ ν…ŒμŠ€νŠΈκ°€ μ„±κ³΅ν–ˆμ„κΉŒ?

 

κ·Έ 정닡은 λ°”λ‘œ Kafka 의 κΈ°λ³Έ 섀정에 μžˆλ‹€.

 

λ‹€λ₯Έ λ§Žμ€ λΌμ΄λΈŒλŸ¬λ¦¬λ“€κ³Ό λ§ˆμ°¬κ°€μ§€λ‘œ kafka μ—­μ‹œ μ‹€ν–‰ μ˜΅μ…˜ (configuration) 듀을 ν•„μš”λ‘œ ν•˜λŠ”λ°, 기본적으둜 topic 생성 κΈ°λŠ₯이 ν™œμ„±ν™” λ˜μ–΄μžˆλ‹€.

 

κ·Έλž˜μ„œ 토픽을 사전에 μƒμ„±ν•˜μ§€ μ•Šλ”λΌλ„ λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•  λ•Œ μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ” 토픽에 λ°œν–‰μ„ ν•˜λ”λΌλ„ λ¬Έμ œκ°€ μ—†μ—ˆλ˜ 것이닀.

 

μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ” topic 에 λ°œν–‰μ„ ν•˜λŠ” ν…ŒμŠ€νŠΈλ₯Ό μœ„ν•΄ @EmbeddedKafka 섀정을 λ°”κΏ”λ³΄μž

 

μ–΄λ…Έν…Œμ΄μ…˜μ˜ νŒŒλΌλ―Έν„°λ‘œ brokerProperties 에 "auto.create.topics.enable=false" λ₯Ό λ„˜κ²¨μ„œ ν† ν”½ μžλ™ 생성 κΈ°λŠ₯을 끄고 μ•„λž˜μ˜ ν…ŒμŠ€νŠΈλ₯Ό μ‹€ν–‰ν•΄λ³΄μž

 

 

그럼 μ•„λž˜μ™€ 같이 UNKNOWN_TOPIC_OR_PARTITION μ—λŸ¬κ°€ λ°œμƒν•œλ‹€.

 

λ‚˜λŠ” Junit Extension 을 톡해 2초 timeout 이 λ°œμƒν•˜λ©΄ ν…ŒμŠ€νŠΈλ₯Ό ν†΅κ³Όν•˜λ„λ‘ μ΅μŠ€ν…μ…˜ 클래슀λ₯Ό λ§Œλ“€μ–΄μ„œ μΆ”κ°€ν•΄μ€¬λŠ”λ°, κ΄€λ ¨ μ½”λ“œλ₯Ό 더 ν™•μΈν•΄λ³΄κ³ μ‹ΆμœΌλ©΄ kafka ν•™μŠ΅ν…ŒμŠ€νŠΈ github μ—μ„œ 확인할 수 μžˆλ‹€

정리

  • βœ… Producer λŠ” μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ” topic 에 λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•  수 μ—†μŒ
  • βœ… Kafka 의 κΈ°λ³Έ 섀정이 μ‘΄μž¬ν•˜μ§€ μ•ŠλŠ” topic 을 μžλ™ 생성함
  • βœ… broker props λ₯Ό 톡해 ν•΄λ‹Ή 섀정을 끌 수 있음

test 3. λ©”μ‹œμ§€ λ°œν–‰ κ²°κ³Ό 처리 콜백 λ™μž‘ ν…ŒμŠ€νŠΈ

 

test 1 μ—μ„œ producer κ°€ produce λ₯Ό μˆ˜ν–‰ν•˜λ©΄ λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν•˜κ³  Future 객체λ₯Ό λ°˜ν™˜λ°›λŠ”λ‹€κ³  μ„€λͺ…ν–ˆλ‹€.

 

λ§Œμ•½ κ²°κ³Όλ₯Ό ν™•μΈν•˜κ³ μ‹Άλ‹€λ©΄ Future.get() 연산을 톡해 μš”μ²­μ— λŒ€ν•΄ blocking ν•˜λ©΄ κ²°κ³Όλ₯Ό ν•΄λ‹Ή μŠ€λ ˆλ“œ λ‚΄μ—μ„œ 확인할 수 μžˆλ‹€.

 

ν•˜μ§€λ§Œ λŒ€λŸ‰μ˜ λ©”μ‹œμ§€μ— λŒ€ν•΄μ„œ 계속 block ν•˜μ—¬ κ²°κ³Όλ₯Ό ν™•μΈν•˜λŠ” 것이 μ–΄λ–€ 상황에선 μ˜¬λ°”λ₯΄μ§€ μ•Šμ€ 선택일 수 μžˆλŠ”λ°, 이λ₯Ό μœ„ν•΄ kafka λŠ” callback μ΄λΌλŠ” 것을 μ œκ³΅ν•œλ‹€

 

이번 ν…ŒμŠ€νŠΈμ—μ„œλŠ” kafka producer κ°€ μ œκ³΅ν•˜λŠ” callback μ΄λΌλŠ” κΈ°λŠ₯에 λŒ€ν•΄μ„œ μ•Œμ•„λ³΄μž.

 

KafkaProducer λŠ” λ©”μ‹œμ§€ λ°œν–‰ μš”μ²­μ΄ μ™„λ£Œλ˜μ—ˆμ„ λ•Œμ— λŒ€ν•΄ κ²°κ³Ό 처리λ₯Ό μœ„ν•œ callback 을 등둝할 수 μžˆλ‹€

 

callback 은 org.apache.kafka.clients.producer.Callback μΈν„°νŽ˜μ΄μŠ€λ₯Ό implements ν•΄μ•Όν•œλ‹€

 

 

μ½œλ°±μΈν„°νŽ˜μ΄μŠ€λ₯Ό κ΅¬ν˜„ν•˜κ³  KafkaProducer.send() 의 λ‘λ²ˆμ§Έ 인자둜 ν•΄λ‹Ή μ½œλ°±μ„ λ„˜κΈ°λ©΄ λ‚΄λΆ€μ μœΌλ‘œ kafka broker 둜 record μ €μž₯ μš”μ²­μ„ λ§ˆμ³€μ„ λ•Œ ν•΄λ‹Ή μ½œλ°±μ„ μ‹€ν–‰μ‹œν‚¨λ‹€.

 

ν…ŒμŠ€νŠΈλ₯Ό μœ„ν•œ μ½œλ°±μ„ μ•„λž˜μ™€ 같이 κ΅¬ν˜„ν•΄μ€¬λ‹€.

 

@Slf4j
public class SimpleProduceCallback implements Callback {

  public static SimpleProduceCallback newOne() {
    return new SimpleProduceCallback();
  }

  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (nonNull(metadata)) {
      log.info("βœ… λ©”μ‹œμ§€ λ°œν–‰ 성곡, topic: {}, partition: {}, offset: {}, timestamp: {}",
          metadata.topic(),
          metadata.partition(),
          metadata.offset(),
          metadata.timestamp());

    } else {
      log.error("❌ λ©”μ‹œμ§€ λ°œν–‰ μ‹€νŒ¨", exception);
    }
  }
}

onCompletion() λ©”μ„œλ“œμ˜ 첫번째 인자둜 λ“€μ–΄μ˜€λŠ” metatdata λŠ” μ„±κ³΅μ μœΌλ‘œ λ©”μ‹œμ§€λ₯Ό λ°œν–‰ν–ˆμ„ λ•Œ 정상적인 값이 μ„ΈνŒ…λ˜μ–΄ μ „λ‹¬λœλ‹€

 

λ§Œμ•½ 전솑에 μ‹€νŒ¨ν•˜μ˜€λ‹€λ©΄ -1 이 μ„€μ •λœ metatdata 객체λ₯Ό λ°˜ν™˜λ°›μ„ 것이닀.

 

μ•„λž˜ ν…ŒμŠ€νŠΈλ₯Ό μ‹€ν–‰ν•΄λ³΄μž

 

 

λ§ˆμ§€λ§‰ λΌμΈμ—μ„œ sut.close() λ₯Ό μΆ”κ°€ν•΄μ€€ μ΄μœ λŠ” ν…ŒμŠ€νŠΈ μΌ€μ΄μŠ€κ°€ μ’…λ£Œλ˜λŠ” μ‹œμ μ΄ callback 이 ν˜ΈμΆœλ˜λŠ” μ‹œμ λ³΄λ‹€ λŠλ¦¬κΈ°λ•Œλ¬Έμ— ν…ŒμŠ€νŠΈ 검증을 μœ„ν•΄ blocking ν•˜μ˜€λ‹€.

 

 

κ²°κ³Όλ₯Ό ν™•μΈν•˜λ©΄ μ„±κ³΅μ μœΌλ‘œ λ‘œκ·Έκ°€ λ‚˜μ˜€λŠ” 것을 확인할 수 μžˆλ‹€

 

정리

  • βœ… Producer λŠ” λ ˆμ½”λ“œ 전솑 μš”μ²­ κ²°κ³Όλ₯Ό 2가지 λ°©λ²•μœΌλ‘œ 확인할 수 μžˆλ‹€
  • βœ… Future.get 연산을 톡해 κ²°κ³Όλ₯Ό blocking ν•˜μ—¬ 확인할 수 μžˆλ‹€.
  • βœ… Callback 을 λ“±λ‘ν•˜μ—¬ non-blocking λ°©μ‹μœΌλ‘œ 확인할 수 μžˆλ‹€

λŒ“κΈ€