From 3056290782e6e71a66e18323cb8d1fa2808a87cf Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Sat, 24 Jan 2026 15:37:36 -0500 Subject: [PATCH 1/3] perf(index-node): make proofOfIndexing resolver async Remove synchronous `block_on` call in `resolve_proof_of_indexing` which was blocking tokio worker threads while waiting for database queries. Before this change, each POI query blocked an entire tokio worker thread. After this change, POI queries properly yield to the async runtime while waiting for database I/O, allowing the connection pool to be fully utilized. Co-Authored-By: Claude Opus 4.5 --- server/index-node/src/resolver.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..3a64552dfe7 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -352,7 +352,10 @@ where )) } - fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result { + async fn resolve_proof_of_indexing( + &self, + field: &a::Field, + ) -> Result { let deployment_id = field .get_required::("subgraph") .expect("Valid subgraphId required"); @@ -381,7 +384,7 @@ where let poi_fut = self .store .get_proof_of_indexing(&deployment_id, &indexer, block.clone()); - let poi = match graph::futures03::executor::block_on(poi_fut) { + let poi = match poi_fut.await { Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))), Ok(None) => r::Value::Null, Err(e) => { @@ -791,7 +794,7 @@ where field.name.as_str(), scalar_type.name.as_str(), ) { - ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field), + ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await, ("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await, ("Query", "blockHashFromNumber", "Bytes") => { self.resolve_block_hash_from_number(field).await From 888a12e91f24f74d67cd3b1e069abaa0acb0763e Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Sat, 24 Jan 2026 15:38:11 -0500 Subject: [PATCH 2/3] perf(index-node): parallelize publicProofsOfIndexing requests Change the publicProofsOfIndexing resolver to process all POI requests in parallel using `future::join_all` instead of sequentially in a for loop. Co-Authored-By: Claude Opus 4.5 --- server/index-node/src/resolver.rs | 53 ++++++++++++++++--------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 3a64552dfe7..542bdf75b0e 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -15,7 +15,7 @@ use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; use graph::data::subgraph::{status, DeploymentFeatures}; use graph::data::value::Object; -use graph::futures03::TryFutureExt; +use graph::futures03::{future, TryFutureExt}; use graph::prelude::*; use graph_graphql::prelude::{a, ExecutionContext, Resolver}; @@ -417,28 +417,29 @@ where return Err(QueryExecutionError::TooExpensive); } - let mut public_poi_results = vec![]; - for request in requests { - let (poi_result, request) = match self - .store - .get_public_proof_of_indexing(&request.deployment, request.block_number, self) - .await - { - Ok(Some(poi)) => (Some(poi), request), - Ok(None) => (None, request), - Err(e) => { - error!( - self.logger, - "Failed to query public proof of indexing"; - "subgraph" => &request.deployment, - "block" => format!("{}", request.block_number), - "error" => format!("{:?}", e) - ); - (None, request) - } - }; + // Process all POI requests in parallel for better throughput + let poi_futures: Vec<_> = requests + .into_iter() + .map(|request| async move { + let poi_result = match self + .store + .get_public_proof_of_indexing(&request.deployment, request.block_number, self) + .await + { + Ok(Some(poi)) => Some(poi), + Ok(None) => None, + Err(e) => { + error!( + self.logger, + "Failed to query public proof of indexing"; + "subgraph" => &request.deployment, + "block" => format!("{}", request.block_number), + "error" => format!("{:?}", e) + ); + None + } + }; - public_poi_results.push( PublicProofOfIndexingResult { deployment: request.deployment, block: match poi_result { @@ -447,9 +448,11 @@ where }, proof_of_indexing: poi_result.map(|(_, poi)| poi), } - .into_value(), - ) - } + .into_value() + }) + .collect(); + + let public_poi_results = future::join_all(poi_futures).await; Ok(r::Value::List(public_poi_results)) } From 4f89b3f54d8e1601de822aacae8f2a1137d48e86 Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Wed, 28 Jan 2026 22:00:15 -0400 Subject: [PATCH 3/3] perf(index-node): batch block hash lookups for POI queries Pre-fetch all block hashes in a single batch query before parallel POI processing, reducing database round-trips from 10+ to 1-2 per batch. - Add block_hashes_by_block_numbers batch method to ChainStore trait - Add get_public_proof_of_indexing_with_block_hash to StatusStore trait - Modify resolver to group requests by network and batch-fetch hashes - Pass pre-fetched hashes to avoid redundant lookups during parallel POI Co-Authored-By: Claude Opus 4.5 --- graph/src/blockchain/mock.rs | 6 ++ graph/src/components/store/traits.rs | 19 +++++ server/index-node/src/resolver.rs | 115 +++++++++++++++++++++------ store/postgres/src/chain_store.rs | 62 +++++++++++++++ store/postgres/src/store.rs | 22 +++++ store/postgres/src/subgraph_store.rs | 108 +++++++++++++++++++++++++ 6 files changed, 307 insertions(+), 25 deletions(-) diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 31980a996f2..ccb65f72ac9 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -550,6 +550,12 @@ impl ChainStore for MockChainStore { ) -> Result, Error> { unimplemented!() } + async fn block_hashes_by_block_numbers( + &self, + _numbers: &[BlockNumber], + ) -> Result>, Error> { + unimplemented!() + } async fn confirm_block_hash( &self, _number: BlockNumber, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 658baa8be3e..4ad2d4f2e54 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -602,6 +602,12 @@ pub trait ChainStore: ChainHeadStore { number: BlockNumber, ) -> Result, Error>; + /// Return the hashes of all blocks with the given numbers (batch version) + async fn block_hashes_by_block_numbers( + &self, + numbers: &[BlockNumber], + ) -> Result>, Error>; + /// Confirm that block number `number` has hash `hash` and that the store /// may purge any other blocks with that number async fn confirm_block_hash( @@ -790,6 +796,19 @@ pub trait StatusStore: Send + Sync + 'static { block_number: BlockNumber, fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError>; + + /// Like `get_public_proof_of_indexing` but accepts optional pre-fetched block hashes + /// to avoid redundant database lookups when processing batches of POI requests. + async fn get_public_proof_of_indexing_with_block_hash( + &self, + subgraph_id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError>; + + /// Get the network for a deployment + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result; } #[async_trait] diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 542bdf75b0e..7b3a16deaf9 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use async_trait::async_trait; use graph::data::query::Trace; @@ -417,38 +417,103 @@ where return Err(QueryExecutionError::TooExpensive); } - // Process all POI requests in parallel for better throughput - let poi_futures: Vec<_> = requests - .into_iter() - .map(|request| async move { - let poi_result = match self - .store - .get_public_proof_of_indexing(&request.deployment, request.block_number, self) + // Step 1: Group requests by network and collect block numbers for batch lookup + let mut requests_by_network: HashMap> = HashMap::new(); + let mut request_networks: Vec> = Vec::with_capacity(requests.len()); + + for (idx, request) in requests.iter().enumerate() { + match self.store.network_for_deployment(&request.deployment).await { + Ok(network) => { + requests_by_network + .entry(network.clone()) + .or_default() + .push((idx, request.block_number)); + request_networks.push(Some(network)); + } + Err(_) => { + request_networks.push(None); + } + } + } + + // Step 2: Pre-fetch all block hashes per network in batch + let mut block_hash_cache: HashMap< + (String, BlockNumber), + Vec, + > = HashMap::new(); + + for (network, network_requests) in &requests_by_network { + let block_numbers: Vec = + network_requests.iter().map(|(_, num)| *num).collect(); + + if let Some(chain_store) = self.store.block_store().chain_store(network).await { + match chain_store + .block_hashes_by_block_numbers(&block_numbers) .await { - Ok(Some(poi)) => Some(poi), - Ok(None) => None, + Ok(hashes) => { + for (num, hash_vec) in hashes { + block_hash_cache.insert((network.clone(), num), hash_vec); + } + } Err(e) => { - error!( + debug!( self.logger, - "Failed to query public proof of indexing"; - "subgraph" => &request.deployment, - "block" => format!("{}", request.block_number), + "Failed to batch fetch block hashes for network"; + "network" => network, "error" => format!("{:?}", e) ); - None + // Continue without pre-fetched hashes - will fall back to individual lookups + } + } + } + } + + // Step 3: Process all POI requests in parallel, using cached block hashes + let poi_futures: Vec<_> = requests + .into_iter() + .zip(request_networks.into_iter()) + .map(|(request, network_opt)| { + let cache = &block_hash_cache; + async move { + let prefetched_hashes = network_opt + .as_ref() + .and_then(|network| cache.get(&(network.clone(), request.block_number))); + + let poi_result = match self + .store + .get_public_proof_of_indexing_with_block_hash( + &request.deployment, + request.block_number, + prefetched_hashes, + self, + ) + .await + { + Ok(Some(poi)) => Some(poi), + Ok(None) => None, + Err(e) => { + error!( + self.logger, + "Failed to query public proof of indexing"; + "subgraph" => &request.deployment, + "block" => format!("{}", request.block_number), + "error" => format!("{:?}", e) + ); + None + } + }; + + PublicProofOfIndexingResult { + deployment: request.deployment, + block: match poi_result { + Some((ref block, _)) => block.clone(), + None => PartialBlockPtr::from(request.block_number), + }, + proof_of_indexing: poi_result.map(|(_, poi)| poi), } - }; - - PublicProofOfIndexingResult { - deployment: request.deployment, - block: match poi_result { - Some((ref block, _)) => block.clone(), - None => PartialBlockPtr::from(request.block_number), - }, - proof_of_indexing: poi_result.map(|(_, poi)| poi), + .into_value() } - .into_value() }) .collect(); diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index bdf63f52c31..3d455b7d5bf 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -739,6 +739,58 @@ mod data { } } + /// Return the hashes of all blocks with the given block numbers (batch version) + pub(super) async fn block_hashes_by_block_numbers( + &self, + conn: &mut AsyncPgConnection, + chain: &str, + numbers: &[BlockNumber], + ) -> Result>, Error> { + if numbers.is_empty() { + return Ok(HashMap::new()); + } + + match self { + Storage::Shared => { + use public::ethereum_blocks as b; + + let results = b::table + .select((b::number, b::hash)) + .filter(b::network_name.eq(chain)) + .filter(b::number.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64)))) + .load::<(i64, String)>(conn) + .await?; + + let mut map: HashMap> = HashMap::new(); + for (num, hash) in results { + let block_hash = hash.parse()?; + map.entry(num as BlockNumber).or_default().push(block_hash); + } + Ok(map) + } + Storage::Private(Schema { blocks, .. }) => { + let results = blocks + .table() + .select((blocks.number(), blocks.hash())) + .filter( + blocks + .number() + .eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))), + ) + .load::<(i64, Vec)>(conn) + .await?; + + let mut map: HashMap> = HashMap::new(); + for (num, hash) in results { + map.entry(num as BlockNumber) + .or_default() + .push(BlockHash::from(hash)); + } + Ok(map) + } + } + } + pub(super) async fn confirm_block_hash( &self, conn: &mut AsyncPgConnection, @@ -2971,6 +3023,16 @@ impl ChainStoreTrait for ChainStore { .await } + async fn block_hashes_by_block_numbers( + &self, + numbers: &[BlockNumber], + ) -> Result>, Error> { + let mut conn = self.pool.get_permitted().await?; + self.storage + .block_hashes_by_block_numbers(&mut conn, &self.chain, numbers) + .await + } + async fn confirm_block_hash( &self, number: BlockNumber, diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 4adec80ab5b..10996d0572f 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -171,6 +171,28 @@ impl StatusStore for Store { .await } + async fn get_public_proof_of_indexing_with_block_hash( + &self, + subgraph_id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError> { + self.subgraph_store + .get_public_proof_of_indexing_with_block_hash( + subgraph_id, + block_number, + prefetched_hashes, + self.block_store().clone(), + fetch_block_ptr, + ) + .await + } + + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result { + self.subgraph_store.network_for_deployment(id).await + } + async fn query_permit(&self) -> QueryPermit { // Status queries go to the primary shard. self.block_store.query_permit_primary().await diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 478d21eba02..a6eacd12145 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -259,6 +259,33 @@ impl SubgraphStore { .await } + pub(crate) async fn get_public_proof_of_indexing_with_block_hash( + &self, + id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + block_store: impl BlockStore, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError> { + self.inner + .get_public_proof_of_indexing_with_block_hash( + id, + block_number, + prefetched_hashes, + block_store, + fetch_block_ptr, + ) + .await + } + + /// Get the network for a deployment + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + self.inner.network_for_deployment(id).await + } + pub fn notification_sender(&self) -> Arc { self.sender.clone() } @@ -622,6 +649,15 @@ impl Inner { Ok(site) } + /// Get the network for a deployment + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + let site = self.site(id).await?; + Ok(site.network.clone()) + } + /// Return the store and site for the active deployment of this /// deployment hash async fn store( @@ -1171,6 +1207,78 @@ impl Inner { })) } + /// Like `get_public_proof_of_indexing` but accepts optional pre-fetched block hashes + /// to avoid redundant database lookups when processing batches of POI requests. + pub(crate) async fn get_public_proof_of_indexing_with_block_hash( + &self, + id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + block_store: impl BlockStore, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError> { + let (store, site) = self.store(id).await?; + + let block_hash = match prefetched_hashes { + Some(hashes) if hashes.len() == 1 => { + // Use the pre-fetched hash directly + hashes[0].clone() + } + Some(hashes) if hashes.is_empty() => { + // No blocks found for this number, try RPC fallback + match fetch_block_ptr + .block_ptr_for_number(site.network.clone(), block_number) + .await + .ok() + .flatten() + { + None => return Ok(None), + Some(block_ptr) => block_ptr.hash, + } + } + _ => { + // Multiple hashes or no pre-fetched data - fall back to standard lookup + let chain_store = match block_store.chain_store(&site.network).await { + Some(chain_store) => chain_store, + None => return Ok(None), + }; + let mut hashes = chain_store + .block_hashes_by_block_number(block_number) + .await?; + + if hashes.len() == 1 { + hashes.pop().unwrap() + } else { + match fetch_block_ptr + .block_ptr_for_number(site.network.clone(), block_number) + .await + .ok() + .flatten() + { + None => return Ok(None), + Some(block_ptr) => block_ptr.hash, + } + } + } + }; + + let block_for_poi_query = BlockPtr::new(block_hash.clone(), block_number); + let indexer = Some(Address::ZERO); + let poi = store + .get_proof_of_indexing(site, &indexer, block_for_poi_query) + .await?; + + Ok(poi.map(|poi| { + ( + PartialBlockPtr { + number: block_number, + hash: Some(block_hash), + }, + poi, + ) + })) + } + // Only used by tests #[cfg(debug_assertions)] pub async fn find(