회사에서 JDK 17 버전에서 21로 변경하며 Virtual Thread를 도입하기 위해 학습한 내용을 기록하였습니다.

Virtual Thread 개념 정리

Java 19에서 프리뷰로 도입되고 Java 21에서 정식 출시된 Virtual Thread(가상 스레드)에 대해 학습하였습니다. 기존 Platform Thread와 비교하면 상당한 차이가 있었습니다.

 

기존 Platform Thread의 한계점

  • 스레드당 약 2MB의 메모리 사용
  • 생성 및 삭제 비용이 높음
  • 일반적으로 수십~수백 개 정도만 생성 가능

Virtual Thread의 개선점

  • 스레드당 약 10KB로 매우 가벼움
  • 수십만~수백만 개까지 생성 가능
  • I/O 대기 시간을 효율적으로 활용

동작 원리 학습

Virtual Thread는 Platform Thread 위에서 동작하며, 핵심은 Mount/Unmount 메커니즘입니다.

동작 흐름:
1. Virtual Thread A가 Platform Thread 1에서 실행 (Mount 상태)
2. DB 조회 등 I/O 작업 시작
3. Virtual Thread A가 Platform Thread에서 분리 (Unmount)
4. Platform Thread 1이 다른 Virtual Thread B를 실행
5. I/O 완료 시 Virtual Thread A가 다시 Platform Thread에 연결되어 실행 재개

핵심 용어

  • 마운트(Mount): Virtual Thread가 Platform Thread에서 실행되는 상태
  • 언마운트(Unmount): I/O 블로킹 시 Platform Thread에서 분리되어 대기하는 상태
  • Carrier Thread: Virtual Thread를 실제로 실행하는 Platform Thread

I/O 바운드 작업에서의 효과

Virtual Thread는 I/O 집약적인 작업에서 뛰어난 성능을 보입니다.

public void processUserData() {
    // 1. DB 사용자 조회 - I/O 대기 시 언마운트 발생
    User user = userRepository.findById(userId);
    
    // 2. 외부 API 호출 - I/O 대기 시 언마운트 발생
    ApiResponse response = externalApiClient.call(user.getId());
    
    // 3. 파일 저장 - I/O 대기 시 언마운트 발생
    fileService.saveUserData(user, response);
}

위와 같은 코드에서는 각 I/O 작업마다 Virtual Thread가 언마운트되어 Platform Thread가 다른 Virtual Thread를 실행할 수 있게 됩니다.

참고로 Thread.sleep()도 Virtual Thread에서는 언마운트됩니다.

주의해야 할 제약사항

synchronized 블록 사용 시 문제점

학습 중 가장 중요하다고 판단한 부분입니다.

// Virtual Thread 효과가 제한되는 경우
public void inefficientMethod() {
    synchronized (lock) {
        // Virtual Thread가 Platform Thread를 계속 점유
        // 언마운트되지 않아 비효율적
        expensiveOperation();
    }
}

// Virtual Thread에 적합한 방식
private final ReentrantLock lock = new ReentrantLock();

public void efficientMethod() {
    lock.lock();
    try {
        // Virtual Thread 언마운트 가능
        expensiveOperation();
    } finally {
        lock.unlock();
    }
}

synchronized는 JVM 모니터 락으로 OS 레벨에서 스레드를 블로킹하기 때문에 언마운트가 불가능합니다.

CPU 바운드 작업의 한계

public void cpuIntensiveTask() {
    for (int i = 0; i < 1000000; i++) {
        // CPU 연산 중심 작업에서는 언마운트 포인트가 없음
        Math.sin(i) * Math.cos(i);
    }
}

CPU 집약적인 작업은 Virtual Thread를 많이 생성해도 CPU 코어 수의 제약을 받습니다.

Spring Boot 환경에서의 적용

설정은 매우 간단합니다.

# application.yml
spring:
  threads:
    virtual:
      enabled: true

이 설정만으로 Spring의 요청 처리 스레드가 Virtual Thread로 변경됩니다.

실제 적용 예시

@RestController
public class UserController {
    
    @GetMapping("/users/{id}")
    public UserDetailResponse getUserDetail(@PathVariable Long id) {
        // 전체 메서드가 Virtual Thread에서 실행
        
        // 사용자 정보 조회 - 언마운트 발생
        User user = userService.findById(id);
        
        // 권한 정보 조회 - 언마운트 발생
        List<Permission> permissions = permissionService.findByUserId(id);
        
        // 외부 프로필 이미지 조회 - 언마운트 발생
        String profileImageUrl = profileService.getImageUrl(user.getEmail());
        
        return UserDetailResponse.of(user, permissions, profileImageUrl);
    }
}

CPU 작업 혼합 시 최적화 방안

보고서 생성과 같이 CPU 계산이 포함된 경우, CPU 작업만 별도 Thread Pool로 분리하는 것이 효과적입니다.

@Configuration
@EnableAsync
public class ThreadPoolConfig {
    
    @Bean("cpuExecutor")
    public ExecutorService cpuExecutor() {
        return Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );
    }
}

@Service
public class ReportService {
    
    @Autowired
    @Qualifier("cpuExecutor")
    private ExecutorService cpuExecutor;
    
    public CompletableFuture<ReportResponse> generateReport(ReportRequest request) {
        return CompletableFuture
            // I/O 작업: 데이터 수집 (Virtual Thread)
            .supplyAsync(() -> dataService.collectData(request))
            
            // CPU 작업: 복잡한 계산 (Platform Thread Pool)
            .thenApplyAsync(data -> processCalculation(data), cpuExecutor)
            
            // I/O 작업: 결과 저장 (Virtual Thread)
            .thenApply(result -> dataService.save(result));
    }
}

