diff --git a/src/builder.rs b/src/builder.rs index 729cefe1b..7f15cced6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -668,9 +668,9 @@ impl NodeBuilder { let logger = setup_logger(&self.log_writer_config, &self.config)?; let runtime = if let Some(handle) = self.runtime_handle.as_ref() { - Arc::new(Runtime::with_handle(handle.clone())) + Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger))) } else { - Arc::new(Runtime::new().map_err(|e| { + Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| { log_error!(logger, "Failed to setup tokio runtime: {}", e); BuildError::RuntimeSetupFailed })?) @@ -715,9 +715,9 @@ impl NodeBuilder { let logger = setup_logger(&self.log_writer_config, &self.config)?; let runtime = if let Some(handle) = self.runtime_handle.as_ref() { - Arc::new(Runtime::with_handle(handle.clone())) + Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger))) } else { - Arc::new(Runtime::new().map_err(|e| { + Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| { log_error!(logger, "Failed to setup tokio runtime: {}", e); BuildError::RuntimeSetupFailed })?) @@ -1668,18 +1668,11 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); - let background_processor_task = Mutex::new(None); - let background_tasks = Mutex::new(None); - let cancellable_background_tasks = Mutex::new(None); - let is_running = Arc::new(RwLock::new(false)); Ok(Node { runtime, stop_sender, - background_processor_task, - background_tasks, - cancellable_background_tasks, config, wallet, chain_source, diff --git a/src/event.rs b/src/event.rs index 883177d67..ff94d51d1 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1059,7 +1059,7 @@ where forwarding_channel_manager.process_pending_htlc_forwards(); }; - self.runtime.spawn(future); + self.runtime.spawn_cancellable_background_task(future); }, LdkEvent::SpendableOutputs { outputs, channel_id } => { match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) { @@ -1441,7 +1441,7 @@ where } } }; - self.runtime.spawn(future); + self.runtime.spawn_cancellable_background_task(future); }, LdkEvent::BumpTransaction(bte) => { match bte { diff --git a/src/gossip.rs b/src/gossip.rs index 1185f0718..258f9f736 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -144,6 +144,6 @@ impl RuntimeSpawner { impl FutureSpawner for RuntimeSpawner { fn spawn + Send + 'static>(&self, future: T) { - self.runtime.spawn(future); + self.runtime.spawn_cancellable_background_task(future); } } diff --git a/src/lib.rs b/src/lib.rs index cc5e383a1..1604d1b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,9 +128,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, - BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, - NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -181,9 +180,6 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc, stop_sender: tokio::sync::watch::Sender<()>, - background_processor_task: Mutex>>, - background_tasks: Mutex>>, - cancellable_background_tasks: Mutex>>, config: Arc, wallet: Arc, chain_source: Arc, @@ -226,10 +222,6 @@ impl Node { return Err(Error::AlreadyRunning); } - let mut background_tasks = tokio::task::JoinSet::new(); - let mut cancellable_background_tasks = tokio::task::JoinSet::new(); - let runtime_handle = self.runtime.handle(); - log_info!( self.logger, "Starting up LDK Node with node ID {} on network: {}", @@ -253,19 +245,11 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - background_tasks.spawn_on( - async move { - chain_source - .continuously_sync_wallets( - stop_sync_receiver, - sync_cman, - sync_cmon, - sync_sweeper, - ) - .await; - }, - runtime_handle, - ); + self.runtime.spawn_background_task(async move { + chain_source + .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) + .await; + }); if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); @@ -273,7 +257,7 @@ impl Node { let gossip_sync_logger = Arc::clone(&self.logger); let gossip_node_metrics = Arc::clone(&self.node_metrics); let mut stop_gossip_sync = self.stop_sender.subscribe(); - cancellable_background_tasks.spawn_on(async move { + self.runtime.spawn_cancellable_background_task(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { tokio::select! { @@ -314,7 +298,7 @@ impl Node { } } } - }, runtime_handle); + }); } if let Some(listening_addresses) = &self.config.listening_addresses { @@ -340,7 +324,7 @@ impl Node { bind_addrs.extend(resolved_address); } - cancellable_background_tasks.spawn_on(async move { + self.runtime.spawn_cancellable_background_task(async move { { let listener = tokio::net::TcpListener::bind(&*bind_addrs).await @@ -378,7 +362,7 @@ impl Node { } listening_indicator.store(false, Ordering::Release); - }, runtime_handle); + }); } // Regularly reconnect to persisted peers. @@ -387,7 +371,7 @@ impl Node { let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); let mut stop_connect = self.stop_sender.subscribe(); - cancellable_background_tasks.spawn_on(async move { + self.runtime.spawn_cancellable_background_task(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { @@ -415,7 +399,7 @@ impl Node { } } } - }, runtime_handle); + }); // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); @@ -427,7 +411,7 @@ impl Node { let mut stop_bcast = self.stop_sender.subscribe(); let node_alias = self.config.node_alias.clone(); if may_announce_channel(&self.config).is_ok() { - cancellable_background_tasks.spawn_on(async move { + self.runtime.spawn_cancellable_background_task(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. #[cfg(not(test))] let mut interval = tokio::time::interval(Duration::from_secs(30)); @@ -498,15 +482,14 @@ impl Node { } } } - }, runtime_handle); + }); } let stop_tx_bcast = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); - cancellable_background_tasks.spawn_on( - async move { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await }, - runtime_handle, - ); + self.runtime.spawn_cancellable_background_task(async move { + chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await + }); let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( Arc::clone(&self.tx_broadcaster), @@ -563,7 +546,7 @@ impl Node { }) }; - let handle = self.runtime.spawn(async move { + self.runtime.spawn_background_processor_task(async move { process_events_async( background_persister, |e| background_event_handler.handle_event(e), @@ -584,38 +567,27 @@ impl Node { panic!("Failed to process events"); }); }); - debug_assert!(self.background_processor_task.lock().unwrap().is_none()); - *self.background_processor_task.lock().unwrap() = Some(handle); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); let liquidity_logger = Arc::clone(&self.logger); - background_tasks.spawn_on( - async move { - loop { - tokio::select! { - _ = stop_liquidity_handler.changed() => { - log_debug!( - liquidity_logger, - "Stopping processing liquidity events.", - ); - return; - } - _ = liquidity_handler.handle_next_event() => {} + self.runtime.spawn_background_task(async move { + loop { + tokio::select! { + _ = stop_liquidity_handler.changed() => { + log_debug!( + liquidity_logger, + "Stopping processing liquidity events.", + ); + return; } + _ = liquidity_handler.handle_next_event() => {} } - }, - runtime_handle, - ); + } + }); } - debug_assert!(self.background_tasks.lock().unwrap().is_none()); - *self.background_tasks.lock().unwrap() = Some(background_tasks); - - debug_assert!(self.cancellable_background_tasks.lock().unwrap().is_none()); - *self.cancellable_background_tasks.lock().unwrap() = Some(cancellable_background_tasks); - log_info!(self.logger, "Startup complete."); *is_running_lock = true; Ok(()) @@ -649,15 +621,7 @@ impl Node { } // Cancel cancellable background tasks - if let Some(mut tasks) = self.cancellable_background_tasks.lock().unwrap().take() { - let runtime_handle = self.runtime.handle(); - tasks.abort_all(); - tokio::task::block_in_place(move || { - runtime_handle.block_on(async { while let Some(_) = tasks.join_next().await {} }) - }); - } else { - debug_assert!(false, "Expected some cancellable background tasks"); - }; + self.runtime.abort_cancellable_background_tasks(); // Disconnect all peers. self.peer_manager.disconnect_all_peers(); @@ -668,91 +632,13 @@ impl Node { log_debug!(self.logger, "Stopped chain sources."); // Wait until non-cancellable background tasks (mod LDK's background processor) are done. - let runtime_handle = self.runtime.handle(); - if let Some(mut tasks) = self.background_tasks.lock().unwrap().take() { - tokio::task::block_in_place(move || { - runtime_handle.block_on(async { - loop { - let timeout_fut = tokio::time::timeout( - Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS), - tasks.join_next_with_id(), - ); - match timeout_fut.await { - Ok(Some(Ok((id, _)))) => { - log_trace!(self.logger, "Stopped background task with id {}", id); - }, - Ok(Some(Err(e))) => { - tasks.abort_all(); - log_trace!(self.logger, "Stopping background task failed: {}", e); - break; - }, - Ok(None) => { - log_debug!(self.logger, "Stopped all background tasks"); - break; - }, - Err(e) => { - tasks.abort_all(); - log_error!( - self.logger, - "Stopping background task timed out: {}", - e - ); - break; - }, - } - } - }) - }); - } else { - debug_assert!(false, "Expected some background tasks"); - }; + self.runtime.wait_on_background_tasks(); - // Wait until background processing stopped, at least until a timeout is reached. - if let Some(background_processor_task) = - self.background_processor_task.lock().unwrap().take() - { - let abort_handle = background_processor_task.abort_handle(); - let timeout_res = tokio::task::block_in_place(move || { - self.runtime.block_on(async { - tokio::time::timeout( - Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), - background_processor_task, - ) - .await - }) - }); - - match timeout_res { - Ok(stop_res) => match stop_res { - Ok(()) => log_debug!(self.logger, "Stopped background processing of events."), - Err(e) => { - abort_handle.abort(); - log_error!( - self.logger, - "Stopping event handling failed. This should never happen: {}", - e - ); - panic!("Stopping event handling failed. This should never happen."); - }, - }, - Err(e) => { - abort_handle.abort(); - log_error!(self.logger, "Stopping event handling timed out: {}", e); - }, - } - } else { - debug_assert!(false, "Expected a background processing task"); - }; + // Finally, wait until background processing stopped, at least until a timeout is reached. + self.runtime.wait_on_background_processor_task(); #[cfg(tokio_unstable)] - { - let runtime_handle = self.runtime.handle(); - log_trace!( - self.logger, - "Active runtime tasks left prior to shutdown: {}", - runtime_handle.metrics().active_tasks_count() - ); - } + self.runtime.log_metrics(); log_info!(self.logger, "Shutdown complete."); *is_running_lock = false; diff --git a/src/runtime.rs b/src/runtime.rs index 4c1241165..b30790a04 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -5,16 +5,27 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use tokio::task::JoinHandle; +use crate::config::{ + BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, +}; +use crate::logger::{log_debug, log_error, log_trace, LdkLogger, Logger}; + +use tokio::task::{JoinHandle, JoinSet}; use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::time::Duration; pub(crate) struct Runtime { mode: RuntimeMode, + background_tasks: Mutex>, + cancellable_background_tasks: Mutex>, + background_processor_task: Mutex>>, + logger: Arc, } impl Runtime { - pub fn new() -> Result { + pub fn new(logger: Arc) -> Result { let mode = match tokio::runtime::Handle::try_current() { Ok(handle) => RuntimeMode::Handle(handle), Err(_) => { @@ -22,21 +33,62 @@ impl Runtime { RuntimeMode::Owned(rt) }, }; - Ok(Self { mode }) + let background_tasks = Mutex::new(JoinSet::new()); + let cancellable_background_tasks = Mutex::new(JoinSet::new()); + let background_processor_task = Mutex::new(None); + + Ok(Self { + mode, + background_tasks, + cancellable_background_tasks, + background_processor_task, + logger, + }) } - pub fn with_handle(handle: tokio::runtime::Handle) -> Self { + pub fn with_handle(handle: tokio::runtime::Handle, logger: Arc) -> Self { let mode = RuntimeMode::Handle(handle); - Self { mode } + let background_tasks = Mutex::new(JoinSet::new()); + let cancellable_background_tasks = Mutex::new(JoinSet::new()); + let background_processor_task = Mutex::new(None); + + Self { + mode, + background_tasks, + cancellable_background_tasks, + background_processor_task, + logger, + } } - pub fn spawn(&self, future: F) -> JoinHandle + pub fn spawn_background_task(&self, future: F) where - F: Future + Send + 'static, - F::Output: Send + 'static, + F: Future + Send + 'static, { - let handle = self.handle(); - handle.spawn(future) + let mut background_tasks = self.background_tasks.lock().unwrap(); + let runtime_handle = self.handle(); + background_tasks.spawn_on(future, runtime_handle); + } + + pub fn spawn_cancellable_background_task(&self, future: F) + where + F: Future + Send + 'static, + { + let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap(); + let runtime_handle = self.handle(); + cancellable_background_tasks.spawn_on(future, runtime_handle); + } + + pub fn spawn_background_processor_task(&self, future: F) + where + F: Future + Send + 'static, + { + let mut background_processor_task = self.background_processor_task.lock().unwrap(); + debug_assert!(background_processor_task.is_none(), "Expected no background processor_task"); + + let runtime_handle = self.handle(); + let handle = runtime_handle.spawn(future); + *background_processor_task = Some(handle); } pub fn spawn_blocking(&self, func: F) -> JoinHandle @@ -58,7 +110,92 @@ impl Runtime { tokio::task::block_in_place(move || handle.block_on(future)) } - pub fn handle(&self) -> &tokio::runtime::Handle { + pub fn abort_cancellable_background_tasks(&self) { + let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lock().unwrap()); + debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks"); + tasks.abort_all(); + self.block_on(async { while let Some(_) = tasks.join_next().await {} }) + } + + pub fn wait_on_background_tasks(&self) { + let mut tasks = core::mem::take(&mut *self.background_tasks.lock().unwrap()); + debug_assert!(tasks.len() > 0, "Expected some background_tasks"); + self.block_on(async { + loop { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS), + tasks.join_next_with_id(), + ); + match timeout_fut.await { + Ok(Some(Ok((id, _)))) => { + log_trace!(self.logger, "Stopped background task with id {}", id); + }, + Ok(Some(Err(e))) => { + tasks.abort_all(); + log_trace!(self.logger, "Stopping background task failed: {}", e); + break; + }, + Ok(None) => { + log_debug!(self.logger, "Stopped all background tasks"); + break; + }, + Err(e) => { + tasks.abort_all(); + log_error!(self.logger, "Stopping background task timed out: {}", e); + break; + }, + } + } + }) + } + + pub fn wait_on_background_processor_task(&self) { + if let Some(background_processor_task) = + self.background_processor_task.lock().unwrap().take() + { + let abort_handle = background_processor_task.abort_handle(); + let timeout_res = self.block_on(async { + tokio::time::timeout( + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), + background_processor_task, + ) + .await + }); + + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => log_debug!(self.logger, "Stopped background processing of events."), + Err(e) => { + abort_handle.abort(); + log_error!( + self.logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, + Err(e) => { + abort_handle.abort(); + log_error!(self.logger, "Stopping event handling timed out: {}", e); + }, + } + } else { + debug_assert!(false, "Expected a background processing task"); + }; + } + + #[cfg(tokio_unstable)] + pub fn log_metrics(&self) { + let runtime_handle = self.handle(); + log_trace!( + self.logger, + "Active runtime tasks left prior to shutdown: {}", + runtime_handle.metrics().active_tasks_count() + ); + } + + fn handle(&self) -> &tokio::runtime::Handle { match &self.mode { RuntimeMode::Owned(rt) => rt.handle(), RuntimeMode::Handle(handle) => handle,