diff --git a/Cargo.lock b/Cargo.lock index a6f22eab9a9..ecd2b2a3537 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3959,6 +3959,8 @@ dependencies = [ "serde", "serde_yaml", "slog", + "slog-async", + "slog-term", "tokio", "tokio-stream", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index bd9808622a9..20c44b4989c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,8 @@ serde_json = { version = "1.0", features = ["arbitrary_precision"] } serde_regex = "1.1.0" serde_yaml = "0.9.21" slog = { version = "2.8.2", features = ["release_max_level_trace", "max_level_trace"] } +slog-async = "2.5.0" +slog-term = "2.7.0" sqlparser = { version = "0.60.0", features = ["visitor"] } strum = { version = "0.27", features = ["derive"] } syn = { version = "2.0.114", features = ["full"] } diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index 2b7d560dfc1..fa11dff8cf6 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -6,7 +6,7 @@ use crate::polling_monitor::{ use anyhow::{self, Error}; use bytes::Bytes; use graph::{ - blockchain::{Blockchain, TriggerFilterWrapper}, + blockchain::Blockchain, components::{store::DeploymentId, subgraph::HostMetrics}, data::subgraph::SubgraphManifest, data_source::{ @@ -74,7 +74,6 @@ where pub(crate) instance: SubgraphInstance, pub instances: SubgraphKeepAlive, pub offchain_monitor: OffchainMonitor, - pub filter: Option>, pub(crate) trigger_processor: Box>, pub(crate) decoder: Box>, } @@ -101,7 +100,6 @@ impl> IndexingContext { instance, instances, offchain_monitor, - filter: None, trigger_processor, decoder, } diff --git a/core/src/subgraph/error.rs b/core/src/subgraph/error.rs index 502a28dbc66..6aad9f7cb3e 100644 --- a/core/src/subgraph/error.rs +++ b/core/src/subgraph/error.rs @@ -10,6 +10,37 @@ impl DeterministicError for StoreError {} impl DeterministicError for anyhow::Error {} +/// Classification of processing errors for unified error handling. +/// +/// This enum provides a consistent way to categorize errors and determine +/// the appropriate response. The error handling invariants are: +/// +/// - **Deterministic**: Stop processing the current block, persist PoI only. +/// The subgraph will be marked as failed. These errors are reproducible +/// and indicate a bug in the subgraph or a permanent data issue. +/// +/// - **NonDeterministic**: Retry with exponential backoff. These errors are +/// transient (network issues, temporary database problems) and may succeed +/// on retry. +/// +/// - **PossibleReorg**: Restart the block stream cleanly without persisting. +/// The block stream needs to be restarted to detect and handle a potential +/// blockchain reorganization. +/// +/// - **Canceled**: The subgraph was canceled (unassigned or shut down). +/// No error should be recorded; this is a clean shutdown. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ProcessingErrorKind { + /// Error is deterministic - stop processing, persist PoI only + Deterministic, + /// Error is non-deterministic - retry with backoff + NonDeterministic, + /// Possible blockchain reorg detected - restart block stream cleanly + PossibleReorg, + /// Processing was canceled - clean shutdown + Canceled, +} + /// An error happened during processing and we need to classify errors into /// deterministic and non-deterministic errors. This struct holds the result /// of that classification @@ -23,13 +54,72 @@ pub enum ProcessingError { #[error("{0}")] Deterministic(Box), + /// A possible blockchain reorganization was detected. + /// The block stream should be restarted to detect and handle the reorg. + #[error("possible reorg detected: {0:#}")] + PossibleReorg(Error), + #[error("subgraph stopped while processing triggers")] Canceled, } impl ProcessingError { + /// Classify the error into one of the defined error kinds. + /// + /// This method provides a unified way to determine how to handle an error: + /// - `Deterministic`: Stop processing, persist PoI only + /// - `NonDeterministic`: Retry with backoff + /// - `PossibleReorg`: Restart block stream cleanly + /// - `Canceled`: Clean shutdown, no error recording + pub fn kind(&self) -> ProcessingErrorKind { + match self { + ProcessingError::Unknown(_) => ProcessingErrorKind::NonDeterministic, + ProcessingError::Deterministic(_) => ProcessingErrorKind::Deterministic, + ProcessingError::PossibleReorg(_) => ProcessingErrorKind::PossibleReorg, + ProcessingError::Canceled => ProcessingErrorKind::Canceled, + } + } + + #[allow(dead_code)] pub fn is_deterministic(&self) -> bool { - matches!(self, ProcessingError::Deterministic(_)) + matches!(self.kind(), ProcessingErrorKind::Deterministic) + } + + /// Returns true if this error should stop processing the current block. + /// + /// Deterministic errors stop processing because continuing would produce + /// incorrect results. The PoI is still persisted for debugging purposes. + #[allow(dead_code)] + pub fn should_stop_processing(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::Deterministic) + } + + /// Returns true if this error requires a clean restart of the block stream. + /// + /// Possible reorgs require restarting to allow the block stream to detect + /// and properly handle the reorganization. No state should be persisted + /// in this case. + #[allow(dead_code)] + pub fn should_restart(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::PossibleReorg) + } + + /// Returns true if this error is retryable with exponential backoff. + /// + /// Non-deterministic errors (network issues, temporary failures) may + /// succeed on retry and should not immediately fail the subgraph. + #[allow(dead_code)] + pub fn is_retryable(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::NonDeterministic) + } + + /// Returns true if processing was canceled (clean shutdown). + /// + /// Canceled errors indicate the subgraph was unassigned or shut down + /// intentionally and should not be treated as failures. + #[allow(dead_code)] + pub fn is_canceled(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::Canceled) } pub fn detail(self, ctx: &str) -> ProcessingError { @@ -41,6 +131,9 @@ impl ProcessingError { ProcessingError::Deterministic(e) => { ProcessingError::Deterministic(Box::new(anyhow!("{e}").context(ctx.to_string()))) } + ProcessingError::PossibleReorg(e) => { + ProcessingError::PossibleReorg(e.context(ctx.to_string())) + } ProcessingError::Canceled => ProcessingError::Canceled, } } diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner/mod.rs similarity index 63% rename from core/src/subgraph/runner.rs rename to core/src/subgraph/runner/mod.rs index 81db925a092..7b8b179d9f3 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner/mod.rs @@ -1,13 +1,17 @@ +mod state; +mod trigger_runner; + use crate::subgraph::context::IndexingContext; use crate::subgraph::error::{ ClassifyErrorHelper as _, DetailHelper as _, NonDeterministicErrorHelper as _, ProcessingError, + ProcessingErrorKind, }; use crate::subgraph::inputs::IndexingInputs; use crate::subgraph::state::IndexingState; use crate::subgraph::stream::new_block_stream; use anyhow::Context as _; use graph::blockchain::block_stream::{ - BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, + BlockStream, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; use graph::blockchain::{ Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _, @@ -21,7 +25,7 @@ use graph::components::{ subgraph::{MappingError, PoICausalityRegion, ProofOfIndexing, SharedProofOfIndexing}, }; use graph::data::store::scalar::Bytes; -use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; +use graph::data::subgraph::schema::SubgraphError; use graph::data_source::{ offchain, CausalityRegion, DataSource, DataSourceCreationError, TriggerData, }; @@ -30,7 +34,7 @@ use graph::ext::futures::Cancelable; use graph::futures03::stream::StreamExt; use graph::prelude::{ anyhow, hex, retry, thiserror, BlockNumber, BlockPtr, BlockState, CancelGuard, CancelHandle, - CancelToken as _, CancelableError, CheapClone as _, EntityCache, EntityModification, Error, + CancelToken as _, CheapClone as _, EntityCache, EntityModification, Error, InstanceDSTemplateInfo, LogCode, RunnerMetrics, RuntimeHostBuilder, StopwatchMetrics, StoreError, StreamExtension, UnfailOutcome, Value, ENV_VARS, }; @@ -42,6 +46,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::vec; +use self::state::{RestartReason, RunnerState, StopReason}; +use self::trigger_runner::TriggerRunner; + const MINUTE: Duration = Duration::from_secs(60); const SKIP_PTR_UPDATES_THRESHOLD: Duration = Duration::from_secs(60 * 5); @@ -61,6 +68,9 @@ where logger: Logger, pub metrics: RunnerMetrics, cancel_handle: Option, + /// The current state in the runner's state machine. + /// This field drives the main loop of the runner. + runner_state: RunnerState, } #[derive(Debug, thiserror::Error)] @@ -101,6 +111,7 @@ where logger, metrics, cancel_handle: None, + runner_state: RunnerState::Initializing, } } @@ -212,15 +223,11 @@ where let block_stream_canceler = CancelGuard::new(); let block_stream_cancel_handle = block_stream_canceler.handle(); // TriggerFilter needs to be rebuilt eveytime the blockstream is restarted - self.ctx.filter = Some(self.build_filter()); + let filter = self.build_filter(); - let block_stream = new_block_stream( - &self.inputs, - self.ctx.filter.clone().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line - &self.metrics.subgraph, - ) - .await? - .cancelable(&block_stream_canceler); + let block_stream = new_block_stream(&self.inputs, filter, &self.metrics.subgraph) + .await? + .cancelable(&block_stream_canceler); self.cancel_handle = Some(block_stream_cancel_handle); @@ -241,11 +248,17 @@ where } } - pub async fn run(self) -> Result<(), SubgraphRunnerError> { - self.run_inner(false).await.map(|_| ()) - } - - async fn run_inner(mut self, break_on_restart: bool) -> Result { + /// Initialize the runner by performing pre-loop setup. + /// + /// This method handles: + /// - Updating the deployment synced metric + /// - Attempting to unfail deterministic errors from the previous run + /// - Checking if the subgraph has already reached its max end block + /// + /// Returns the next state to transition to: + /// - `Restarting` to start the block stream (normal case) + /// - `Stopped` if the max end block was already reached + async fn initialize(&self) -> Result, SubgraphRunnerError> { self.update_deployment_synced_metric(); // If a subgraph failed for deterministic reasons, before start indexing, we first @@ -282,87 +295,335 @@ where "max_end_block" => max_end_block, "current_block" => current_ptr.block_number()); self.inputs.store.flush().await?; - return Ok(self); + return Ok(RunnerState::Stopped { + reason: StopReason::MaxEndBlockReached, + }); } } } - loop { - debug!(self.logger, "Starting or restarting subgraph"); + // Normal case: proceed to start the block stream + Ok(RunnerState::Restarting { + reason: RestartReason::StoreError, // Initial start uses the same path as restart + }) + } - let mut block_stream = self.start_block_stream().await?; + /// Await the next block stream event and transition to the appropriate state. + /// + /// This method waits for the next event from the block stream and determines + /// which state the runner should transition to: + /// - `ProcessingBlock` for new blocks to process + /// - `Reverting` for revert events + /// - `Stopped` when the stream ends or is canceled + /// - Returns back to `AwaitingBlock` for non-fatal errors that allow continuation + async fn await_block( + &mut self, + mut block_stream: Cancelable>>, + ) -> Result, SubgraphRunnerError> { + let event = { + let _section = self.metrics.stream.stopwatch.start_section("scan_blocks"); + block_stream.next().await + }; - debug!(self.logger, "Started block stream"); + if self.is_canceled() { + return self.cancel(); + } - self.metrics.subgraph.deployment_status.running(); + match event { + Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => { + Ok(RunnerState::ProcessingBlock { + block_stream, + block, + cursor, + }) + } + Some(Ok(BlockStreamEvent::Revert(to_ptr, cursor))) => Ok(RunnerState::Reverting { + block_stream, + to_ptr, + cursor, + }), + // Log and drop the errors from the block_stream + // The block stream will continue attempting to produce blocks + Some(Err(e)) => { + // Log error and continue waiting for blocks + debug!( + &self.logger, + "Block stream produced a non-fatal error"; + "error" => format!("{}", e), + ); + Ok(RunnerState::AwaitingBlock { block_stream }) + } + // If the block stream ends, that means that there is no more indexing to do. + None => Ok(RunnerState::Stopped { + reason: StopReason::StreamEnded, + }), + } + } - // Process events from the stream as long as no restart is needed - loop { - let event = { - let _section = self.metrics.stream.stopwatch.start_section("scan_blocks"); + fn cancel(&mut self) -> Result, SubgraphRunnerError> { + if self.ctx.instances.contains(&self.inputs.deployment.id) { + warn!( + self.logger, + "Terminating the subgraph runner because a newer one is active. \ + Possible reassignment detected while the runner was in a non-cancellable pending state", + ); + return Err(SubgraphRunnerError::Duplicate); + } + warn!( + self.logger, + "Terminating the subgraph runner because subgraph was unassigned", + ); + Ok(RunnerState::Stopped { + reason: StopReason::Unassigned, + }) + } - block_stream.next().await - }; + /// Construct a SubgraphError and mark the subgraph as failed in the store. + async fn fail_subgraph( + &mut self, + message: String, + block_ptr: Option, + deterministic: bool, + ) -> Result<(), SubgraphRunnerError> { + let error = SubgraphError { + subgraph_id: self.inputs.deployment.hash.clone(), + message, + block_ptr, + handler: None, + deterministic, + }; + self.inputs + .store + .fail_subgraph(error) + .await + .context("Failed to set subgraph status to `failed`")?; + Ok(()) + } - // TODO: move cancel handle to the Context - // This will require some code refactor in how the BlockStream is created - let block_start = Instant::now(); - - let action = self.handle_stream_event(event).await.inspect(|res| { - self.metrics - .subgraph - .observe_block_processed(block_start.elapsed(), res.block_finished()); - })?; - - self.update_deployment_synced_metric(); - - // It is possible that the subgraph was unassigned, but the runner was in - // a retry delay state and did not observe the cancel signal. - if self.is_canceled() { - // It is also possible that the runner was in a retry delay state while - // the subgraph was reassigned and a new runner was started. - if self.ctx.instances.contains(&self.inputs.deployment.id) { - warn!( - self.logger, - "Terminating the subgraph runner because a newer one is active. \ - Possible reassignment detected while the runner was in a non-cancellable pending state", - ); - return Err(SubgraphRunnerError::Duplicate); - } + /// Handle a restart by potentially restarting the store and starting a new block stream. + /// + /// This method handles: + /// - Restarting the store if there were errors (to clear error state) + /// - Reverting state to the last good block if the store was restarted + /// - Starting a new block stream with updated filters + /// + /// Returns the next state to transition to: + /// - `AwaitingBlock` with the new block stream (normal case) + async fn restart( + &mut self, + reason: RestartReason, + ) -> Result, SubgraphRunnerError> { + debug!(self.logger, "Starting or restarting subgraph"; "reason" => ?reason); + + // If restarting due to store error, try to restart the store + if matches!(reason, RestartReason::StoreError) { + let store = self.inputs.store.cheap_clone(); + if let Some(store) = store.restart().await? { + let last_good_block = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0); + self.revert_state_to(last_good_block)?; + self.inputs = Arc::new(self.inputs.with_store(store)); + } + } - warn!( - self.logger, - "Terminating the subgraph runner because subgraph was unassigned", - ); - return Ok(self); - } + let block_stream = self.start_block_stream().await?; - match action { - Action::Continue => continue, - Action::Stop => { - info!(self.logger, "Stopping subgraph"); - self.inputs.store.flush().await?; - return Ok(self); - } - Action::Restart if break_on_restart => { + debug!(self.logger, "Started block stream"); + self.metrics.subgraph.deployment_status.running(); + self.update_deployment_synced_metric(); + + Ok(RunnerState::AwaitingBlock { block_stream }) + } + + /// Finalize the runner when it reaches a terminal state. + /// + /// This method handles cleanup tasks when the runner stops: + /// - Flushing the store to ensure all changes are persisted + /// - Logging the stop reason + async fn finalize(self, reason: StopReason) -> Result { + match reason { + StopReason::MaxEndBlockReached => { + info!(self.logger, "Stopping subgraph - max end block reached"); + } + StopReason::Canceled => { + info!(self.logger, "Stopping subgraph - canceled"); + } + StopReason::Unassigned => { + info!(self.logger, "Stopping subgraph - unassigned"); + } + StopReason::StreamEnded => { + info!(self.logger, "Stopping subgraph - stream ended"); + } + } + + self.inputs.store.flush().await?; + Ok(self) + } + + pub async fn run(self) -> Result<(), SubgraphRunnerError> { + self.run_inner(false).await.map(|_| ()) + } + + /// Main state machine loop for the subgraph runner. + /// + /// This method drives the runner through its state machine, transitioning + /// between states based on events and actions. The state machine replaces + /// the previous nested loop structure with explicit state transitions. + /// + /// ## State Machine + /// + /// The runner starts in `Initializing` and transitions through states: + /// - `Initializing` → `Restarting` (or `Stopped` if max end block reached) + /// - `Restarting` → `AwaitingBlock` + /// - `AwaitingBlock` → `ProcessingBlock`, `Reverting`, or `Stopped` + /// - `ProcessingBlock` → `AwaitingBlock` or `Restarting` + /// - `Reverting` → `AwaitingBlock` or `Restarting` + /// - `Stopped` → terminal (returns) + async fn run_inner(mut self, break_on_restart: bool) -> Result { + // Start in Initializing state + self.runner_state = RunnerState::Initializing; + + // Track whether we've started processing blocks (not just initialized). + // This is used for break_on_restart logic - we should only stop on restart + // after we've actually started processing, not on the initial "restart" + // which is really the first start of the block stream. + let mut has_processed_blocks = false; + + loop { + self.runner_state = match std::mem::take(&mut self.runner_state) { + RunnerState::Initializing => self.initialize().await?, + + RunnerState::Restarting { reason } => { + if break_on_restart && has_processed_blocks { + // In test mode, stop on restart after first block processing info!(self.logger, "Stopping subgraph on break"); - self.inputs.store.flush().await?; - return Ok(self); - } - Action::Restart => { - // Restart the store to clear any errors that it - // might have encountered and use that from now on - let store = self.inputs.store.cheap_clone(); - if let Some(store) = store.restart().await? { - let last_good_block = - store.block_ptr().map(|ptr| ptr.number).unwrap_or(0); - self.revert_state_to(last_good_block)?; - self.inputs = Arc::new(self.inputs.with_store(store)); + RunnerState::Stopped { + reason: StopReason::Canceled, } - break; + } else { + self.restart(reason).await? } - }; - } + } + + RunnerState::AwaitingBlock { block_stream } => { + self.await_block(block_stream).await? + } + + RunnerState::ProcessingBlock { + block_stream, + block, + cursor, + } => { + has_processed_blocks = true; + self.process_block_state(block_stream, block, cursor) + .await? + } + + RunnerState::Reverting { + block_stream, + to_ptr, + cursor, + } => { + self.handle_revert_state(block_stream, to_ptr, cursor) + .await? + } + + RunnerState::Stopped { reason } => { + return self.finalize(reason).await; + } + }; + } + } + + /// Process a block and determine the next state. + /// + /// This is the state machine wrapper around `process_block` that handles + /// the block processing action and determines state transitions. + async fn process_block_state( + &mut self, + block_stream: Cancelable>>, + block: BlockWithTriggers, + cursor: FirehoseCursor, + ) -> Result, SubgraphRunnerError> { + let block_ptr = block.ptr(); + self.metrics + .stream + .deployment_head + .set(block_ptr.number as f64); + + if block.trigger_count() > 0 { + self.metrics + .subgraph + .block_trigger_count + .observe(block.trigger_count() as f64); + } + + // Check if we should skip this block (optimization for blocks without triggers) + if block.trigger_count() == 0 + && self.state.skip_ptr_updates_timer.elapsed() <= SKIP_PTR_UPDATES_THRESHOLD + && !self.inputs.store.is_deployment_synced() + && !close_to_chain_head(&block_ptr, &self.inputs.chain.chain_head_ptr().await?, 1000) + { + // Skip this block and continue with the same stream + return Ok(RunnerState::AwaitingBlock { block_stream }); + } else { + self.state.skip_ptr_updates_timer = Instant::now(); + } + + let block_start = Instant::now(); + + let action = { + let stopwatch = &self.metrics.stream.stopwatch; + let _section = stopwatch.start_section(PROCESS_BLOCK_SECTION_NAME); + self.process_block(block, cursor).await + }; + + let action = self.handle_action(block_start, block_ptr, action).await?; + + self.update_deployment_synced_metric(); + + if self.is_canceled() { + return self.cancel(); + } + + self.metrics + .subgraph + .observe_block_processed(block_start.elapsed(), action.block_finished()); + + // Convert Action to RunnerState + match action { + Action::Continue => Ok(RunnerState::AwaitingBlock { block_stream }), + Action::Restart => Ok(RunnerState::Restarting { + reason: RestartReason::DynamicDataSourceCreated, + }), + Action::Stop => Ok(RunnerState::Stopped { + reason: StopReason::MaxEndBlockReached, + }), + } + } + + /// Handle a revert event and determine the next state. + /// + /// This is the state machine wrapper around `handle_revert` that handles + /// the revert action and determines state transitions. + async fn handle_revert_state( + &mut self, + block_stream: Cancelable>>, + revert_to_ptr: BlockPtr, + cursor: FirehoseCursor, + ) -> Result, SubgraphRunnerError> { + let stopwatch = &self.metrics.stream.stopwatch; + let _section = stopwatch.start_section(HANDLE_REVERT_SECTION_NAME); + + let action = self.handle_revert(revert_to_ptr, cursor).await?; + + match action { + Action::Continue => Ok(RunnerState::AwaitingBlock { block_stream }), + Action::Restart => Ok(RunnerState::Restarting { + reason: RestartReason::DataSourceExpired, + }), + Action::Stop => Ok(RunnerState::Stopped { + reason: StopReason::Canceled, + }), } } @@ -569,13 +830,85 @@ where .await } - /// Processes a block and returns the updated context and a boolean flag indicating - /// whether new dynamic data sources have been added to the subgraph. - async fn process_block( + // ========================================================================= + // Pipeline Stage Methods + // ========================================================================= + // + // The following methods implement the block processing pipeline stages. + // Each stage handles a specific phase of block processing: + // + // 1. match_triggers: Match and decode triggers against hosts + // 2. execute_triggers: Execute the matched triggers + // 3. process_dynamic_data_sources: Handle dynamically created data sources + // 4. (process_offchain_triggers): Existing handle_offchain_triggers method + // 5. (persist_block_state): Existing transact_block_state method + // + // ========================================================================= + + /// Pipeline Stage 1: Match triggers to hosts and decode them. + /// + /// Takes raw triggers from a block and matches them against all registered + /// hosts, returning runnable triggers ready for execution. + async fn match_triggers<'a>( + &'a self, + logger: &Logger, + block: &Arc, + triggers: Vec>, + ) -> Result>, MappingError> { + let hosts_filter = |trigger: &TriggerData| self.ctx.instance.hosts_for_trigger(trigger); + self.match_and_decode_many(logger, block, triggers, hosts_filter) + .await + } + + /// Pipeline Stage 2: Execute matched triggers. + /// + /// Takes runnable triggers and executes them using the TriggerRunner, + /// accumulating state changes in the block state. + async fn execute_triggers( + &self, + block: &Arc, + runnables: Vec>, + block_state: BlockState, + proof_of_indexing: &SharedProofOfIndexing, + causality_region: &str, + ) -> Result { + let trigger_runner = TriggerRunner::new( + self.ctx.trigger_processor.as_ref(), + &self.logger, + &self.metrics.subgraph, + &self.inputs.debug_fork, + self.inputs.instrument, + ); + trigger_runner + .execute( + block, + runnables, + block_state, + proof_of_indexing, + causality_region, + ) + .await + } + + /// Pipeline Stage 3: Process dynamically created data sources. + /// + /// This loop processes data sources created during trigger execution: + /// 1. Instantiate the created data sources + /// 2. Reprocess triggers from this block that match the new data sources + /// 3. Repeat until no more data sources are created + /// + /// Note: This algorithm processes data sources spawned on the same block + /// _breadth first_ on the tree implied by the parent-child relationship + /// between data sources. + async fn process_dynamic_data_sources( &mut self, - block: BlockWithTriggers, - firehose_cursor: FirehoseCursor, - ) -> Result { + logger: &Logger, + block: &Arc, + firehose_cursor: &FirehoseCursor, + mut block_state: BlockState, + proof_of_indexing: &SharedProofOfIndexing, + causality_region: &str, + ) -> Result { fn log_triggers_found(logger: &Logger, triggers: &[Trigger]) { if triggers.len() == 1 { info!(logger, "1 trigger found in this block"); @@ -584,6 +917,140 @@ where } } + let _section = self + .metrics + .stream + .stopwatch + .start_section(HANDLE_CREATED_DS_SECTION_NAME); + + while block_state.has_created_data_sources() { + // Instantiate dynamic data sources, removing them from the block state. + let (data_sources, runtime_hosts) = + self.create_dynamic_data_sources(block_state.drain_created_data_sources())?; + + let filter = &Arc::new(TriggerFilterWrapper::new( + C::TriggerFilter::from_data_sources( + data_sources.iter().filter_map(DataSource::as_onchain), + ), + vec![], + )); + + // TODO: We have to pass a reference to `block` to + // `refetch_block`, otherwise the call to + // handle_offchain_triggers below gets an error that `block` + // has moved. That is extremely fishy since it means that + // `handle_offchain_triggers` uses the non-refetched block + // + // It's also not clear why refetching needs to happen inside + // the loop; will firehose really return something diffrent + // each time even though the cursor doesn't change? + let block = self.refetch_block(logger, block, firehose_cursor).await?; + + // Reprocess the triggers from this block that match the new data sources + let block_with_triggers = self + .inputs + .triggers_adapter + .triggers_in_block(logger, block.as_ref().clone(), filter) + .await + .non_deterministic()?; + + let triggers = block_with_triggers.trigger_data; + log_triggers_found::(logger, &triggers); + + // Add entity operations for the new data sources to the block state + // and add runtimes for the data sources to the subgraph instance. + self.persist_dynamic_data_sources(&mut block_state, data_sources); + + // Process the triggers in each host in the same order the + // corresponding data sources have been created. + let hosts_filter = |_: &'_ TriggerData| -> Box + Send> { + Box::new(runtime_hosts.iter().map(Arc::as_ref)) + }; + let runnables = self + .match_and_decode_many(logger, &block, triggers, hosts_filter) + .await; + + let trigger_runner = TriggerRunner::new( + self.ctx.trigger_processor.as_ref(), + &self.logger, + &self.metrics.subgraph, + &self.inputs.debug_fork, + self.inputs.instrument, + ); + let res = match runnables { + Ok(runnables) => { + trigger_runner + .execute( + &block, + runnables, + block_state, + proof_of_indexing, + causality_region, + ) + .await + } + Err(e) => Err(e), + }; + + block_state = res.map_err(|e| { + // This treats a `PossibleReorg` as an ordinary error which will fail the subgraph. + // This can cause an unnecessary subgraph failure, to fix it we need to figure out a + // way to revert the effect of `create_dynamic_data_sources` so we may return a + // clean context as in b21fa73b-6453-4340-99fb-1a78ec62efb1. + match e { + MappingError::PossibleReorg(e) | MappingError::Unknown(e) => { + ProcessingError::Unknown(e) + } + } + })?; + } + + Ok(block_state) + } + + /// Pipeline Stage 4: Process offchain triggers. + /// + /// Retrieves ready offchain events and processes them, returning entity + /// modifications and processed data sources to be included in the transaction. + async fn process_offchain_triggers( + &mut self, + block: &Arc, + block_state: &mut BlockState, + ) -> Result<(Vec, Vec), ProcessingError> { + let offchain_events = self + .ctx + .offchain_monitor + .ready_offchain_events() + .non_deterministic()?; + + let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) = + self.handle_offchain_triggers(offchain_events, block) + .await + .non_deterministic()?; + + block_state + .persisted_data_sources + .extend(persisted_off_chain_data_sources); + + Ok((offchain_mods, processed_offchain_data_sources)) + } + + /// Processes a block and returns the updated context and a boolean flag indicating + /// whether new dynamic data sources have been added to the subgraph. + /// + /// ## Pipeline Stages + /// + /// Block processing follows a pipeline of stages: + /// 1. **match_triggers**: Match and decode triggers against hosts + /// 2. **execute_triggers**: Execute the matched triggers via TriggerRunner + /// 3. **process_dynamic_data_sources**: Handle dynamically created data sources + /// 4. **process_offchain_triggers**: Process offchain events + /// 5. **persist_block**: Persist block state to the store + async fn process_block( + &mut self, + block: BlockWithTriggers, + firehose_cursor: FirehoseCursor, + ) -> Result { let triggers = block.trigger_data; let block = Arc::new(block.block); let block_ptr = block.ptr(); @@ -613,46 +1080,22 @@ where .stopwatch .start_section(PROCESS_TRIGGERS_SECTION_NAME); - // Match and decode all triggers in the block - let hosts_filter = |trigger: &TriggerData| self.ctx.instance.hosts_for_trigger(trigger); - let match_res = self - .match_and_decode_many(&logger, &block, triggers, hosts_filter) - .await; - - // Process events one after the other, passing in entity operations - // collected previously to every new event being processed - let mut res = Ok(block_state); - match match_res { + // Stage 1: Match triggers to hosts and decode + let runnables = self.match_triggers(&logger, &block, triggers).await; + + // Stage 2: Execute triggers + let res = match runnables { Ok(runnables) => { - for runnable in runnables { - let process_res = self - .ctx - .trigger_processor - .process_trigger( - &self.logger, - runnable.hosted_triggers, - &block, - res.unwrap(), - &proof_of_indexing, - &causality_region, - &self.inputs.debug_fork, - &self.metrics.subgraph, - self.inputs.instrument, - ) - .await - .map_err(|e| e.add_trigger_context(&runnable.trigger)); - match process_res { - Ok(state) => res = Ok(state), - Err(e) => { - res = Err(e); - break; - } - } - } - } - Err(e) => { - res = Err(e); + self.execute_triggers( + &block, + runnables, + block_state, + &proof_of_indexing, + &causality_region, + ) + .await } + Err(e) => Err(e), }; match res { @@ -661,21 +1104,10 @@ where // Some form of unknown or non-deterministic error ocurred. Err(MappingError::Unknown(e)) => return Err(ProcessingError::Unknown(e)), - Err(MappingError::PossibleReorg(e)) => { - info!(logger, - "Possible reorg detected, retrying"; - "error" => format!("{:#}", e), - ); - // In case of a possible reorg, we want this function to do nothing and restart the - // block stream so it has a chance to detect the reorg. - // - // The state is unchanged at this point, except for having cleared the entity cache. - // Losing the cache is a bit annoying but not an issue for correctness. - // - // See also b21fa73b-6453-4340-99fb-1a78ec62efb1. - return Ok(Action::Restart); - } + // Possible blockchain reorg detected - signal restart via ProcessingError::PossibleReorg. + // See also b21fa73b-6453-4340-99fb-1a78ec62efb1. + Err(MappingError::PossibleReorg(e)) => return Err(ProcessingError::PossibleReorg(e)), } // Check if there are any datasources that have expired in this block. ie: the end_block @@ -691,137 +1123,33 @@ where // or data sources that have reached their end block. let needs_restart = created_data_sources_needs_restart || has_expired_data_sources; - { - let _section = self - .metrics - .stream - .stopwatch - .start_section(HANDLE_CREATED_DS_SECTION_NAME); - - // This loop will: - // 1. Instantiate created data sources. - // 2. Process those data sources for the current block. - // Until no data sources are created or MAX_DATA_SOURCES is hit. - - // Note that this algorithm processes data sources spawned on the same block _breadth - // first_ on the tree implied by the parent-child relationship between data sources. Only a - // very contrived subgraph would be able to observe this. - while block_state.has_created_data_sources() { - // Instantiate dynamic data sources, removing them from the block state. - let (data_sources, runtime_hosts) = - self.create_dynamic_data_sources(block_state.drain_created_data_sources())?; - - let filter = &Arc::new(TriggerFilterWrapper::new( - C::TriggerFilter::from_data_sources( - data_sources.iter().filter_map(DataSource::as_onchain), - ), - vec![], - )); - - // TODO: We have to pass a reference to `block` to - // `refetch_block`, otherwise the call to - // handle_offchain_triggers below gets an error that `block` - // has moved. That is extremely fishy since it means that - // `handle_offchain_triggers` uses the non-refetched block - // - // It's also not clear why refetching needs to happen inside - // the loop; will firehose really return something diffrent - // each time even though the cursor doesn't change? - let block = self - .refetch_block(&logger, &block, &firehose_cursor) - .await?; - - // Reprocess the triggers from this block that match the new data sources - let block_with_triggers = self - .inputs - .triggers_adapter - .triggers_in_block(&logger, block.as_ref().clone(), filter) - .await - .non_deterministic()?; - - let triggers = block_with_triggers.trigger_data; - log_triggers_found(&logger, &triggers); - - // Add entity operations for the new data sources to the block state - // and add runtimes for the data sources to the subgraph instance. - self.persist_dynamic_data_sources(&mut block_state, data_sources); - - // Process the triggers in each host in the same order the - // corresponding data sources have been created. - let hosts_filter = |_: &'_ TriggerData| -> Box + Send> { - Box::new(runtime_hosts.iter().map(Arc::as_ref)) - }; - let match_res: Result, _> = self - .match_and_decode_many(&logger, &block, triggers, hosts_filter) - .await; - - let mut res = Ok(block_state); - match match_res { - Ok(runnables) => { - for runnable in runnables { - let process_res = self - .ctx - .trigger_processor - .process_trigger( - &self.logger, - runnable.hosted_triggers, - &block, - res.unwrap(), - &proof_of_indexing, - &causality_region, - &self.inputs.debug_fork, - &self.metrics.subgraph, - self.inputs.instrument, - ) - .await - .map_err(|e| e.add_trigger_context(&runnable.trigger)); - match process_res { - Ok(state) => res = Ok(state), - Err(e) => { - res = Err(e); - break; - } - } - } - } - Err(e) => { - res = Err(e); - } - } + // Checkpoint before dynamic DS processing for potential rollback scenarios. + // This captures the current state so it can be restored if dynamic data source + // processing fails in a way that requires partial rollback. + let _checkpoint = block_state.checkpoint(); + + // Stage 3: Process dynamic data sources + block_state = self + .process_dynamic_data_sources( + &logger, + &block, + &firehose_cursor, + block_state, + &proof_of_indexing, + &causality_region, + ) + .await?; - block_state = res.map_err(|e| { - // This treats a `PossibleReorg` as an ordinary error which will fail the subgraph. - // This can cause an unnecessary subgraph failure, to fix it we need to figure out a - // way to revert the effect of `create_dynamic_data_sources` so we may return a - // clean context as in b21fa73b-6453-4340-99fb-1a78ec62efb1. - match e { - MappingError::PossibleReorg(e) | MappingError::Unknown(e) => { - ProcessingError::Unknown(e) - } - } - })?; - } - } - - // Check for offchain events and process them, including their entity modifications in the - // set to be transacted. - let offchain_events = self - .ctx - .offchain_monitor - .ready_offchain_events() - .non_deterministic()?; - let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) = - self.handle_offchain_triggers(offchain_events, &block) - .await - .non_deterministic()?; - block_state - .persisted_data_sources - .extend(persisted_off_chain_data_sources); + // Stage 4: Process offchain triggers + let (offchain_mods, processed_offchain_data_sources) = self + .process_offchain_triggers(&block, &mut block_state) + .await?; + // Stage 5: Persist block state self.transact_block_state( &logger, - block_ptr.clone(), - firehose_cursor.clone(), + block_ptr, + firehose_cursor, block.timestamp(), block_state, proof_of_indexing, @@ -997,84 +1325,94 @@ where Ok(action) } - Err(ProcessingError::Canceled) => { - debug!(self.logger, "Subgraph block stream shut down cleanly"); - Ok(Action::Stop) - } + // Handle errors based on their kind using the unified error classification. + // + // Error handling invariants: + // - Deterministic: Stop processing, persist PoI only, fail subgraph + // - NonDeterministic: Retry with backoff, may succeed on retry + // - PossibleReorg: Restart cleanly without persisting (don't fail subgraph) + // - Canceled: Clean shutdown, no error recording + Err(e) => match e.kind() { + ProcessingErrorKind::Canceled => { + debug!(self.logger, "Subgraph block stream shut down cleanly"); + Ok(Action::Stop) + } - // Handle unexpected stream errors by marking the subgraph as failed. - Err(e) => { - self.metrics.subgraph.deployment_status.failed(); - let last_good_block = self - .inputs - .store - .block_ptr() - .map(|ptr| ptr.number) - .unwrap_or(0); - self.revert_state_to(last_good_block)?; + ProcessingErrorKind::PossibleReorg => { + // Possible reorg detected - restart the block stream cleanly. + // Don't persist anything and don't mark subgraph as failed. + // The block stream restart will allow detection of the actual reorg. + info!(self.logger, + "Possible reorg detected, restarting block stream"; + "error" => format!("{:#}", e), + ); - let message = format!("{:#}", e).replace('\n', "\t"); - let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure); - let deterministic = e.is_deterministic(); + // Revert in-memory state to last good block but don't touch the store + let last_good_block = self + .inputs + .store + .block_ptr() + .map(|ptr| ptr.number) + .unwrap_or(0); + self.revert_state_to(last_good_block)?; - let error = SubgraphError { - subgraph_id: self.inputs.deployment.hash.clone(), - message, - block_ptr: Some(block_ptr), - handler: None, - deterministic, - }; + Ok(Action::Restart) + } - match deterministic { - true => { - // Fail subgraph: - // - Change status/health. - // - Save the error to the database. - self.inputs - .store - .fail_subgraph(error) - .await - .context("Failed to set subgraph status to `failed`")?; + ProcessingErrorKind::Deterministic => { + // Deterministic error - fail the subgraph permanently. + self.metrics.subgraph.deployment_status.failed(); + let last_good_block = self + .inputs + .store + .block_ptr() + .map(|ptr| ptr.number) + .unwrap_or(0); + self.revert_state_to(last_good_block)?; - Err(err) - } - false => { - // Shouldn't fail subgraph if it's already failed for non-deterministic - // reasons. - // - // If we don't do this check we would keep adding the same error to the - // database. - let should_fail_subgraph = - self.inputs.store.health().await? != SubgraphHealth::Failed; - - if should_fail_subgraph { - // Fail subgraph: - // - Change status/health. - // - Save the error to the database. - self.inputs - .store - .fail_subgraph(error) - .await - .context("Failed to set subgraph status to `failed`")?; - } + let message = format!("{:#}", e).replace('\n', "\t"); + let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure); - // Retry logic below: + self.fail_subgraph(message, Some(block_ptr), true).await?; - let message = format!("{:#}", e).replace('\n', "\t"); - error!(self.logger, "Subgraph failed with non-deterministic error: {}", message; - "attempt" => self.state.backoff.attempt, - "retry_delay_s" => self.state.backoff.delay().as_secs()); + Err(err) + } - // Sleep before restarting. - self.state.backoff.sleep_async().await; + ProcessingErrorKind::NonDeterministic => { + // Non-deterministic error - retry with backoff. + self.metrics.subgraph.deployment_status.failed(); + let last_good_block = self + .inputs + .store + .block_ptr() + .map(|ptr| ptr.number) + .unwrap_or(0); + self.revert_state_to(last_good_block)?; + + let message = format!("{:#}", e).replace('\n', "\t"); + + error!(self.logger, "Subgraph failed with non-deterministic error: {}", message; + "attempt" => self.state.backoff.attempt, + "retry_delay_s" => self.state.backoff.delay().as_secs()); + + // Shouldn't fail subgraph if it's already failed for non-deterministic + // reasons. + // + // If we don't do this check we would keep adding the same error to the + // database. + if !self.inputs.store.health().await?.is_failed() { + self.fail_subgraph(message, Some(block_ptr), false).await?; + } - self.state.should_try_unfail_non_deterministic = true; + // Sleep before restarting. + self.state.backoff.sleep_async().await; - // And restart the subgraph. - Ok(Action::Restart) - } + self.state.should_try_unfail_non_deterministic = true; + + // And restart the subgraph. + Ok(Action::Restart) } - } + }, } } @@ -1132,31 +1470,6 @@ where C: Blockchain, T: RuntimeHostBuilder, { - async fn handle_stream_event( - &mut self, - event: Option, CancelableError>>, - ) -> Result { - let stopwatch = &self.metrics.stream.stopwatch; - let action = match event { - Some(Ok(BlockStreamEvent::ProcessBlock(block, cursor))) => { - let _section = stopwatch.start_section(PROCESS_BLOCK_SECTION_NAME); - self.handle_process_block(block, cursor).await? - } - Some(Ok(BlockStreamEvent::Revert(revert_to_ptr, cursor))) => { - let _section = stopwatch.start_section(HANDLE_REVERT_SECTION_NAME); - self.handle_revert(revert_to_ptr, cursor).await? - } - // Log and drop the errors from the block_stream - // The block stream will continue attempting to produce blocks - Some(Err(e)) => self.handle_err(e).await?, - // If the block stream ends, that means that there is no more indexing to do. - // Typically block streams produce indefinitely, but tests are an example of finite block streams. - None => Action::Stop, - }; - - Ok(action) - } - async fn handle_offchain_triggers( &mut self, triggers: Vec, @@ -1288,47 +1601,6 @@ where C: Blockchain, T: RuntimeHostBuilder, { - async fn handle_process_block( - &mut self, - block: BlockWithTriggers, - cursor: FirehoseCursor, - ) -> Result { - let block_ptr = block.ptr(); - self.metrics - .stream - .deployment_head - .set(block_ptr.number as f64); - - if block.trigger_count() > 0 { - self.metrics - .subgraph - .block_trigger_count - .observe(block.trigger_count() as f64); - } - - if block.trigger_count() == 0 - && self.state.skip_ptr_updates_timer.elapsed() <= SKIP_PTR_UPDATES_THRESHOLD - && !self.inputs.store.is_deployment_synced() - && !close_to_chain_head( - &block_ptr, - &self.inputs.chain.chain_head_ptr().await?, - // The "skip ptr updates timer" is ignored when a subgraph is at most 1000 blocks - // behind the chain head. - 1000, - ) - { - return Ok(Action::Continue); - } else { - self.state.skip_ptr_updates_timer = Instant::now(); - } - - let start = Instant::now(); - - let res = self.process_block(block, cursor).await; - - self.handle_action(start, block_ptr, res).await - } - async fn handle_revert( &mut self, revert_to_ptr: BlockPtr, @@ -1380,51 +1652,6 @@ where Ok(action) } - async fn handle_err( - &mut self, - err: CancelableError, - ) -> Result { - if self.is_canceled() { - debug!(&self.logger, "Subgraph block stream shut down cleanly"); - return Ok(Action::Stop); - } - - let err = match err { - CancelableError::Error(BlockStreamError::Fatal(msg)) => { - error!( - &self.logger, - "The block stream encountered a substreams fatal error and will not retry: {}", - msg - ); - - // If substreams returns a deterministic error we may not necessarily have a specific block - // but we should not retry since it will keep failing. - self.inputs - .store - .fail_subgraph(SubgraphError { - subgraph_id: self.inputs.deployment.hash.clone(), - message: msg, - block_ptr: None, - handler: None, - deterministic: true, - }) - .await - .context("Failed to set subgraph status to `failed`")?; - - return Ok(Action::Stop); - } - e => e, - }; - - debug!( - &self.logger, - "Block stream produced a non-fatal error"; - "error" => format!("{}", err), - ); - - Ok(Action::Continue) - } - /// Determines if the subgraph needs to be restarted. /// Currently returns true when there are data sources that have reached their end block /// in the range between `revert_to_ptr` and `subgraph_ptr`. diff --git a/core/src/subgraph/runner/state.rs b/core/src/subgraph/runner/state.rs new file mode 100644 index 00000000000..ad1761c8e47 --- /dev/null +++ b/core/src/subgraph/runner/state.rs @@ -0,0 +1,98 @@ +//! State machine types for SubgraphRunner. +//! +//! This module defines the explicit state machine that controls the runner's lifecycle, +//! replacing the previous nested loop structure with clear state transitions. + +use graph::blockchain::block_stream::{BlockStream, BlockWithTriggers, FirehoseCursor}; +use graph::blockchain::Blockchain; +use graph::ext::futures::Cancelable; +use graph::prelude::BlockPtr; + +/// The current state of the SubgraphRunner's lifecycle. +/// +/// The runner transitions through these states as it processes blocks, +/// handles reverts, and responds to errors or cancellation signals. +/// +/// ## State Transitions +/// +/// ```text +/// Initializing ───────────────────────────────────┐ +/// │ │ +/// v │ +/// AwaitingBlock ◄────────────────────────────────┤ +/// │ │ +/// ├── ProcessBlock event ──► ProcessingBlock │ +/// │ │ │ +/// │ ├── success ┼──► AwaitingBlock +/// │ │ │ +/// │ └── restart ┼──► Restarting +/// │ │ +/// ├── Revert event ──────────► Reverting ────┤ +/// │ │ +/// ├── Error ─────────────────► Restarting ───┤ +/// │ │ +/// └── Cancel/MaxBlock ───────► Stopped │ +/// │ +/// Restarting ─────────────────────────────────────┘ +/// ``` +#[derive(Default)] +pub enum RunnerState { + /// Initial state, ready to start block stream. + #[default] + Initializing, + + /// Block stream active, waiting for next event. + AwaitingBlock { + block_stream: Cancelable>>, + }, + + /// Processing a block through the pipeline. + /// The block stream is kept alive to continue processing after this block. + ProcessingBlock { + block_stream: Cancelable>>, + block: BlockWithTriggers, + cursor: FirehoseCursor, + }, + + /// Handling a revert event. + /// The block stream is kept alive to continue processing after the revert. + Reverting { + block_stream: Cancelable>>, + to_ptr: BlockPtr, + cursor: FirehoseCursor, + }, + + /// Restarting block stream (new filters, store restart, etc.). + Restarting { reason: RestartReason }, + + /// Terminal state. + Stopped { reason: StopReason }, +} + +/// Reasons for restarting the block stream. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RestartReason { + /// New dynamic data source was created that requires filter updates. + DynamicDataSourceCreated, + /// A data source reached its end block. + DataSourceExpired, + /// Store error occurred and store needs to be restarted. + StoreError, + /// Possible reorg detected, need to restart to detect it. + /// NOTE: Currently unused but reserved for future error handling consolidation (Phase 5). + #[allow(dead_code)] + PossibleReorg, +} + +/// Reasons for stopping the runner. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StopReason { + /// The maximum end block was reached. + MaxEndBlockReached, + /// The runner was canceled (unassigned or shutdown). + Canceled, + /// The subgraph was unassigned while this runner was active. + Unassigned, + /// The block stream ended (typically in tests). + StreamEnded, +} diff --git a/core/src/subgraph/runner/trigger_runner.rs b/core/src/subgraph/runner/trigger_runner.rs new file mode 100644 index 00000000000..6c89052e5dc --- /dev/null +++ b/core/src/subgraph/runner/trigger_runner.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; + +use graph::blockchain::Blockchain; +use graph::components::store::SubgraphFork; +use graph::components::subgraph::{MappingError, SharedProofOfIndexing}; +use graph::components::trigger_processor::RunnableTriggers; +use graph::prelude::{BlockState, RuntimeHostBuilder, SubgraphInstanceMetrics, TriggerProcessor}; +use graph::slog::Logger; + +/// Handles the execution of triggers against runtime hosts, accumulating state. +/// +/// This component unifies the trigger processing loop that was previously duplicated +/// for initial triggers and dynamically created data source triggers. +pub struct TriggerRunner<'a, C: Blockchain, T: RuntimeHostBuilder> { + processor: &'a dyn TriggerProcessor, + logger: &'a Logger, + metrics: &'a Arc, + debug_fork: &'a Option>, + instrument: bool, +} + +impl<'a, C, T> TriggerRunner<'a, C, T> +where + C: Blockchain, + T: RuntimeHostBuilder, +{ + /// Create a new TriggerRunner with the given dependencies. + pub fn new( + processor: &'a dyn TriggerProcessor, + logger: &'a Logger, + metrics: &'a Arc, + debug_fork: &'a Option>, + instrument: bool, + ) -> Self { + Self { + processor, + logger, + metrics, + debug_fork, + instrument, + } + } + + /// Execute a sequence of runnable triggers, accumulating state changes. + /// + /// Processes each trigger in order. If any trigger fails with a non-deterministic + /// error, processing stops and the error is returned. Deterministic errors are + /// accumulated in the block state. + pub async fn execute( + &self, + block: &Arc, + runnables: Vec>, + block_state: BlockState, + proof_of_indexing: &SharedProofOfIndexing, + causality_region: &str, + ) -> Result { + let mut state = block_state; + + for runnable in runnables { + state = self + .processor + .process_trigger( + self.logger, + runnable.hosted_triggers, + block, + state, + proof_of_indexing, + causality_region, + self.debug_fork, + self.metrics, + self.instrument, + ) + .await + .map_err(|e| e.add_trigger_context(&runnable.trigger))?; + } + + Ok(state) + } +} diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 9c4fc5dc8b4..b78c4ada314 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -216,6 +216,9 @@ those. decisions. Set to `true` to turn simulation on, defaults to `false` - `GRAPH_STORE_CONNECTION_TIMEOUT`: How long to wait to connect to a database before assuming the database is down in ms. Defaults to 5000ms. +- `GRAPH_STORE_SETUP_TIMEOUT`: Timeout for database setup operations + (migrations, schema creation) in milliseconds. Defaults to 30000ms (30s). + Setup operations can legitimately take longer than normal runtime operations. - `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`: When a database shard is marked unavailable due to connection timeouts, this controls how often to allow a single probe request through to check if the database has recovered. Only one diff --git a/docs/plans/runner-refactor.md b/docs/plans/runner-refactor.md new file mode 100644 index 00000000000..c5ce612f8db --- /dev/null +++ b/docs/plans/runner-refactor.md @@ -0,0 +1,281 @@ +# Runner Refactor Implementation Plan + +This document outlines the implementation plan for the runner refactor described in [the spec](../specs/runner-refactor.md). + +## Overview + +The refactor transforms `core/src/subgraph/runner.rs` from a complex nested-loop structure into a cleaner state machine with explicit pipeline stages. + +## Git Workflow + +**Branch**: All work should be committed to the `runner-refactor` branch. + +**Commit discipline**: + +- Commit work in small, reviewable chunks +- Each commit should be self-contained and pass all checks +- Prefer many small commits over few large ones +- Each commit message should clearly describe what it does +- Each step in the implementation phases should correspond to one or more commits + +**Before each commit**: + +```bash +just format +just lint +just test-unit +just test-runner +``` + +- MANDATORY: Work must be committed, a task is only done when work is committed +- MANDATORY: Make sure to follow the commit discipline above +- IMPORTANT: The runner tests produce output in `tests/runner-tests.log`. Use that to investigate failures. + +## Implementation Phases + +### Phase 1: Extract TriggerRunner Component + +**Goal:** Eliminate duplicated trigger processing code (lines 616-656 vs 754-790). + +**Files to modify:** + +- `core/src/subgraph/runner.rs` - Extract logic +- Create `core/src/subgraph/runner/trigger_runner.rs` + +**Steps:** + +1. Create `TriggerRunner` struct with execute method +2. Replace first trigger loop (lines 616-656) with `TriggerRunner::execute()` +3. Replace second trigger loop (lines 754-790) with same call +4. Verify tests pass + +**Verification:** + +- `just test-unit` passes +- `just test-runner` passes +- No behavioral changes + +### Phase 2: Define RunnerState Enum + +**Goal:** Introduce explicit state machine types without changing control flow yet. + +**Files to modify:** + +- Create `core/src/subgraph/runner/state.rs` +- `core/src/subgraph/runner.rs` - Add state field + +**Steps:** + +1. Define `RunnerState` enum with all variants +2. Define `RestartReason` and `StopReason` enums +3. Add `state: RunnerState` field to `SubgraphRunner` +4. Initialize state in constructor +5. Verify tests pass (no behavioral changes yet) + +**Verification:** + +- Code compiles +- Tests pass unchanged + +### Phase 3: Refactor run_inner to State Machine + +**Goal:** Replace nested loops with explicit state transitions. + +**Files to modify:** + +- `core/src/subgraph/runner.rs` - Rewrite `run_inner` + +**Steps:** + +1. Extract `initialize()` method for pre-loop setup +2. Extract `await_block()` method for stream event handling +3. Extract `restart()` method for restart logic +4. Extract `finalize()` method for cleanup +5. Rewrite `run_inner` as state machine loop +6. Remove nested loop structure +7. Verify tests pass + +**Verification:** + +- `just test-unit` passes +- `just test-runner` passes +- Same behavior, cleaner structure + +### Phase 4: Define Pipeline Stages + +**Goal:** Break `process_block` into explicit stages. + +**Files to modify:** + +- Create `core/src/subgraph/runner/pipeline.rs` +- `core/src/subgraph/runner.rs` - Refactor `process_block` + +**Steps:** + +1. Extract `match_triggers()` stage method +2. Extract `execute_triggers()` stage method (uses `TriggerRunner`) +3. Extract `process_dynamic_data_sources()` stage method +4. Extract `process_offchain_triggers()` stage method +5. Extract `persist_block_state()` stage method +6. Rewrite `process_block` to call stages in sequence +7. Verify tests pass + +**Verification:** + +- `just test-unit` passes +- `just test-runner` passes +- Same behavior, cleaner structure + +### Phase 5: Consolidate Error Handling + +**Goal:** Unify scattered error handling into explicit classification. + +**Files to modify:** + +- `graph/src/components/subgraph/error.rs` (or wherever `ProcessingError` lives) +- `core/src/subgraph/runner.rs` - Use new error methods + +**Steps:** + +1. Add `ProcessingErrorKind` enum with Deterministic/NonDeterministic/PossibleReorg variants +2. Add `kind()` method to `ProcessingError` +3. Add helper methods: `should_stop_processing()`, `should_restart()`, `is_retryable()` +4. Replace scattered error checks in `process_block` with unified logic +5. Replace scattered error checks in dynamic DS handling +6. Replace scattered error checks in `handle_offchain_triggers` +7. Document error handling invariants in code comments +8. Verify tests pass + +**Verification:** + +- `just test-unit` passes +- `just test-runner` passes +- Error behavior unchanged (same semantics, cleaner code) + +### Phase 6: Add BlockState Checkpoints + +**Goal:** Enable rollback capability with minimal overhead. + +**Files to modify:** + +- `graph/src/prelude.rs` or wherever `BlockState` is defined +- `core/src/subgraph/runner.rs` - Use checkpoints + +**Steps:** + +1. Add `checkpoint()` method to `BlockState` +2. Add `BlockStateCheckpoint` struct +3. Add `restore()` method to `BlockState` +4. Use checkpoint before dynamic DS processing +5. Verify tests pass + +**Verification:** + +- `just test-unit` passes +- No performance regression (checkpoints are lightweight) + +### Phase 7: Module Organization + +**Goal:** Organize code into proper module structure. + +**Files to create/modify:** + +- `core/src/subgraph/runner/mod.rs` +- Move/organize existing extracted modules + +**Steps:** + +1. Create `runner/` directory +2. Move `state.rs`, `pipeline.rs`, `trigger_runner.rs` into it +3. Update `runner.rs` to re-export from module +4. Update imports in dependent files +5. Verify tests pass + +**Verification:** + +- `just test-unit` passes +- `just test-runner` passes +- `just lint` shows no warnings + +## Completion Criteria + +Each phase is complete when: + +1. `just format` - Code is formatted +2. `just lint` - Zero warnings +3. `just check --release` - Builds in release mode +4. `just test-unit` - Unit tests pass +5. `just test-runner` - Runner tests pass + +## Progress Checklist + +### Phase 1: Extract TriggerRunner Component + +- [x] Create `TriggerRunner` struct with execute method +- [x] Replace first trigger loop (lines 616-656) +- [x] Replace second trigger loop (lines 754-790) +- [x] Verify tests pass + +### Phase 2: Define RunnerState Enum + +- [x] Define `RunnerState` enum with all variants +- [x] Define `RestartReason` and `StopReason` enums +- [x] Add `state: RunnerState` field to `SubgraphRunner` +- [x] Initialize state in constructor +- [x] Verify tests pass + +### Phase 3: Refactor run_inner to State Machine + +- [x] Extract `initialize()` method +- [x] Extract `await_block()` method +- [x] Extract `restart()` method +- [x] Extract `finalize()` method +- [x] Rewrite `run_inner` as state machine loop +- [x] Remove nested loop structure +- [x] Verify tests pass + +### Phase 4: Define Pipeline Stages + +- [x] Extract `match_triggers()` stage method +- [x] Extract `execute_triggers()` stage method +- [x] Extract `process_dynamic_data_sources()` stage method +- [x] Extract `process_offchain_triggers()` stage method +- [x] Extract `persist_block_state()` stage method +- [x] Rewrite `process_block` to call stages in sequence +- [x] Verify tests pass + +### Phase 5: Consolidate Error Handling + +- [x] Add `ProcessingErrorKind` enum +- [x] Add `kind()` method to `ProcessingError` +- [x] Add helper methods (`should_stop_processing()`, `should_restart()`, `is_retryable()`) +- [x] Replace scattered error checks in `process_block` +- [x] Replace scattered error checks in dynamic DS handling (preserved existing behavior per spec) +- [x] Replace scattered error checks in `handle_offchain_triggers` (preserved existing behavior per spec) +- [x] Document error handling invariants +- [x] Verify tests pass + +### Phase 6: Add BlockState Checkpoints + +- [x] Add `BlockStateCheckpoint` struct +- [x] Add `checkpoint()` method to `BlockState` +- [x] Add `restore()` method to `BlockState` +- [x] Use checkpoint before dynamic DS processing +- [x] Verify tests pass + +### Phase 7: Module Organization + +- [x] Create `runner_components/` directory (named to avoid conflict with `runner.rs`) +- [x] Move `state.rs`, `trigger_runner.rs` into it (pipeline.rs was not created as stages are methods) +- [x] Create `runner_components/mod.rs` with re-exports +- [x] Update imports in `runner.rs` to use the new module path +- [x] Verify tests pass +- [x] `just lint` shows zero warnings + +## Notes + +- Each phase should be a separate, reviewable PR +- Phases 1-4 can potentially be combined if changes are small +- Phase 3 (FSM refactor of run_inner) is the most invasive and should be reviewed carefully +- Phase 5 (error handling) can be done earlier if it helps simplify other phases +- Preserve all existing behavior - this is a refactor, not a feature change diff --git a/docs/specs/runner-refactor.md b/docs/specs/runner-refactor.md new file mode 100644 index 00000000000..ee36ebe3342 --- /dev/null +++ b/docs/specs/runner-refactor.md @@ -0,0 +1,380 @@ +# Subgraph Runner Simplification Spec + +## Problem Statement + +`core/src/subgraph/runner.rs` is complex and hard to modify. Key issues: + +1. **Duplicated trigger processing** (lines 616-656 vs 754-790): Nearly identical loops +2. **Control flow confusion**: Nested loops in `run_inner` with 6 exit paths +3. **State management**: Mixed patterns (mutable fields, `std::mem::take`, drains) +4. **`process_block` monolith**: ~260 lines handling triggers, dynamic DS, offchain, persistence + +## Design Decisions + +| Aspect | Decision | +|--------|----------| +| Control flow | Enum-based FSM for full runner lifecycle | +| Trigger processing | New `TriggerRunner` component | +| Block processing | Pipeline with explicitly defined stages | +| State management | Mutable accumulator with checkpoints | +| Breaking changes | Moderate (internal APIs can change) | + +## Target Architecture + +### 1. Runner State Machine + +Replace nested loops in `run_inner` with an explicit enum FSM covering the full lifecycle: + +```rust +enum RunnerState { + /// Initial state, ready to start block stream + Initializing, + + /// Block stream active, waiting for next event + AwaitingBlock { + block_stream: Cancelable>>, + }, + + /// Processing a block through the pipeline + ProcessingBlock { + block: BlockWithTriggers, + cursor: FirehoseCursor, + }, + + /// Handling a revert event + Reverting { + to_ptr: BlockPtr, + cursor: FirehoseCursor, + }, + + /// Restarting block stream (new filters, store restart, etc.) + Restarting { + reason: RestartReason, + }, + + /// Terminal state + Stopped { + reason: StopReason, + }, +} + +enum RestartReason { + DynamicDataSourceCreated, + DataSourceExpired, + StoreError, + PossibleReorg, +} + +enum StopReason { + MaxEndBlockReached, + Canceled, + Unassigned, +} +``` + +The main loop becomes: + +```rust +async fn run(mut self) -> Result<(), SubgraphRunnerError> { + loop { + self.state = match self.state { + RunnerState::Initializing => self.initialize().await?, + RunnerState::AwaitingBlock { stream } => self.await_block(stream).await?, + RunnerState::ProcessingBlock { block, cursor } => { + self.process_block(block, cursor).await? + } + RunnerState::Reverting { to_ptr, cursor } => { + self.handle_revert(to_ptr, cursor).await? + } + RunnerState::Restarting { reason } => self.restart(reason).await?, + RunnerState::Stopped { reason } => return self.finalize(reason).await, + }; + } +} +``` + +### 2. Block Processing Pipeline + +Replace the `process_block` monolith with explicit stages: + +```rust +/// Pipeline stages for block processing +mod pipeline { + pub struct TriggerMatchStage; + pub struct TriggerExecuteStage; + pub struct DynamicDataSourceStage; + pub struct OffchainTriggerStage; + pub struct PersistStage; +} + +/// Result of block processing pipeline +pub struct BlockProcessingResult { + pub action: Action, + pub block_state: BlockState, +} + +impl SubgraphRunner { + async fn process_block( + &mut self, + block: BlockWithTriggers, + cursor: FirehoseCursor, + ) -> Result { + let block = Arc::new(block.block); + let triggers = block.trigger_data; + + // Stage 1: Match triggers to hosts and decode + let runnables = self.match_triggers(&block, triggers).await?; + + // Stage 2: Execute triggers (unified for initial + dynamic DS) + let mut block_state = self.execute_triggers(&block, runnables).await?; + + // Checkpoint before dynamic DS processing + let checkpoint = block_state.checkpoint(); + + // Stage 3: Process dynamic data sources (loop until none created) + block_state = self.process_dynamic_data_sources(&block, &cursor, block_state).await?; + + // Stage 4: Handle offchain triggers + let offchain_result = self.process_offchain_triggers(&block, &mut block_state).await?; + + // Stage 5: Persist to store + self.persist_block_state(block_state, offchain_result).await?; + + // Determine next state + Ok(self.determine_next_state()) + } +} +``` + +### 3. Error Handling Strategy + +Consolidate scattered error handling into explicit classification: + +```rust +/// Unified error classification for trigger processing +pub enum ProcessingErrorKind { + /// Stop processing, persist PoI only + Deterministic(anyhow::Error), + /// Retry with backoff, attempt to unfail + NonDeterministic(anyhow::Error), + /// Restart block stream cleanly (don't persist) + PossibleReorg(anyhow::Error), +} + +impl ProcessingError { + /// Classify error once, use classification throughout + pub fn kind(&self) -> ProcessingErrorKind { ... } + + /// Whether this error should stop processing the current block + pub fn should_stop_processing(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::Deterministic(_)) + } + + /// Whether this error requires a clean restart + pub fn should_restart(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::PossibleReorg(_)) + } + + /// Whether this error is retryable with backoff + pub fn is_retryable(&self) -> bool { + matches!(self.kind(), ProcessingErrorKind::NonDeterministic(_)) + } +} +``` + +**Key Invariant (must be preserved):** +``` +Deterministic → Stop processing block, persist PoI only +NonDeterministic → Retry with backoff +PossibleReorg → Restart cleanly (don't persist) +``` + +Currently this logic is scattered across: +- `process_block` early return for PossibleReorg (line 664-677) +- Dynamic data sources error mapping (line 792-802) +- `transact_block_state` (line 405-430) +- `handle_offchain_triggers` (line 1180-1190) + +Consolidating into helper methods eliminates these scattered special cases. + +### 4. TriggerRunner Component + +Extract trigger execution into a dedicated component: + +```rust +/// Handles matching, decoding, and executing triggers +pub struct TriggerRunner<'a, C: Blockchain, T: RuntimeHostBuilder> { + decoder: &'a Decoder, + processor: &'a dyn TriggerProcessor, + logger: &'a Logger, + metrics: &'a SubgraphMetrics, + debug_fork: &'a Option>, + instrument: bool, +} + +impl<'a, C, T> TriggerRunner<'a, C, T> +where + C: Blockchain, + T: RuntimeHostBuilder, +{ + /// Execute triggers against hosts, accumulating state + pub async fn execute( + &self, + block: &Arc, + runnables: Vec>, + mut block_state: BlockState, + proof_of_indexing: &SharedProofOfIndexing, + causality_region: &PoICausalityRegion, + ) -> Result { + for runnable in runnables { + block_state = self.processor + .process_trigger( + self.logger, + runnable.hosted_triggers, + block, + block_state, + proof_of_indexing, + causality_region, + self.debug_fork, + self.metrics, + self.instrument, + ) + .await + .map_err(|e| e.add_trigger_context(&runnable.trigger))?; + } + Ok(block_state) + } +} +``` + +This eliminates the duplicated loops (lines 616-656 and 754-790). + +### 5. State Management with Checkpoints + +**Explicit Input/Output Types for Pipeline Stages:** + +```rust +/// Input to trigger processing - makes dependencies explicit +struct TriggerProcessingContext<'a> { + block: &'a Arc, + proof_of_indexing: &'a SharedProofOfIndexing, + causality_region: &'a PoICausalityRegion, +} + +/// Output from trigger processing - makes results explicit +struct TriggerProcessingResult { + block_state: BlockState, + restart_needed: bool, +} +``` + +**Add checkpoint capability to `BlockState` for rollback scenarios:** + +```rust +impl BlockState { + /// Create a lightweight checkpoint for rollback + pub fn checkpoint(&self) -> BlockStateCheckpoint { + BlockStateCheckpoint { + created_data_sources_count: self.created_data_sources.len(), + persisted_data_sources_count: self.persisted_data_sources.len(), + // Note: entity_cache changes cannot be easily checkpointed + // Rollback clears the cache (acceptable per current behavior) + } + } + + /// Restore state to checkpoint (partial rollback) + pub fn restore(&mut self, checkpoint: BlockStateCheckpoint) { + self.created_data_sources.truncate(checkpoint.created_data_sources_count); + self.persisted_data_sources.truncate(checkpoint.persisted_data_sources_count); + // Entity cache is cleared on rollback (matches current behavior) + } +} +``` + +### 6. File Structure + +``` +core/src/subgraph/ +├── runner.rs # Main SubgraphRunner with FSM +├── runner/ +│ ├── mod.rs +│ ├── state.rs # RunnerState enum and transitions +│ ├── pipeline.rs # Pipeline stage definitions +│ └── trigger_runner.rs # TriggerRunner component +├── context.rs # IndexingContext (unchanged) +├── inputs.rs # IndexingInputs (unchanged) +└── state.rs # IndexingState (unchanged) +``` + +## Deferred Concerns + +### Fishy Block Refetch (Preserve Behavior) + +The TODO at lines 721-729 notes unclear behavior around block refetching in the dynamic DS loop. The restructure preserves this behavior without attempting to fix it. Investigate separately. + +## Key Interfaces + +### RunnerState Transitions + +``` +Initializing ──────────────────────────────────┐ + │ │ + v │ +AwaitingBlock ◄─────────────────────────────────┤ + │ │ + ├── ProcessBlock event ──► ProcessingBlock │ + │ │ │ + │ ├── success ┼──► AwaitingBlock + │ │ │ + │ └── restart ┼──► Restarting + │ │ + ├── Revert event ──────────► Reverting ────┤ + │ │ + ├── Error ─────────────────► Restarting ───┤ + │ │ + └── Cancel/MaxBlock ───────► Stopped │ + │ +Restarting ─────────────────────────────────────┘ +``` + +### Pipeline Data Flow + +``` +BlockWithTriggers + │ + v +┌──────────────────┐ +│ TriggerMatchStage│ ─► Vec +└──────────────────┘ + │ + v +┌────────────────────┐ +│ TriggerExecuteStage│ ─► BlockState (mutated) +└────────────────────┘ + │ + v (loop while has_created_data_sources) +┌─────────────────────────┐ +│ DynamicDataSourceStage │ ─► BlockState (mutated), new hosts added +└─────────────────────────┘ + │ + v +┌─────────────────────┐ +│ OffchainTriggerStage│ ─► offchain_mods, processed_ds +└─────────────────────┘ + │ + v +┌─────────────┐ +│ PersistStage│ ─► Store transaction +└─────────────┘ +``` + +## Verification + +After implementation, verify: + +1. **Unit tests pass**: `just test-unit` +2. **Runner tests pass**: `just test-runner` +3. **Lint clean**: `just lint` (zero warnings) +4. **Build succeeds**: `just check --release` + +For behavioral verification, the existing runner tests should catch regressions. No new integration tests required for a refactor that preserves behavior. diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 7de4222efe2..c076619bbf9 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -705,14 +705,6 @@ pub enum BlockStreamError { ProtobufDecodingError(#[from] prost::DecodeError), #[error("block stream error {0}")] Unknown(#[from] anyhow::Error), - #[error("block stream fatal error {0}")] - Fatal(String), -} - -impl BlockStreamError { - pub fn is_deterministic(&self) -> bool { - matches!(self, Self::Fatal(_)) - } } #[derive(Debug)] diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 6ee720a10c0..bbba7b5b32b 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -178,4 +178,48 @@ impl BlockState { pub fn persist_data_source(&mut self, ds: StoredDynamicDataSource) { self.persisted_data_sources.push(ds) } + + /// Create a lightweight checkpoint for rollback. + /// + /// This captures the current counts of created and persisted data sources, + /// allowing a partial rollback via `restore()`. Note that entity cache changes + /// cannot be easily checkpointed; rollback clears the cache (acceptable per + /// current behavior). + pub fn checkpoint(&self) -> BlockStateCheckpoint { + assert!(!self.in_handler); + BlockStateCheckpoint { + created_data_sources_count: self.created_data_sources.len(), + persisted_data_sources_count: self.persisted_data_sources.len(), + processed_data_sources_count: self.processed_data_sources.len(), + deterministic_errors_count: self.deterministic_errors.len(), + } + } + + /// Restore state to a previously captured checkpoint (partial rollback). + /// + /// This truncates the data source vectors to their checkpoint sizes. + /// Entity cache is NOT restored - caller should handle cache state if needed. + pub fn restore(&mut self, checkpoint: BlockStateCheckpoint) { + assert!(!self.in_handler); + self.created_data_sources + .truncate(checkpoint.created_data_sources_count); + self.persisted_data_sources + .truncate(checkpoint.persisted_data_sources_count); + self.processed_data_sources + .truncate(checkpoint.processed_data_sources_count); + self.deterministic_errors + .truncate(checkpoint.deterministic_errors_count); + } +} + +/// A lightweight checkpoint for `BlockState` rollback. +/// +/// Captures counts of mutable vectors in `BlockState` to enable partial rollback. +/// Used before processing dynamic data sources to allow recovery if needed. +#[derive(Debug, Clone, Copy)] +pub struct BlockStateCheckpoint { + created_data_sources_count: usize, + persisted_data_sources_count: usize, + processed_data_sources_count: usize, + deterministic_errors_count: usize, } diff --git a/graph/src/components/subgraph/mod.rs b/graph/src/components/subgraph/mod.rs index 02b6486b953..45ab3b16ac4 100644 --- a/graph/src/components/subgraph/mod.rs +++ b/graph/src/components/subgraph/mod.rs @@ -8,7 +8,9 @@ mod settings; pub use crate::prelude::Entity; pub use self::host::{HostMetrics, MappingError, RuntimeHost, RuntimeHostBuilder}; -pub use self::instance::{BlockState, InstanceDSTemplate, InstanceDSTemplateInfo}; +pub use self::instance::{ + BlockState, BlockStateCheckpoint, InstanceDSTemplate, InstanceDSTemplateInfo, +}; pub use self::instance_manager::SubgraphInstanceManager; pub use self::proof_of_indexing::{ PoICausalityRegion, ProofOfIndexing, ProofOfIndexingEvent, ProofOfIndexingFinisher, diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index cc29cb107bf..863e5e4665c 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -63,6 +63,10 @@ pub struct EnvVarsStore { /// Set by the environment variable `GRAPH_STORE_CONNECTION_TIMEOUT` (expressed /// in milliseconds). The default value is 5000ms. pub connection_timeout: Duration, + /// Set by `GRAPH_STORE_SETUP_TIMEOUT` (in milliseconds). Default: 30000ms. + /// Used during database setup (migrations, schema creation) which can + /// legitimately take longer than normal operations. + pub setup_timeout: Duration, /// Set by the environment variable `GRAPH_STORE_CONNECTION_MIN_IDLE`. No /// default value is provided. pub connection_min_idle: Option, @@ -214,6 +218,7 @@ impl TryFrom for EnvVarsStore { ), recent_blocks_cache_capacity: x.recent_blocks_cache_capacity, connection_timeout: Duration::from_millis(x.connection_timeout_in_millis), + setup_timeout: Duration::from_millis(x.setup_timeout_in_millis), connection_min_idle: x.connection_min_idle, connection_idle_timeout: Duration::from_secs(x.connection_idle_timeout_in_secs), write_queue_size: x.write_queue_size, @@ -299,6 +304,8 @@ pub struct InnerStore { // configured differently for each pool. #[envconfig(from = "GRAPH_STORE_CONNECTION_TIMEOUT", default = "5000")] connection_timeout_in_millis: u64, + #[envconfig(from = "GRAPH_STORE_SETUP_TIMEOUT", default = "30000")] + setup_timeout_in_millis: u64, #[envconfig(from = "GRAPH_STORE_CONNECTION_MIN_IDLE")] connection_min_idle: Option, #[envconfig(from = "GRAPH_STORE_CONNECTION_IDLE_TIMEOUT", default = "600")] diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 8aaa1766f79..37dfa549468 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -132,8 +132,9 @@ pub mod prelude { UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, }; pub use crate::components::subgraph::{ - BlockState, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, RuntimeHostBuilder, - SubgraphInstanceManager, SubgraphRegistrar, SubgraphVersionSwitchingMode, + BlockState, BlockStateCheckpoint, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, + RuntimeHostBuilder, SubgraphInstanceManager, SubgraphRegistrar, + SubgraphVersionSwitchingMode, }; pub use crate::components::trigger_processor::TriggerProcessor; pub use crate::components::versions::{ApiVersion, FeatureFlag}; diff --git a/store/postgres/src/pool/coordinator.rs b/store/postgres/src/pool/coordinator.rs index fb0b05a1ac0..d0a7088ec47 100644 --- a/store/postgres/src/pool/coordinator.rs +++ b/store/postgres/src/pool/coordinator.rs @@ -265,7 +265,7 @@ impl PoolCoordinator { let primary = self.primary()?; - let mut pconn = primary.get().await?; + let mut pconn = primary.get_for_setup().await?; let states: Vec<_> = states .into_iter() diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index afb0aef4ebf..fc0b0cd7388 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -620,6 +620,18 @@ impl PoolInner { self.get_from_pool(&self.pool, None, Duration::ZERO).await } + /// Get a connection using the setup timeout. Use only during database + /// initialization where operations can legitimately take longer. + async fn get_for_setup(&self) -> Result { + let setup_timeouts = Timeouts { + wait: Some(ENV_VARS.store.setup_timeout), + create: Some(ENV_VARS.store.setup_timeout), + recycle: Some(ENV_VARS.store.setup_timeout), + }; + self.get_from_pool(&self.pool, Some(setup_timeouts), Duration::ZERO) + .await + } + /// Get the pool for fdw connections. It is an error if none is configured fn fdw_pool(&self, logger: &Logger) -> Result<&AsyncPool, StoreError> { let pool = match &self.fdw_pool { @@ -701,7 +713,7 @@ impl PoolInner { } async fn locale_check(&self, logger: &Logger) -> Result<(), StoreError> { - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; let _: () = if let Err(msg) = catalog::Locale::load(&mut conn).await?.suitable() { if self.shard == *PRIMARY_SHARD && primary::is_empty(&mut conn).await? { const MSG: &str = "Database does not use C locale. \ @@ -751,7 +763,7 @@ impl PoolInner { async fn configure_fdw(&self, servers: &[ForeignServer]) -> Result<(), StoreError> { info!(&self.logger, "Setting up fdw"); - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; conn.batch_execute("create extension if not exists postgres_fdw") .await?; conn.transaction(|conn| { @@ -790,7 +802,10 @@ impl PoolInner { // careful that block_on only gets called on a blocking thread to // avoid errors from the tokio runtime let logger = self.logger.cheap_clone(); - let mut conn = self.get().await.map(AsyncConnectionWrapper::from)?; + let mut conn = self + .get_for_setup() + .await + .map(AsyncConnectionWrapper::from)?; tokio::task::spawn_blocking(move || { diesel::Connection::transaction::<_, StoreError, _>(&mut conn, |conn| { @@ -808,7 +823,7 @@ impl PoolInner { } info!(&self.logger, "Dropping cross-shard views"); - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; conn.transaction(|conn| { async { let query = format!("drop schema if exists {} cascade", CROSS_SHARD_NSP); @@ -845,7 +860,7 @@ impl PoolInner { return Ok(()); } - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; let sharded = Namespace::special(CROSS_SHARD_NSP); if catalog::has_namespace(&mut conn, &sharded).await? { // We dropped the namespace before, but another node must have @@ -897,7 +912,7 @@ impl PoolInner { pub async fn remap(&self, server: &ForeignServer) -> Result<(), StoreError> { if server.shard == *PRIMARY_SHARD { info!(&self.logger, "Mapping primary"); - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; conn.transaction(|conn| ForeignServer::map_primary(conn, &self.shard).scope_boxed()) .await?; } @@ -907,7 +922,7 @@ impl PoolInner { "Mapping metadata from {}", server.shard.as_str() ); - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; conn.transaction(|conn| server.map_metadata(conn).scope_boxed()) .await?; } @@ -919,7 +934,7 @@ impl PoolInner { return Ok(false); } - let mut conn = self.get().await?; + let mut conn = self.get_for_setup().await?; server.needs_remap(&mut conn).await } } diff --git a/tests/.gitignore b/tests/.gitignore index b3458a8f91a..b6f5b85637c 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -2,3 +2,4 @@ contracts/cache/ contracts/out/build-info/ integration-tests/graph-node.log integration-tests/*/subgraph.yaml.patched +runner-tests.log diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 6de710f6561..d6d54de5a55 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -19,6 +19,8 @@ graph-runtime-wasm = { path = "../runtime/wasm" } serde = { workspace = true } serde_yaml = { workspace = true } slog = { workspace = true } +slog-async = { workspace = true } +slog-term = { workspace = true } tokio = { version = "1.49.0", features = ["rt", "macros", "process"] } tokio-util.workspace = true diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index c36c9043830..b8b9c87db3e 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -358,7 +358,7 @@ graph::prelude::lazy_static! { } fn test_logger(test_name: &str) -> Logger { - graph::log::logger(true).new(o!("test" => test_name.to_string())) + crate::output::test_logger(test_name) } #[allow(clippy::await_holding_lock)] diff --git a/tests/src/helpers.rs b/tests/src/helpers.rs index 4a59c1df7ef..fb06a583923 100644 --- a/tests/src/helpers.rs +++ b/tests/src/helpers.rs @@ -1,8 +1,9 @@ use std::fs::File; -use std::io::BufReader; +use std::io::{BufReader, Write}; use std::path::PathBuf; use std::process::Command; +use crate::output::OutputConfig; use anyhow::{bail, Context}; use graph::itertools::Itertools; use graph::prelude::serde_json::{json, Value}; @@ -80,14 +81,20 @@ pub fn run_cmd(command: &mut Command) -> String { .output() .context(format!("failed to run {}", program)) .unwrap(); - println!( + + let mut out = OutputConfig::get(); + writeln!( + out, "stdout:\n{}", pretty_output(&output.stdout, &format!("[{}:stdout] ", program)) - ); - println!( + ) + .unwrap(); + writeln!( + out, "stderr:\n{}", pretty_output(&output.stderr, &format!("[{}:stderr] ", program)) - ); + ) + .unwrap(); String::from_utf8(output.stdout).unwrap() } diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 2b67fc4dc44..33c6246c8b4 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -4,7 +4,9 @@ pub mod fixture; pub mod helpers; #[macro_use] pub mod macros; +pub mod output; pub mod recipe; pub mod subgraph; pub use config::{Config, DbConfig, EthConfig, CONFIG}; +pub use output::OutputConfig; diff --git a/tests/src/output.rs b/tests/src/output.rs new file mode 100644 index 00000000000..a1cb960ded2 --- /dev/null +++ b/tests/src/output.rs @@ -0,0 +1,142 @@ +//! Output configuration for runner tests. +//! +//! When running locally, verbose output (slog logs, command stdout/stderr) is redirected +//! to `tests/runner-tests.log` while progress messages appear on the console. +//! In CI (detected via `GITHUB_ACTIONS` env var), all output goes to the console. + +use slog::{o, Drain, Logger}; +use std::fs::File; +use std::io::{self, Write}; +use std::path::PathBuf; +use std::sync::{Mutex, OnceLock}; + +/// Log file name relative to the `tests` crate root. +const LOG_FILE_NAME: &str = "runner-tests.log"; + +/// Global output configuration, initialized once. +static OUTPUT_CONFIG: OnceLock = OnceLock::new(); + +/// Output configuration for runner tests. +pub struct OutputConfig { + /// Log file handle (None in CI mode). + log_file: Option>, + /// Absolute path to log file (None in CI mode). + log_file_path: Option, + /// Whether running in CI. + is_ci: bool, +} + +impl OutputConfig { + /// Initialize the global output configuration. + /// + /// In CI (when `GITHUB_ACTIONS` is set), output goes to stdout. + /// Locally, verbose output is redirected to the log file. + /// + /// Prints the log file path at startup (local only). + pub fn init() -> &'static Self { + OUTPUT_CONFIG.get_or_init(|| { + let is_ci = std::env::var("GITHUB_ACTIONS").is_ok(); + + if is_ci { + OutputConfig { + log_file: None, + log_file_path: None, + is_ci, + } + } else { + let cwd = std::env::current_dir() + .expect("Failed to get current directory") + .canonicalize() + .expect("Failed to canonicalize current directory"); + let log_file_path = cwd.join(LOG_FILE_NAME); + + let file = File::create(&log_file_path).unwrap_or_else(|e| { + panic!( + "Failed to create log file {}: {}", + log_file_path.display(), + e + ) + }); + + let config = OutputConfig { + log_file: Some(Mutex::new(file)), + log_file_path: Some(log_file_path), + is_ci, + }; + + // Print log file path at startup + config.print_log_file_info(); + + config + } + }) + } + + /// Get the global output configuration, initializing if needed. + pub fn get() -> &'static Self { + Self::init() + } + + /// Print the log file path to console. + pub fn print_log_file_info(&self) { + if let Some(ref path) = self.log_file_path { + println!("Runner test logs: {}", path.display()); + } + } + + /// Returns true if running in CI. + pub fn is_ci(&self) -> bool { + self.is_ci + } +} + +impl Write for &OutputConfig { + fn write(&mut self, buf: &[u8]) -> io::Result { + if let Some(ref file_mutex) = self.log_file { + file_mutex.lock().unwrap().write(buf) + } else { + io::stdout().write(buf) + } + } + + fn flush(&mut self) -> io::Result<()> { + if let Some(ref file_mutex) = self.log_file { + file_mutex.lock().unwrap().flush() + } else { + io::stdout().flush() + } + } +} + +/// Create a slog logger for a test, respecting the output configuration. +/// +/// In CI, logs go to stdout. Locally, logs go to the log file. +pub fn test_logger(test_name: &str) -> Logger { + let output = OutputConfig::get(); + + if output.is_ci { + // CI: use default logger that writes to stdout + graph::log::logger(true).new(o!("test" => test_name.to_string())) + } else { + // Local: write to log file + let decorator = slog_term::PlainDecorator::new(LogFileWriter); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + Logger::root(drain, o!("test" => test_name.to_string())) + } +} + +/// A writer that forwards to the OutputConfig log file. +struct LogFileWriter; + +impl io::Write for LogFileWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut output = OutputConfig::get(); + output.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + let mut output = OutputConfig::get(); + output.flush() + } +} diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 0b846b7c49a..d4d2330ca9f 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -447,7 +447,7 @@ async fn end_block() -> anyhow::Result<()> { ) { let runner = ctx.runner(block_ptr.clone()).await; let runner = runner.run_for_test(false).await.unwrap(); - let filter = runner.context().filter.as_ref().unwrap(); + let filter = runner.build_filter_for_test(); let addresses = filter .chain_filter .log()