회사에서 JDK 17 버전에서 21로 변경하며 Virtual Thread를 도입하기 위해 학습한 내용을 기록하였습니다.

Virtual Thread 개념 정리

Java 19에서 프리뷰로 도입되고 Java 21에서 정식 출시된 Virtual Thread(가상 스레드)에 대해 학습하였습니다. 기존 Platform Thread와 비교하면 상당한 차이가 있었습니다.

 

기존 Platform Thread의 한계점

  • 스레드당 약 2MB의 메모리 사용
  • 생성 및 삭제 비용이 높음
  • 일반적으로 수십~수백 개 정도만 생성 가능

Virtual Thread의 개선점

  • 스레드당 약 10KB로 매우 가벼움
  • 수십만~수백만 개까지 생성 가능
  • I/O 대기 시간을 효율적으로 활용

동작 원리 학습

Virtual Thread는 Platform Thread 위에서 동작하며, 핵심은 Mount/Unmount 메커니즘입니다.

동작 흐름:
1. Virtual Thread A가 Platform Thread 1에서 실행 (Mount 상태)
2. DB 조회 등 I/O 작업 시작
3. Virtual Thread A가 Platform Thread에서 분리 (Unmount)
4. Platform Thread 1이 다른 Virtual Thread B를 실행
5. I/O 완료 시 Virtual Thread A가 다시 Platform Thread에 연결되어 실행 재개

핵심 용어

  • 마운트(Mount): Virtual Thread가 Platform Thread에서 실행되는 상태
  • 언마운트(Unmount): I/O 블로킹 시 Platform Thread에서 분리되어 대기하는 상태
  • Carrier Thread: Virtual Thread를 실제로 실행하는 Platform Thread

I/O 바운드 작업에서의 효과

Virtual Thread는 I/O 집약적인 작업에서 뛰어난 성능을 보입니다.

public void processUserData() {
    // 1. DB 사용자 조회 - I/O 대기 시 언마운트 발생
    User user = userRepository.findById(userId);
    
    // 2. 외부 API 호출 - I/O 대기 시 언마운트 발생
    ApiResponse response = externalApiClient.call(user.getId());
    
    // 3. 파일 저장 - I/O 대기 시 언마운트 발생
    fileService.saveUserData(user, response);
}

위와 같은 코드에서는 각 I/O 작업마다 Virtual Thread가 언마운트되어 Platform Thread가 다른 Virtual Thread를 실행할 수 있게 됩니다.

참고로 Thread.sleep()도 Virtual Thread에서는 언마운트됩니다.

주의해야 할 제약사항

synchronized 블록 사용 시 문제점

학습 중 가장 중요하다고 판단한 부분입니다.

// Virtual Thread 효과가 제한되는 경우
public void inefficientMethod() {
    synchronized (lock) {
        // Virtual Thread가 Platform Thread를 계속 점유
        // 언마운트되지 않아 비효율적
        expensiveOperation();
    }
}

// Virtual Thread에 적합한 방식
private final ReentrantLock lock = new ReentrantLock();

public void efficientMethod() {
    lock.lock();
    try {
        // Virtual Thread 언마운트 가능
        expensiveOperation();
    } finally {
        lock.unlock();
    }
}

synchronized는 JVM 모니터 락으로 OS 레벨에서 스레드를 블로킹하기 때문에 언마운트가 불가능합니다.

CPU 바운드 작업의 한계

public void cpuIntensiveTask() {
    for (int i = 0; i < 1000000; i++) {
        // CPU 연산 중심 작업에서는 언마운트 포인트가 없음
        Math.sin(i) * Math.cos(i);
    }
}

CPU 집약적인 작업은 Virtual Thread를 많이 생성해도 CPU 코어 수의 제약을 받습니다.

Spring Boot 환경에서의 적용

설정은 매우 간단합니다.

# application.yml
spring:
  threads:
    virtual:
      enabled: true

이 설정만으로 Spring의 요청 처리 스레드가 Virtual Thread로 변경됩니다.

실제 적용 예시

@RestController
public class UserController {
    
