[νμ΅ ν μ€νΈλ‘ λ°°μ보λ kafka] 3. μΉ΄νμΉ΄ νμ΅ ν μ€νΈ μ€λΉνκΈ°
ν΄λΉ μ리μ¦μμ μ 곡νλ λͺ¨λ μμ€μ½λλ github repository μμ μ 곡λ©λλ€. μμΈν μ½λμ ν μ€νΈ μΌμ΄μ€λ github repository μμ νμΈν΄μ£ΌμΈμ.
μ΄λ² [νμ΅ ν μ€νΈλ‘ λ°°μ보λ kafka] μ리μ¦λ μλ μμλλ‘ μ±ν°κ° ꡬμ±λκ³ , μλ¦¬μ¦ μΈλ‘ kafka κ΄λ ¨νμ¬ λμ± λ§μ νμ΅ μ 보λ kafka μ¬ν μΈμ μμ νμΈν μ μμ΅λλ€.
μλ¦¬μ¦ λͺ©μ°¨
- μ리μ¦λ₯Ό μμνλ©°
- kafka λΉ λ₯΄κ² νμ΄λ³΄κ³ μλ체νκΈ°
- kafka 컨μ κ³Ό μ©μ΄ μ 리
- νμ΅ν μ€νΈ μ€λΉνκΈ° <-- νμ¬ κΈ
- νμ΅ ν μ€νΈλ‘ kafka producer μμ보기
- νμ΅ ν μ€νΈλ‘ kafka consumer μμ보기
- νμ΅ ν μ€νΈλ‘ partitioning μμ보기
- νμ΅ ν μ€νΈλ‘ consumer group κ³Ό rebalancing μμ보기
νμ΅μ λ¨κ³λ³ μμλ‘ λͺ©μ°¨κ° ꡬμ±λμ΄μμΌλ―λ‘ μ νλμ΄μΌ νλ μ±ν°κ° μ‘΄μ¬ν©λλ€
μ΄λ² μ±ν°μμλ μμΌλ‘ νμ΅ν μΉ΄νμΉ΄μ κ°λ μ μ½λλ‘ μ΄ν΄νκΈ° μν΄μ μ¬μ©ν λ°©μμΈ νμ΅ ν μ€νΈμ λν΄ μμλ³Ό κ²μ΄λ€.
μ°μ νμ΅ ν μ€νΈμ λν΄μ μ λͺ¨λ₯Ό μ μμΌλ νμ΅ν μ€νΈκ° 무μμΈμ§, κ·Έλ¦¬κ³ μ΄λ€ μ₯μ μ΄ μμ΄μ μ΄λ² μ리μ¦μμ μ±ννμλμ§ μμ보λλ‘ νμ
νμ΅ ν μ€νΈλ?
νμ΅ ν μ€νΈλ μ΄λ€ λΌμ΄λΈλ¬λ¦¬λ νλ μμν¬λ₯Ό νμ΅νκ³ μ΅νκΈ° μν΄μ μμ±νλ ν μ€νΈμ΄λ€.
μ΄ κ°λ μ martin fowler κ° "Refactoring: Improving the Design of Existing Code" μ± μμ "νμ΅ ν μ€νΈ" λΌλ μ©μ΄λ₯Ό μ²μ μ¬μ©νλλ°, μ΄ν μ°λ¦¬λλΌμμλ ν λΉμ μ€νλ§μμ μΈκΈλμ΄ λ§μ κ΄μ¬μ λμλ κ°λ μ΄λ€
μ€μ ν μ€νΈ μ½λλ₯Ό λ§λ€κ³ μ€ννλ©΄μ κΈ°μ μ μ΅νκ³ λμμ μ€ννλ λͺ¨λ κ³Όμ μ ν¬ν¨νλλ°, μ΄λ¬ν λ°©λ²λ€μ΄ μλ‘μ΄ κΈ°μ μ μ΅λνκ³ μ΅νλλ°μ κ°μΈμ μΌλ‘ κ°μ₯ ν¨κ³Όκ° μ’μλ€.
νμ΅ ν μ€νΈλ μΌλ°μ μΌλ‘ λ€μκ³Ό κ°μ μμλ₯Ό κ±°μΉλ€
- ν μ€νΈ λμ μ μ
- ν μ€νΈ μ½λ μμ±
- ν μ€νΈ μΌμ΄μ€ μ€ν (μ€ν¨)
- μ€ν¨ μμΈλ§ λΉ λ₯΄κ² ν΄κ²° (μ±κ³΅μν€κΈ°)
- μ‘°κΈμ variation μ μΆκ°νμ¬ μ€ν¨ μΌμ΄μ€ λ§λ€κΈ°
- variation μΌμ΄μ€ μ±κ³΅μν€κΈ°
μμ κ³Όμ μ λ°λ³΅νλ©° μ μ§μ μΌλ‘ κ°λ κ³Ό μ§μμ νμ₯νλ κ²μ λͺ©νλ‘ νλ€.
μλ₯Όλ€μ΄ μ΄λ€ λ―Έμ§μ Calculator
λΌλ ν΄λμ€κ° μλ€κ³ κ°μ ν΄λ³΄μ.
κ·Έλ¦¬κ³ μ΄ ν΄λμ€μ λ΄λΆ ꡬνμ λ¬Έμλ μκ³ λ―ΈμΉλ―μ΄ λ³΅μ‘ν΄μ λμΌλ‘ λ‘μ§ νμ΅μ΄ μ΄λ ΅λ€κ³ κ°μ νμ.
νμ΅ ν μ€νΈμ μ κ·Όλ²μ μ°μ ν΄λ³΄λ κ²μ΄λ€.
λμΆ© μ΄λ ν κ²μ΄λΌλ μμΈ‘λ§μ κ°μ§κ³ (μΌλ°μ μΌλ‘λ λ¬Έμλ μ£Όμ, μκ·Έλμ²λ₯Ό 보면 μμΈ‘μ νλ μ νλκ° λ λμ κ²μ΄λ€) μ΄κ²μ κ² ν΄λ³΄λ κ²μ΄λ€
Calculator
λΌλ ν΄λμ€μ int calc(int a, int b)
λΌλ μκ·Έλμ²κ° μλλ°, λμΆ© μ°μ°μ νκΈ΄ νλλ°, λ§μ
μΈμ§ κ³±μ
μΈμ§ λͺ¨λ₯΄κ² λ€λ©΄?
κ·Έλ₯ μΌλ¨ ν μ€νΈλ₯Ό λ§λλ κ²μ΄λ€.
κ·Έλ¦¬κ³ λ§μ μ΄κ² κ±°λ νκ³ μ€νμμΌλ³Έλ€
class UsageTest {
Calculator sut = new Calculator();
@Test
void test() {
int actual = sut.calc(10, 20); // λ§μ
μΌ κ²μΌλ‘ μΆμΈ‘ & μ€ν
// AssertionFailedException λ°μ
assertThat(actual).isEqualTo(10 + 20)
}
}
μ€ν¨νλ€. κ·ΈλΌ μ μ€ν¨νλμ§ μμΈμ νμ ν΄λ³΄κ³ λ€λ₯Έ λ°©μμ μλν΄λ³Έλ€.
class UsageTest {
Calculator sut = new Calculator();
@Test
void test() {
int actual = sut.calc(10, 20); // μμ μ€ν¨νμΌλκΉ λΊμ
μ΄λΌκ³ μΆμΈ‘ & μ€ν
assertThat(actual).isEqualTo(10 - 20) // μ±κ³΅
}
}
μ΄ ν
μ€νΈλ₯Ό ν΅ν΄μ int calc(int a, int b)
λ λΊμ
μ°μ°μ μννλ λ©μλλΌλ κ²μ μκ² λμλ€.
λ΄κ° μκ°νλ νμ΅ ν μ€νΈμ μ₯μ
κ°μΈμ μΌλ‘ νμ΅ν μ€νΈλ μ΄λ° μ₯μ μ΄ μλκ² κ°λ€.
- μνλ κΈ°λ₯λ§ λΉ λ₯΄κ² μ΅ν μ μλ€.
- μ μ§μ μΈ μ€νμ΄ κ°λ₯νλ€
- κ±°μ§λ§ νμ§ μλ λ¬Έμλ₯Ό μ§μ λ§λ€μ΄λκ°λ€(곡μ κ° κ°λ₯νλ€)
μνλ κΈ°λ₯λ§ λΉ λ₯΄κ² μ΅ν μ μλ€.
μ΄λ€ λΌμ΄λΈλ¬λ¦¬μ νΉμ κΈ°λ₯λ§μ μν λ λ§€μ° μ μ©νλ€.
κ³μν΄μ λλ λ°©μλ§ μ°Ύκ²λλ λͺ¨λ κ²μ μμ§ μμλ λλ€.
μμ°μ± μΈ‘λ©΄μμ λ§€μ° μ 리ν νΉμ§μ΄ μλ€.
μ μ§μ μΈ μ€νμ΄ κ°λ₯νλ€
ν μ€νΈλ₯Ό νλ² μ€νμν€κ³ λλ©΄ μ¬κΈ°μ μ¬λ¬κ°μ§ λ³νμ μ€ ν μ€νΈκ° κ°λ₯νλ€
νλΌλ―Έν°λ₯Ό λ°κΏλ³΄κ³ κ²½κ³κ°λ λ£μ΄λ³΄κ³ λ±λ±.
짧μ μ£ΌκΈ°λ‘ ν μ€νΈ νΌλλ°±μ ν΅ν΄μ κΈ°λ₯μ μμκ°κ³ trouble shooting μ ν΅ν΄ λ μ λ¬Έμ±μ λμΌ μ μλ€
κ±°μ§λ§ νμ§ μλ λ¬Έμλ₯Ό μ§μ λ§λ€μ΄λκ°λ€(곡μ κ° κ°λ₯νλ€)
νμ΅ ν μ€νΈλ ν μ€νΈμ΄λ€.
ν μ€νΈ μ½λλ₯Ό μ μμ±νλ€λ©΄ μ΄λμλ νμ κ·Έ κ²°κ³Όκ° μ°ΈμΈ regression test κ° κ°λ₯νλ€.
κ²°κ΅ λ΄κ° κ²μ¦ν κΈ°λ₯μ λν΄μ μΈμ λ κ±°μ§μλ, μ§μ€μ μ 곡νλ λ¬Έμκ° λλ€.
κ·Έλ¦¬κ³ μ΄ λ¬Έμλ μ½λλ‘ μμ±λμ΄μκΈ° λλ¬Έμ μΈμ λ μ§ λͺ¨λμκ² κ³΅μ κ° κ°λ₯νλ€
μ΄ νμ΅ ν μ€νΈλ₯Ό μ₯μ μ΄ μμκΈ°μ λ΄κ° μ§κΈ λΈλ‘κ·Έμ μ΄λ κ² μλ¦¬μ¦ κΈμ μ°κ³ μλ€.
μ΄λ² μ리μ¦μμ μ¬μ©ν νμ΅ ν μ€νΈ μ€λΉνκΈ°
μ΄λ² μ리μ¦μ 첫λ²μ§Έ λͺ©νλ spring κ³Ό μκ΄ μμ΄ λ¨μ§ apache kafaka λ₯Ό μ¬μ©ν΄λ³΄κ³ λ껴보λκ² μ΄λ€.
κ·Έλ¦¬κ³ μ΄νμ spring κ³Ό kafka λ₯Ό μ μ¬μ©νλ λ²μ λν΄μ μμλ³Ό μμ μΈλ° μ°μ μνν ν μ€νΈλ₯Ό μν΄μ spring μ λμμ λ°μ μμ μ΄λ€
ν μ€νΈλ₯Ό μν gradle μμ‘΄μ±
μ¬μ€ apache kafka λ§μ μνλ€λ©΄ νμν μμ‘΄μ±μ 'org.apache.kafka:kafka_2.10'
μΈλ° μ°λ¦¬λ org.springframework.kafka:spring-kafka
μ μ¬μ©ν κ²μ΄λ€.
ν μ€νΈλ₯Ό μν μΉ΄νμΉ΄ ν΄λ¬μ€ν°
κ·Έλ¦¬κ³ μΉ΄νμΉ΄λ₯Ό μ€νμν€κΈ° μν΄μ, μ ννλ μΉ΄νμΉ΄ λΈλ‘컀λ₯Ό μ€νμν€κΈ° μν΄μλ λ§μ μ€μ λ€κ³Ό μ€λΉλ¬Όμ΄ νμνλ€.
μΉ΄νμΉ΄ λΈλ‘컀λ₯Ό μ€νμν€κΈ° μν΄μλ ν΄λ¬μ€ν°μ λν κ΄λ¦¬μ λΈλ‘컀 λ©νλ°μ΄ν° κ΄λ¦¬λ₯Ό μν΄μ ν¨κ» μ¬μ©λλ zookeeper κ° νμνλ€.
νμ§λ§ μ°λ¦¬μ λͺ©νλ kafka λ₯Ό μ΄μνκ³ κ΄λ¦¬νλ κ²μ΄ μλλ―λ‘ μ€μ μΉ΄νμΉ΄ ν΄λ¬μ€ν°λ₯Ό λμ°λλ°μ ν° λ Έλ ₯μ λ€μ΄μ§ μμ μμ μ΄λ€.
kafka λ₯Ό μ΄μ©νλ μλΉμ€μμ λνμ μΌλ‘ λ§μ΄ μ¬μ©νλ ν μ€νΈ λ°©λ²μ μλ§ 2κ°μ§ λ°©λ²μ΄ μΌλ°μ μΌ κ²μ΄λ€.
- kafka testcontainers
- inmemeory kafka broker
testcontainers μμ²΄κ° λ컀λ₯Ό μ΄μ©ν ν μ€νΈμ΄λ€ 보λ ν μ€νΈλ₯Ό μ€νμν€λλ°μ μμ΄ λ§μ μ»΄ν¨ν 리μμ€κ° νμν΄μ ν μ€νΈμ λΉ λ₯Έ νΌλλ°±μ΄ μ΄λ ΅λ€.
κ·Έλμ μ΄λ² ν μ€νΈμμλ inmemory broker λ₯Ό μ΄μ©ν κ²μ΄λ€
org.springframework.kafka:spring-kafka-test
μμ‘΄μ±μμ @EmbeddedKafka
λΌλ μ΄λ
Έν
μ΄μ
μ ν΅ν΄μ μ½κ³ λΉ λ₯Έ in memory broker λ₯Ό μ 곡νλ€.
μλλ μ΄λ² μ리μ¦μμ νμν μμ‘΄μ±μ ν¬ν¨ν build.gradle
plugins {
id 'java'
id 'org.springframework.boot' version '2.7.14'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
}
// ..
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
tasks.named('test') {
useJUnitPlatform()
}
EmbeddedKafka ν μ€νΈ μ€μ
μ€μ μΉ΄νμΉ΄ λμ κ³Όμ μ λν΄μλ λ€μ μκ°λΆν° μμΈν νμΈν΄λ³΄λλ‘ νκ³ μ°μ μ°λ¦¬κ° μ€λΉν ν μ€νΈ νκ²½μ΄ μ λμκ°λμ§ νμΈν΄λ³΄μ
/src/test/java/{pkgDir}
κ²½λ‘μ ν
μ€νΈ νλλ₯Ό λ§λ€κ³ λ€μκ³Ό κ°μ΄ μ
λ ₯ν΄λ³΄μ
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { // μΉ΄νμΉ΄ λΈλ‘컀μ κΈ°λ³Έ μμ± μ€μ μ μΈ
"listeners=PLAINTEXT://localhost:9092", // bootstrap server
})
public class SampleTest {
private static final Map<String, Object> props = Map.of(
"bootstrap.servers", "localhost:9092",
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer"
);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
@Test
void sample() {
// message produce to broker
ProducerRecord<String, String> record = new ProducerRecord<>("a-topic", "hi~");
producer.send(record);
}
}
κ·Έλ¦¬κ³ μ€νμμΌλ³΄λ©΄ μλ κ·Έλ¦Όκ³Ό κ°μ΄ ν μ€νΈκ° μ±κ³΅νκ² λλ€
μλ νμ΅ν μ€νΈλ μ€ν¨νλ ν μ€νΈλ₯Ό ν΅ν΄ μ μ§μ μΌλ‘ κΈ°μ μ μ΅νλ λ°©μμΈλ°,λΈλ‘κ·Έ κΈμ κ°λ μ±κ³Ό νΉμ±μΌλ‘ μΈν΄ μ±κ³΅νλ ν μ€νΈλ§ 보μ¬μ€ μμ μ΄λ€
ν μ€νΈκ° μ±κ³΅νλ€λ©΄ μ΄μ μ€λΉλ λλ¬λ€!
λ²μΈ
λ²μΈλ‘ νμ΅ ν μ€νΈλ λλ§μ κ°μ΄λ λ¬Έμκ° λ μ μλ€.
κ·Έλμ μ΅λν ν΅μ¬λ§ λ¨κ²¨λκ³ κΉλνκ² μ μ§νλλ‘ λ Έλ ₯νλ νΈμ΄λ€.
μλ¦¬μ¦ μ€κ°μ€κ° ν μ€νΈ μ½λ 리ν©ν λ§μ ν΅ν΄μ Helper λ Fixture class λ₯Ό λ§μ΄ λ§λ€ μμ μΈλ°, κ·Έλλ§λ€ μλ €μ£ΌκΈ΄ νκ² μ§λ§ νΉμ λμΉλ λΆλΆκ³Ό μ΄ν΄κ° κ°μ§ μλ λΆλΆμ΄ μμ μ μμΌλ―λ‘ μμΈν μ½λλ dhslrl321-kafka github repository μμ νμΈν μ μλ€
κ·Έλ° λ§₯λ½μΌλ‘ 보면 μΉ΄νμΉ΄ λΈλ‘컀λ₯Ό μ€μ νλ μ½λλ₯Ό μ‘°κΈ κΉλνκ² λ§λ€κΈ° μν΄μ ν μ€νΈ μ΄λ Έν μ΄μ μ λ§λ€μ΄λ΄€λ€
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@SpringBootTest
@EmbeddedKafka(partitions = 2, brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
})
public @interface KafkaTest {
String testDescriptions() default "";
}
κ·Έλ¦¬κ³ μ€μ ν μ€νΈ μ½λμμλ ν΄λΉ μ΄λ Έν μ΄μ λ§ μ¬μ©νλ€.
@KafkaTest
public class SampleTest {
// ..
}