시작하며

Kafka에 대한 지식이 부족하다고 판단하여, Local 환경에서 Spring boot와 Kafka의 연동실습이며, 이론적인 내용은 추후에 작성할 예정이다.

 

Kafka 서버 구축하기

1. Apache Kafka 공식 홈페이지에서 Kafka 다운로드

  • 다운로드 후 압축을 해제하면, bin과 config 파일이 존재한다.
  • bin 디렉토리내에는 Kafka 관련 각종 실행 sh가 , config 디렉토리내에는 설정관련 파일이 위치한다.

 

2. Kafka Zookeeper 구동하기

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

  • Zookeeper는 기본적으로 2181 포트에서 구동한다.

3. Kafka Broker 구동하기

./bin/kafka-server-start.sh ./config/server.properties

  • Broker는 기본적으로 9092 포트에서 구동한다.

4. Topic 생성 및 확인하기

  • Topic 생성
./bin/kafka-topics.sh --create --topic [생성할 topic 이름] --bootstrap-server localhost:9092 --partitions 1
  • Topic 삭제
./bin/kafka-topics.sh --delete --topic [삭제할 topic 이름] --bootstrap-server localhost:9092
  • Topic List 조회
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

  • Topic 정보 조회
./bin/kafka-topics.sh --describe --topic [확인할 topic 이름] --bootstrap-server localhost:9092

 

Spring Boot Kafka 설정

1. build.gradle 및 yml 설정

	// kafka
    implementation 'org.springframework.kafka:spring-kafka'
spring:
  kafka:
    bootstrap-servers: localhost:9092

 

2. Config 설정

  • Producer 
@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ProducerFactory<String, KafkaPubRequestDto> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapSevers;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> containerListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaSubRequestDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckTime(3000);
        factory.getContainerProperties().setPollTimeout(500);
        factory.getContainerProperties().setLogContainerConfig(true);
        factory.setConcurrency(1);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, KafkaSubRequestDto> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapSevers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        JsonDeserializer<KafkaSubRequestDto> deserializer = new JsonDeserializer<>(KafkaSubRequestDto.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                deserializer
        );
    }

}

 

3. Producer, Consume

  • Producer
@RequiredArgsConstructor
@Slf4j
@Component
public class KafkaProducer {

    private final KafkaTemplate<String, KafkaPubRequestDto> kafkaTemplate;

    public void send(KafkaPubRequestDto kafkaPubRequestDto){
        log.info("> Kafka Producer Send Start [message] : {}", kafkaPubRequestDto);
        kafkaTemplate.send("test-topic-1", kafkaPubRequestDto);
        log.info("> Kafka Producer Send End [message] : {}", kafkaPubRequestDto);
    }

}
  • Consumer
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic-1", containerFactory = "containerListenerFactory", groupId = "test")
    private void testListener(@Payload KafkaSubRequestDto kafkaSubRequestDto, Acknowledgment ack, ConsumerRecordMetadata metadata) {
        log.info("> Kafka Consumer Read Start [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
        ack.acknowledge();
        log.info("> Kafka Consumer Read End [KafkaSubRequestDto] : {}", kafkaSubRequestDto);
    }

}

 

4. Controller

@RestController
@RequestMapping("/kafka")
@Slf4j
@RequiredArgsConstructor
public class KafkaPubController {

    private final KafkaProducer kafkaProducer;
    private final AtomicLong messageId = new AtomicLong(0);

    @PostMapping("/pub")
    void pubMessage(@RequestBody KafkaPubRequestDto kafkaPubRequestDto){
        LocalDateTime now = LocalDateTime.now();
        log.info("> Kafka Message Pub [time] :{}", now);
        kafkaPubRequestDto.setMessageId(messageId.getAndIncrement());
        kafkaProducer.send(kafkaPubRequestDto);
    }
}
  • 결과

 

끝으로

카프카 책을 구매 예정이며, 더 깊게 공부해볼 예정이며, 프로젝트에서 기회가된다면 대용량 데이터를 경험해보고 싶다.

시작하며