도입 시 고려사항

DB Connection Pool 용량 검토

Virtual Thread로 동시 처리량이 증가하면 DB 커넥션 부족 현상이 발생할 수 있습니다. 기존 스레드 풀 크기에 맞춰 설정된 커넥션 풀을 재검토해야 합니다.

외부 API Rate Limiting 대비

동시 호출량 급증으로 외부 서비스에서 Rate Limit에 걸릴 가능성이 있습니다. Circuit Breaker 패턴이나 Rate Limiter 도입을 검토해야 합니다.

로그 시스템 영향도 분석

처리량 증가에 따른 로그 급증으로 다음과 같은 문제가 발생할 수 있습니다:

  • 로그 파일 크기 급증
  • 로그 I/O 병목 현상
  • 디스크 용량 부족

메모리 사용 패턴 변화

  • Direct Memory 사용량 증가 가능성
  • Young Generation GC 빈도 변화
  • 기존 JVM 튜닝 파라미터 재검토 필요

도입 적합성 판단 기준

적극 도입 권장 환경

  • 웹 서버, REST API 서비스
  • 마이크로서비스 아키텍처
  • I/O 집약적 배치 처리
  • 실시간 통신 서버

신중한 검토 필요 환경

  • CPU 집약적 애플리케이션
  • synchronized 사용이 많은 레거시 코드
  • 엄격한 성능 요구사항이 있는 시스템

도입 전 체크포인트

  • 현재 병목 지점이 I/O 중심인지 확인
  • DB Connection Pool 여유분 검토
  • 외부 API 의존성 및 제약사항 파악
  • 기존 코드의 synchronized 사용량 분석
  • 모니터링 체계의 Virtual Thread 대응 가능성 확인

학습 결론

Virtual Thread는 I/O 바운드 작업이 많은 웹 애플리케이션에서 상당한 성능 향상을 기대할 수 있는 기술입니다.

특히 설정이 간단하고 기존 코드 변경이 최소화되어 도입 부담이 적습니다.

다만 synchronized 블록의 제약사항이나 의존 시스템에 미치는 영향을 충분히 검토해야 합니다.

먼저 직렬화와 역직렬화란 무엇인가? 

직렬화 : 메모리를 바이트 스트림으로 변환하는 과정으로 이를통해 네트워크 통신이 가능해진다.

역직렬화 : 네트워크 통신으로 받은 데이터를 메모리에 쓸수 있는 형태로 변환하는것이다.

Spring의 직렬화 및 역직렬화

일반적으로 Spring에서는 직렬화 및 역직렬화시 Java 객체 ↔ JSON 형태로 변환한다.

Spring-web 라이브러리에는 HttpMessageConverter를 상속받는 다양한 Converter가 존재한다.

 

유형에 따라 Converter가 사용되며, RestApi 유형의 경우에는 MappingJackson2HttpMessageConverter 가 사용된다.

 

여러 Converter 중 적합한 Convert를 판단하는 방법은?

public interface HttpMessageConverter<T> {
    boolean canRead(Class<?> clazz, @Nullable MediaType mediaType);

    boolean canWrite(Class<?> clazz, @Nullable MediaType mediaType);

    List<MediaType> getSupportedMediaTypes();

    default List<MediaType> getSupportedMediaTypes(Class<?> clazz) {
        return !this.canRead(clazz, (MediaType)null) && !this.canWrite(clazz, (MediaType)null) ? Collections.emptyList() : this.getSupportedMediaTypes();
    }

    T read(Class<? extends T> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException;

    void write(T t, @Nullable MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException;
}

HttpMessageConverter 의 구현체중 여러 List를 순회하며,canRead(), canWrite() 메서드의 true값이 반환될경우 

해당 Convert를 적합하다 판단하여 사용한다.

 

 

Controller의 @ResponseBody, @RequestBody의 경우에는 MappingJackson2HttpMessageConverte가 실행되고, 내부적으로 ObjectMapper를 사용하여 타입에 맞게 직렬화 및 역직렬화가 실행된다.

 

 

시작하며

인프런의 모든강의가 30% 할인 하길래 동시성관련 강의를 구매 후 공부한 내용을 기록하였다. [강의]

동시성 문제란?

동시성 문제란 여러 쓰레드들이 공유 자원에 대한 경쟁을 벌여 의도하지 않은 결과를 말한다.

강의는 상품의 재고에대한 동시성문제를 다루는 내용이다.

 

재고 감소로직의 순서로는

1) 재고 감소 로직은 해당 ID값을 통해 엔티티를 조회

@Transactional
    public void decrease(Long id, Long quantity) {
        Stock stock = stockRepository.findById(id).orElseThrow();
        stock.decrease(quantity);
        stockRepository.saveAndFlush(stock);
    }

2) 요청 재고가 기존 재고보다 많지 않다면, 재고를 감소하는 로직이다.

public void decrease(Long quantity) {
    if (this.quantity - quantity < 0) {
        throw new RuntimeException("재고는 0개 미만이 될 수 없습니다.");
    }
    this.quantity -= quantity;
}

만약 1번이라는 상품에 재고가 100개 있을경우 1개를 감소시키면 재고가 99개가 될것이다.

@Test
@DisplayName("재고의 수량만큼 재고가 감소 테스트")
void decrease() {
    // given
    stockService.decrease(1L, 1L);
    // when
    Stock stock = stockRepository.findById(1L).orElseThrow();
    // then
    assertEquals(stock.getQuantity(), 99);
}