    @GetMapping("/users/{id}")
    public UserDetailResponse getUserDetail(@PathVariable Long id) {
        // 전체 메서드가 Virtual Thread에서 실행
        
        // 사용자 정보 조회 - 언마운트 발생
        User user = userService.findById(id);
        
        // 권한 정보 조회 - 언마운트 발생
        List<Permission> permissions = permissionService.findByUserId(id);
        
        // 외부 프로필 이미지 조회 - 언마운트 발생
        String profileImageUrl = profileService.getImageUrl(user.getEmail());
        
        return UserDetailResponse.of(user, permissions, profileImageUrl);
    }
}

CPU 작업 혼합 시 최적화 방안

보고서 생성과 같이 CPU 계산이 포함된 경우, CPU 작업만 별도 Thread Pool로 분리하는 것이 효과적입니다.

@Configuration
@EnableAsync
public class ThreadPoolConfig {
    
    @Bean("cpuExecutor")
    public ExecutorService cpuExecutor() {
        return Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
        );
    }
}

@Service
public class ReportService {
    
    @Autowired
    @Qualifier("cpuExecutor")
    private ExecutorService cpuExecutor;
    
    public CompletableFuture<ReportResponse> generateReport(ReportRequest request) {
        return CompletableFuture
            // I/O 작업: 데이터 수집 (Virtual Thread)
            .supplyAsync(() -> dataService.collectData(request))
            
            // CPU 작업: 복잡한 계산 (Platform Thread Pool)
            .thenApplyAsync(data -> processCalculation(data), cpuExecutor)
            
            // I/O 작업: 결과 저장 (Virtual Thread)
            .thenApply(result -> dataService.save(result));
    }
}

도입 시 고려사항

DB Connection Pool 용량 검토

Virtual Thread로 동시 처리량이 증가하면 DB 커넥션 부족 현상이 발생할 수 있습니다. 기존 스레드 풀 크기에 맞춰 설정된 커넥션 풀을 재검토해야 합니다.

외부 API Rate Limiting 대비

동시 호출량 급증으로 외부 서비스에서 Rate Limit에 걸릴 가능성이 있습니다. Circuit Breaker 패턴이나 Rate Limiter 도입을 검토해야 합니다.

로그 시스템 영향도 분석

처리량 증가에 따른 로그 급증으로 다음과 같은 문제가 발생할 수 있습니다:

  • 로그 파일 크기 급증
  • 로그 I/O 병목 현상
  • 디스크 용량 부족

메모리 사용 패턴 변화

  • Direct Memory 사용량 증가 가능성
  • Young Generation GC 빈도 변화
  • 기존 JVM 튜닝 파라미터 재검토 필요

도입 적합성 판단 기준

적극 도입 권장 환경

  • 웹 서버, REST API 서비스
  • 마이크로서비스 아키텍처
  • I/O 집약적 배치 처리
  • 실시간 통신 서버

신중한 검토 필요 환경

  • CPU 집약적 애플리케이션
  • synchronized 사용이 많은 레거시 코드
  • 엄격한 성능 요구사항이 있는 시스템

도입 전 체크포인트

  • 현재 병목 지점이 I/O 중심인지 확인
  • DB Connection Pool 여유분 검토
  • 외부 API 의존성 및 제약사항 파악
  • 기존 코드의 synchronized 사용량 분석
  • 모니터링 체계의 Virtual Thread 대응 가능성 확인

학습 결론

Virtual Thread는 I/O 바운드 작업이 많은 웹 애플리케이션에서 상당한 성능 향상을 기대할 수 있는 기술입니다.

특히 설정이 간단하고 기존 코드 변경이 최소화되어 도입 부담이 적습니다.

다만 synchronized 블록의 제약사항이나 의존 시스템에 미치는 영향을 충분히 검토해야 합니다.

시작하며

Node.js 서버에서 금전적인 행위를 처리하는 배치 작업이 종료(Shutdown)될 때 안전하게 종료되지 않아 특정 유저의 출금이 두 번 발생하는 문제가 종종 발생했다. 원인을 분석해보니, Shutdown 시점에 출금 처리는 되었으나 데이터 처리까지 완료되지 않아, 다음 배치 작업에서 중복 출금이 발생하는 문제였다. 이를 해결하기 위해 Graceful Shutdown을 도입하는 과정에서 배운 점들을 기록하였다.

Graceful Shutdown이란?

직역하면 우아한 종료로, 애플리케이션이 완전히 종료되기 전에 진행 중인 모든 작업을 마무리하고, 리소스를 해제하며 데이터의 무결성을 유지한 상태로 안전하게 종료되는 과정을 의미한다.