자바 성능 튜닝 이야기 [Chap 4] <어디에 담아야하는지...>를 읽고 정리 및 추가 개념을 정리하였다.

 

  • Collection : 가장 상위 인터페이스이다.
  • Set : 중복을 허용하지 않는 집합을 처리하기 위한 인터페이스이다.
  • SortedSet : 오름차순을 갖는 Set 인터페이스이다.
  • List : 순서가 있는 집합을 처리하기 위한 인터페이스 이기 때문에 인덱스가 있어 위치를 지정하여 값을 찾을수 있다. 중복을 허용하며 List 인터페이스를 상속받는 클래스 중에 가장 많이 사용하는 것으로 ArrayList가 있다.
  • Queue : 여러개의 객체를 처리하기 전에 담아서 처리할때 사용하기 위한 인터페이스이다. 기본적으로 FIFO 구조이다.
  • Map : Key, Value 형태로 구성된 객체의 집합을 처리하기 위한 인터페이스이다. 중복되는 키를 허용하지 않는다.
  • SortedMap : 키를 오름차순으로 정렬하는 Map 인터페이스이다.

 

Set

  • HashSet : 데이터를 해쉬 테이블에 담는 클래스로 순서없이 저장된다.
  • TreeSet : red-black이라는 트리에 데이터를 담는다. 값에 따라서 순서가 정해진다. 데이터를 담으면서 동시에 정렬하기때문에 성능상 느리다.
  • LinkedHashSet : 해쉬 테이블에 데이터를 담는데, 저장된 순서에 따라서 순서가 결정된다.

List

List는 배열의 확장판이라고 보면된다. 배열은 최초 선언시 담을 수 있는 데이터의 개수가 한정적이지만, List는 담을 수 있는 크기가 자동으로 증가되므로, 데이터의 개수를 확실히 모를때 사용된다.

  • Vector : 객체 생성시에 크기를 지정할 필요가 없는 배열 클래스이다.
  • ArrayList : Vector와 비슷하지만, 동기화 처리가 되어 있지 않다.
  • LinkedList : ArrayList와 동일하지만, Queue 인터페이스를 구현했기 때문에 FIFO 큐 작업을 수행한다.

Map

  • Hashtable : 데이터를 해쉬 테이블에 담는 클래스이다. 내부에서 관리하는 해쉬 테이블 객체가 동기화되어 있으므로, 동기화가 필요한 부분에서는 이클래스를 사용한다.
  • HashMap : 데이터를 해쉬 테이블에 담는 클래스이다. Hashtable 클래스와 다른점은 null 값을 허용한다는 것과 동기화되어 있지 않다는 것이다.
  • TreeMap : red-black 트리에 데이터를 담는다. TreeSet과 다른점은 키에 의해서 순서가 정해진다는 것이다.
  • LinkedHashMap : HashMap과 거의 동일하며, 이중 연결 리스트 방식을 사용하여 데이터를 담는다는 점만 다르다.

Queue

List도 순서가 있고, Queue도 순서가 있는데, 왜 굳이 Queue를 사용할까?

List의 가장 큰 담점은 데이터가 많은 경우 처리시간이 늘어나는점이다. 가장 앞에 있는 데이터(0번)를 지우면 그다음 1번 데이터부터 마지막 데이터까지 한칸씩 옮기는 작업을 수행해야 하므로, 데이터가 적을때는 상관없지만, 데이터가 많으면 많을수록 시간이 증가한다.

 

시작하며

자바 성능 튜닝 이야기 [Chap 3] <왜 자꾸 String을 쓰지 말라는거야>를 읽고 정리 및 추가 개념을 정리하였다.

 

String vs StringBuffer vs StringBuilder

		// 책참고 코드
	final String aValue = "abcde";
    for(int outLoop=0;outLoop<10;outLoop++) {
      String a = new String();
      StringBuffer b = new StringBuffer();
      StringBuilder c = new StringBuilder();
      for(int loop=0;loop<10000;loop++) {
        a+=aValue;
      }
      for(int loop=0;loop<10000;loop++) {
        b.append(aValue);
      }
      String temp = b.toString();
      for(int loop=0;loop<10000;loop++) {
        c.append(aValue);
      }
      String temp2 = c.toString();
    }

 

- 응답시간

주요 소스 부분 응답 시간(ms) 비고
a+=aValue; 95,801.41ms  95초
b.append(aValue);
String temp=b.toString(); 
247.48ms
14.21ms 
0.24초
c.append(aValue);
String temp2=b.toString();
174.17ms
13.38ms 
0.17초

 

- 메모리사용량

주요 소스 부분  메모리 사용량(bytes) 생성된 임시 객체 수 비고
a+=aValue;  100,102,000,000  4,000,000  약 95Gb
b.append(aValue);
String temp=b.toString 
29,493,600
10,004,000 1,200

200  약 28Mb
약 9.5Mb
c.append(aValue);
String temp2=b.toString() 
29,493,600
10,004,000 
1,200
200 
약 28Mb
약 9.5Mb

 

