ABOUT ME

-

Total
-
  • Rust: 비동기 리팩토링 tokio::task::JoinSet
    컴퓨터/Rust 2024. 4. 6. 00:37
    728x90
    반응형

     

     

    GitHub - Alfex4936/cargo-depsize: Rust cargo utility that calculates and displays the total size of each dependency in your Rust

    Rust cargo utility that calculates and displays the total size of each dependency in your Rust project. - Alfex4936/cargo-depsize

    github.com

    cargo depsize라는 툴은 Rust 프로젝트에서 쓰고 있는 라이브러리들의 파일 크기를 구하는 간단한 유틸리티이다.

    리팩토링 하다가, 비동기로 만들어 놓고 각각 계산을 비동기로 돌리면 되는데 하나씩 기다리고 있었다.

     

    우선 원래 futures crate를 또 추가해서 비동기 함수들을 한 번에 실행하고 join_all  할 수 있었는데

    use futures::future;
    
    let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

     

    tokio 1.21.0 부터 JoinSet이 생겨서 futures를 추가 안 해도 된다.

    /// tokio::task::JoinSet 의 함수 코드
    /// A collection of tasks spawned on a Tokio runtime.
    ///
    /// A `JoinSet` can be used to await the completion of some or all of the tasks
    /// in the set. The set is not ordered, and the tasks will be returned in the
    /// order they complete.
    ///
    /// All of the tasks must have the same return type `T`.
    ///
    /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
    ///
    /// # Examples
    ///
    /// Spawn multiple tasks and wait for them.
    ///
    /// ```
    /// use tokio::task::JoinSet;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let mut set = JoinSet::new();
    ///
    ///     for i in 0..10 {
    ///         set.spawn(async move { i });
    ///     }
    ///
    ///     let mut seen = [false; 10];
    ///     while let Some(res) = set.join_next().await {
    ///         let idx = res.unwrap();
    ///         seen[idx] = true;
    ///     }
    ///
    ///     for i in 0..10 {
    ///         assert!(seen[i]);
    ///     }
    /// }
    /// ```
    #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
    pub struct JoinSet<T> {
        inner: IdleNotifiedSet<JoinHandle<T>>,
    }

     

    코드를 대충 보면, JoinSet은 T 타입의 작업들을 비동기적으로 실행하고, 결과를 나중에 수집할 수 있도록 돕는 구조체.

    2가지 주요 구성 요소가 있는데, 

    /*
    실제로 비동기 작업들을 관리하는 부분인 듯하다.
    
    내부적으로 두 개의 LinkedList (notified와 idle)를 사용하여 작업의 상태를 추적.
    
    notified 리스트에는 알림이 발생한 작업들이,
    
    idle 리스트에는 아직 알림이 발생하지 않은 작업들이 저장.
    
    이 구조는 작업이 완료되면 waker를 통해 알림을 받게 됨.
    */
    
    pub(crate) struct IdleNotifiedSet<T> {
        lists: Arc<Lists<T>>,
        length: usize,
    }
    
    /*
    각 비동기 작업은 JoinHandle을 통해 추적
    
    이 핸들은 작업의 결과를 나타내는 타입 T를 갖고 있으며
    
    작업이 완료되면 해당 결과에 접근할 수 있게 되고
    
    JoinHandle은 내부적으로 RawTask라는 구조체를 사용하여 실제 작업의 메모리 위치를 관리함
    */
    pub struct JoinHandle<T> {
        raw: RawTask,
        _p: PhantomData<T>,
    }

     

    작동 방식은 그럼

    1. 비동기 작업이 JoinSet에 추가되면, JoinHandle을 통해 내부적으로 추적
    2. 작업 실행되면, 그 상태가 IdleNotifiedSet 의 notified 또는 idle 리스트 중 하나에 들어감
    3. 작업 완료되고 알림 받으면, JoinSet이 해당 작업 결과 수집할 수 있음
    4. join_next().await 과 같은 메소드로 가장 먼저 완료된 작업들 결과를 "순차적"으로 받음. (내부 작업 상태 리스트 업데이트)

     

    기존의 동기적 코드는 아래처럼 for loop 돌면서 하나씩 await 하고 insert 했다.

    let packages = workspace_resolve.pkg_set.packages();
    
    for package in packages.into_iter() {
        let size = calculate_package_size(package).await?;
        package_sizes.insert(package.package_id().clone(), size);

     

    이거를 JoinSet를 써서 바꾸면 아래처럼 바뀐다.

    let mut join_set = JoinSet::new();
    // Spawn each calculate_package_size task into the JoinSet
    for package in packages {
        // let semaphore_clone = semaphore.clone(); // 세마포어 제한 TODO
        let package_id = package.package_id().clone();
        let package_path = package.root().to_path_buf(); // PathBuf is Send
    
        join_set.spawn(async move {
            // let _permit = semaphore_clone
            //     .acquire()
            //     .await
            //     .expect("Failed to acquire semaphore");
    
            // PathBuf는 Send라 thread-safe 
            match calculate_package_size(&package_path).await {
                Ok(size) => Ok((package_id, size)),
                Err(e) => {
                    eprintln!("Failed to calculate size for {}: {}", package_id.name(), e);
                    Err(e)
                }
            }
        });
    }
    
    // 다 기다리고 결과 모으기
    while let Some(res) = join_set.join_next().await {
        let (package_id, size) = res?.expect("Failed to join");
        package_sizes.insert(package_id, size);
    }
    728x90

    댓글