-
K6/JMeter로 WebSocket 성능 테스트 해보기 (nginx 로드 밸런서)컴퓨터 2023. 12. 21. 01:08728x90반응형
소개: Spring Boot 프로젝트에 마이크로서비스로 쓰일 채팅 웹소켓 서버를 Rust 및 Spring Boot WebFlux 버전으로 만들고 K6 및 JMeter를 이용해 로드 테스트를 해볼 것이다.
i5-10600KF (6코어 - 가상 12) CPU와 24GB 컴퓨터 로컬에서 테스트한 과정이다.
n명의 유저 중 80%가 트위터 예전 기준 140 단어 기준 (약 1KB) 메시지를 전송한다. (Lorem ipsum)
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse porttitor quam vitae aliquet faucibus. In condimentum mi id accumsan ullamcorper. Fusce convallis purus nisl, vitae hendrerit velit laoreet sed. Pellentesque ullamcorper feugiat eros sed volutpat. Donec ut blandit dolor. Integer viverra sed magna vel bibendum. Nam elit enim, vestibulum ut consequat non, rutrum id odio. Duis ornare risus fringilla dolor eleifend porttitor. Duis eleifend ullamcorper dignissim. Duis luctus dui vel semper iaculis. Aliquam nisl ligula, maximus in pellentesque vitae, eleifend a erat. Mauris feugiat suscipit eleifend. Vestibulum blandit lacus non tellus sagittis ultrices. Vestibulum eget blandit ligula. Nulla commodo sit amet neque at volutpat. Vestibulum a velit auctor, lobortis sem non, lobortis magna. Etiam eu est arcu. Duis sit amet vehicula leo. Donec consequat mauris ut libero fringilla, sed gravida ex consectetur. In porta, leo eget pellentesque fringilla, mauris massa.
공통 옵션
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(15); const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); pub struct WsConn { room: Uuid, lobby_addr: Addr<Lobby>, hb: Instant, pub id: Uuid, } impl WsConn { pub fn new(room: Uuid, lobby: Addr<Lobby>) -> WsConn { WsConn { id: Uuid::new_v4(), room, hb: Instant::now(), lobby_addr: lobby, } } fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) { // Capture a clone of the ID to use in the closure let id = self.id; ctx.run_interval(HEARTBEAT_INTERVAL, move |act, ctx| { if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { eprintln!("{:?} Disconnecting due to failed heartbeat", id); ctx.stop(); return; } ctx.ping(b""); }); } } impl Actor for WsConn { type Context = ws::WebsocketContext<Self>; fn started(&mut self, ctx: &mut Self::Context) { self.hb(ctx); let addr = ctx.address(); self.lobby_addr .send(Connect { addr: addr.recipient(), lobby_id: self.room, self_id: self.id, }) .into_actor(self) .then(|res, _, ctx| { if res.is_err() { ctx.stop(); } fut::ready(()) }) .wait(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> actix::Running { self.lobby_addr.do_send(Disconnect { id: self.id, room_id: self.room, }); actix::Running::Stop } } impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsConn { fn handle( &mut self, msg: Result<WebsocketMessage, ws::ProtocolError>, ctx: &mut Self::Context, ) { match msg { Ok(WebsocketMessage::Ping(msg)) => { self.hb = Instant::now(); ctx.pong(&msg); } Ok(WebsocketMessage::Pong(_)) => { self.hb = Instant::now(); } Ok(WebsocketMessage::Binary(bin)) => ctx.binary(bin), Ok(WebsocketMessage::Close(reason)) => { ctx.close(reason); ctx.stop(); } Ok(WebsocketMessage::Nop) => (), Ok(WebsocketMessage::Text(s)) => self.lobby_addr.do_send(ClientActorMessage { id: self.id, msg: s.to_string(), room_id: self.room, }), Ok(WebsocketMessage::Continuation(_)) => { ctx.stop(); } Err(e) => { eprintln!("Error in WebSocket message handling: {:?}", e); ctx.stop(); } } } } impl Handler<WsMessage> for WsConn { type Result = (); fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) { ctx.text(msg.0); } }
Rust 릴리즈 모드 컴파일 옵션
[profile.release] opt-level = 3 debug = false rpath = false lto = true debug-assertions = false codegen-units = 1 panic = 'unwind' incremental = false overflow-checks = false
Linux 파일 descriptor 제한이 있다. 임시적으로 변경해 준다. 아니면 (/etc/security/limits.conf 수정)
ulimit -n 10000
# /etc/sysctl.conf net.ipv4.tcp_keepalive_time = 600 net.ipv4.tcp_keepalive_probes = 5 net.ipv4.tcp_keepalive_intvl = 15 net.ipv4.tcp_syncookies = 1 net.ipv4.tcp_fin_timeout = 30 net.ipv4.tcp_max_syn_backlog = 4096 net.ipv4.tcp_max_syn_backlog = 4096 net.ipv4.tcp_tw_reuse = 1 # or sysctl -w net.ipv4.ip_local_port_range="1024 65535" sysctl -w net.ipv4.tcp_tw_reuse=1 sysctl -w net.ipv4.tcp_timestamps=1 ulimit -n 250000
테스트 with K6
import ws from 'k6/ws'; import { check, sleep } from 'k6'; export const options = { scenarios: { websocket_test: { executor: 'constant-vus', vus: 100, duration: '1m', }, }, }; const HI_MESSAGE = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Suspendisse porttitor quam vitae aliquet faucibus. In condimentum mi id accumsan ullamcorper. Fusce convallis purus nisl, vitae hendrerit velit laoreet sed. Pellentesque ullamcorper feugiat eros sed volutpat. Donec ut blandit dolor. Integer viverra sed magna vel bibendum. Nam elit enim, vestibulum ut consequat non, rutrum id odio. Duis ornare risus fringilla dolor eleifend porttitor. Duis eleifend ullamcorper dignissim. Duis luctus dui vel semper iaculis. Aliquam nisl ligula, maximus in pellentesque vitae, eleifend a erat. Mauris feugiat suscipit eleifend. Vestibulum blandit lacus non tellus sagittis ultrices. Vestibulum eget blandit ligula. Nulla commodo sit amet neque at volutpat. Vestibulum a velit auctor, lobortis sem non, lobortis magna. Etiam eu est arcu. Duis sit amet vehicula leo. Donec consequat mauris ut libero fringilla, sed gravida ex consectetur. In porta, leo eget pellentesque fringilla, mauris massa."; export default function () { const url = 'ws://localhost:8082/group/seokwon'; const params = { tags: { my_tag: 'hello' } }; const res = ws.connect(url, params, function (socket) { socket.on('open', function open() { console.log('connected'); // Only a subset of users send messages if (Math.random() < 0.8) { // 80% users send a message socket.send(HI_MESSAGE); } socket.setInterval(function timeout() { if (Math.random() < 0.1) { socket.ping(); console.log('Pinging every 10sec (approximately)'); } }, 10000); }); socket.on('ping', function () { console.log('PING!'); }); socket.on('pong', function () { console.log('PONG!'); }); socket.on('close', function () { console.log('disconnected'); }); socket.setTimeout(function () { console.log('Closing the socket after 10 seconds'); socket.close(); }, 10000); }); // Check the response check(res, { 'status is 101': (r) => r && r.status === 101 }); // Use sleep to simulate think time sleep(1); }
서버 쓰레드 12 버전 (100 VUs = 유저)
서버 쓰레드 16 vs 12 (100 VUs) 사실 100명 정도는 거의 차이 없다.
5000을 K6로 하고 싶었지만 100이 프리티어여서 돈을 아끼기 위해 JMeter를 이용해서 나머지를 테스트할 것이다.
Rust actix-rs with JMeter
여기서부터 모든 유저가 1KB 메시지를 보낸다. (거의 계속)
loop를 돌기에 500명이 계속 새로운 커넥션을 계속 만든다. 30초동안.
1. 서버 쓰레드 16 - 500 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)
단일 서버
99% Line = 90밀리세컨드, 약 8000 requests / sec, 0% error
2. 서버 쓰레드 16 - 800 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)
단일 서버
99% Line = 231밀리세컨드, 약 5670 requests / sec, 0% error, max cpu 40%
결과 비교: 500명보다 확실히 힘들어졌다. 그래도 단일 서버로 99%의 요청이 약 200밀리세컨드라는 결과가 나왔다.
3. 서버 쓰레드 16 - 1000 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)
단일 서버
99% Line = 231밀리세컨드, 약 5670 requests / sec, 0% error, max cpu 40%
Average Response Time: 198 ms
Median Response Time: 60 ms
90th Percentile: 517 ms
95th Percentile: 538 ms
99th Percentile: 934 ms
Maximum Response Time: 6007 ms
Error Rate: Approximately 0.004%
Throughput: 3603.3 requests/second
Data Received: 713.1 KB/sec
Data Sent: 3496.3 KB/sec결과 비교: 800명에 비해 99% 가 거의 1s에 달했고 0.04%로 아예 요청 실패도 생겨버렸다. 현재 셋업으로는 700명 정도의 채팅은 가능한 듯 하다.
nginx 로드 밸런서
nginx로 간단하게 RR 로드 밸런서를 적용하고 테스트해 볼 것이다. 웹소켓 Rust 서버를 8082와 8083에 실행했다. (scaling-out)
wscat -c localhost:8090을 하면 8082, 8083에 upstream 된다. (엔진x라고 외우면 된다)
upstream springtalk { ip_hash; server localhost:8082; server localhost:8083; } server { listen 8090; server_name localhost:8090; location /group/seokwon { # redirect all HTTP traffic to localhost:8080 proxy_pass http://springtalk; proxy_set_header X-Real-IP $remote_addr; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # WebSocket support (nginx 1.4) proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; # Path rewriting rewrite /group/seokwon/(.*) /$1 break; proxy_redirect off; } }
설정 저장
sudo apt install nginx sudo vim /etc/nginx/sites-available/springtalk sudo ln -s /etc/nginx/sites-available/springtalk /etc/nginx/sites-enabled/springtalk sudo nginx -t sudo systemctl restart nginx
서버x2 - 1000 가상 유저 (쓰레드 라이프타임 30초, ramp-up 10초)
Average Response Time: 122 ms
Median Response Time: 2 ms
90th Percentile: 514 ms
95th Percentile: 549 ms
99th Percentile: 690 ms
Maximum Response Time: 6007 ms
Error Rate: Approximately 0.082%
Throughput: 5824.3 requests/second
Data Received: 1216.8 KB/sec
Data Sent: 5190.2 KB/secRust axum-rs 단일서버
tokio가 만든 axum 웹 프레임워크로 변경 후에 최적화를 더 해봤다. DashMap과 tokio 쓰레드 16. (Concurrent Hashmap)
[profile.release] opt-level = 3 lto = true codegen-units = 1 panic = 'abort' debug-assertions = false target-cpu = "native"
struct AppState { user_set: DashMap<String, ()>, tx: broadcast::Sender<String>, } #[tokio::main(worker_threads = 12)] async fn main() { ... // 브로드캐스팅 채널은 스프링보다 더 주어보았다. let (tx, _rx) = broadcast::channel(1024); ... }
500명 테스트
Rust WebSocket Sampler (axum) 458407 25 5 13 21 1004 0 1058 0.0 15200.683091819477 3019.300912464353 14755.350579363829 일단 성능이 좋았다. 오류 0%에 평균이 25ms 반응이다. 근데 CPU를 90%나 썼다.
그후 1000명부터는 평균은 50ms로 평균은 좋지만 99% 백분위수가 1second에
throughput이 4000/sec이기 때문에 큰 성능 효과를 보지 못했다.
Spring Boot WebFlux 단일서버
Rust와 비교해보기 위해 가장 간단한 셋업을 비슷하게 했다. (no security setup)
근데 Rust 보다 크게 셋업을 신경안썼는데 현재 버전은 스프링이 압승이었다.
ZGC, 힙 8GB, 브로드캐스트 채널 256
application { applicationDefaultJvmArgs = ['-Xms2G', '-Xmx8G', '-XX:+UseZGC', '-XX:+ZGenerational'] } // main @SpringBootApplication(scanBasePackages = {"csw.spring.talk.config"}) public class SpringTalkApplication { public static void main(String[] args) { int cores = Runtime.getRuntime().availableProcessors(); int workerCount = cores * 2; HttpResources.set(LoopResources.create("httpServer", workerCount, true)); SpringApplication.run(SpringTalkApplication.class, args); } } // my handler // Reactive streams backpressure @Component public class MyWebSocketHandler implements WebSocketHandler { private final Map<String, Map<String, WebSocketSession>> roomSessions = new ConcurrentHashMap<>(); @Override public Mono<Void> handle(WebSocketSession session) { String room = extractRoomFromSession(session); roomSessions.computeIfAbsent(room, k -> new ConcurrentHashMap<>()).put(session.getId(), session); // Send a welcome message to the connected user Map<String, WebSocketSession> roomSession = roomSessions.get(room); Mono<Void> welcomeMessage = session.send(Mono.just(session.textMessage("Welcome to " + room + "! Currently, there are " + roomSession.size() + " users in the room."))); // Notify all users in the room when a new user joins Mono<Void> notifyJoin = broadcast(room, "A new user has joined " + room + ". Total users: " + roomSession.size()).then(); // Handle incoming messages Mono<Void> input = session.receive() .map(WebSocketMessage::getPayloadAsText) .flatMap(message -> broadcast(room, "User " + session.getId() + " says: " + message).then()) .then(); // Send periodic heartbeat messages Flux<Long> heartbeatInterval = Flux.interval(Duration.ofSeconds(15)); Mono<Void> heartbeat = heartbeatInterval.flatMap(i -> session.send(Mono.just(session.pingMessage(dataBufferFactory -> dataBufferFactory.allocateBuffer(10).write("Heartbeat".getBytes())))) ).then(); // When the session is closed, remove it from the room and notify others Mono<Void> sessionClose = session.closeStatus().flatMap(signalType -> { Map<String, WebSocketSession> sessions = roomSessions.get(room); if (sessions != null) { sessions.remove(session.getId()); return broadcast(room, "User left " + room + ". Total users: " + sessions.size()).then(); } return Mono.empty(); }); // Combine all parts into a single Mono<Void> return Mono.when(welcomeMessage, notifyJoin, input, heartbeat, sessionClose); } private String extractRoomFromSession(WebSocketSession session) { // Extract the room name from the session URL String path = session.getHandshakeInfo().getUri().getPath(); return path.substring(path.lastIndexOf('/') + 1); } // Reactive streams backpressure private Flux<Void> broadcast(String room, String message) { return Flux.fromIterable(roomSessions.getOrDefault(room, Collections.emptyMap()).values()) .flatMap(session -> session.send(Mono.just(session.textMessage(message))), 256); } }
500명 동시 테스트
Spring WebSocket Sampler 516016 23 22 38 46 86 0 165 0.0 17185.63911276893 3302.608019664208 16682.153591887032 Max가 Rust actix-rs보다 일단 낫다. (500 -> 165)
1000명 동시 테스트
CPU를 더 많이 먹긴 했다. 근데 error 없이 다 처리했다.
Spring WebSocket Sampler 484516 49 52 79 90 130 0 271 0.0 16119.905512858903 3101.1927597980507 15647.642656036864 2000명 테스트
에러 없이 다 처리했다. 최대 반응도 150ms 로 굉장히 성능이 좋았다.
5000명 테스트
CPU 사용량 거의 100%, 메모리는 5GB를 사용했다. 오류 0%, 99% 1s가 나올정도로 슬슬 서비스 했을 때 대기시간이 생길 수 도 있다.
여기서부터 Rust 설정을 잘 못했나? 생각이 들었다. 성능이 좋았다.
마무리
설정의 차이와 구현에서 살짝 다른 부분도 있겠지만 가장 간단하고 빠르게 셋업을 한 상태에서는 스프링이 괜찮았다.
Rust 웹소켓 서버 최적화를 어느정도 했지만 여기서 Rust 코드를 더 최적화를 해봐야겠다고 생각했다.
perf를 이용해서 프로파일링 해본 결과 actix/tokio 쪽에서 웹소켓 연결 부분만 많이 잡아먹고 앱에서는 별로 안잡아먹고
actix-rs worker를 32로 해봐도 소용없으니 나중에 low-level을 건들던지 셋업을 최적화 해봐야겠다.
나중에 actix-rs, axum-rs, spring boot 코드를 올리고 수정도 해봐야한다.
728x90'컴퓨터' 카테고리의 다른 글
Neo4j: 그래프 데이터베이스 시작 (Graph) (0) 2024.04.01 AWS EC2 가비아 HTTPS 도메인 503 (1) 2023.12.05 Github Action 로컬에서 돌리기 (running GA locally) (0) 2023.11.05