a += avalue; => a=a+aValue와 같다.

새로운 String 객체가 만들어지고, 이전에 있던객체는 GC의 대상이된다.

StringBuffer 클래스와 StringBuilder 클래스

StringBuffer, StringBuilder 클래스는 제공하는 메서드가 동일하다.

StringBuffer 클래스는 스레드에 안전하게(Thead Safe)설계 되어 있으므로, 여러개의 스레드에서 하나의 StringBuffer 객체를 처리해도 전혀 문제가 되지 않는다. 하지만, StringBuilder는 단일 스레드에서의 안정성만을 보장한다. 그렇기 때문에 여러개의 스레드에서 하나의 StringBuilder 객체를 처리하면 문제가 발생한다.

 

왼쪽이 StringBuffer의 append 메서드이며, synchronized가 선언되어있고, 오른쪽의 StringBuilder는 없다.

 

 

EC2에 프로젝트 Clone 받기

 

먼저 EC2에 Git 설치

sudo yum install git

 

설치가 완료되면 설치상태를 확인

git --version

 

깃이 성공적으로 설치되면, clone할 프로젝트 디렉토리를 생성한다.

 

Git repository에서 HTTPS주소를 복사한다.

 

복사후 EC2 서버에 git clone을 진행

git clone 복사한 주소

 

책과 다른점으로 git 에서 Password를 토큰 형식으로 사용하도록 변경되었기 때문에 Password에 토큰 값을 넣으면 된다.

Username for 'https://github.com': seungsoo
Password for 'https://seungsoo@github.com':

 

git clone이 끝난 후 repository이름의 디렉토리가 생성되고, 하위에 파일이 복사되어있다.

 

그리고 코드들이 잘 수행되는지 테스트로 검증한다.

./gradlew test

 

테스트 코드가 무한로딩 발생으로 링크를 참고하였다.

 

배포 스크립트 만들기

sh 파일생성

vim ~/app/step1/deploy.sh

 

deploy.sh 작성

#!/bin/bash

REPOSITORY=/home/ec2-user/app/step1
PROJECT_NAME=SpringBoot_Aws_Study 

cd $REPOSITORY/$PROJECT_NAME/

echo "> Git Pull"

git pull

echo "> 프로젝트 Build 시작"

./gradlew build

echo ">step1 디렉토리로 이동"

cd $REPOSITORY 

echo "> Build 파일복사"