Node-schedule 사용

Node.js에서 스케줄업무를 하는 라이브러리는 크게 Node-schedule 과 Node-cron이 있다. 현재 프로젝트에서는 Node-schedule 버전 1.3.3 을 사용중이다. 

 

※ 버전 확인 명령어

npm list node-schedule

 

만약 node-schedule의 버전을 2.x대 를 사용중이라면 기본 제공하는 gracefulShutdown() 메서드를 사용하여 안전한 종료가 가능하다. 하지만 1.x 대 버전을 사용한다면 gracefulShutdown() 메서드를 지원하지 않기때문에 메서드를 만들어 사용할수 있다.

gracefulShutdown 구현

Graceful Shutdown을 위해 작업 관리 리스트를 만들어 scheduledJobs와 runningJobs 배열에 작업을 추가, 삭제하는 구조로 관리한다.

1. 스케줄 관리 함수

const scheduledJobs = [];
let runningJobs = [];

const manageGracefulShutdownJob = (interval, jobFunction) => {
    let job = Schedule.scheduleJob(interval, async () => {
        const promise = jobFunction();
        runningJobs.push(promise);
        try {
            await promise;
        } finally {
            runningJobs = runningJobs.filter(job => job !== promise);
        }
    });
    scheduledJobs.push(job);
};

 

manageGracefulShutdownJob이 실행시 scheduleJobs 변수에 push 하여 관리대상으로 등록하고, 실행시 promise 객체를 반환하여 runningJobs에 push한다. 이후 await으로 해당 실행이 종료된 이후 runningJobs에서 해제를 시켜줌으로서, 현재실행중이 스케줄 잡을 관리한다.

manageGracefulShutdownJob('실행 주기', 메서드명);

2. Graceful Shutdown 적용

SIGINT 신호가 발생하면 모든 스케줄 작업을 취소하고, 실행 중인 작업이 완료될 때까지 대기한다.

process.on('SIGINT', async () => {
    console.log('Graceful shutdown in progress start');
    for (const job of scheduledJobs) {
        job.cancel();
    }
    console.log('All scheduled jobs cancelled');

    await Promise.all(runningJobs);

    console.log('All running jobs completed');
    process.exit(0);
});

 

위 코드를 통해 Graceful Shutdown을 구현할 수 있다. 그러나 개발 서버에서 테스트 시 SIGINT 신호가 발생하면 즉시 종료되는 현상을 확인했다. 이는 PM2의 kill_timeout 기본 설정이 1.6초로, 해당 시간동안 종료되지 않으면 SIGKILL로 변환되어 강제 종료되기 때문이다. kill_timeout을 1시간으로 늘려 테스트한 결과, 정상적으로 Graceful Shutdown이 작동했다.

참고 링크

https://github.com/node-schedule/node-schedule?tab=readme-ov-file#graceful-shutdown

https://pm2.keymetrics.io/docs/usage/signals-clean-restart/

시작하며

Readable Code: 읽기 좋은 코드를 작성하는 사고법 강의를 보고 정리한내용으로,

회사의 서비스 개선을 준비하는 단계에서 우리팀에 적용해보면 좋겠다. 라는 생각이 들어 정리한 내용입니다.

 

이름짓기

1. 단수와 복수를 구분하기

  • ~(e)s를 붙여 변수, 클래스 등 단수인지, 복수인지 구분

2. 이름줄이지 않기

  • 관용어처럼 많은 사람들이 자주사용하는 줄임말 정도만 사용하고, 무분별한 줄임말은 자제 또한, 줄임말이 이해될 수 있는 것은 문맥때문이기에 문맥을 잘 활용
  • ex) column -> col, latitude -> lat, longitude -> lon

 

3. 은어 / 방언 사용하지 않기

  • 일부 팀원 / 현재의 우리팀만 아는 용어금지
    • 새로운 사람이 팀에 합류했을때 용어를 이해하기 힘들다.
  • 도메인 용어 사용하기
    • 팀단위의 도메인 용어를 먼저 정의하는 과정이 필요하다.

4. 좋은 코드를 보고 습득하기

  • 비슷한 상황에서 자주사용하는 단어, 개념 습득하기

메서드와 추상화

