ABOUT ME

-

Total
-
  • K6/JMeter로 WebSocket 성능 테스트 해보기 (nginx 로드 밸런서)
    컴퓨터 2023. 12. 21. 01:08
    728x90
    반응형

    소개: Spring Boot 프로젝트에 마이크로서비스로 쓰일 채팅 웹소켓 서버를 Rust 및 Spring Boot WebFlux 버전으로 만들고 K6 및 JMeter를 이용해 로드 테스트를 해볼 것이다.

     

    실행 전 상태

     

    i5-10600KF (6코어 - 가상 12) CPU와 24GB 컴퓨터 로컬에서 테스트한 과정이다.

    n명의 유저 중 80%가 트위터 예전 기준 140 단어 기준 (약 1KB) 메시지를 전송한다. (Lorem ipsum)

    Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse porttitor quam vitae aliquet faucibus. In condimentum mi id accumsan ullamcorper. Fusce convallis purus nisl, vitae hendrerit velit laoreet sed. Pellentesque ullamcorper feugiat eros sed volutpat. Donec ut blandit dolor. Integer viverra sed magna vel bibendum. Nam elit enim, vestibulum ut consequat non, rutrum id odio. Duis ornare risus fringilla dolor eleifend porttitor. Duis eleifend ullamcorper dignissim. Duis luctus dui vel semper iaculis. Aliquam nisl ligula, maximus in pellentesque vitae, eleifend a erat. Mauris feugiat suscipit eleifend. Vestibulum blandit lacus non tellus sagittis ultrices. Vestibulum eget blandit ligula. Nulla commodo sit amet neque at volutpat.
    
    Vestibulum a velit auctor, lobortis sem non, lobortis magna. Etiam eu est arcu. Duis sit amet vehicula leo. Donec consequat mauris ut libero fringilla, sed gravida ex consectetur. In porta, leo eget pellentesque fringilla, mauris massa.

     

    공통 옵션

    const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15);
    const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
    
    pub struct WsConn {
        room: Uuid,
        lobby_addr: Addr<Lobby>,
        hb: Instant,
        pub id: Uuid,
    }
    
    impl WsConn {
        pub fn new(room: Uuid, lobby: Addr<Lobby>) -> WsConn {
            WsConn {
                id: Uuid::new_v4(),
                room,
                hb: Instant::now(),
                lobby_addr: lobby,
            }
        }
    
        fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
            // Capture a clone of the ID to use in the closure
            let id = self.id;
    
            ctx.run_interval(HEARTBEAT_INTERVAL, move |act, ctx| {
                if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
                    eprintln!("{:?} Disconnecting due to failed heartbeat", id);
                    ctx.stop();
                    return;
                }
    
                ctx.ping(b"");
            });
        }
    }
    
    impl Actor for WsConn {
        type Context = ws::WebsocketContext<Self>;
    
        fn started(&mut self, ctx: &mut Self::Context) {
            self.hb(ctx);
    
            let addr = ctx.address();
            self.lobby_addr
                .send(Connect {
                    addr: addr.recipient(),
                    lobby_id: self.room,
                    self_id: self.id,
                })
                .into_actor(self)
                .then(|res, _, ctx| {
                    if res.is_err() {
                        ctx.stop();
                    }
                    fut::ready(())
                })
                .wait(ctx);
        }
    
        fn stopping(&mut self, _: &mut Self::Context) -> actix::Running {
            self.lobby_addr.do_send(Disconnect {
                id: self.id,
                room_id: self.room,
            });
            actix::Running::Stop
        }
    }
    
    impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsConn {
        fn handle(
            &mut self,
            msg: Result<WebsocketMessage, ws::ProtocolError>,
            ctx: &mut Self::Context,
        ) {
            match msg {
                Ok(WebsocketMessage::Ping(msg)) => {
                    self.hb = Instant::now();
                    ctx.pong(&msg);
                }
                Ok(WebsocketMessage::Pong(_)) => {
                    self.hb = Instant::now();
                }
                Ok(WebsocketMessage::Binary(bin)) => ctx.binary(bin),
                Ok(WebsocketMessage::Close(reason)) => {
                    ctx.close(reason);
                    ctx.stop();
                }
                Ok(WebsocketMessage::Nop) => (),
                Ok(WebsocketMessage::Text(s)) => self.lobby_addr.do_send(ClientActorMessage {
                    id: self.id,
                    msg: s.to_string(),
                    room_id: self.room,
                }),
                Ok(WebsocketMessage::Continuation(_)) => {
                    ctx.stop();
                }
                Err(e) => {
                    eprintln!("Error in WebSocket message handling: {:?}", e);
                    ctx.stop();
                }
            }
        }
    }
    
    impl Handler<WsMessage> for WsConn {
        type Result = ();
    
        fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
            ctx.text(msg.0);
        }
    }

     Rust 릴리즈 모드 컴파일 옵션

    [profile.release]
    opt-level = 3
    debug = false
    rpath = false
    lto = true
    debug-assertions = false
    codegen-units = 1
    panic = 'unwind'
    incremental = false
    overflow-checks = false

    Linux 파일 descriptor 제한이 있다. 임시적으로 변경해 준다. 아니면 (/etc/security/limits.conf 수정)

    ulimit -n 10000
    # /etc/sysctl.conf
    
    net.ipv4.tcp_keepalive_time = 600
    net.ipv4.tcp_keepalive_probes = 5
    net.ipv4.tcp_keepalive_intvl = 15
    net.ipv4.tcp_syncookies = 1
    net.ipv4.tcp_fin_timeout = 30
    net.ipv4.tcp_max_syn_backlog = 4096
    net.ipv4.tcp_max_syn_backlog = 4096
    net.ipv4.tcp_tw_reuse = 1
    
    # or
    
    sysctl -w net.ipv4.ip_local_port_range="1024 65535"
    sysctl -w net.ipv4.tcp_tw_reuse=1
    sysctl -w net.ipv4.tcp_timestamps=1
    ulimit -n 250000

     

    테스트 with K6

    import ws from 'k6/ws';
    import { check, sleep } from 'k6';
    
    export const options = {
        scenarios: {
            websocket_test: {
                executor: 'constant-vus',
                vus: 100,
                duration: '1m',
            },
        },
    };
    
    const HI_MESSAGE = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse porttitor quam vitae aliquet faucibus. In condimentum mi id accumsan ullamcorper. Fusce convallis purus nisl, vitae hendrerit velit laoreet sed. Pellentesque ullamcorper feugiat eros sed volutpat. Donec ut blandit dolor. Integer viverra sed magna vel bibendum. Nam elit enim, vestibulum ut consequat non, rutrum id odio. Duis ornare risus fringilla dolor eleifend porttitor. Duis eleifend ullamcorper dignissim. Duis luctus dui vel semper iaculis. Aliquam nisl ligula, maximus in pellentesque vitae, eleifend a erat. Mauris feugiat suscipit eleifend. Vestibulum blandit lacus non tellus sagittis ultrices. Vestibulum eget blandit ligula. Nulla commodo sit amet neque at volutpat. Vestibulum a velit auctor, lobortis sem non, lobortis magna. Etiam eu est arcu. Duis sit amet vehicula leo. Donec consequat mauris ut libero fringilla, sed gravida ex consectetur. In porta, leo eget pellentesque fringilla, mauris massa.";
    
    export default function () {
        const url = 'ws://localhost:8082/group/seokwon';
        const params = { tags: { my_tag: 'hello' } };
    
        const res = ws.connect(url, params, function (socket) {
            socket.on('open', function open() {
                console.log('connected');
    
                // Only a subset of users send messages
                if (Math.random() < 0.8) {  // 80% users send a message
                    socket.send(HI_MESSAGE);
                }
    
                socket.setInterval(function timeout() {
                    if (Math.random() < 0.1) {
                        socket.ping();
                        console.log('Pinging every 10sec (approximately)');
                    }
                }, 10000);
            });
    
            socket.on('ping', function () {
                console.log('PING!');
            });
    
            socket.on('pong', function () {
                console.log('PONG!');
            });
    
            socket.on('close', function () {
                console.log('disconnected');
            });
    
            socket.setTimeout(function () {
                console.log('Closing the socket after 10 seconds');
                socket.close();
            }, 10000);
        });
    
        // Check the response
        check(res, { 'status is 101': (r) => r && r.status === 101 });
    
        // Use sleep to simulate think time
        sleep(1);
    }

    서버 쓰레드 12 버전 (100 VUs = 유저)

    서버 쓰레드 16 vs 12 (100 VUs) 사실 100명 정도는 거의 차이 없다.

     

    5000을 K6로 하고 싶었지만 100이 프리티어여서 돈을 아끼기 위해 JMeter를 이용해서 나머지를 테스트할 것이다.

     

    Rust actix-rs with JMeter

    테스트 셋업, thread만 바꾼다.

    여기서부터 모든 유저가 1KB 메시지를 보낸다. (거의 계속)

    loop를 돌기에 500명이 계속 새로운 커넥션을 계속 만든다. 30초동안.

     

    1. 서버 쓰레드 16 - 500 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)

    단일 서버

    99% Line = 90밀리세컨드, 약 8000 requests / sec, 0% error

    response graph
    aggregate report

    2. 서버 쓰레드 16 - 800 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)

    단일 서버

    99% Line = 231밀리세컨드, 약 5670 requests / sec, 0% error, max cpu 40%

    response time graph

    결과 비교: 500명보다 확실히 힘들어졌다. 그래도 단일 서버로 99%의 요청이 약 200밀리세컨드라는 결과가 나왔다.

    3. 서버 쓰레드 16 - 1000 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)

    단일 서버

    99% Line = 231밀리세컨드, 약 5670 requests / sec, 0% error, max cpu 40%

    Average Response Time: 198 ms
    Median Response Time: 60 ms
    90th Percentile: 517 ms
    95th Percentile: 538 ms
    99th Percentile: 934 ms
    Maximum Response Time: 6007 ms
    Error Rate: Approximately 0.004%
    Throughput: 3603.3 requests/second
    Data Received: 713.1 KB/sec
    Data Sent: 3496.3 KB/sec

    response time graph

    결과 비교: 800명에 비해 99% 가 거의 1s에 달했고 0.04%로 아예 요청 실패도 생겨버렸다. 현재 셋업으로는 700명 정도의 채팅은 가능한 듯 하다.

     

    nginx 로드 밸런서

    nginx로 간단하게 RR 로드 밸런서를 적용하고 테스트해 볼 것이다. 웹소켓 Rust 서버를 8082와 8083에 실행했다. (scaling-out)

    wscat -c localhost:8090을 하면 8082, 8083에 upstream 된다. (엔진x라고 외우면 된다)

    upstream springtalk {
        ip_hash;
        server localhost:8082;
        server localhost:8083;
    }
    
    server {
        listen 8090;
        server_name localhost:8090;
    
        location /group/seokwon {
            # redirect all HTTP traffic to localhost:8080
            proxy_pass http://springtalk;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header Host $host;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    
            # WebSocket support (nginx 1.4)
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
    
            # Path rewriting
            rewrite /group/seokwon/(.*) /$1 break;
            proxy_redirect off;
        }
    }

    설정 저장

    sudo apt install nginx
    
    sudo vim /etc/nginx/sites-available/springtalk
    sudo ln -s /etc/nginx/sites-available/springtalk /etc/nginx/sites-enabled/springtalk
    
    sudo nginx -t
    sudo systemctl restart nginx

     

    서버x2 - 1000 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)

    almost 80%
    reponse time graph

    Average Response Time: 122 ms
    Median Response Time: 2 ms
    90th Percentile: 514 ms
    95th Percentile: 549 ms
    99th Percentile: 690 ms
    Maximum Response Time: 6007 ms
    Error Rate: Approximately 0.082%
    Throughput: 5824.3 requests/second
    Data Received: 1216.8 KB/sec
    Data Sent: 5190.2 KB/sec

     

    Rust axum-rs 단일서버

    tokio가 만든 axum 웹 프레임워크로 변경 후에 최적화를 더 해봤다. DashMap과 tokio 쓰레드 16. (Concurrent Hashmap)

    [profile.release]
    opt-level = 3
    lto = true
    codegen-units = 1
    panic = 'abort'
    debug-assertions = false
    target-cpu = "native"
    struct AppState {
        user_set: DashMap<String, ()>,
        tx: broadcast::Sender<String>,
    }
    
    #[tokio::main(worker_threads = 12)]
    async fn main() {
        ...
        // 브로드캐스팅 채널은 스프링보다 더 주어보았다.
        let (tx, _rx) = broadcast::channel(1024);
        ...
    }

    500명 테스트

    Rust WebSocket Sampler (axum) 458407 25 5 13 21 1004 0 1058 0.0 15200.683091819477 3019.300912464353 14755.350579363829

    일단 성능이 좋았다. 오류 0%에 평균이 25ms 반응이다. 근데 CPU를 90%나 썼다.

     

    그후 1000명부터는 평균은 50ms로 평균은 좋지만 99% 백분위수가 1second에 

    throughput이 4000/sec이기 때문에 큰 성능 효과를 보지 못했다.

     

    Spring Boot WebFlux 단일서버

    Rust와 비교해보기 위해 가장 간단한 셋업을 비슷하게 했다. (no security setup)

    근데 Rust 보다 크게 셋업을 신경안썼는데 현재 버전은 스프링이 압승이었다.

    ZGC, 힙 8GB, 브로드캐스트 채널 256

    application {
    	applicationDefaultJvmArgs = ['-Xms2G', '-Xmx8G', '-XX:+UseZGC', '-XX:+ZGenerational']
    }
    
    // main
    @SpringBootApplication(scanBasePackages = {"csw.spring.talk.config"})
    public class SpringTalkApplication {
        public static void main(String[] args) {
            int cores = Runtime.getRuntime().availableProcessors();
            int workerCount = cores * 2;
            HttpResources.set(LoopResources.create("httpServer", workerCount, true));
    
            SpringApplication.run(SpringTalkApplication.class, args);
        }
    }
    
    // my handler
    // Reactive streams backpressure
    
    @Component
    public class MyWebSocketHandler implements WebSocketHandler {
    
        private final Map<String, Map<String, WebSocketSession>> roomSessions = new ConcurrentHashMap<>();
        @Override
        public Mono<Void> handle(WebSocketSession session) {
            String room = extractRoomFromSession(session);
            roomSessions.computeIfAbsent(room, k -> new ConcurrentHashMap<>()).put(session.getId(), session);
    
            // Send a welcome message to the connected user
            Map<String, WebSocketSession> roomSession = roomSessions.get(room);
            Mono<Void> welcomeMessage = session.send(Mono.just(session.textMessage("Welcome to " + room + "! Currently, there are " + roomSession.size() + " users in the room.")));
    
            // Notify all users in the room when a new user joins
            Mono<Void> notifyJoin = broadcast(room, "A new user has joined " + room + ". Total users: " + roomSession.size()).then();
    
            // Handle incoming messages
            Mono<Void> input = session.receive()
                    .map(WebSocketMessage::getPayloadAsText)
                    .flatMap(message -> broadcast(room, "User " + session.getId() + " says: " + message).then())
                    .then();
    
            // Send periodic heartbeat messages
            Flux<Long> heartbeatInterval = Flux.interval(Duration.ofSeconds(15));
            Mono<Void> heartbeat = heartbeatInterval.flatMap(i ->
                    session.send(Mono.just(session.pingMessage(dataBufferFactory ->
                            dataBufferFactory.allocateBuffer(10).write("Heartbeat".getBytes()))))
            ).then();
    
            // When the session is closed, remove it from the room and notify others
            Mono<Void> sessionClose = session.closeStatus().flatMap(signalType -> {
                Map<String, WebSocketSession> sessions = roomSessions.get(room);
                if (sessions != null) {
                    sessions.remove(session.getId());
                    return broadcast(room, "User left " + room + ". Total users: " + sessions.size()).then();
                }
                return Mono.empty();
            });
    
            // Combine all parts into a single Mono<Void>
            return Mono.when(welcomeMessage, notifyJoin, input, heartbeat, sessionClose);
        }
    
        private String extractRoomFromSession(WebSocketSession session) {
            // Extract the room name from the session URL
            String path = session.getHandshakeInfo().getUri().getPath();
            return path.substring(path.lastIndexOf('/') + 1);
        }
    
        // Reactive streams backpressure
        private Flux<Void> broadcast(String room, String message) {
            return Flux.fromIterable(roomSessions.getOrDefault(room, Collections.emptyMap()).values())
                    .flatMap(session -> session.send(Mono.just(session.textMessage(message))),
                            256);
        }
    }

     

    500명 동시 테스트

    Spring WebSocket Sampler 516016 23 22 38 46 86 0 165 0.0 17185.63911276893 3302.608019664208 16682.153591887032

    Max가 Rust actix-rs보다 일단 낫다. (500 -> 165)

    1000명 동시 테스트

    CPU를 더 많이 먹긴 했다. 근데 error 없이 다 처리했다.

    Spring WebSocket Sampler 484516 49 52 79 90 130 0 271 0.0 16119.905512858903 3101.1927597980507 15647.642656036864

    spring webflux cpu usage
    Spring WebFlux aggregate graph

    2000명 테스트

    에러 없이 다 처리했다. 최대 반응도 150ms 로 굉장히 성능이 좋았다.

     

    5000명 테스트

    CPU 사용량 거의 100%, 메모리는 5GB를 사용했다. 오류 0%, 99% 1s가 나올정도로 슬슬 서비스 했을 때 대기시간이 생길 수 도 있다.

    여기서부터 Rust 설정을 잘 못했나? 생각이 들었다. 성능이 좋았다.

     

    마무리

    설정의 차이와 구현에서 살짝 다른 부분도 있겠지만 가장 간단하고 빠르게 셋업을 한 상태에서는 스프링이 괜찮았다.

    Rust 웹소켓 서버 최적화를 어느정도 했지만 여기서 Rust 코드를 더 최적화를 해봐야겠다고 생각했다.

    perf를 이용해서 프로파일링 해본 결과 actix/tokio 쪽에서 웹소켓 연결 부분만 많이 잡아먹고 앱에서는 별로 안잡아먹고

    actix-rs worker를 32로 해봐도 소용없으니 나중에 low-level을 건들던지 셋업을 최적화 해봐야겠다.

    나중에 actix-rs, axum-rs, spring boot 코드를 올리고 수정도 해봐야한다.

    cargo flamegraph from&nbsp;815.137 MB perf.data (51352 samples)

    728x90

    댓글