하지만 여러 쓰레드가 재고를 감소한다면 결과는 달라진다.

@Test
@DisplayName("재고의 수량 감소 - 동시성 문제")
void concurrent_decrease() throws InterruptedException {
    // given
    int threadCount = 100;
    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        executorService.submit(() -> {
            try {
                stockService.decrease(1L, 1L);
            } finally {
                countDownLatch.countDown();
            }
        });
    }

    countDownLatch.await();

    // when
    Stock stock = stockRepository.findById(1L).orElseThrow();

    // then
    assertEquals(0, stock.getQuantity());
}

100 요청했으니, 재고가 0이 되기를 기대하지만 테스트는 실패한다.

이유는 스레드 1번이 재고를 조회 후 재고 감소하기전 다른 스레드 2번이 재고를 조회하기에 결과값이 다르게 발생한다.

해결 방법으로는 공유자원에 하나의 스레드만 접근하도록 하면된다!

Synchronized

synchronized를 메소드에 명시해주면 하나의 스레만 접근이 가능하다.

공유되는 자원의 Thread-safe를 하기 위해, synchronized로 스레드간 동기화를 시켜 Thread-safe하게 만들어 준다.

 

여기서 알게된 사실로는 아래처럼 synchronized를 사용한다해도 해결되지않는다.

@Transactional
public synchronized void decrease(Long id, Long quantity) {
    Stock stock = stockRepository.findById(id).orElseThrow();
    stock.decrease(quantity);
    stockRepository.saveAndFlush(stock);
}

이유는 아래와 같이 lock을 반납한다하더라도, Transaction이 종료되지 않았기 때문에 DB에 반영되지 않았기 때문이다.

Transaction 시작
	lock 획득
	비즈니스 로직 수행
	lock 반납
Transaction 종료

@Transactional 어노테이션을 주석달고 실행한다면 테스트가 통과한다.

//    @Transactional
    public synchronized void decrease(Long id, Long quantity) {
        Stock stock = stockRepository.findById(id).orElseThrow();
        stock.decrease(quantity);
        stockRepository.saveAndFlush(stock);
    }

 

synchronized의 문제점

자바의 synchronized는 하나의 프로세스 내에서만 보장이 된다.

즉 서버가 1대일 경우는 문제없지만, 서버가 2대 이상일경우 Thread-safe하지않다.

DB Lock

Pessimistic Lock(비관적 락)

비관적 락은 충돌이 발생할 확률이 높다고 가정하여, 하나의 트랜잭션이 자원에 접근시 락을 걸고, 다른 트랜잭션이 접근하지 못하게 한다.

DB에서 Shared Lock(공유, 읽기 잠금) 이나 Exclusive Lock(베타, 쓰기 잠금)을 건다.

Shared Lock은 다른 트랜잭션에서 읽기만 가능하고 Exclusive Lock는 다른 트랜잭션에서 읽기, 쓰기가 둘다 불가능하다.

 

장점 : 충돌이 자주 발생하는 환경에 대해서는 롤백 횟수를 줄일 수 있으므로, 성능적 이점이 있다.

단점 : 동시 처리 성능  저하 및 교착 상태(DeadLock) 발생 가능성이 있다.

@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithPessimisticLock(Long id);

Optimistic Lock(낙관적 락)

낙관적 락은 데이터에 락을 걸지않고, 동시성 문제가 발생하면 발생한 시점에 버전을 이용하여 관리한다.

 

장점 : 충돌이 안난다는 가정하에, 동시 요청에 대해서 처리 성능이 좋다.

단점 : 충돌이 자주발생할 경우 롤백처리에 비용이 많이들며, 구현이 복잡하다.

 

Entity에@Version 추가

@Entity
@NoArgsConstructor
@Getter
@ToString
public class Stock {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long productId;

    private Long quantity;
    @Version
    private Long version;

    public Stock(Long productId, Long quantity) {
        this.productId = productId;
        this.quantity = quantity;
    }

    public void decrease(Long quantity) {
        if (this.quantity - quantity < 0) {
            throw new RuntimeException("재고는 0개 미만이 될 수 없습니다.");
        }
        this.quantity -= quantity;
    }

}
@Component
@RequiredArgsConstructor
@Slf4j
public class OptimisticLockStockFacade {

    private final OptimisticLockStockService optimisticLockStockService;

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (true) {
            try {
                optimisticLockStockService.decrease(id, quantity);
                break;
            } catch (Exception e) {
                log.info("=== Thread Sleep ===");
                Thread.sleep(50);
            }
        }

    }
}
@Lock(LockModeType.OPTIMISTIC)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithOptimisticLock(Long id);

Redis

Lettuce

Lettuce는 Netty기반의 Redis Client이며, 요청을 논블로킹으로 처리하여 높은 성능을 가진다.

@Component
@RequiredArgsConstructor
public class RedisLockRepository {

    private final RedisTemplate<String, String> redisTemplate;

    public Boolean lock(Long key) {
        return redisTemplate
                .opsForValue()
                .setIfAbsent(generateKey(key),"lock");
    }

    public Boolean unlock(Long key) {
        return redisTemplate.delete(generateKey(key));
    }


    private String generateKey(Long key) {
        return key.toString();
    }
}

lock() 메서드 내부에서 사용하는 setIfAbsent()를 통해 setnx를 사용하여, 값이 없다면 값을 set한다.

 

이어서 재고감소 로직이다.

1. SprinLock 방식으로 락을 얻기 위해 시도하고,

2. 락을 얻는다면 재고 감소 로직을 실행하고,

