From bf4e33e61688dd3fcc1f1c255668f0ff9943a33d Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 19 Dec 2025 21:55:58 +0800 Subject: [PATCH] fix: remove unnecessary select! macro --- .config/nextest.toml | 8 ++++ src/daemon/mod.rs | 12 ++--- src/state_manager/utils.rs | 20 ++++++-- src/state_migration/tests/mod.rs | 2 +- src/tool/subcommands/api_cmd/test_snapshot.rs | 12 ++--- src/utils/mod.rs | 46 ++++++++----------- 6 files changed, 52 insertions(+), 48 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index 11d1a92cc3e9..a18eadce8bfe 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -41,3 +41,11 @@ slow-timeout = { period = "120s", terminate-after = 3 } filter = 'test(rpc_snapshot_test_)' slow-timeout = { period = "120s", terminate-after = 3 } retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true } + +# These tests download test snapshots from the network, which can take a while. +# There might be some network issues, so we allow some retries with backoff. +# Jitter is enabled to avoid [thundering herd issues](https://en.wikipedia.org/wiki/Thundering_herd_problem). +[[profile.default.overrides]] +filter = 'test(state_compute_)' +slow-timeout = { period = "120s", terminate-after = 3 } +retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true } diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index ff24667e38f1..b591a09d7797 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -36,7 +36,7 @@ use crate::utils::misc::env::is_env_truthy; use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING}; use anyhow::{Context as _, bail}; use dialoguer::theme::ColorfulTheme; -use futures::{Future, FutureExt, select}; +use futures::{Future, FutureExt}; use std::path::Path; use std::sync::Arc; use std::sync::OnceLock; @@ -738,13 +738,9 @@ async fn maybe_set_snapshot_path( async fn propagate_error( services: &mut JoinSet>, ) -> anyhow::Result { - while !services.is_empty() { - select! { - option = services.join_next().fuse() => { - if let Some(Ok(Err(error_message))) = option { - return Err(error_message) - } - }, + while let Some(result) = services.join_next().await { + if let Ok(Err(error_message)) = result { + return Err(error_message); } } std::future::pending().await diff --git a/src/state_manager/utils.rs b/src/state_manager/utils.rs index 9d5f50e619ff..2049cfee6d6f 100644 --- a/src/state_manager/utils.rs +++ b/src/state_manager/utils.rs @@ -197,6 +197,7 @@ pub mod state_compute { use std::{ path::{Path, PathBuf}, sync::{Arc, LazyLock}, + time::Duration, }; use url::Url; @@ -215,11 +216,22 @@ pub mod state_compute { let url = Url::parse(&format!( "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/state_compute/{chain}_{epoch}.forest.car.zst" ))?; - Ok( - download_file_with_cache(&url, &SNAPSHOT_CACHE_DIR, DownloadFileOption::NonResumable) - .await? - .path, + Ok(crate::utils::retry( + crate::utils::RetryArgs { + timeout: Some(Duration::from_secs(30)), + max_retries: Some(5), + delay: Some(Duration::from_secs(1)), + }, + || { + download_file_with_cache( + &url, + &SNAPSHOT_CACHE_DIR, + DownloadFileOption::NonResumable, + ) + }, ) + .await? + .path) } pub async fn prepare_state_compute( diff --git a/src/state_migration/tests/mod.rs b/src/state_migration/tests/mod.rs index f65f67682344..e7216e55b1b2 100644 --- a/src/state_migration/tests/mod.rs +++ b/src/state_migration/tests/mod.rs @@ -85,7 +85,7 @@ async fn test_state_migration( retry( RetryArgs { timeout: Some(timeout), - max_retries: Some(5), + max_retries: Some(15), ..Default::default() }, || async { diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 55d50e79ac2a..1a21b5432d27 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -217,17 +217,11 @@ mod tests { let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots"); let path = crate::utils::retry( crate::utils::RetryArgs { - timeout: Some(Duration::from_secs(if crate::utils::is_ci() { - 20 - } else { - 120 - })), + timeout: Some(Duration::from_secs(30)), max_retries: Some(5), - ..Default::default() - }, - || async { - download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable).await + delay: Some(Duration::from_secs(1)), }, + || download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable), ) .await .unwrap() diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 49de3de44285..09e1b228ea99 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -20,13 +20,9 @@ pub mod stream; pub mod version; use anyhow::{Context as _, bail}; -use futures::{ - Future, FutureExt, - future::{FusedFuture, pending}, - select, -}; +use futures::Future; use multiaddr::{Multiaddr, Protocol}; -use std::{pin::Pin, str::FromStr, time::Duration}; +use std::{str::FromStr, time::Duration}; use tokio::time::sleep; use tracing::error; use url::Url; @@ -125,29 +121,26 @@ where F: Future>, E: std::fmt::Debug, { - let mut timeout: Pin>> = match args.timeout { - Some(duration) => Box::pin(sleep(duration).fuse()), - None => Box::pin(pending()), - }; let max_retries = args.max_retries.unwrap_or(usize::MAX); - let mut task = Box::pin( - async { - for _ in 0..max_retries { - match make_fut().await { - Ok(ok) => return Ok(ok), - Err(err) => error!("retrying operation after {err:?}"), - } - if let Some(delay) = args.delay { - sleep(delay).await; - } + let task = async { + for _ in 0..max_retries { + match make_fut().await { + Ok(ok) => return Ok(ok), + Err(err) => error!("retrying operation after {err:?}"), + } + if let Some(delay) = args.delay { + sleep(delay).await; } - Err(RetryError::RetriesExceeded) } - .fuse(), - ); - select! { - _ = timeout => Err(RetryError::TimeoutExceeded), - res = task => res, + Err(RetryError::RetriesExceeded) + }; + + if let Some(timeout) = args.timeout { + tokio::time::timeout(timeout, task) + .await + .map_err(|_| RetryError::TimeoutExceeded)? + } else { + task.await } } @@ -187,6 +180,7 @@ mod tests { mod files; use RetryError::{RetriesExceeded, TimeoutExceeded}; + use futures::future::pending; use std::{future::ready, sync::atomic::AtomicUsize}; use super::*;