하나의 메서드는 하나의 주제만을 가져야한다.

예시로 메서드의 이름은 추상적이며, 내용은 구체화하여 작성하여야한다.

왼쪽은 좋은예시로 메서드 이름은 추상적이며 내용은 구체적이지만, 반면 오른쪽은 하나의 메서드내에 여러개의 구체적인 내용이 포함되어

내용을 유추하기가 힘들다.

만약 이럴경우 여러개의 메서드로 분리하여 메서드당 하나의책임으로 추상화할 수 있다.

매직 넘버, 매직 스트링

  • 의미를 갖고 있으나, 상수로 추출되지 않은 숫자, 문자열 등
  • 상수 추출로 이름을 짓고 의미를 부여함으로써 가독성, 유지보수성이 향상된다.

사고의 Depth 줄이기: 중첩 분기문, 중첩 반복문

  • 보이는 Depth를 줄이는데에 급급한것이 아니라, 추상화를 통한 사고 과정의 Depth를 줄이는것이 중요
  • 2중 중첩구조로 표현하는것이 사고하는데에 더 도움이 된다고 판단한다면, 메서드 분리보다 그대로 놔두는것이 더나은 선택일 수 있다.
  • 때로는 메서드를 분리하는것이 더 혼선을 줄 수도 있다.

 

먼저 직렬화와 역직렬화란 무엇인가? 

직렬화 : 메모리를 바이트 스트림으로 변환하는 과정으로 이를통해 네트워크 통신이 가능해진다.

역직렬화 : 네트워크 통신으로 받은 데이터를 메모리에 쓸수 있는 형태로 변환하는것이다.

Spring의 직렬화 및 역직렬화

일반적으로 Spring에서는 직렬화 및 역직렬화시 Java 객체 ↔ JSON 형태로 변환한다.

Spring-web 라이브러리에는 HttpMessageConverter를 상속받는 다양한 Converter가 존재한다.

 

유형에 따라 Converter가 사용되며, RestApi 유형의 경우에는 MappingJackson2HttpMessageConverter 가 사용된다.

 

여러 Converter 중 적합한 Convert를 판단하는 방법은?

public interface HttpMessageConverter<T> {
    boolean canRead(Class<?> clazz, @Nullable MediaType mediaType);

    boolean canWrite(Class<?> clazz, @Nullable MediaType mediaType);

    List<MediaType> getSupportedMediaTypes();

    default List<MediaType> getSupportedMediaTypes(Class<?> clazz) {
        return !this.canRead(clazz, (MediaType)null) && !this.canWrite(clazz, (MediaType)null) ? Collections.emptyList() : this.getSupportedMediaTypes();
    }

    T read(Class<? extends T> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException;

    void write(T t, @Nullable MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException;
}

HttpMessageConverter 의 구현체중 여러 List를 순회하며,canRead(), canWrite() 메서드의 true값이 반환될경우 

해당 Convert를 적합하다 판단하여 사용한다.

 

 

Controller의 @ResponseBody, @RequestBody의 경우에는 MappingJackson2HttpMessageConverte가 실행되고, 내부적으로 ObjectMapper를 사용하여 타입에 맞게 직렬화 및 역직렬화가 실행된다.

 

 

시작하며

인프런의 모든강의가 30% 할인 하길래 동시성관련 강의를 구매 후 공부한 내용을 기록하였다. [강의]

동시성 문제란?

동시성 문제란 여러 쓰레드들이 공유 자원에 대한 경쟁을 벌여 의도하지 않은 결과를 말한다.

강의는 상품의 재고에대한 동시성문제를 다루는 내용이다.

 

재고 감소로직의 순서로는

1) 재고 감소 로직은 해당 ID값을 통해 엔티티를 조회

@Transactional
    public void decrease(Long id, Long quantity) {
        Stock stock = stockRepository.findById(id).orElseThrow();
        stock.decrease(quantity);
        stockRepository.saveAndFlush(stock);
    }

2) 요청 재고가 기존 재고보다 많지 않다면, 재고를 감소하는 로직이다.

public void decrease(Long quantity) {
    if (this.quantity - quantity < 0) {
        throw new RuntimeException("재고는 0개 미만이 될 수 없습니다.");
    }
    this.quantity -= quantity;
}