3. 학을 해제해주는 방식

@Component
@RequiredArgsConstructor
public class LettuceLockFacade {

    private final RedisLockRepository lockRepository;
    private final StockService stockService;

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (!lockRepository.lock(id)) {
            Thread.sleep(100);
        }
        try {
            stockService.decrease(id,quantity);
        } finally {
            lockRepository.unlock(id);
        }
    }
}

 

장점 : 별도의 설정없이 간단히 구현가능하다.

단점 : Sprin Lock 방식이, Lock을 얻을때까지 Lock을 얻기 위해 시도하기 때문에, Redis에 부하를 준다.

Redisson

Redis는 Pub/sub 기능을 제공해준다. Redisson에서는 Pub/Sub 기반의 분산락을 이미 구현하여 제공해주고있다.

 

의존성 추가

implementation 'org.redisson:redisson-spring-boot-starter:3.27.1'

 

@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonLockStockFacade {

    private final RedissonClient redissonClient;
    private final StockService stockService;

    public void decrease(Long id, Long quantity) {
        RLock lock = redissonClient.getLock(id.toString());

        try {
            boolean result = lock.tryLock(10, 1, TimeUnit.SECONDS);

            if (!result) {
                log.info("Lock 획득 실패");
                return;
            }
            stockService.decrease(id, quantity);

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

 

장점 : Redisson에서 이미 구현되어 있기때문에, 별도의 구현 로직이 필요하지않다. 또한, Lettuce Sprin Lock에 비해 분산락으로 Redis에 부하를 덜 준다.

단점 : 별도의 의존성 주입이 필요하다.

 

시작하며

Kafka에 대한 지식이 부족하다고 판단하여, Local 환경에서 Spring boot와 Kafka의 연동실습이며, 이론적인 내용은 추후에 작성할 예정이다.

 

Kafka 서버 구축하기

1. Apache Kafka 공식 홈페이지에서 Kafka 다운로드

  • 다운로드 후 압축을 해제하면, bin과 config 파일이 존재한다.
  • bin 디렉토리내에는 Kafka 관련 각종 실행 sh가 , config 디렉토리내에는 설정관련 파일이 위치한다.

 

2. Kafka Zookeeper 구동하기

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

  • Zookeeper는 기본적으로 2181 포트에서 구동한다.

3. Kafka Broker 구동하기

./bin/kafka-server-start.sh ./config/server.properties

  • Broker는 기본적으로 9092 포트에서 구동한다.

4. Topic 생성 및 확인하기

  • Topic 생성
./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1
  • Topic 삭제
./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092
  • Topic List 조회
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

  • Topic 정보 조회
./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092

 

Spring Boot Kafka 설정

1. build.gradle 및 yml 설정

	// kafka
    implementation 'org.springframework.kafka:spring-kafka'
spring:
  kafka:
    bootstrap-servers: localhost:9092

 

2. Config 설정

  • Producer 
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ProducerFactory<String, KafkaPubRequestDto> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> containerListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckTime(3000);
        factory.getContainerProperties().setPollTimeout(500);
        factory.getContainerProperties().setLogContainerConfig(true);
        factory.setConcurrency(1);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, KafkaSubRequestDto> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        JsonDeserializer<KafkaSubRequestDto> deserializer = new JsonDeserializer<>(KafkaSubRequestDto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                deserializer
        );
    }

}

 

3. Producer, Consume

  • Producer
@RequiredArgsConstructor
@Slf4j
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate;

    public void send(KafkaPubRequestDto kafkaPubRequestDto){
        log.info("> Kafka Producer Send Start [message] : {}", kafkaPubRequestDto);
        kafkaTemplate.send("test-topic-1", kafkaPubRequestDto);
        log.info("> Kafka Producer Send End [message] : {}", kafkaPubRequestDto);
    }

}
  • Consumer
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic-1", containerFactory = "containerListenerFactory", groupId = "test")
    private void testListener(@Payload KafkaSubRequestDto kafkaSubRequestDto, Acknowledgment ack, ConsumerRecordMetadata metadata) {
        log.info("> Kafka Consumer Read Start [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
        ack.acknowledge();
        log.info("> Kafka Consumer Read End [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
    }

}

 

4. Controller

@RestController
@RequestMapping("/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaPubController {

    private final KafkaProducer kafkaProducer;
    private final AtomicLong messageId = new AtomicLong(0);

    @PostMapping("/pub")
    void pubMessage(@RequestBody KafkaPubRequestDto kafkaPubRequestDto){
        LocalDateTime now = LocalDateTime.now();
        log.info("> Kafka Message Pub [time] :{}", now);
        kafkaPubRequestDto.setMessageId(messageId.getAndIncrement());
        kafkaProducer.send(kafkaPubRequestDto);
    }
}
  • 결과

 

끝으로

카프카 책을 구매 예정이며, 더 깊게 공부해볼 예정이며, 프로젝트에서 기회가된다면 대용량 데이터를 경험해보고 싶다.

시작하며

Error 또는 중요한 배치 시 알람을 받을 방법으로 회사에서 사용하는 Slack을 접목시켰다.

 

링크 : https://github.com/seungsoos/slack-demo

 

Slack 의존성 추가 및 yml 설정

	implementation("com.slack.api:bolt:1.18.0")
	implementation("com.slack.api:bolt-servlet:1.18.0")
	implementation("com.slack.api:bolt-jetty:1.18.0")

 

구글링을 통해 방법을 찾고, 해당 토큰을 입력한다.

 

코드내용은 아래와 같다.

@Slf4j
@Component
@RequiredArgsConstructor
public class SlackComponentImpl implements SlackComponent {

    /**
     * Slack의 경우 하나의 채널당 tps 1 사용
     */
    @Value("${slack.send.tps}")
    private Integer tps;

    @Value(value = "${slack.token}")
    private String slackToken;
    private MethodsClient methods = null;

    private int tpsSendCount = 0;

    /**
     * 순서 보장 및 데이터 유실 방지를 위한 Queue 사용
     */
    BlockingQueue<SlackInnerResponse> blockingQueue = new LinkedBlockingQueue();

    @PostConstruct
    private void init() {
        methods = Slack.getInstance().methods(slackToken);
    }

    @Async
    public void addQueue(String message, SlackChannel slackChannel){
        SlackInnerResponse slackInnerResponse = new SlackInnerResponse(message, slackChannel);
        blockingQueue.add(slackInnerResponse);

        startSendMessage();
    }

    private void startSendMessage() {
        SlackInnerResponse poll = blockingQueue.poll();
        if(Objects.isNull(poll)){
            return;
        }
        sendMessage(poll.message, poll.slackChannel);
    }

    @AllArgsConstructor
    static class SlackInnerResponse{
        private String message;
        private SlackChannel slackChannel;
    }


    /**
     * Slack 메시지 전송 메서드
     * 비동기 처리
     */

    private synchronized void sendMessage(String message, SlackChannel slackChannel) {
        log.info("sendSlackMessage message = {}, channel ={}", message, slackChannel.desc());

        for (int i = 0; i < 3; i++) {
            if (getTpsSendCount() >= tps) {
                log.info("thread Sleep");
                /**
                 * SlackApiException: status: 429 발생으로 TPS 조절 필요
                 * https://api.slack.com/docs/rate-limits
                 */
                sleep(1000);
                clearTpsSendCount();
            }
            tpsSendCountPlus();
            try {
                ChatPostMessageResponse chatPostMessageResponse = send(message, slackChannel.desc());
                boolean result = chatPostMessageResponse.isOk();
                if (result) {
                    return;
                }
            } catch (SlackApiException | IOException e) {
                log.error("SlackException = ", e);
                sleep(30000);
            } catch (Exception e) {
                log.error("Exception =", e);
                sleep(30000);
            }
        }
    }

    private ChatPostMessageResponse send(String message, String channel) throws IOException, SlackApiException {
        
        ChatPostMessageRequest request = ChatPostMessageRequest.builder()
                .channel(channel)
                .text(message)
                .build();
        return methods.chatPostMessage(request);
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            log.error("sleep error", e);
        }
    }

    private synchronized void tpsSendCountPlus() {
        log.info("> Plus");
        ++tpsSendCount;
    }
    private synchronized int getTpsSendCount() {
        log.info("> GET");
        return tpsSendCount;
    }

    private synchronized void clearTpsSendCount() {
        log.info("> Clear");
        tpsSendCount = 0;
    }

}

