ABOUT ME

-

Total
-
  • Spring boot: websocket 서버 확장 도전 및 Kafka
    컴퓨터/Kafka 2023. 12. 24. 13:23
    728x90
    반응형

    소개

    같은 Spring boot 웹소켓 서버 (실시간 채팅을 위한)를 scale-out 하는 (같은 서버를 여러 다른 포트에서)  과정을 담았다.

     

    문제

    여기서 마주친 문제는, 여러 다른 포트에서 열린 웹소켓 서버에서 같은 하나의 채팅 방에 있는 유저들끼리 어떻게 통신할 수 있을까?

    (예: 8081 포트 서버에서 "spring"이란 채팅 방, 8082 포트 서버에서 "spring"이란 채팅 방, 서로 같은 메시지를 봐야 한다.)

     

    Sticky Session

    처음 떠올린 건 스티키 세션이다.

    사용자를 특정 서버 인스턴스에 '고정'해서, 모든 요청이 같은 서버로 라우팅 되도록 하는 기술이다. (세션 데이터 일관성 유지)

    웹소켓의 경우, 연결이 한 번 맺어지면 계속 유지되므로, 스티키 세션의 효과가 제한적일 수 있다.

    또한, 서버 중 하나가 실패할 경우, 해당 서버에 연결된 사용자들은 서비스를 이용할 수 없게 된다.

     

    nginx ip_hash와 차이점?

    nginx에서 설정할 때 ip_hash를 쓰면 항상 같은 아이피는 같은 주소로 가는데 이거랑 뭔 차이일까 궁금했다.

    ip_hash는 IP 주소에만 의존하고 스티키 세션은 세션 정보가 메모리에 저장된 경우, 같은 서버로 지속적으로 요청이 라우팅 되어야 해당 정보에 접근할 수 있다.

    즉, ip_hash는 스티키 세션의 한 방법이지만, 스티키 세션 자체가 반드시 ip_hash를 사용하는 것은 아니다.

     

    메시지 브로커

    RabbitMQ, Apache Kafka가 대안으로 떠올랐다.

    대량 데이터를 신속하게 처리하고, 토픽 기반으로 메시지를 관리하고 수평 확장성도 좋다.

     

    Apache Kafka

    Apache Kafka에 관한 글을 거의 3년 전에 쓰고 오랜만에 사용하니 KRaft로 드디어 편하게 Zookeeper 없이 실행될 수 있었다.

    Linux 버전 기준 다운로드 및 KRaft 서버 실행은 공식 사이트에 잘 나와있다. @Apache Kafka

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org

     

    topic은 spring boot에서 dynamic 하게 만들 것이다. (prefix name이 "websocket-room-", 여기에 각 방마다 토픽을 만들어 볼 것이다)

     

    Spring Boot WebFlux

    build.gradle

    dependencies {
    	implementation 'org.springframework.boot:spring-boot-starter-webflux'
    	implementation 'org.springframework.kafka:spring-kafka'
    	implementation 'org.apache.kafka:kafka-clients'
    	compileOnly 'org.projectlombok:lombok'
    	annotationProcessor 'org.projectlombok:lombok'
    	testImplementation 'org.springframework.boot:spring-boot-starter-test'
    	testImplementation 'io.projectreactor:reactor-test'
    }

     

    adminClient

    Apache Kafka에 직접 데이터 추가/삭제/업데이트를 하기 위한 클라이언트 Bean을 만들어준다.

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class KafkaConfig {
    
        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;
    
        @Bean
        public AdminClient adminClient() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            return AdminClient.create(configs);
        }
    }

     

    KafkaProperties

    설정 값을 여러 클래스에서 편하게 사용하기 위해 만듦

    @Configuration
    @ConfigurationProperties(prefix = "spring.kafka")
    @Getter
    @Setter
    @Slf4j
    public class KafkaProperties implements InitializingBean {
    
        private String bootstrapServers;
        private String topicPrefix;
        private String consumerGroup;
        private String topicPattern;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            log.info(this.toString());
        }
    }

     

    KafkaTopicCreator

    Kafka 토픽 생성  함수

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import reactor.core.publisher.Mono;
    
    import java.util.Collections;
    
    @Component
    public class KafkaTopicCreator {
    
        private final AdminClient adminClient;
    
        @Autowired
        public KafkaTopicCreator(AdminClient adminClient) {
            this.adminClient = adminClient;
        }
    
        public Mono<Void> createTopic(String topicName, int partitions, short replicationFactor) {
            return Mono.create(sink -> adminClient.listTopics().names().whenComplete((names, ex) -> {
                if (ex != null) {
                    sink.error(new RuntimeException("Failed to list topics: " + ex.getMessage()));
                    return;
                }
    
                if (names.contains(topicName)) {
                    sink.success(); // Topic already exists
                } else {
                    // Create the topic if it doesn't exist
                    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
                    adminClient.createTopics(Collections.singleton(newTopic)).all().whenComplete((result, creationEx) -> {
                        if (creationEx != null) {
                            sink.error(new RuntimeException("Failed to create topic: " + creationEx.getMessage()));
                        } else {
                            sink.success();
                        }
                    });
                }
            }));
        }
    }

     

    ChattingHandler

    웹소켓 처리, 방들이 있고 sendKafka 함수에서 room topic으로 메시지를 보낸다.

    @Slf4j
    @RequiredArgsConstructor
    @Component
    public class MyWebSocketHandler implements WebSocketHandler {
    
        private final KafkaTopicCreator kafkaTopicCreator;
        private final KafkaTemplate<String, String> kafkaTemplate;
        private final RoomSessionsManager roomSessionsManager;
        private final KafkaProperties kafkaProperties;
    
    ...
    
        private Mono<Void> sendToKafka(String room, String message) {
            return Mono.fromFuture(() -> kafkaTemplate.send(kafkaProperties.getTopicPrefix() + room, message).thenApply(result -> null));
        }
    }

     

    KafkaConsumerConfig

    여러 방법이 있지만 일단 포트마다 실행할 것이기에 consumer group 아이디를 랜덤으로 생성한다.

    @EnableKafka
    @Configuration
    @RequiredArgsConstructor
    public class KafkaConsumerConfig {
        private final KafkaProperties kafkaProperties;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
            // Generate a random suffix for the consumer group
            String uniqueGroupId = kafkaProperties.getConsumerGroup() + "-" + UUID.randomUUID();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, uniqueGroupId);
    
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or earliest?
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
    
            return factory;
        }
    }

     

    KafkaConsumerService

    topicPattern은 SPeL을 지원하고 Kafka 내부에선 regex 형식 string 이여야 한다.

    "websocket-room-hi", "websocket-room-blah" 등 이 형식의 모든 topic 리스너를 만들려고 처음에 패턴을

    "websocket-room-*"로 했는데 먹히질 않았다. 오래 삽질하다가 "websocket-room-.*"으로 하니 잘 되었다..

    아래 리스너는 위와 같은 모든 topic에 전송을 듣고 웹소켓 세션 방마다 메시지를 다시 보내준다. (다른 유저 메시지 보기)

    import csw.spring.talk.config.RoomSessionsManager;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.KafkaHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Service;
    import org.springframework.web.reactive.socket.WebSocketSession;
    import reactor.core.publisher.Mono;
    
    import java.util.Map;
    
    
    @Slf4j
    @RequiredArgsConstructor
    @Service
    public class KafkaConsumerService {
    
        private final KafkaProperties kafkaProperties;
        private final RoomSessionsManager roomSessionsManager;
    
    
        @KafkaListener(topicPattern = "#{kafkaProperties.getTopicPattern()}", concurrency = "${listen.concurrency:3}")
        public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
            String room = topic.substring(kafkaProperties.getTopicPrefix().length());
            Map<String, WebSocketSession> sessions = roomSessionsManager.getRoomSessions(room);
    
            sessions.values().forEach(session -> {
                try {
                    session.send(Mono.just(session.textMessage(message))).subscribe();
                } catch (Exception e) {
                    log.error("Error sending message to WebSocket session", e);
                }
            });
        }
    }

     

    이렇게 하면 다음과 같이 jar로 빌드하고 여러 포트에서 실행하면

    ./gradlew clean bootJar build -x test
    java -jar build/libs/spring-talk-0.0.1-SNAPSHOT.jar --server.port=8081
    java -jar build/libs/spring-talk-0.0.1-SNAPSHOT.jar --server.port=8082

     

    왼쪽 (8081 서버)과 오른쪽 (8082 서버)에서 같은 방 소통이 잘 가능하다.

    728x90

    '컴퓨터 > Kafka' 카테고리의 다른 글

    Kafka 2.8.0+: No more zookeeper  (0) 2021.05.15
    Pinterest의 Apache Kafka 팁, 사용법  (0) 2021.01.27
    Apache Kafka Connect 사용해보기  (0) 2021.01.07

    댓글