만약 1번이라는 상품에 재고가 100개 있을경우 1개를 감소시키면 재고가 99개가 될것이다.

@Test
@DisplayName("재고의 수량만큼 재고가 감소 테스트")
void decrease() {
    // given
    stockService.decrease(1L, 1L);
    // when
    Stock stock = stockRepository.findById(1L).orElseThrow();
    // then
    assertEquals(stock.getQuantity(), 99);
}

하지만 여러 쓰레드가 재고를 감소한다면 결과는 달라진다.

@Test
@DisplayName("재고의 수량 감소 - 동시성 문제")
void concurrent_decrease() throws InterruptedException {
    // given
    int threadCount = 100;
    ExecutorService executorService = Executors.newFixedThreadPool(32);
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {
        executorService.submit(() -> {
            try {
                stockService.decrease(1L, 1L);
            } finally {
                countDownLatch.countDown();
            }
        });
    }

    countDownLatch.await();

    // when
    Stock stock = stockRepository.findById(1L).orElseThrow();

    // then
    assertEquals(0, stock.getQuantity());
}

100 요청했으니, 재고가 0이 되기를 기대하지만 테스트는 실패한다.

이유는 스레드 1번이 재고를 조회 후 재고 감소하기전 다른 스레드 2번이 재고를 조회하기에 결과값이 다르게 발생한다.

해결 방법으로는 공유자원에 하나의 스레드만 접근하도록 하면된다!

Synchronized

synchronized를 메소드에 명시해주면 하나의 스레만 접근이 가능하다.

공유되는 자원의 Thread-safe를 하기 위해, synchronized로 스레드간 동기화를 시켜 Thread-safe하게 만들어 준다.

 

여기서 알게된 사실로는 아래처럼 synchronized를 사용한다해도 해결되지않는다.

@Transactional
public synchronized void decrease(Long id, Long quantity) {
    Stock stock = stockRepository.findById(id).orElseThrow();
    stock.decrease(quantity);
    stockRepository.saveAndFlush(stock);
}

이유는 아래와 같이 lock을 반납한다하더라도, Transaction이 종료되지 않았기 때문에 DB에 반영되지 않았기 때문이다.

Transaction 시작
	lock 획득
	비즈니스 로직 수행
	lock 반납
Transaction 종료

@Transactional 어노테이션을 주석달고 실행한다면 테스트가 통과한다.

//    @Transactional
    public synchronized void decrease(Long id, Long quantity) {
        Stock stock = stockRepository.findById(id).orElseThrow();
        stock.decrease(quantity);
        stockRepository.saveAndFlush(stock);
    }

 

synchronized의 문제점

자바의 synchronized는 하나의 프로세스 내에서만 보장이 된다.

즉 서버가 1대일 경우는 문제없지만, 서버가 2대 이상일경우 Thread-safe하지않다.

DB Lock

Pessimistic Lock(비관적 락)

비관적 락은 충돌이 발생할 확률이 높다고 가정하여, 하나의 트랜잭션이 자원에 접근시 락을 걸고, 다른 트랜잭션이 접근하지 못하게 한다.

DB에서 Shared Lock(공유, 읽기 잠금) 이나 Exclusive Lock(베타, 쓰기 잠금)을 건다.

Shared Lock은 다른 트랜잭션에서 읽기만 가능하고 Exclusive Lock는 다른 트랜잭션에서 읽기, 쓰기가 둘다 불가능하다.

 

장점 : 충돌이 자주 발생하는 환경에 대해서는 롤백 횟수를 줄일 수 있으므로, 성능적 이점이 있다.

단점 : 동시 처리 성능  저하 및 교착 상태(DeadLock) 발생 가능성이 있다.

@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithPessimisticLock(Long id);

Optimistic Lock(낙관적 락)

낙관적 락은 데이터에 락을 걸지않고, 동시성 문제가 발생하면 발생한 시점에 버전을 이용하여 관리한다.

 

장점 : 충돌이 안난다는 가정하에, 동시 요청에 대해서 처리 성능이 좋다.

단점 : 충돌이 자주발생할 경우 롤백처리에 비용이 많이들며, 구현이 복잡하다.

 

Entity에@Version 추가

@Entity
@NoArgsConstructor
@Getter
@ToString
public class Stock {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long productId;

    private Long quantity;
    @Version
    private Long version;