Slack의 채널을 여러 분기처리하기위해 Enum을 사용하였고, token 및 tps는 config로 관리하였다.

 

해당 개발테스트 중 알게된 사항으로 Slack은 채널당 tps를 1로 엄격히 규정하고있다. 여러 테스트 결과 과도한 트래픽이 초과될시 429 에러가 발생하며 해당 내용은 코드에 정리하였다. 또한, 과도한 트래픽이 발생시 데이터의 유실을 확인하여 Queue를 활용하여 데이터를 안전하게 제공하였다.

 

시작하며

서버로그중 가끔씩 ClientAbortException가 보인다.

 

Response body : {"code":"400","msg":"Insufficient parameters.","data":"I/O error while reading input message; nested exception is org.apache.catalina.connector.ClientAbortException: java.io.EOFException"}

 

발생원인 : 클라이언트가 연결을 끊었을때 발생하는것이 일반적이다.

서버가 클리이언트로 응답을 보내려하지만, 클라이언트의 연결이 이미끊어진경우 발생한다.

 

해당 에러는 클라이언트 단에서 발생하였다. 해당 에러는 Filter에서 Handling

설정을 하였다.

 

참고사이트 https://perfectacle.github.io/2022/03/20/client-abort-exception-deep-dive-part-01/

1. DB 접근 기술

JDBC(Java Database Connectivity)

  • JDBC API를 이용하여 JDBC Driver의 변경에따른 DB 접근
  • 동작원리

  • 예제)
package sec01.ex01.dao;

import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.sql.DataSource;

import sec01.ex01.dto.MemberDTO;

public class MemberDAO {
   
   String driver = "oracle.jdbc.OracleDriver";
   String url = "jdbc:oracle:thin:@localhost:1521:xe";
   String user = "user1";
   String pwd = "1234";
   
   private Connection conn = null;
   private Statement stmt = null;
   private PreparedStatement pstmt = null;
   private ResultSet rs = null;
   
   private DataSource ds;