cp $REPOSITORY/$PROJECT_NAME/build/libs/*.jar $REPOSITORY/

echo "> 현재 구동중인 애플리케이션 pid 확인"

CURRENT_PID=$(pgrep -f ${PROJECT_NAME}.*.jar)

echo "현재 구동중인 애플리케이션 pid 확인"

if [ -z "$CURRENT_PID" ]; then
        echo "> 현재 구동 중인 애플리케이션이 없으므로 종료하지 않습니다."
else
        echo "> kill -15 $CURRENT_PID"
        kill -15 $CURRENT_PID
        sleep 5
fi

echo "> 새 애플리케이션 배포"

JAR_NAME=$(ls -tr $REPOSITORY/ | grep jar | tail -n 1)

echo "> JAR Name: $JAR_NAME"

nohup java -jar $REPOSITORY/$JAR_NAME 2>&1 &

 

생성한 스크립트에 실행 권한 추가

chmod +x ./deploy.sh

 

스크립트 실행

./deploy.sh

 

실행 후 로그확인

vim nohup.out

 

로그내에 

No qualifying bean of type 'org.springframework.security.oauth2.client.registration.ClientRegistrationRepository' available

다음과 같은 에러가 발생한다.

 

외부 Securiry 파일 등록하기

ClientRegistrationRepository를 생성하려면 ClientId와 ClientSecret이 필수이다.

Local에서 실행시 application-oauth.properties가 있어 문제되지 않았지만, 깃허브에는 gitignore로 제외시킨상태이다.

 

공개된 저장소에 ClientId와 ClientSecret를 올릴 수 없으니, 서버에서 직접 이 설정들을 가지고 있게한다.

 

properties 파일생성

vim ~/app/application-oauth.properties

 

application-oauth.properties를 복사 붙여넣기한다.

 

그리고 배포스크립트를 수정한다.

 

nohup java -jar \
       -Dspring.config.location=classpath:/application.properties,/home/ec2-user/app/application-oauth.properties \
       $REPOSITORY/$JAR_NAME 2>&1 &

 

배포후 스크립트 실행시

 

'no main manifest attribute in' 라는 에러가 발생하였다.

- 에러 설명 : jar파일에서 처음 호출할 Main 메소드를 찾지 못했다.

 

 

해결방법은 링크를 참고하였다.

 

 

 

 

 

 

 

 

 

 

 

RDS 운영환경에 맞는 파라미터 설정하기

RDS를 처음 생성하면 몇가지 설정을 필수로 해야한다.

 

파라미터 그룹 생성에서 방금 생성한 RDS와 버전을 맞춰야한다.

RDS version : 10.6.14

 

  • 타임존 수정 Asia/Seoul

  • Character Set
  • character 항목들은 utf8mb4로, collation 항목들은utf8mb4_general_ci 적용한다.
character_set_client
character_set_connection
character_set_database
character_set_filesystem
character_set_results
character_set_server

collation_connection
collation_server

 

  • Max Connection 150

 

파라미터 그룹을 데이터베이스에 연결시키기

EC2 및 탄력적 IP 설정으로 인스턴스를 구성 후 부터 기록하도록 하겠다.

 

아마존 리눅스 서버 생성시 꼭 해야 할 설정들

  • Java 8 설치 : 책의 내용은 8 이지만, 나는 Java 11을 설치하였다.
  • 타임존 변경 : 기본서버의 시간이 미국시간대이기에, 한국시간대로 변경
    • EC2 서버의 기본 타임존은 UTC이다. 한국시간과의 9시간의 차이가 발생으로 타임존 변경, 변경 후 date로 시간확인
$ sudo rm /etc/localtime
$ sudo ln -s /usr/share/zoneinfo/Asia/Seoul etc/localtime
  • 호스트 네임 변경 : 실제 서버가 여러대일경우 IP 만으로 판단불가하므로, 서버의 별명을 등록

호스트 네임을 변경후 한가지 작업을 더해야한다. /etc/hosts에 변경한 호스트 네임을 등록해야한다.

호스트네임을 등록하지 않아 발생한 문제점으로 우아한 형제들의 기술 블로그의 기록이다.

https://techblog.woowahan.com/2517/

 

/etc/hosts 파일을 수정한다.

 $ sudo vim /etc/hosts

방금등록한 HOSTNAME을 추가한다.

 

 127.0.0.1 방금 등록한 HOSTNAME

 

이후 잘 등록 되었다면,아직 80포트로 실행된 서비스가없음으로 다음과 같은 문구가 발생한다.

브런치 생성 / 이동 / 삭제

  • 생성
    • git branch 브런치이름
  • 브런치 확인
    • git branch
  • 이동
    • git switch 브런치이름
  • 생성과 동시에 이동
    • git switch -c 브런치이름 
  • 삭제
    • git branch -d 브런치이름

reset

git reset --hard 돌아갈 커밋 해시
Ex) git reset --hard a2a8158

revert

git revert 돌아갈 커밋 해시

Ex) git revert 04901be

※ 만약 revert시 A라는 파일을 추가 한 커밋(커밋A)이 있다. 그런데 이후 커밋에서 A파일을 수정한 커밋(커밋B)내역이 있을 경우

   커밋A로 revert시 A파일을 삭제해아하는데, 문제가 발생한다.

   문제의 파일을 add 혹은 rm으로 정리후 git revert --continue하면된다.

reset과 revert의 차이점

  1. reset
    • Commit 시점으로 되돌리지만 중간 단계의 Commit도 모두 삭제된다.
  2. revert
    • 해당 커밋내역에 대해서만 되돌린다.
    • revert한 커밋 이력이 추가된다.
    • revert시 커밋이력을 남기고 싶지않다면 git revert --no-commit 돌아갈 커밋 해시
      -> 정확히는 다른 작업들과 함께 커밋시 반영하기 위해 commit 전단계로 돌아간다.
     

pull 할것이 있을때 push

GitHub에 pull 할것 이 있는 상태에서 로컬에서 commit을 하였다.

현재 상태에서 로컬 -> 원격 으로 push 하게 되면 다음과 같은 에러가 발생한다.

pull을 받아온 이후 push가 가능하다.

git pull --no-rebase (Merge 방식)
git pull --rebase (rebase 방식)

이후 다시 push하면 된다.

 

파일을 staging area에서 working directory로 이동

git restore --staged 파일명

 

reset의 세가지 옵션

  • soft: repository에서 staging area로 이동
  • mixed (default): repository에서 working directory로 이동
  • hard: 수정사항 완전히 삭제

 

 

+ Recent posts