    public Stock(Long productId, Long quantity) {
        this.productId = productId;
        this.quantity = quantity;
    }

    public void decrease(Long quantity) {
        if (this.quantity - quantity < 0) {
            throw new RuntimeException("재고는 0개 미만이 될 수 없습니다.");
        }
        this.quantity -= quantity;
    }

}
@Component
@RequiredArgsConstructor
@Slf4j
public class OptimisticLockStockFacade {

    private final OptimisticLockStockService optimisticLockStockService;

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (true) {
            try {
                optimisticLockStockService.decrease(id, quantity);
                break;
            } catch (Exception e) {
                log.info("=== Thread Sleep ===");
                Thread.sleep(50);
            }
        }

    }
}
@Lock(LockModeType.OPTIMISTIC)
@Query("select s from Stock s where s.id = :id")
Stock findByIdWithOptimisticLock(Long id);

Redis

Lettuce

Lettuce는 Netty기반의 Redis Client이며, 요청을 논블로킹으로 처리하여 높은 성능을 가진다.

@Component
@RequiredArgsConstructor
public class RedisLockRepository {

    private final RedisTemplate<String, String> redisTemplate;

    public Boolean lock(Long key) {
        return redisTemplate
                .opsForValue()
                .setIfAbsent(generateKey(key),"lock");
    }

    public Boolean unlock(Long key) {
        return redisTemplate.delete(generateKey(key));
    }


    private String generateKey(Long key) {
        return key.toString();
    }
}

lock() 메서드 내부에서 사용하는 setIfAbsent()를 통해 setnx를 사용하여, 값이 없다면 값을 set한다.

 

이어서 재고감소 로직이다.

1. SprinLock 방식으로 락을 얻기 위해 시도하고,

2. 락을 얻는다면 재고 감소 로직을 실행하고,

3. 학을 해제해주는 방식

@Component
@RequiredArgsConstructor
public class LettuceLockFacade {

    private final RedisLockRepository lockRepository;
    private final StockService stockService;

    public void decrease(Long id, Long quantity) throws InterruptedException {
        while (!lockRepository.lock(id)) {
            Thread.sleep(100);
        }
        try {
            stockService.decrease(id,quantity);
        } finally {
            lockRepository.unlock(id);
        }
    }
}

 

장점 : 별도의 설정없이 간단히 구현가능하다.

단점 : Sprin Lock 방식이, Lock을 얻을때까지 Lock을 얻기 위해 시도하기 때문에, Redis에 부하를 준다.

Redisson

Redis는 Pub/sub 기능을 제공해준다. Redisson에서는 Pub/Sub 기반의 분산락을 이미 구현하여 제공해주고있다.

 

의존성 추가

implementation 'org.redisson:redisson-spring-boot-starter:3.27.1'

 