   public List<MemberDTO> listMember(){
      
      List<MemberDTO> list = new ArrayList<>();
      
      try {
         Class.forName(driver);
         System.out.println("Oracle 드라이버 로딩 성공");
         
         conn = DriverManager.getConnection(url, user, pwd);
         System.out.println("Connection 성공");
         
         stmt = conn.createStatement();
         System.out.println("Statment 생성 성공");
         
         String sql = "SELECT *FROM MEMBERTEST";
         
         rs =  stmt.executeQuery(sql);
         
         while(rs.next()) {
            String id = rs.getString("id");
            String pwd = rs.getString("pwd");
            String name = rs.getString("name");
            String email = rs.getString("email");
            Date joinDate = rs.getDate("joinDate");
            
            MemberDTO dto = new MemberDTO();
            dto.setId(id);
            dto.setPwd(pwd);
            dto.setName(name);
            dto.setEmail(email);
            dto.setJoinDate(joinDate);
            
            list.add(dto);
         }
      } catch (Exception e) {
         e.printStackTrace();
      } finally {
         try {if(rs != null)  rs.close();} catch (Exception e2) {}
         try {if(stmt != null)stmt.close();} catch (Exception e2) {}
         try {if(conn != null)conn.close();} catch (Exception e2) {}
      }
      return list;
   }
}
  •  단점
    1. 쿼리를 실행하기 전, 후 많은 코드를 작성해야한다.
      • DB와의 연결 설정 및 객체 자원을 사용후 종료(close)해야한다.
    2.  DB 관련 로직에서 예외처리코드를 작성해야한다.
      • DB연결 설정에서 예외처리가 필수가 된다.
    3. 중복코드가 많다.
    4. 가독성이 좋지않다.
    5. 유지,보수가 어렵다.

SQL Mapper

  • 객체와 테이블 간의 관계를 직접 매핑하는 것이 아닌, SQL문을 실행해 쿼리 수행 결과를 어떤 객체에 바인딩 하는 방법이다.
  • 따라서 DBMS에 종속적인 방법이라고 할 수 있으며, 대표적으로 MyBatis가 있다

ORM

  • ORM 기술은 객체(Object)와 DB테이블을 매핑하여 데이터를 객체화하는 기술이다.
  • 개발자가 직접 SQL을 작성하지 않아도 자동으로 SQL문을 만들어내기 때문에 DBMS에 종속적이지 않다. 대표적으로 JPA가 있다.

2. MyBatis

  • Concept :  SQL과 Java 코드를 분리

MyBatis는 개발자에게 유연하고 효율적인 데이터베이스 액세스를 제공한다. SQL 중심적인 접근 방식과 매핑 기능을 통해 개발자는 데이터베이스 조작을 세밀하게 제어할 수 있으며, 성능 최적화와 단순성을 통해 개발 생산성과 코드의 가독성을 높인다.

 

3. JPA

    • Concept : DB 테이블을 매핑해서 데이터를 하나의 객체로 간주하고자 하며, 이러한
                         사상을
      통해서  DB와 객체지향의 패러다임 불일치를 해결하고자 한다.

JPA는 객체 지향적인 프로그래밍 모델을 사용하여 생산성을 향상시키고 유지보수를 용이하게 한다.

또한, 데이터 베이스에 대한 종속성을 줄이고 이식성을 높여서 유연성을 제공한다.

 

 

4. MyBatis 동작과정 및 장단점

동작과정

MyBatis데이터베이스 액세스 흐름

MyBatis를 사용하기 위한 기본적인 자바 인터페이스는 SqlSession이다. 이 인터페이스를 통해 명령어를 실행하고, 메퍼를 얻으며, 트랜잭션을 관리 할 수 있다. SqlSession은 인터페이스로 구현체인 SqlSessionTemplate이 있다. SqlSessionTemplate은 필요한 시점에 세션을 닫고, 커밋하거나 롤백하는 것을 포함한 세션의 생명주기를 관리한다.

 MyBatis공식문서

장단점

예시) 간략한 Team, Member라는 테이블이 있다.

DB테이블 구성도

Java Entity

두 테이블을 Join하여 모든 컬럼을 가져올때 객체지향적이지 않은, DB에 종속적인 방법을 사용하게된다.

이때 조회 결과값을 담을 Entity가없기때문에, 하나의 Entity를 만들어 사용하게된다.

※ 물론 Join 결과값을 Result <resultMap> 이라는 방법으로 1:1, 1:N. N:N  사용이 가능하나,

   실무 프로젝트 기준으로 생각하였을때 매우 복잡하다. 그렇기에 대다수의 사람들이 MyBatis를 이러한 방식으로 사용한다.

또한 insert시 Team에 대한 insert 후 useGeneratedKey방식을 이용하여 PK값을 얻어온이후 다시 Member에 대한 insert를 사용해야한다.

  • 장점
    1. SQL 리를 직접 작성하므로, 최적화된 쿼리를 구현 할 수 있다.
    2. 복잡한 쿼리도 구현 가능하다.
  • 단점
    1. 컴파일 시 오류를 확인 할 수없다.
    2. 쿼리를 직접 작성하기 때문에 JPA에 비해 생산성이 떨어진다.
    3. DB, Entity 변경 시 소스 및 쿼리 수정이 필요하다.
    4. 중복 쿼리가 발생하기 쉽다.

★ MyBatis 사용시 객체 지향적인 Java와 데이터 지향적인 DB의 DB에 종속적인 패러다임의 불일치가 발생한다.

 

 

5. JPA 동작과정 및 장단점

EntityManger

EntityManager는 데이터베이스와 상호 작용하는 주요한 인터페이스로, 엔티티의 영속성과 일관성을 유지하며, 데이터베이스 작업을 단순화하고 효율적으로 처리하는 데 도움을 준다.

  1. Entity 관리
    • EntityManager를 사용하여 엔티티를 영속성 컨텍스트에 등록하고, 상태 변경을 추적한다.
    •  엔티티를 조회, 저장, 수정, 삭제할 수 있는 기능을 제공한다.
  2. Transaction 관리
    • 트랜잭션의 시작과 종료를 관리하고, 트랜잭션 내에서 엔티티 작업을 수행한다.
    •  트랜잭션을 커밋 또는 롤백하여 데이터베이스의 일관성을 보장한다.
  3. 영속성 컨텍스트 관리

