시작하며
Kafka에 대한 지식이 부족하다고 판단하여, Local 환경에서 Spring boot와 Kafka의 연동실습이며, 이론적인 내용은 추후에 작성할 예정이다.
Kafka 서버 구축하기
1. Apache Kafka 공식 홈페이지에서 Kafka 다운로드
- 다운로드 후 압축을 해제하면, bin과 config 파일이 존재한다.
- bin 디렉토리내에는 Kafka 관련 각종 실행 sh가 , config 디렉토리내에는 설정관련 파일이 위치한다.
2. Kafka Zookeeper 구동하기
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- Zookeeper는 기본적으로 2181 포트에서 구동한다.
3. Kafka Broker 구동하기
./bin/kafka-server-start.sh ./config/server.properties
- Broker는 기본적으로 9092 포트에서 구동한다.
4. Topic 생성 및 확인하기
- Topic 생성
./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1
- Topic 삭제
./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092
- Topic List 조회
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
- Topic 정보 조회
./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092
Spring Boot Kafka 설정
1. build.gradle 및 yml 설정
// kafka
implementation 'org.springframework.kafka:spring-kafka'
spring:
kafka:
bootstrap-servers: localhost:9092
2. Config 설정
- Producer
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapSevers;
@Bean
public ProducerFactory<String, KafkaPubRequestDto> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootStrapSevers;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> containerListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckTime(3000);
factory.getContainerProperties().setPollTimeout(500);
factory.getContainerProperties().setLogContainerConfig(true);
factory.setConcurrency(1);
return factory;
}
@Bean
public ConsumerFactory<String, KafkaSubRequestDto> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
JsonDeserializer<KafkaSubRequestDto> deserializer = new JsonDeserializer<>(KafkaSubRequestDto.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
deserializer
);
}
}
3. Producer, Consume
- Producer
@RequiredArgsConstructor
@Slf4j
@Component
public class KafkaProducer {
private final KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate;
public void send(KafkaPubRequestDto kafkaPubRequestDto){
log.info("> Kafka Producer Send Start [message] : {}", kafkaPubRequestDto);
kafkaTemplate.send("test-topic-1", kafkaPubRequestDto);
log.info("> Kafka Producer Send End [message] : {}", kafkaPubRequestDto);
}
}
- Consumer
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "test-topic-1", containerFactory = "containerListenerFactory", groupId = "test")
private void testListener(@Payload KafkaSubRequestDto kafkaSubRequestDto, Acknowledgment ack, ConsumerRecordMetadata metadata) {
log.info("> Kafka Consumer Read Start [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
ack.acknowledge();
log.info("> Kafka Consumer Read End [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
}
}
4. Controller
@RestController
@RequestMapping("/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaPubController {
private final KafkaProducer kafkaProducer;
private final AtomicLong messageId = new AtomicLong(0);
@PostMapping("/pub")
void pubMessage(@RequestBody KafkaPubRequestDto kafkaPubRequestDto){
LocalDateTime now = LocalDateTime.now();
log.info("> Kafka Message Pub [time] :{}", now);
kafkaPubRequestDto.setMessageId(messageId.getAndIncrement());
kafkaProducer.send(kafkaPubRequestDto);
}
}
- 결과
끝으로
카프카 책을 구매 예정이며, 더 깊게 공부해볼 예정이며, 프로젝트에서 기회가된다면 대용량 데이터를 경험해보고 싶다.