Backend/Spring

Spring boot, Kafka 연동

seung_soos 2024. 1. 14. 23:25

 

시작하며

Kafka에 대한 지식이 부족하다고 판단하여, Local 환경에서 Spring boot와 Kafka의 연동실습이며, 이론적인 내용은 추후에 작성할 예정이다.

 

Kafka 서버 구축하기

1. Apache Kafka 공식 홈페이지에서 Kafka 다운로드

  • 다운로드 후 압축을 해제하면, bin과 config 파일이 존재한다.
  • bin 디렉토리내에는 Kafka 관련 각종 실행 sh가 , config 디렉토리내에는 설정관련 파일이 위치한다.

 

2. Kafka Zookeeper 구동하기

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

  • Zookeeper는 기본적으로 2181 포트에서 구동한다.

3. Kafka Broker 구동하기

./bin/kafka-server-start.sh ./config/server.properties

  • Broker는 기본적으로 9092 포트에서 구동한다.

4. Topic 생성 및 확인하기

  • Topic 생성
./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1
  • Topic 삭제
./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092
  • Topic List 조회
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

  • Topic 정보 조회
./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092

 

Spring Boot Kafka 설정

1. build.gradle 및 yml 설정

	// kafka
    implementation 'org.springframework.kafka:spring-kafka'
spring:
  kafka:
    bootstrap-servers: localhost:9092

 

2. Config 설정

  • Producer 
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ProducerFactory<String, KafkaPubRequestDto> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> containerListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckTime(3000);
        factory.getContainerProperties().setPollTimeout(500);
        factory.getContainerProperties().setLogContainerConfig(true);
        factory.setConcurrency(1);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, KafkaSubRequestDto> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        JsonDeserializer<KafkaSubRequestDto> deserializer = new JsonDeserializer<>(KafkaSubRequestDto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                deserializer
        );
    }

}

 

3. Producer, Consume

  • Producer
@RequiredArgsConstructor
@Slf4j
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate;

    public void send(KafkaPubRequestDto kafkaPubRequestDto){
        log.info("> Kafka Producer Send Start [message] : {}", kafkaPubRequestDto);
        kafkaTemplate.send("test-topic-1", kafkaPubRequestDto);
        log.info("> Kafka Producer Send End [message] : {}", kafkaPubRequestDto);
    }

}
  • Consumer
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic-1", containerFactory = "containerListenerFactory", groupId = "test")
    private void testListener(@Payload KafkaSubRequestDto kafkaSubRequestDto, Acknowledgment ack, ConsumerRecordMetadata metadata) {
        log.info("> Kafka Consumer Read Start [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
        ack.acknowledge();
        log.info("> Kafka Consumer Read End [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
    }

}

 

4. Controller

@RestController
@RequestMapping("/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaPubController {

    private final KafkaProducer kafkaProducer;
    private final AtomicLong messageId = new AtomicLong(0);

    @PostMapping("/pub")
    void pubMessage(@RequestBody KafkaPubRequestDto kafkaPubRequestDto){
        LocalDateTime now = LocalDateTime.now();
        log.info("> Kafka Message Pub [time] :{}", now);
        kafkaPubRequestDto.setMessageId(messageId.getAndIncrement());
        kafkaProducer.send(kafkaPubRequestDto);
    }
}
  • 결과

 

끝으로

카프카 책을 구매 예정이며, 더 깊게 공부해볼 예정이며, 프로젝트에서 기회가된다면 대용량 데이터를 경험해보고 싶다.