영속성 컨텍스트

영속성 컨텐스트란 엔티티를 영구 저장하는 환경이라는 뜻이다. 애플리케이션과 데이터베이스 사이에서 객체를 보관하는 가상의 데이터베이스 같은 역할을 한다. 엔티티 매니저를 통해 엔티티를 저장하거나 조회하면 엔티티 매니저는 영속성 컨텍스트에 엔티티를 보관하고 관리한다.

영속성 컨텍스트는 내부적으로 1차 캐시영역과 쓰기지연 SQL 저장소 영역이 있다.

영속성 컨텍스트 조회

  • 1차 캐시를 조회하여, 1차 캐시에 저장된 데이터가 있을경우 그대로 조회
  • 1차 캐시를 조회하여, 1차 캐시에 저장된 데이터가 없을경우 DB 조회 후, 1차 캐시에 저장한다.

엔티티 등록 - 쓰기 지연

엔티티 매니저는 데이터 변경 시 반드시 트랜잭션을 시작해야 한다.

EntityManager em = EntityManagerFactory.createEntityManager();
EntityTransaction tx = em.getTransaction(); // 트랜잭션

// 트랜잭션 시작
tx.begin();

// 비영속
Member member = new Member();
member.setId("member1");
member.setUsername("홍길동");

// 영속
em.persist(member);

// 엔티티 등록
tx.commit();
  • em.persist(member); : member 엔티티를 영속 컨텍스트에 저장하지만, 데이터베이스에는 반영되지 않는다.
  • tx.commit(); : 트랜잭션을 커밋하는 순간 데이터베이스에 INSERT SQL을 보내 저장하게 된다.
  • em.persist()를 실행할 때, 영속 컨텍스트의 1차 캐시에는 member 엔티티가 저장되고, 쓰기 지연 SQL 저장소에는 member 엔티티의 INSERT SQL 쿼리문이 저장된다.
  • tx.commit()을 실행하는 순간 쓰기 지연 SQL 저장소에 저장된 INSERT SQL 쿼리를 보내 데이터베이스에 저장하는 것이다.

엔티티 수정 - 변경 감지

EntityManager em = EntityManagerFactory.createEntityManager();
EntityTransaction tx = em.getTransaction(); // 트랜잭션

// 트랜잭션 시작
tx.begin();

// member 조회
Member member = em.find(Member.class, "member");
member.setUsername("hello");
member.setAge("20");

// 엔티티 등록
tx.commit();
  • 영속 컨텍스트의 1차 캐시에는 member의 초기 데이터가 저장되어 있을 것이다.
  • 이후 set 메서드를 통해 데이터를 변경한다.
  • 트랜잭션 커밋 시 flush()가 발생하면서 1차 캐시에서 엔티티와 스냅샷을 비교하여 변경에 대한 감지를 한다.
  • 이후 SQL UPDATE 쿼리를 생성하여 쓰기 지연 SQL 저장소에서 쿼리를 보낸다.

엔티티 삭제

Member member = em.find(Member.class, "member");

// 엔티티 삭제
em.remove(member);
  • 엔티티 삭제는 remove() 메서드를 통해 데이터를 삭제할 수 있다.
  • 영속성 컨텍스트와 데이터베이스에서 모두 제거된다.

엔티티 생명주기

  • 비영속(new/transient): 영속성 컨텍스트와 전혀 관계가 없는 상태
  • 영속(managed): 영속성 컨텍스트에 저장된 상태
  • 준영속(detached): 영속성 컨텍스트에 저장되었다가 분리된 상태
  • 삭제(removed): 삭제된 상태

장단점

  • 장점
    1. 객체 지향적 프로그래밍 설계가 가능하다.
    2. 기본적인 SQL문법이 지원되기 때문에 생산성이 향상된다.
    3. 데이터베이스 방언이 지원되기 때문에 특정 데이터베이스에 종속되지 않는다.
  • 단점
    • QueryDsl을 사용하여도 MyBatis에 비해 복잡한 쿼리문이 구현하기 힘들다.
    • 러닝커브가 높다.

 

JPA의 경우 개발 생산성이 향상되는 장점은 있으나, 복잡한 쿼리문(통계)의 경우 myBatis 가 직관적이고 효율적이라고 생각하여

프로젝트의 특성에 맞게 사용하는것을 추천한다.

 

 

 

 

JPA

JPA는 Java Persistence API의 약자로, '자바 애플리케이션에서 관계형 데이터베이스를 사용하는 방식을 정의한 인터페이스'이다.

자바 ORM(Object Relational Mapping) 기술에 대한 API 표준 명세이다. 나는 지금껏 '라이브러리' 정도로 생각하며, 사용하였다.

JPA는 단순한 인터페이스 이기 때문에 구현체가 없다. 이런 JPA의 구현체는 아래그림과 같다.

 

JPA를 사용하기 위해서는 JPA를 구현한 Hibernate, DataNucleus, EclipseLink 같은 ORM 프레임워크를 사용해야한다.

그중 Hibernate가 가장 범용적으로 다양한 기능을 제공한다고 한다.

3가지 ORM 프레임워크는 다음에 찾아보자.

 

Hibernate

Hibernate는 JPA 구현체 중 하나이며, 다음은 JPA와 Hibernate의 상속 및 구현 관계이다. 

JPA의 핵심인 EntityManagerFactory, EntityManager, EntityTransaction 인터페이스를 Hibernate는 각각 SessionFactory, Session, Transaction으로 상속받고 Impl로 구현하였다.

