ABOUT ME

-

Total
-
  • Go: 백엔드 웹소켓 채팅 (fiber v2 + websocket + rabbitmq)
    컴퓨터/Go language 2024. 3. 31. 16:02
    728x90
    반응형

    여러 가지 방이 있고 각 유저들이 있는 채팅 서비스를 만들고 싶었다.

    우선 분산 시스템을 고려하지 않고 베이스를 만들어 준다.

     

    싱글 서버

    웹소켓이 양방향이라 (bi-directional) 선택했고, 객체를 저장하고 있으면 계속 주고받을 수 있다.

    채팅 서버에서 websocket 연결 객체를 저장하고 있어야만 메시지를 전송할 수 있다.

    var WsRoomManager *RoomConnectionManager = NewRoomConnectionManager()
    
    type RoomConnectionManager struct {
    	connections       *haxmap.Map[string, []*websocket.Conn] // roomid and users
    }

     

    Concurrent Map

    연결 객체들을 해쉬 맵에 저장하고 여러 goroutine (쓰레드 개념)에서 접근해야 해서 동시성 맵이 필요했다.

    golang에 sync.Map 이 concurrent safe 한 map 이긴 한데, 이것보다 빠른 라이브러리들이 많다.

    처음에는 haxmap 이라고 빠른 xxHash랑 Harris lock free list를 이용해서 lock free 동시성 맵을 선택 했다.

     

    GitHub - alphadose/haxmap: Fastest and most memory efficient golang concurrent hashmap

    Fastest and most memory efficient golang concurrent hashmap - alphadose/haxmap

    github.com

     

    그렇게 한참을 쓰다가 성능 테스트를 해보려고 JMeter로 부가를 줬더니

    haxmap에서 데이터 레이스가 발생했다.

    go run -race main.go

     

    원인은 무엇일까 대충 코드를 보니 haxmap 구현이, grow (사이즈 키우기)를 하는 도중 set이 들어오면 이게 처리가 잘 안 되어있는 것 같다.

    누군가 이미 issue를 남겨놓았고, haxmap 제작자가 harris lock list를 잘못 구현했다고 한다.

    그래서 저 issue를 남겨놓은 사람의 레포를 보니 BucketMap이라고 더 잘 구현한 버전이 있는데

    사용 방법도 힘들고 메소드가 왜 그런진 자세히 보진 않았는데 내가 원하는 key 타입이 잘 해싱이 안되었다. (xxhash 이용)

     

    Race condition · Issue #32 · alphadose/haxmap

    Hi, I saw that the sorted linked list is based on Harris's linked list, but according to my understanding, it's not correctly written. Harris's linked list is based on 2 assumptions: 1. Atomic oper...

    github.com

     

    그렇게 결국 처음에 눈여겨봤던 "Abseil's SwissTable" 를 Go언어 에서 구현한 concurrent-swiss-map 으로 옮겼다.

    (Swiss Table은 구글에서 개발한 고성능 해시 테이블, open addressing을 이용해 해시 충돌을 해결한다고 한다)

    https://github.com/mhmtszr/concurrent-swiss-map

     

    GitHub - mhmtszr/concurrent-swiss-map: A high-performance, thread-safe generic concurrent hash map implementation with Swiss Map

    A high-performance, thread-safe generic concurrent hash map implementation with Swiss Map. - mhmtszr/concurrent-swiss-map

    github.com

     

    xxh3는 xxhash 구현한 또 다른 라이브러리인데 FNV-1A, xxhash, xxh3 이런저런 해시 알고리즘을 벤치마킹 결과

    제일 괜찮은 라이브러리라 선택했다.

     

    GitHub - zeebo/xxh3: XXH3 algorithm in Go

    XXH3 algorithm in Go. Contribute to zeebo/xxh3 development by creating an account on GitHub.

    github.com

    type ChulbongConn struct {
    	Socket *websocket.Conn
    	UserID string
    	Mutex  *sync.Mutex
    	Send   chan []byte
    }
    
    type RoomConnectionManager struct {
    	connections       *csmap.CsMap[string, []*ChulbongConn] // roomid and users
    }
    
    hasher := func(key string) uint64 {
        return xxh3.HashString(key)
    }
    
    manager := &RoomConnectionManager{
        connections: csmap.Create(
            csmap.WithShardCount[string, []*ChulbongConn](64),
            csmap.WithCustomHasher[string, []*ChulbongConn](hasher),
        )
    }

     

    분산 시스템

    scale-out 을 했을 때 문제점들은 무엇일까 고민해 보았다.

    1. 웹소켓 객체들을 앱 메모리에 저장해서, A 서버에서 "TEST" 방에서 대화하고 있는 사람들끼리는 괜찮은데

    B 서버에 저장된 "TEST" 방에 있는 유저들에겐 위 방식으로는 서로 대화가 안 된다.

    2. 앱 메모리에 저장할 수 있는 크기보다 웹소켓 객체가 더 많이 온다면? 로드 밸런서로 적절한 분배가 필요함 or 수평 확장

     

    여기서 주로 쓰는 게 메시지 브로커인데 (Kafka, RabbitMQ 등) 우선 클라우드 서버를 구할 수 없었다.

    Kafka -> Sink 를 해서 뭐 여러 서비스 만들어서 처리할 수 있는 것 같은데 개인 환경에서 그렇게 구현하긴 힘들어서

    그래서 쓰고 있던 Redis pub/sub 모델을 잠깐 썼다.

     

    Redis pub/sub streamline 

    1. 각 서버 메모리마다 웹소켓 객체를 roomID에 따라 해시 맵에 저장한다.
    2. 각 서버마다 redis subscribe, A 서버가 메시지를 보내면 빠르게 해시 맵 돌아서 broadcast 하면서 동시에 publish 해준다.
    3. 다른 서버들도 똑같이 보내고, 받을 때는 subscribe 로 받아서 처리한다.
    4. 근데 이때 만약 redis가 못 보냈거나, 서버가 똑같은 메시지를 2번 처리할 수도 있는데 이건 간단하게 set를 만들고 message에 각각 uid 처럼 랜덤 고유 값을 부여해서 처리했는지 확인할 수도 있다.

     

    쓰다가 동기화 문제가 생겼다. 특히 go의 웹소켓 (fasthttp나 gorilla) 의 WriteMessage는 thread safe하진 않다.

    그래서 메시지를 보내는 코드에 mutex로 락을 걸고 unlock을 해야 하는데 하기가 싫었다.

    결국, 메시지 브로커를 찾다가 in memory 기반 (rabbitmq 등) 을 쓸지 log based (kafka, apache kinesis 등) 를 쓸지

    고민하다가 메시지를 저장할 생각은 없고 채팅 방이 사라지면 메시지도 다 사라지는 방식을 원했다.

    (A, B 메시지가 거의 동시에 와도 B가 먼저 처리되든 A가 먼저 처리되든 큰 상관도 없었다.)

     

    RabbitMQ 클라우드 서버를 찾다가 예전에 카프카를 썼을 때 cloudkarafka 가 cloudamqp 로 옮긴다 해서

    봤더니 LavinMQ 라는 처음 보는 것이 있었다.

     

    GitHub - cloudamqp/lavinmq: Lightweight and fast AMQP (0-9-1) server

    Lightweight and fast AMQP (0-9-1) server. Contribute to cloudamqp/lavinmq development by creating an account on GitHub.

    github.com

     

    implementation 부분을 읽어보면

     

    LavinMQ의 특징

    언어 및 컴파일: LavinMQ는 Crystal 언어로 작성되었다.

    처음 보는 언어라 찾아봤더니 C언어 급의 low level 성능을 내고 싶은데 high level 문법을 지원하는 언어였다.

    (LLVM 기반의 현대적 언어로 Ruby와 유사한 문법을 가지며, 단일 바이너리로 컴파일. Go 언어와도 비슷함)

     

    • 입출력 및 동시성 모델: 이벤트 루프 라이브러리를 사용하여 입출력을 처리하고, CSP(Communicating Sequential Processes)와 유사한 동시성 모델을 채택. 또한, 가비지 컬렉션이 있어 메모리 관리가 용이
    • 메시지 저장 방식: 메시지를 RAM에 캐싱하는 대신, 모든 메시지를 가능한 한 빠르게 디스크에 쓰고 운영 체제의 캐시를 사용하여 캐싱
    • 디스크 기반 메시지 저장소: 각 큐는 디스크 상의 메시지 저장소(파일 세그먼트 형태)에 의해 지원함. 이 파일들은 필요할 때만 메모리에 매핑되어 메모리 사용을 최적화
    • 메시지 처리: 메시지는 세그먼트에 시간순으로 추가되고, 소비될 때는 세그먼트에서 순차적으로 읽힌다. 메시지가 인정(acknowledged)되면 해당 위치는 "ack" 파일에 기록
    • 설정 및 정의: 큐, 교환기(exchanges), 바인딩 등의 정의는 AMQP 프레임 형태로 파일에 저장됨. 이는 서버 시작 시 복원
    • 비 AMQP 객체 저장: 사용자, vhosts, 정책 등은 JSON 파일로 저장되며, 운영자가 서버가 실행되지 않는 동안 수정하기 쉽다.

     

    LavinMQ vs Kafka

    • 언어 및 기술 스택: Kafka는 Scala와 Java로 개발되었으며, JVM(Java Virtual Machine) 위에서 실행된다. 반면, LavinMQ는 Crystal 언어로 개발되어 컴파일된 단일 바이너리로 실행됨. 이는 Kafka보다 성능과 메모리 사용 측면에서 이점을 제공할 수도 있다.
    • 메시지 저장: Kafka 역시 메시지를 디스크에 저장하지만, LavinMQ와 달리 높은 처리량과 확장성을 위한 특화된 저장 메커니즘을 가진다. (Kafka는 분산 시스템에서의 복제 및 파티셔닝을 통해 대용량 데이터 처리에 최적화)
    • 메시지 처리 및 동시성: LavinMQ의 CSP와 유사한 동시성 모델은 명시적인 메시지 전달을 통한 동시성 처리를 가능하게 한다. (Kafka는 브로커와 컨슈머 사이의 오프셋 관리를 통해 메시지 처리를 함)

     

    전통적인 인-메모리 메시지 브로커도 아니고 (RAM 에 저장하는), 디스크 기반의 저장 메커니즘을 갖고 있고

    OS의 파일 시스템 캐시를 써서 성능을 최적화하는데, AMQP 프로토콜 기반에 그냥 처음 보는 거라 써보고 싶었다.

    (연결은 RabbitMQ 커넥터로 똑같이 씀, 아래는 Go 언어에서 쓰는 공식 예제)

     

    LavinMQ with Golang

    The recommended library for Golang to access LavinMQ server is amqp091-go. Before we jump right into writing our code, let’s set up the development environment. Go development environment Download and install Go. Confirm that it’s available from your c

    lavinmq.com

     

    글로벌 클라이언트를 만들어 주고 (retry 이런 건 구현 필요)

    // Message Broker
    connection, err := amqp.Dial(os.Getenv("LAVINMQ_HOST"))
    if err != nil {
        log.Panicf("Failed to connect to LavinMQ")
    }
    services.LavinMQClient = connection

     

    websocket 객체를 저장할 때 Queue를 생성하는데 방마다 한 번만 만들게 체크하고, goroutine도 잘 꺼야 한다.

    (bool 대신 struct{}을 쓰면 메모리를 안쓰고 체크 가능하다)

    var (
    	LavinMQClient       *amqp.Connection
    	ActiveSubscriptions = csmap.Create(
    		csmap.WithShardCount[string, struct{}](64),
    		csmap.WithCustomHasher[string, struct{}](func(key string) uint64 {
    			return xxh3.HashString(key)
    		}),
    	)
    
    	// Map to store cancellation functions for each room subscription
    	cancellationFunctions = csmap.Create(
    		csmap.WithShardCount[string, context.CancelFunc](64),
    		csmap.WithCustomHasher[string, context.CancelFunc](func(key string) uint64 {
    			return xxh3.HashString(key)
    		}),
    	)
    )

     

    Queue를 생성할 때 TTL, lifetime 을 설정해서 아무도 없으면 알아서 삭제되게 했다.

    (방이 많아서 굳이 남겨두진 않았다, 그리고 마지막 유저가 퇴장 메시지가 다음 유저가 들어왔을 때 보여서 지웠다)

    func PublishMessageToAMQP(ctx context.Context, roomID, message, userNickname, userId string) {
    	ch, err := LavinMQClient.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close() // Ensure the channel is closed when function returns
    
    	queueName := fmt.Sprintf("chat_room_%s", roomID)
    
    	broadcastMsg := dto.BroadcastMessage{
    		UID:          uuid.New().String(),
    		Message:      message,
    		UserID:       userId,
    		UserNickname: userNickname,
    		RoomID:       roomID,
    		Timestamp:    time.Now().UnixMilli(),
    	}
    
    	// Serialize the message struct to JSON
    	msgJSON, err := json.Marshal(broadcastMsg)
    	if err != nil {
    		log.Printf("Error marshalling message to JSON: %v", err)
    		return
    	}
    
    	// Publish a message to the queue
    	ch.PublishWithContext(
    		ctx,
    		"",        // exchange - Using the default exchange which routes based on queue name
    		queueName, // routing key (queue name)
    		false,     // mandatory
    		false,     // immediate
    		amqp.Publishing{
    			ContentType: "text/plain",
    			Body:        msgJSON,
    		},
    	)
    	// failOnError(err, "Failed to declare a queue")
    }
    
    func ListenFromAMQP(ctx context.Context, queueName, roomID string, callback func(string, []byte)) {
    	ch, err := LavinMQClient.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close() // Ensure the channel is closed when function returns
    
    	// _, err = ch.QueueDelete(
    	// 	queueName, // queue name
    	// 	true,      // ifUnused - only delete if unused
    	// 	true,      // ifEmpty - only delete if empty
    	// 	false,     // noWait - don't wait for server to confirm the deletion
    	// )
    	// if err != nil {
    	// 	log.Printf("Failed to delete queue: %s", err)
    	// 	return
    	// }
    
    	q, err := ch.QueueDeclare(
    		queueName, // name
    		true,      // durable
    		false,     // delete when unused
    		false,     // exclusive
    		false,     // no-wait
    		amqp.Table{ // arguments - Use amqp.Table for passing queue arguments
    			"x-expires":          6000000, // Queue expires after 6000 seconds of not being used
    			"x-message-ttl":      30000,   // Messages expire after 30 seconds
    			"x-max-length":       1000,    // Maximum length of the queue (number of messages)
    			"x-max-length-bytes": 1000000, // Maximum size of the queue (in bytes)
    			// "x-dead-letter-exchange": "myDLX", // Example for setting a dead-letter exchange
    		}, // arguments,
    	)
    	failOnError(err, "Failed to declare a queue")
        
        ch.QueuePurge(queueName, false) // 그냥 purge
    
    	consumerId := fmt.Sprintf("chat-%s", roomID)
    	// log.Printf("[✅] Consumer! %s", consumerId)
    
    	msgs, err := ch.Consume(
    		q.Name,     // queue
    		consumerId, // consumer
    		true,       // auto-ack
    		false,      // exclusive
    		false,      // no-local
    		false,      // no-wait
    		nil,        // args
    	)
    
    	failOnError(err, "Failed to register a consumer")
    
    	var wg sync.WaitGroup
    	wg.Add(1)
    
    	go func() {
    		defer wg.Done()
    		for {
    			select {
    			case d := <-msgs:
    				if len(d.Body) == 0 {
    					continue
    				}
    				// log.Printf("[✅] Message Received: %s", d.Body)
    				callback(roomID, d.Body)
    			case <-ctx.Done():
    				// log.Printf("[✅] Done sub\n")
    				return
    			}
    		}
    	}()
    
    	wg.Wait()
    }

     

    적절히 웹소켓 핸들러에서 publish 해주면 된다.

    for {
        _, message, err := c.ReadMessage()
        if err != nil {
            // log.Printf("Error reading message: %v", err)
            break
        }
    
        messageString := string(message)
        messageString = strings.TrimSpace(messageString)
    
        if messageString == "" {
            continue
        }
        bad, _ := utils.CheckForBadWords(messageString)
        if bad {
            continue
        }
    
        // Publish the valid message to the LavinMQ queue for this chat room
        services.PublishMessageToAMQP(context.Background(), roomID, messageString, clientNickname, clientId)
    }

     

     

    풀소스는 여기 있다.

     

    GitHub - Alfex4936/chulbong-kr: 대한민국 철봉 지도

    대한민국 철봉 지도. Contribute to Alfex4936/chulbong-kr development by creating an account on GitHub.

    github.com

     

    728x90

    댓글