@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonLockStockFacade {

    private final RedissonClient redissonClient;
    private final StockService stockService;

    public void decrease(Long id, Long quantity) {
        RLock lock = redissonClient.getLock(id.toString());

        try {
            boolean result = lock.tryLock(10, 1, TimeUnit.SECONDS);

            if (!result) {
                log.info("Lock 획득 실패");
                return;
            }
            stockService.decrease(id, quantity);

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}

 

장점 : Redisson에서 이미 구현되어 있기때문에, 별도의 구현 로직이 필요하지않다. 또한, Lettuce Sprin Lock에 비해 분산락으로 Redis에 부하를 덜 준다.

단점 : 별도의 의존성 주입이 필요하다.

시작하며

MMS 발송 모듈에서 히스토리 테이블의 Select 시 성능 저하로 커버링 인덱스를 적용하였다.

 

현재 상황

  • 테이블은 4개의 컬럼으로 구성되어있고, 4개의 컬럼이 복합 PK로 구성 및 파티션이 적용 되어 있다. 
  • 발송 배치시 Select 쿼리 성능 저하로 TPS가 맞춰지지 않았다.

원인

  • 복합 PK를 사용시 단점으로 Where 조건절이 복합키 컬럼이 모두 만족 되지 않는다면 인덱스가 적용이 되지않는다. 현재의 Where 조건절에는 하나의 컬럼만이 사용되었다. 
  • History 테이블로서 데이터가 지속적재 되기때문에 시간 경과에 따른 누적 성능 저하가 발생한다.

해결방안

  • Where 조건절에 해당하는 컬럼을 추가 적용하여, 인덱스가 적용되도록 추가하여, 기존 성능 2초에서 0.01초로 감소시켰다.

커버링 인덱스 적용 전
커버링 인덱스 적용 후

 

링크 : 인덱스 자료 정리

 

 

시작하며

Kafka에 대한 지식이 부족하다고 판단하여, Local 환경에서 Spring boot와 Kafka의 연동실습이며, 이론적인 내용은 추후에 작성할 예정이다.

 

Kafka 서버 구축하기

1. Apache Kafka 공식 홈페이지에서 Kafka 다운로드

  • 다운로드 후 압축을 해제하면, bin과 config 파일이 존재한다.
  • bin 디렉토리내에는 Kafka 관련 각종 실행 sh가 , config 디렉토리내에는 설정관련 파일이 위치한다.

 

2. Kafka Zookeeper 구동하기

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

  • Zookeeper는 기본적으로 2181 포트에서 구동한다.

3. Kafka Broker 구동하기

./bin/kafka-server-start.sh ./config/server.properties

  • Broker는 기본적으로 9092 포트에서 구동한다.

4. Topic 생성 및 확인하기

  • Topic 생성
./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1
  • Topic 삭제
./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092
  • Topic List 조회
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

  • Topic 정보 조회
./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092

 

Spring Boot Kafka 설정

1. build.gradle 및 yml 설정

	// kafka
    implementation 'org.springframework.kafka:spring-kafka'
spring:
  kafka:
    bootstrap-servers: localhost:9092

 

2. Config 설정

  • Producer 
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ProducerFactory<String, KafkaPubRequestDto> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> containerListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckTime(3000);
        factory.getContainerProperties().setPollTimeout(500);
        factory.getContainerProperties().setLogContainerConfig(true);
        factory.setConcurrency(1);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, KafkaSubRequestDto> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        JsonDeserializer<KafkaSubRequestDto> deserializer = new JsonDeserializer<>(KafkaSubRequestDto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                deserializer
        );
    }

}

 

3. Producer, Consume

  • Producer