Hibernate는 JPA의 구현체이기때문에 다른 ORM프레임워크를 사용해도 되고, JPA는 인터페이스이기 때문에 본인이 구현하여 사용 할 수도 있다.

Hibernate 장점

  1. ORM 방식 사용
    • Hibernate은 자바 객체와 관계형 데이터베이스 간의 매핑을 간단하게 처리한다.
    • 개발자는 SQL 쿼리를 직접 작성하지않고도 객체를 데이터베이스에 저장 및 검색 등이 가능하다.
  2. 성능 최적화
    • Hibernate은 지연 로딩(Lazy loading) 및 캐시 기능을 지원하여 애플리케이션의 성능을 향상시킬 수 있다.
    • 지연 로딩은 필요한 시점에만 데이터를 로드하여 데이터베이스 액세스를 줄여준다.
  3. 트랜잭션 관리
    • Hibernate은 ACID (원자성, 일관성, 고립성, 지속성) 트랜잭션을 지원한다.
    • 트랜잭션 관리를 쉽게 할 수 있으며, 롤백과 커밋 기능을 제공한다.

Hibernate 단점

  1. 성능
    • 물론 SQL을 직접 작성하는 것보다 메서드 호출만으로 쿼리를 수행한다는 것은 성능이 떨어질 수 있다.
    • 실제로 초기의 ORM은 쿼리가 제대로 수행되지 않았고, 성능도 좋지 못했다한다.
    • 그러나 지금은 많이 발전하여, 좋은 성능을 보여주고 있고 계속 발전하고 있다.
  2. ​세밀함
    • 메서드 호출로 DB 데이터를 조작 하기 때문에 세밀함이 떨어진다.
    • 복잡한 통계 분석 쿼리를 메서드만으로 해결하는 것은 힘든 일이다. 이것을 보완하기 위해 SQL과 유사한 기술인 JPQL을 지원한다.
  3. ​러닝커브
    • JPA를 잘 사용하기 위해서는 알아야 할 것이 많습니다. 즉 러닝커브가 높습니다.

Spring Data JPA

Spring Data JPA는 Spring에서 제공하는 모듈 중 하나로 JPA를 쉽고 편하게 사용하도록 도와준다.

기존 JPA의 경우 EntityManager를 주입 받아 사용해야하지만, Spring Data JPA는 JPA를 한단계 더 추상화하킨 Repository 인터페이스를 제공한다.

 

Repository 구현 규칙

Method

 method  기능
 save()  레코드 저장 (insert, update)
 findOne()  primary key로 레코드 한건 찾기
 findAll()  전체 레코드 불러오기. 정렬(sort), 페이징(pageable) 가능
 count()  레코드 갯수
 delete()  레코드 삭제

Keyword

메서드 이름 키워드  샘플  설명
 And  findByEmailAndUserId(String email, String userId)  여러필드를 and 로 검색
 Or  findByEmailOrUserId(String email, String userId)  여러필드를 or 로 검색
 Between  findByCreatedAtBetween(Date fromDate, Date toDate)  필드의 두 값 사이에 있는 항목 검색
 LessThan  findByAgeGraterThanEqual(int age)  작은 항목 검색
 GreaterThanEqual  findByAgeGraterThanEqual(int age)  크거나 같은 항목 검색
 Like  findByNameLike(String name)  like 검색
 IsNull  findByJobIsNull()  null 인 항목 검색
 In  findByJob(String … jobs)  여러 값중에 하나인 항목 검색
 OrderBy  findByEmailOrderByNameAsc(String email)  검색 결과를 정렬하여 전달

 

 

Hibernate와 Spring Data JPA 차이점

Spring Data JPA의 경우 DB에 접근하는 상황에서는  Repository를 정의하여 사용한다. 개발자가 Repository 인터페이스에 정해진 규칙대로 메소드를 입력하면 Spring이 메소드 이름에 적합한 쿼리를 만든다. 또한 Hibernate의 경우 EntityManager를 통해 관리한다. 하지만 Spring Data JPA의 경우 EntityManger를 직접 다루지않고, SimpleJpaRepository의 내부적으로 EntityManager를 사용하고 있다. 

 

Spring Data JPA 공식문서

 

시작하며

현재 사용하고 있는방법을 기록해두고 더 좋은 코드를 찾을시 비교하기위한 기록용이다.

 

ex)

 @RequestPart(required = false, value = "img_file") MultipartFile imgFile
            , @Valid @RequestPart(value = "data") InsertRequest insertRequest
            , BindingResult bindingResult

1차. 입력값의 유무

if (insertRequest == null){
            throw  new RootException(ApiStatusCode.BAD_REQUEST, "잘못된 정보입니다.");
        }

2차. 입력값의 이상유무

       BidingResult를 사용하여 최초의 Request값에 대한 검증을 한다.

 

Request객체의 검증 어노테이션을 활용한다.

@NotNull, @NotBlank, @NotEmpty, @Size, @Min, @Max 등 

 if (bindingResult.hasErrors()) {
            log.info("BindingResult hasErrors");
            throw new RootException(ApiStatusCode.BAD_REQUEST,  "잘못된 정보입니다.");
        }

 

3차. Service 계층 세부 내용검증

 if (insertRequest.getStartDt().compareTo(insertRequest.getEndDt()) >= 1) {
            //0이면같음, 음수면 정상
            throw new RootException(ApiStatusCode.BAD_REQUEST, "노출 시작 일자가 종료 일자 보다 늦을 수 없습니다.");
        }

 

+ Recent posts