@RequiredArgsConstructor
@Slf4j
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate;

    public void send(KafkaPubRequestDto kafkaPubRequestDto){
        log.info("> Kafka Producer Send Start [message] : {}", kafkaPubRequestDto);
        kafkaTemplate.send("test-topic-1", kafkaPubRequestDto);
        log.info("> Kafka Producer Send End [message] : {}", kafkaPubRequestDto);
    }

}
  • Consumer
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic-1", containerFactory = "containerListenerFactory", groupId = "test")
    private void testListener(@Payload KafkaSubRequestDto kafkaSubRequestDto, Acknowledgment ack, ConsumerRecordMetadata metadata) {
        log.info("> Kafka Consumer Read Start [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
        ack.acknowledge();
        log.info("> Kafka Consumer Read End [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
    }

}

 

4. Controller

@RestController
@RequestMapping("/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaPubController {

    private final KafkaProducer kafkaProducer;
    private final AtomicLong messageId = new AtomicLong(0);

    @PostMapping("/pub")
    void pubMessage(@RequestBody KafkaPubRequestDto kafkaPubRequestDto){
        LocalDateTime now = LocalDateTime.now();
        log.info("> Kafka Message Pub [time] :{}", now);
        kafkaPubRequestDto.setMessageId(messageId.getAndIncrement());
        kafkaProducer.send(kafkaPubRequestDto);
    }
}
  • 결과

 

끝으로

카프카 책을 구매 예정이며, 더 깊게 공부해볼 예정이며, 프로젝트에서 기회가된다면 대용량 데이터를 경험해보고 싶다.

시작하며

Error 또는 중요한 배치 시 알람을 받을 방법으로 회사에서 사용하는 Slack을 접목시켰다.

 

링크 : https://github.com/seungsoos/slack-demo

 

Slack 의존성 추가 및 yml 설정

	implementation("com.slack.api:bolt:1.18.0")
	implementation("com.slack.api:bolt-servlet:1.18.0")
	implementation("com.slack.api:bolt-jetty:1.18.0")

 

구글링을 통해 방법을 찾고, 해당 토큰을 입력한다.

 

코드내용은 아래와 같다.

@Slf4j
@Component
@RequiredArgsConstructor
public class SlackComponentImpl implements SlackComponent {

    /**
     * Slack의 경우 하나의 채널당 tps 1 사용
     */
    @Value("${slack.send.tps}")
    private Integer tps;

    @Value(value = "${slack.token}")
    private String slackToken;
    private MethodsClient methods = null;

    private int tpsSendCount = 0;

    /**
     * 순서 보장 및 데이터 유실 방지를 위한 Queue 사용
     */
    BlockingQueue<SlackInnerResponse> blockingQueue = new LinkedBlockingQueue();

    @PostConstruct
    private void init() {
        methods = Slack.getInstance().methods(slackToken);
    }

    @Async
    public void addQueue(String message, SlackChannel slackChannel){
        SlackInnerResponse slackInnerResponse = new SlackInnerResponse(message, slackChannel);
        blockingQueue.add(slackInnerResponse);

        startSendMessage();
    }

    private void startSendMessage() {
        SlackInnerResponse poll = blockingQueue.poll();
        if(Objects.isNull(poll)){
            return;
        }
        sendMessage(poll.message, poll.slackChannel);
    }

    @AllArgsConstructor
    static class SlackInnerResponse{
        private String message;
        private SlackChannel slackChannel;
    }


    /**
     * Slack 메시지 전송 메서드
     * 비동기 처리
     */

    private synchronized void sendMessage(String message, SlackChannel slackChannel) {
        log.info("sendSlackMessage message = {}, channel ={}", message, slackChannel.desc());

        for (int i = 0; i < 3; i++) {
            if (getTpsSendCount() >= tps) {
                log.info("thread Sleep");
                /**
                 * SlackApiException: status: 429 발생으로 TPS 조절 필요
                 * https://api.slack.com/docs/rate-limits
                 */
                sleep(1000);
                clearTpsSendCount();
            }
            tpsSendCountPlus();
            try {
                ChatPostMessageResponse chatPostMessageResponse = send(message, slackChannel.desc());
                boolean result = chatPostMessageResponse.isOk();
                if (result) {
                    return;
                }
            } catch (SlackApiException | IOException e) {
                log.error("SlackException = ", e);
                sleep(30000);
            } catch (Exception e) {
                log.error("Exception =", e);
                sleep(30000);
            }
        }
    }

    private ChatPostMessageResponse send(String message, String channel) throws IOException, SlackApiException {
        
        ChatPostMessageRequest request = ChatPostMessageRequest.builder()
                .channel(channel)
                .text(message)
                .build();
        return methods.chatPostMessage(request);
    }

    private void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            log.error("sleep error", e);
        }
    }

    private synchronized void tpsSendCountPlus() {
        log.info("> Plus");
        ++tpsSendCount;
    }
    private synchronized int getTpsSendCount() {
        log.info("> GET");
        return tpsSendCount;
    }

    private synchronized void clearTpsSendCount() {
        log.info("> Clear");
        tpsSendCount = 0;
    }

}

Slack의 채널을 여러 분기처리하기위해 Enum을 사용하였고, token 및 tps는 config로 관리하였다.

 

해당 개발테스트 중 알게된 사항으로 Slack은 채널당 tps를 1로 엄격히 규정하고있다. 여러 테스트 결과 과도한 트래픽이 초과될시 429 에러가 발생하며 해당 내용은 코드에 정리하였다. 또한, 과도한 트래픽이 발생시 데이터의 유실을 확인하여 Queue를 활용하여 데이터를 안전하게 제공하였다.

 

시작하며

서버로그중 가끔씩 ClientAbortException가 보인다.

 

Response body : {"code":"400","msg":"Insufficient parameters.","data":"I/O error while reading input message; nested exception is org.apache.catalina.connector.ClientAbortException: java.io.EOFException"}

 

발생원인 : 클라이언트가 연결을 끊었을때 발생하는것이 일반적이다.

서버가 클리이언트로 응답을 보내려하지만, 클라이언트의 연결이 이미끊어진경우 발생한다.

 

해당 에러는 클라이언트 단에서 발생하였다. 해당 에러는 Filter에서 Handling

설정을 하였다.

 

참고사이트 https://perfectacle.github.io/2022/03/20/client-abort-exception-deep-dive-part-01/

+ Recent posts