From 520121261896f1639448c4ae3dc96a2402331eff Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 10:54:27 -0800 Subject: [PATCH 01/13] graph, store: Replace std::sync::RwLock with parking_lot::RwLock for pool metrics Use parking_lot::RwLock instead of std::sync::RwLock for connection pool metric recording. parking_lot::RwLock is faster for short-held locks as it uses efficient spinning before parking, reducing tokio worker thread blocking during metric recording. This change helps reduce tokio threadpool contention when the connection pool is under heavy load, as the metric recording locks are held for only microseconds. Co-Authored-By: Claude Opus 4.5 --- graph/src/components/store/mod.rs | 4 +++- graph/src/data/graphql/load_manager.rs | 2 +- store/postgres/src/pool/manager.rs | 4 ++-- store/postgres/src/pool/mod.rs | 6 +++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 77675967c25..c9f78cece5d 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -25,9 +25,11 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fmt; use std::fmt::Display; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; +use crate::parking_lot::RwLock; + use async_trait::async_trait; use crate::blockchain::{Block, BlockHash, BlockPtr}; diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index b8bdb4a63d0..96fa23d7fa7 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -457,7 +457,7 @@ impl LoadManager { } fn overloaded(&self, wait_stats: &PoolWaitStats) -> (bool, Duration) { - let store_avg = wait_stats.read().unwrap().average(); + let store_avg = wait_stats.read().average(); let overloaded = store_avg .map(|average| average > ENV_VARS.load_threshold) .unwrap_or(false); diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 4677ea6276b..2f5fe0fa00a 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -22,9 +22,10 @@ use std::collections::HashMap; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::RwLock; use std::time::{Duration, Instant}; +use graph::parking_lot::RwLock; + use crate::pool::AsyncPool; /// Our own connection manager. It is pretty much the same as @@ -308,7 +309,6 @@ impl WaitMeter { pub(crate) fn add_conn_wait_time(&self, duration: Duration) { self.wait_stats .write() - .unwrap() .add_and_register(duration, &self.wait_gauge); } } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index 20d332616a2..cf176db64c2 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -19,11 +19,13 @@ use graph::slog::warn; use graph::util::timed_rw_lock::TimedMutex; use tokio::sync::OwnedSemaphorePermit; +use std::collections::HashMap; use std::fmt::{self}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; -use std::{collections::HashMap, sync::RwLock}; + +use graph::parking_lot::RwLock; use crate::catalog; use crate::pool::manager::{ConnectionManager, WaitMeter}; @@ -720,7 +722,6 @@ impl PoolInner { let permit = self.query_semaphore.cheap_clone().acquire_owned().await; self.semaphore_wait_stats .write() - .unwrap() .add_and_register(start.elapsed(), &self.semaphore_wait_gauge); permit.unwrap() } @@ -734,7 +735,6 @@ impl PoolInner { let elapsed = start.elapsed(); self.indexing_semaphore_wait_stats .write() - .unwrap() .add_and_register(elapsed, &self.indexing_semaphore_wait_gauge); (permit.unwrap(), elapsed) } From 121b41eed9df5d6c033e3171ee89ff50903bb704 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:10:49 -0800 Subject: [PATCH 02/13] graph: Replace std::sync::RwLock with parking_lot::RwLock in LoadManager These locks are accessed on every GraphQL query, so using the faster parking_lot::RwLock reduces lock contention in the query path. Co-Authored-By: Claude Opus 4.5 --- graph/src/data/graphql/load_manager.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index 96fa23d7fa7..0abba9521fc 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -4,9 +4,11 @@ use prometheus::core::GenericCounter; use rand::{prelude::Rng, rng}; use std::collections::{HashMap, HashSet}; use std::iter::FromIterator; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::parking_lot::RwLock; + use crate::components::metrics::{Counter, GaugeVec, MetricsRegistry}; use crate::components::store::{DeploymentId, PoolWaitStats}; use crate::data::graphql::shape_hash::shape_hash; @@ -57,7 +59,7 @@ impl ShardEffort { } pub fn add(&self, shard: &str, qref: QueryRef, duration: Duration, gauge: &GaugeVec) { - let mut inner = self.inner.write().unwrap(); + let mut inner = self.inner.write(); inner.add(qref, duration); gauge .with_label_values(&[shard]) @@ -70,7 +72,7 @@ impl ShardEffort { /// data for the particular query, return `None` as the effort /// for the query pub fn current_effort(&self, qref: &QueryRef) -> (Option, Duration) { - let inner = self.inner.read().unwrap(); + let inner = self.inner.read(); let total_effort = inner.total.duration(); let query_effort = inner.effort.get(qref).map(|stats| stats.duration()); (query_effort, total_effort) @@ -381,7 +383,7 @@ impl LoadManager { let qref = QueryRef::new(deployment, shape_hash); - if self.jailed_queries.read().unwrap().contains(&qref) { + if self.jailed_queries.read().contains(&qref) { return if ENV_VARS.load_simulate { Proceed } else { @@ -426,7 +428,7 @@ impl LoadManager { "query_effort_ms" => query_effort, "total_effort_ms" => total_effort, "ratio" => format!("{:.4}", query_effort/total_effort)); - self.jailed_queries.write().unwrap().insert(qref); + self.jailed_queries.write().insert(qref); return if ENV_VARS.load_simulate { Proceed } else { @@ -465,7 +467,7 @@ impl LoadManager { } fn kill_state(&self, shard: &str) -> (f64, Instant) { - let state = self.kill_state.get(shard).unwrap().read().unwrap(); + let state = self.kill_state.get(shard).unwrap().read(); (state.kill_rate, state.last_update) } @@ -505,7 +507,7 @@ impl LoadManager { kill_rate = (kill_rate - KILL_RATE_STEP_DOWN).max(0.0); } let event = { - let mut state = self.kill_state.get(shard).unwrap().write().unwrap(); + let mut state = self.kill_state.get(shard).unwrap().write(); state.kill_rate = kill_rate; state.last_update = now; state.log_event(now, kill_rate, overloaded) From 4bd926c5e83c5f1cbd419cb5e2d009bd4097a108 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:19:13 -0800 Subject: [PATCH 03/13] store: Replace std::sync::RwLock with parking_lot::RwLock in store_events Replace std::sync::RwLock with parking_lot::RwLock in the SubscriptionManager to reduce lock contention. parking_lot's RwLock is faster for short-held locks due to efficient spinning before parking, which helps reduce tokio threadpool contention. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/store_events.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 6189120f602..572c3a339a3 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -2,7 +2,9 @@ use graph::futures01::Stream; use graph::futures03::compat::Stream01CompatExt; use graph::futures03::stream::StreamExt; use graph::futures03::TryStreamExt; -use std::sync::{atomic::Ordering, Arc, RwLock}; +use std::sync::{atomic::Ordering, Arc}; + +use graph::parking_lot::RwLock; use std::{collections::HashMap, sync::atomic::AtomicUsize}; use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::wrappers::ReceiverStream; @@ -127,7 +129,7 @@ impl SubscriptionManager { // Send to `subscriptions`. { - let senders = subscriptions.read().unwrap().clone(); + let senders = subscriptions.read().clone(); // Write change to all matching subscription streams; remove subscriptions // whose receiving end has been dropped @@ -138,7 +140,7 @@ impl SubscriptionManager { "Failed to send store event to subscriber {}: {}", id, e ); // Receiver was dropped - subscriptions.write().unwrap().remove(&id); + subscriptions.write().remove(&id); } } } @@ -187,7 +189,7 @@ impl SubscriptionManager { // Cleanup `subscriptions`. { - let mut subscriptions = subscriptions.write().unwrap(); + let mut subscriptions = subscriptions.write(); // Obtain IDs of subscriptions whose receiving end has gone let stale_ids = subscriptions @@ -218,7 +220,7 @@ impl SubscriptionManagerTrait for SubscriptionManager { let (sender, receiver) = channel(100); // Add the new subscription - self.subscriptions.write().unwrap().insert(id, sender); + self.subscriptions.write().insert(id, sender); // Return the subscription ID and entity change stream ReceiverStream::new(receiver) From ed3fd40b94d1af0fedd4318fafcd3d1c1817fde3 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:22:25 -0800 Subject: [PATCH 04/13] store: Replace std::sync::RwLock with parking_lot::RwLock in writable Replace std::sync::RwLock with parking_lot::RwLock in the background writer's Request::Write batch handling. This reduces lock contention as parking_lot's RwLock is faster for short-held locks due to efficient spinning before parking. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/writable.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index ff5ffb2d45b..928cbdbe76f 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1,7 +1,9 @@ use std::collections::BTreeSet; use std::ops::{Deref, Range}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Mutex, RwLock, TryLockError as RwLockError}; +use std::sync::Mutex; + +use graph::parking_lot::RwLock; use std::time::Instant; use std::{collections::BTreeMap, sync::Arc}; @@ -574,7 +576,7 @@ impl BlockTracker { // processed. let res = queue.find_map(|req| match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { let res = f(batch.deref(), tracker.revert); @@ -613,7 +615,7 @@ impl BlockTracker { let accum = queue.fold(init, |accum, req| { match req.as_ref() { Request::Write { batch, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); let mut accum = accum; tracker.write(&batch.block_ptr); if batch.first_block <= tracker.revert { @@ -740,7 +742,7 @@ impl std::fmt::Debug for Request { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Write { batch, store, .. } => { - let batch = batch.read().unwrap(); + let batch = batch.read(); write!( f, "write[{}, {:p}, {} entities]", @@ -811,7 +813,7 @@ impl Request { } => { let start = Instant::now(); - let batch = batch.write().unwrap().close(); + let batch = batch.write().close(); if let Some(err) = &batch.error { // This can happen when appending to the batch failed @@ -850,7 +852,7 @@ impl Request { fn should_process(&self) -> bool { match self { Request::Write { queued, batch, .. } => { - batch.read().unwrap().weight() >= ENV_VARS.store.write_batch_size + batch.read().weight() >= ENV_VARS.store.write_batch_size || queued.elapsed() >= ENV_VARS.store.write_batch_duration } Request::RevertTo { .. } | Request::Stop => true, @@ -1169,7 +1171,7 @@ impl Queue { // duration of the write, and we do not want to // slow down queueing requests unnecessarily match existing.try_write() { - Ok(mut existing) => { + Some(mut existing) => { if existing.weight() < ENV_VARS.store.write_batch_size { let res = existing.append(batch).map(|()| None); if existing.weight() >= ENV_VARS.store.write_batch_size { @@ -1180,16 +1182,13 @@ impl Queue { Ok(Some(batch)) } } - Err(RwLockError::WouldBlock) => { + None => { // This branch can cause batches that // are not 'full' at the head of the // queue, something that start_writer // has to take into account Ok(Some(batch)) } - Err(RwLockError::Poisoned(e)) => { - panic!("rwlock on batch was poisoned {:?}", e); - } } } else { Ok(Some(batch)) From c935301bf83908e2d020fb58e3d1aee44b2f310f Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:24:18 -0800 Subject: [PATCH 05/13] graph: Replace std::sync::RwLock with parking_lot::RwLock in TimedCache Replace std::sync::RwLock with parking_lot::RwLock in TimedCache for faster lock acquisition on cache gets and sets. parking_lot's RwLock uses efficient spinning before parking, reducing contention. Co-Authored-By: Claude Opus 4.5 --- graph/src/util/timed_cache.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/graph/src/util/timed_cache.rs b/graph/src/util/timed_cache.rs index 8f64c844630..587a49a176a 100644 --- a/graph/src/util/timed_cache.rs +++ b/graph/src/util/timed_cache.rs @@ -3,10 +3,12 @@ use std::{ cmp::Eq, collections::HashMap, hash::Hash, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant}, }; +use crate::parking_lot::RwLock; + /// Caching of values for a specified amount of time #[derive(Debug)] struct CacheEntry { @@ -49,7 +51,7 @@ impl TimedCache { K: Borrow + Eq + Hash, Q: Hash + Eq + ?Sized, { - match self.entries.read().unwrap().get(key) { + match self.entries.read().get(key) { Some(CacheEntry { value, expires }) if expires >= &now => Some(value.clone()), _ => None, } @@ -72,11 +74,11 @@ impl TimedCache { value, expires: now + self.ttl, }; - self.entries.write().unwrap().insert(key, entry); + self.entries.write().insert(key, entry); } pub fn clear(&self) { - self.entries.write().unwrap().clear(); + self.entries.write().clear(); } pub fn find(&self, pred: F) -> Option> @@ -85,7 +87,6 @@ impl TimedCache { { self.entries .read() - .unwrap() .values() .find(move |entry| pred(entry.value.as_ref())) .map(|entry| entry.value.clone()) @@ -101,7 +102,6 @@ impl TimedCache { { self.entries .write() - .unwrap() .remove(key) .map(|CacheEntry { value, expires }| (value, expires >= Instant::now())) } From 4ada602442d069c524eb59c6d1e1fe88fe12234e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:28:01 -0800 Subject: [PATCH 06/13] store: Replace std::sync::RwLock with parking_lot::RwLock in BlockStore Replace std::sync::RwLock with parking_lot::RwLock for the chain stores map in BlockStore. This reduces lock contention when looking up or modifying chain stores. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/block_store.rs | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/store/postgres/src/block_store.rs b/store/postgres/src/block_store.rs index 674c274ac5c..48ea768c033 100644 --- a/store/postgres/src/block_store.rs +++ b/store/postgres/src/block_store.rs @@ -1,8 +1,6 @@ -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use graph::parking_lot::RwLock; use anyhow::anyhow; use async_trait::async_trait; @@ -321,7 +319,6 @@ impl BlockStore { let configured_chains = block_store .stores .read() - .unwrap() .keys() .cloned() .collect::>(); @@ -410,7 +407,6 @@ impl BlockStore { let store = Arc::new(store); self.stores .write() - .unwrap() .insert(chain.name.clone(), store.clone()); Ok(store) } @@ -475,12 +471,7 @@ impl BlockStore { } async fn store(&self, chain: &str) -> Option> { - let store = self - .stores - .read() - .unwrap() - .get(chain) - .map(CheapClone::cheap_clone); + let store = self.stores.read().get(chain).map(CheapClone::cheap_clone); if store.is_some() { return store; } @@ -506,7 +497,7 @@ impl BlockStore { chain_store.drop_chain().await?; - self.stores.write().unwrap().remove(chain); + self.stores.write().remove(chain); Ok(()) } @@ -516,7 +507,6 @@ impl BlockStore { fn stores(&self) -> Vec> { self.stores .read() - .unwrap() .values() .map(CheapClone::cheap_clone) .collect() From ed76c2c936f6009a17b20ee0b4931a1576b4233b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:30:30 -0800 Subject: [PATCH 07/13] graph: Replace std::sync::RwLock with parking_lot::RwLock in MetricsRegistry Replace std::sync::RwLock with parking_lot::RwLock for the global metrics caches in MetricsRegistry. This reduces lock contention when registering or looking up global metrics. Co-Authored-By: Claude Opus 4.5 --- graph/src/components/metrics/registry.rs | 31 +++++++++--------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index cb210040952..4777ea6f62f 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use crate::parking_lot::RwLock; use prometheus::{labels, Histogram, IntCounterVec}; use prometheus::{IntCounter, IntGauge}; @@ -109,14 +111,13 @@ impl MetricsRegistry { }; let counters = CounterVec::new(opts, variable_labels)?; let id = counters.desc().first().unwrap().id; - let maybe_counter = self.global_counter_vecs.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counter_vecs.read().get(&id).cloned(); if let Some(counters) = maybe_counter { Ok(counters) } else { self.register(name, Box::new(counters.clone())); self.global_counter_vecs .write() - .unwrap() .insert(id, counters.clone()); Ok(counters) } @@ -161,15 +162,12 @@ impl MetricsRegistry { ) -> Result { let counter = counter_with_labels(name, help, const_labels)?; let id = counter.desc().first().unwrap().id; - let maybe_counter = self.global_counters.read().unwrap().get(&id).cloned(); + let maybe_counter = self.global_counters.read().get(&id).cloned(); if let Some(counter) = maybe_counter { Ok(counter) } else { self.register(name, Box::new(counter.clone())); - self.global_counters - .write() - .unwrap() - .insert(id, counter.clone()); + self.global_counters.write().insert(id, counter.clone()); Ok(counter) } } @@ -210,15 +208,12 @@ impl MetricsRegistry { ) -> Result { let gauge = gauge_with_labels(name, help, const_labels)?; let id = gauge.desc().first().unwrap().id; - let maybe_gauge = self.global_gauges.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauges.read().get(&id).cloned(); if let Some(gauge) = maybe_gauge { Ok(gauge) } else { self.register(name, Box::new(gauge.clone())); - self.global_gauges - .write() - .unwrap() - .insert(id, gauge.clone()); + self.global_gauges.write().insert(id, gauge.clone()); Ok(gauge) } } @@ -232,15 +227,12 @@ impl MetricsRegistry { let opts = Opts::new(name, help); let gauges = GaugeVec::new(opts, variable_labels)?; let id = gauges.desc().first().unwrap().id; - let maybe_gauge = self.global_gauge_vecs.read().unwrap().get(&id).cloned(); + let maybe_gauge = self.global_gauge_vecs.read().get(&id).cloned(); if let Some(gauges) = maybe_gauge { Ok(gauges) } else { self.register(name, Box::new(gauges.clone())); - self.global_gauge_vecs - .write() - .unwrap() - .insert(id, gauges.clone()); + self.global_gauge_vecs.write().insert(id, gauges.clone()); Ok(gauges) } } @@ -254,14 +246,13 @@ impl MetricsRegistry { let opts = HistogramOpts::new(name, help); let histograms = HistogramVec::new(opts, variable_labels)?; let id = histograms.desc().first().unwrap().id; - let maybe_histogram = self.global_histogram_vecs.read().unwrap().get(&id).cloned(); + let maybe_histogram = self.global_histogram_vecs.read().get(&id).cloned(); if let Some(histograms) = maybe_histogram { Ok(histograms) } else { self.register(name, Box::new(histograms.clone())); self.global_histogram_vecs .write() - .unwrap() .insert(id, histograms.clone()); Ok(histograms) } From d92d94f67b6d9d2937af016747a8d8923728ec8a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 12:32:53 -0800 Subject: [PATCH 08/13] core: Replace std::sync::RwLock with parking_lot::RwLock in SubgraphKeepAlive Replace std::sync::RwLock with parking_lot::RwLock for the alive_map in SubgraphKeepAlive. This reduces lock contention when tracking running subgraph deployments. Co-Authored-By: Claude Opus 4.5 --- core/src/subgraph/context/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index e6f485e2552..2b7d560dfc1 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -23,7 +23,9 @@ use graph::{ slog::Logger, }; use std::collections::HashMap; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; + +use graph::parking_lot::RwLock; use tokio::sync::mpsc; use self::instance::SubgraphInstance; @@ -44,18 +46,18 @@ impl SubgraphKeepAlive { } pub fn remove(&self, deployment_id: &DeploymentId) { - self.alive_map.write().unwrap().remove(deployment_id); + self.alive_map.write().remove(deployment_id); self.sg_metrics.running_count.dec(); } pub fn insert(&self, deployment_id: DeploymentId, guard: CancelGuard) { - let old = self.alive_map.write().unwrap().insert(deployment_id, guard); + let old = self.alive_map.write().insert(deployment_id, guard); if old.is_none() { self.sg_metrics.running_count.inc(); } } pub fn contains(&self, deployment_id: &DeploymentId) -> bool { - self.alive_map.read().unwrap().contains_key(deployment_id) + self.alive_map.read().contains_key(deployment_id) } } From 4d8402106d36ae966c63b75703a8db659134f20c Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 16 Jan 2026 14:17:52 -0800 Subject: [PATCH 09/13] store: Add adaptive TTL cache for chain_head_ptr() With many subgraphs, chain_head_ptr() was querying the database on every call, leading to connection pool saturation. This adds an adaptive cache that learns optimal TTL from observed block frequency. The cache uses EWMA to estimate block time and sets TTL to 1/4 of that estimate (bounded by 20ms-2000ms). During warmup (first 5 blocks), it uses the minimum TTL to avoid missing blocks on unknown chains. New metrics: - chain_head_ptr_cache_hits: cache hit counter - chain_head_ptr_cache_misses: cache miss counter (DB queries) - chain_head_ptr_cache_block_time_ms: estimated block time per chain Safety escape hatch: set GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE=true to revert to the previous uncached behavior. Co-Authored-By: Claude Opus 4.5 --- graph/src/env/store.rs | 6 + store/postgres/src/chain_store.rs | 191 +++++++++++++++++++++++++++++- 2 files changed, 194 insertions(+), 3 deletions(-) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 7a3c16d5b81..23294ad0a77 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -163,6 +163,9 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, + /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. + /// Set to true to disable chain_head_ptr caching (safety escape hatch). + pub disable_chain_head_ptr_cache: bool, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -224,6 +227,7 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, + disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -331,6 +335,8 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, + #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] + disable_chain_head_ptr_cache: bool, } #[derive(Clone, Copy, Debug)] diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index cc3b6949fa8..556c6b80d2d 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -17,6 +17,8 @@ use graph::util::herd_cache::HerdCache; use std::collections::BTreeMap; use std::future::Future; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -1873,6 +1875,10 @@ pub struct ChainStoreMetrics { chain_head_cache_latest_block_num: Box, chain_head_cache_hits: Box, chain_head_cache_misses: Box, + // Metrics for chain_head_ptr() cache + chain_head_ptr_cache_hits: Box, + chain_head_ptr_cache_misses: Box, + chain_head_ptr_cache_block_time_ms: Box, } impl ChainStoreMetrics { @@ -1914,12 +1920,37 @@ impl ChainStoreMetrics { ) .expect("Can't register the counter"); + let chain_head_ptr_cache_hits = registry + .new_counter_vec( + "chain_head_ptr_cache_hits", + "Number of times the chain_head_ptr cache was hit", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_misses = registry + .new_counter_vec( + "chain_head_ptr_cache_misses", + "Number of times the chain_head_ptr cache was missed", + vec!["network".to_string()], + ) + .expect("Can't register the counter"); + let chain_head_ptr_cache_block_time_ms = registry + .new_gauge_vec( + "chain_head_ptr_cache_block_time_ms", + "Estimated block time in milliseconds used for adaptive cache TTL", + vec!["network".to_string()], + ) + .expect("Can't register the gauge"); + Self { chain_head_cache_size, chain_head_cache_oldest_block_num, chain_head_cache_latest_block_num, chain_head_cache_hits, chain_head_cache_misses, + chain_head_ptr_cache_hits, + chain_head_ptr_cache_misses, + chain_head_ptr_cache_block_time_ms, } } @@ -1959,6 +1990,143 @@ impl ChainStoreMetrics { .unwrap() .inc_by(misses as f64); } + + pub fn record_chain_head_ptr_cache_hit(&self, network: &str) { + self.chain_head_ptr_cache_hits + .with_label_values(&[network]) + .inc(); + } + + pub fn record_chain_head_ptr_cache_miss(&self, network: &str) { + self.chain_head_ptr_cache_misses + .with_label_values(&[network]) + .inc(); + } + + pub fn set_chain_head_ptr_block_time(&self, network: &str, block_time_ms: u64) { + self.chain_head_ptr_cache_block_time_ms + .with_label_values(&[network]) + .set(block_time_ms as f64); + } +} + +const MIN_TTL_MS: u64 = 20; +const MAX_TTL_MS: u64 = 2000; +const MIN_OBSERVATIONS: u64 = 5; + +/// Adaptive cache for chain_head_ptr() that learns optimal TTL from block frequency. +struct ChainHeadPtrCache { + /// Cached value and when it expires + entry: RwLock>, + /// Estimated milliseconds between blocks (EWMA) + estimated_block_time_ms: AtomicU64, + /// When we last observed the chain head change + last_change: RwLock, + /// Number of block changes observed (for warmup) + observations: AtomicU64, + /// Metrics for recording cache hits/misses + metrics: Arc, + /// Chain name for metric labels + chain: String, +} + +impl ChainHeadPtrCache { + fn new(metrics: Arc, chain: String) -> Self { + Self { + entry: RwLock::new(None), + estimated_block_time_ms: AtomicU64::new(0), + last_change: RwLock::new(Instant::now()), + observations: AtomicU64::new(0), + metrics, + chain, + } + } + + /// Returns cached value if still valid, or None if cache is disabled/missed. + /// Records hit/miss metrics automatically. + fn get(&self) -> Option { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return None; + } + let guard = self.entry.read(); + if let Some((value, expires)) = guard.as_ref() { + if Instant::now() < *expires { + self.metrics.record_chain_head_ptr_cache_hit(&self.chain); + return Some(value.clone()); + } + } + self.metrics.record_chain_head_ptr_cache_miss(&self.chain); + None + } + + /// Compute current TTL - MIN_TTL during warmup, then 1/4 of estimated block time + fn current_ttl(&self) -> Duration { + let obs = AtomicU64::load(&self.observations, Ordering::Relaxed); + if obs < MIN_OBSERVATIONS { + return Duration::from_millis(MIN_TTL_MS); + } + + let block_time = AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + let ttl_ms = (block_time / 4).clamp(MIN_TTL_MS, MAX_TTL_MS); + Duration::from_millis(ttl_ms) + } + + /// Cache a new value, updating block time estimate if value changed. + /// Does nothing if cache is disabled. + fn set(&self, new_value: BlockPtr) { + if ENV_VARS.store.disable_chain_head_ptr_cache { + return; + } + let now = Instant::now(); + + // Check if block changed + let old_value = { + let guard = self.entry.read(); + guard.as_ref().map(|(v, _)| v.clone()) + }; + + // Only update estimate if we have a previous value and block number advanced + // (skip reorgs where new block number <= old) + if let Some(old_ptr) = old_value.as_ref() { + if new_value.number > old_ptr.number { + let mut last_change = self.last_change.write(); + let delta_ms = now.duration_since(*last_change).as_millis() as u64; + *last_change = now; + + let blocks_advanced = (new_value.number - old_ptr.number) as u64; + + // Increment observation count + let obs = AtomicU64::fetch_add(&self.observations, 1, Ordering::Relaxed); + + // Ignore unreasonable deltas (> 60s) + if delta_ms > 0 && delta_ms < 60_000 { + let per_block_ms = delta_ms / blocks_advanced; + let new_estimate = if obs == 0 { + // First observation - use as initial estimate + per_block_ms + } else { + // EWMA: new = 0.8 * old + 0.2 * observed + let old_estimate = + AtomicU64::load(&self.estimated_block_time_ms, Ordering::Relaxed); + (old_estimate * 4 + per_block_ms) / 5 + }; + AtomicU64::store( + &self.estimated_block_time_ms, + new_estimate, + Ordering::Relaxed, + ); + + // Update metric gauge + self.metrics + .set_chain_head_ptr_block_time(&self.chain, new_estimate); + } + } + } + + // Compute TTL and store with expiry + let ttl = self.current_ttl(); + *self.entry.write() = Some((new_value, now + ttl)); + } } pub struct ChainStore { @@ -1980,6 +2148,8 @@ pub struct ChainStore { blocks_by_number_cache: HerdCache>, StoreError>>>, ancestor_cache: HerdCache, StoreError>>>, + /// Adaptive cache for chain_head_ptr() + chain_head_ptr_cache: ChainHeadPtrCache, } impl ChainStore { @@ -1994,10 +2164,11 @@ impl ChainStore { metrics: Arc, ) -> Self { let recent_blocks_cache = - RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics); + RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics.clone()); let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain)); let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain)); let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain)); + let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone()); ChainStore { logger, pool, @@ -2009,6 +2180,7 @@ impl ChainStore { blocks_by_hash_cache, blocks_by_number_cache, ancestor_cache, + chain_head_ptr_cache, } } @@ -2351,8 +2523,14 @@ impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { use public::ethereum_networks::dsl::*; + // Check cache first (handles disabled check and metrics internally) + if let Some(cached) = self.chain_head_ptr_cache.get() { + return Ok(Some(cached)); + } + + // Query database let mut conn = self.pool.get_permitted().await?; - Ok(ethereum_networks + let result = ethereum_networks .select((head_block_hash, head_block_number)) .filter(name.eq(&self.chain)) .load::<(Option, Option)>(&mut conn) @@ -2375,7 +2553,14 @@ impl ChainHeadStore for ChainStore { _ => unreachable!(), }) .and_then(|opt: Option| opt) - })?) + })?; + + // Cache the result (set() handles disabled check internally) + if let Some(ref ptr) = result { + self.chain_head_ptr_cache.set(ptr.clone()); + } + + Ok(result) } async fn chain_head_cursor(&self) -> Result, Error> { From 898b62d2bbed8c118798d30c2fb6d2c4ecd87d6f Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 17 Jan 2026 15:47:45 -0800 Subject: [PATCH 10/13] graph, store: Add lock-free AtomicMovingStats for pool wait tracking Replace RwLock with a lock-free AtomicMovingStats that uses an atomic ring buffer with packed bins. Each bin packs epoch (32 bits), count (32 bits), and duration_nanos (64 bits) into a single AtomicU128 for lock-free CAS updates. This eliminates lock contention when many threads write concurrently (every semaphore wait, connection checkout, query execution) while reducing memory usage by 2x (4.8KB vs 9.6KB per stats instance). Co-Authored-By: Claude Opus 4.5 --- Cargo.lock | 1 + graph/Cargo.toml | 1 + graph/src/components/store/mod.rs | 8 +- graph/src/data/graphql/load_manager.rs | 2 +- graph/src/lib.rs | 2 +- graph/src/util/stats.rs | 265 +++++++++++++++++++++++++ store/postgres/src/pool/manager.rs | 10 +- store/postgres/src/pool/mod.rs | 14 +- 8 files changed, 280 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65a4bfdbeb0..f1f46a4d264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2870,6 +2870,7 @@ dependencies = [ "object_store", "parking_lot", "petgraph 0.8.3", + "portable-atomic", "priority-queue", "prometheus", "prost", diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 33cfbd40eb0..7fa31765100 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -79,6 +79,7 @@ futures03 = { version = "0.3.31", package = "futures", features = ["compat"] } wasmparser = "0.118.1" thiserror = { workspace = true } parking_lot = "0.12.5" +portable-atomic = { version = "1.11", features = ["fallback"] } itertools = "0.14.0" defer = "0.2" diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index c9f78cece5d..a9b65e01f9a 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -28,8 +28,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use crate::parking_lot::RwLock; - use async_trait::async_trait; use crate::blockchain::{Block, BlockHash, BlockPtr}; @@ -44,7 +42,7 @@ use crate::env::ENV_VARS; use crate::internal_error; use crate::prelude::{s, Attribute, DeploymentHash, ValueType}; use crate::schema::{ast as sast, EntityKey, EntityType, InputSchema}; -use crate::util::stats::MovingStats; +use crate::util::stats::AtomicMovingStats; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct EntityFilterDerivative(bool); @@ -744,8 +742,8 @@ impl Display for DeploymentLocator { } // The type that the connection pool uses to track wait times for -// connection checkouts -pub type PoolWaitStats = Arc>; +// connection checkouts. Uses lock-free atomic operations internally. +pub type PoolWaitStats = Arc; /// Determines which columns should be selected in a table. #[derive(Clone, Debug, PartialEq, Eq, Hash)] diff --git a/graph/src/data/graphql/load_manager.rs b/graph/src/data/graphql/load_manager.rs index 0abba9521fc..e1053360e18 100644 --- a/graph/src/data/graphql/load_manager.rs +++ b/graph/src/data/graphql/load_manager.rs @@ -459,7 +459,7 @@ impl LoadManager { } fn overloaded(&self, wait_stats: &PoolWaitStats) -> (bool, Duration) { - let store_avg = wait_stats.read().average(); + let store_avg = wait_stats.average(); let overloaded = store_avg .map(|average| average > ENV_VARS.load_threshold) .unwrap_or(false); diff --git a/graph/src/lib.rs b/graph/src/lib.rs index 0607cab5937..e076d64c736 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -170,7 +170,7 @@ pub mod prelude { pub use crate::log::split::split_logger; pub use crate::util::cache_weight::CacheWeight; pub use crate::util::futures::{retry, TimeoutError}; - pub use crate::util::stats::MovingStats; + pub use crate::util::stats::{AtomicMovingStats, MovingStats}; macro_rules! static_graphql { ($m:ident, $m2:ident, {$($n:ident,)*}) => { diff --git a/graph/src/util/stats.rs b/graph/src/util/stats.rs index ac608b56dcb..ccfae84574b 100644 --- a/graph/src/util/stats.rs +++ b/graph/src/util/stats.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; +use std::sync::atomic::Ordering; use std::time::{Duration, Instant}; +use portable_atomic::AtomicU128; use prometheus::Gauge; use crate::prelude::ENV_VARS; @@ -166,6 +168,196 @@ impl MovingStats { } } +/// Packed bin for atomic operations: epoch (32 bits) | count (32 bits) | duration_nanos (64 bits) +/// Fits in a single AtomicU128 for lock-free CAS updates. +#[repr(transparent)] +struct PackedBin(AtomicU128); + +impl PackedBin { + fn new() -> Self { + Self(AtomicU128::new(0)) + } + + /// Pack epoch, count, and duration into a single u128 value. + fn pack(epoch: u32, count: u32, duration_nanos: u64) -> u128 { + ((epoch as u128) << 96) | ((count as u128) << 64) | (duration_nanos as u128) + } + + /// Unpack a u128 value into (epoch, count, duration_nanos). + fn unpack(packed: u128) -> (u32, u32, u64) { + let epoch = (packed >> 96) as u32; + let count = (packed >> 64) as u32; + let duration_nanos = packed as u64; + (epoch, count, duration_nanos) + } +} + +/// Lock-free moving statistics using an epoch-based ring buffer. +/// +/// This is a thread-safe, lock-free alternative to `MovingStats` that uses +/// atomic operations instead of locks. It tracks durations over a sliding +/// time window, storing values in fixed-size bins. +/// +/// Writers use CAS loops to atomically update bins, while readers can +/// scan all bins without blocking writers. +pub struct AtomicMovingStats { + start_time: Instant, + bin_size: Duration, + bins: Box<[PackedBin]>, +} + +impl Default for AtomicMovingStats { + fn default() -> Self { + Self::new(ENV_VARS.load_window_size, ENV_VARS.load_bin_size) + } +} + +impl AtomicMovingStats { + /// Create a new AtomicMovingStats with the given window and bin sizes. + /// + /// # Panics + /// + /// Panics if `window_size` or `bin_size` is `0`, or if `bin_size` >= `window_size` + pub fn new(window_size: Duration, bin_size: Duration) -> Self { + assert!(window_size.as_millis() > 0); + assert!(bin_size.as_millis() > 0); + assert!(window_size > bin_size); + + let num_bins = window_size.as_millis() as usize / bin_size.as_millis() as usize; + let bins: Vec = (0..num_bins).map(|_| PackedBin::new()).collect(); + + Self { + start_time: Instant::now(), + bin_size, + bins: bins.into_boxed_slice(), + } + } + + /// Calculate the epoch number for a given instant. + fn epoch_at(&self, now: Instant) -> u32 { + let elapsed = now.saturating_duration_since(self.start_time); + (elapsed.as_millis() / self.bin_size.as_millis()) as u32 + } + + /// Add a duration measurement at the current time. + pub fn add(&self, duration: Duration) { + self.add_at(Instant::now(), duration); + } + + /// Add a duration measurement at a specific time. + /// + /// Note: It is expected that subsequent calls to `add_at` happen with + /// monotonically increasing `now` values for optimal accuracy. + pub fn add_at(&self, now: Instant, duration: Duration) { + let current_epoch = self.epoch_at(now); + let bin_idx = current_epoch as usize % self.bins.len(); + let bin = &self.bins[bin_idx]; + let duration_nanos = duration.as_nanos() as u64; + + loop { + let current = bin.0.load(Ordering::Acquire); + let (bin_epoch, count, total_nanos) = PackedBin::unpack(current); + + let new_packed = if bin_epoch == current_epoch { + // Same epoch: increment existing values + PackedBin::pack( + current_epoch, + count.saturating_add(1), + total_nanos.saturating_add(duration_nanos), + ) + } else { + // Stale epoch: reset bin with new measurement + PackedBin::pack(current_epoch, 1, duration_nanos) + }; + + match bin.0.compare_exchange_weak( + current, + new_packed, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(_) => continue, // CAS failed, retry + } + } + } + + /// Calculate the average duration over the current window. + /// + /// Returns `None` if no measurements have been recorded in the window. + pub fn average(&self) -> Option { + self.average_at(Instant::now()) + } + + /// Calculate the average duration at a specific time. + fn average_at(&self, now: Instant) -> Option { + let current_epoch = self.epoch_at(now); + let num_bins = self.bins.len() as u32; + let mut total_count: u64 = 0; + let mut total_nanos: u128 = 0; + + for bin in self.bins.iter() { + let (bin_epoch, count, duration_nanos) = + PackedBin::unpack(bin.0.load(Ordering::Acquire)); + // Valid if within window (handles epoch wraparound) + if current_epoch.wrapping_sub(bin_epoch) < num_bins { + total_count += count as u64; + total_nanos += duration_nanos as u128; + } + } + + if total_count > 0 { + Some(Duration::from_nanos( + (total_nanos / total_count as u128) as u64, + )) + } else { + None + } + } + + /// Return `true` if the average of measurements within the window + /// is above `duration`. + pub fn average_gt(&self, duration: Duration) -> bool { + self.average().map(|avg| avg > duration).unwrap_or(false) + } + + /// Return `true` if the average at a specific time is above `duration`. + #[cfg(test)] + fn average_gt_at(&self, now: Instant, duration: Duration) -> bool { + self.average_at(now) + .map(|avg| avg > duration) + .unwrap_or(false) + } + + /// Return the total duration recorded in the current window. + pub fn duration(&self) -> Duration { + self.duration_at(Instant::now()) + } + + /// Return the total duration at a specific time. + fn duration_at(&self, now: Instant) -> Duration { + let current_epoch = self.epoch_at(now); + let num_bins = self.bins.len() as u32; + let mut total_nanos: u128 = 0; + + for bin in self.bins.iter() { + let (bin_epoch, _, duration_nanos) = PackedBin::unpack(bin.0.load(Ordering::Acquire)); + if current_epoch.wrapping_sub(bin_epoch) < num_bins { + total_nanos += duration_nanos as u128; + } + } + + Duration::from_nanos(total_nanos as u64) + } + + /// Adds `duration` to the stats, and register the average ms to `avg_gauge`. + pub fn add_and_register(&self, duration: Duration, avg_gauge: &Gauge) { + self.add(duration); + let wait_avg = self.average().map(|avg| avg.as_millis()).unwrap_or(0); + avg_gauge.set(wait_avg as f64); + } +} + #[cfg(test)] mod tests { use super::*; @@ -219,4 +411,77 @@ mod tests { assert_eq!(20, stats.total.count); assert_eq!(Duration::from_secs(5 * 86 + 16 * 10), stats.total.duration); } + + #[test] + fn atomic_add_one_const() { + let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1)); + let start = stats.start_time; + for i in 0..10 { + stats.add_at(start + Duration::from_secs(i), Duration::from_secs(1)); + } + // After 10 seconds with 5-second window, only last 5 entries are valid + assert_eq!(5, stats.bins.len()); + // Query at time 10 seconds (end of data range) + let query_time = start + Duration::from_secs(10); + // Average should be 1 second + let avg = stats.average_at(query_time).unwrap(); + assert_eq!(Duration::from_secs(1), avg); + assert!(stats.average_gt_at(query_time, Duration::from_millis(900))); + assert!(!stats.average_gt_at(query_time, Duration::from_secs(1))); + } + + #[test] + fn atomic_add_four_linear() { + let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1)); + let start = stats.start_time; + for i in 0..40u64 { + stats.add_at( + start + Duration::from_millis(250 * i), + Duration::from_secs(i), + ); + } + assert_eq!(5, stats.bins.len()); + // Query at time 9.999 seconds (just before epoch 10 to include epoch 5) + // At epoch 9, valid bins are epochs 5-9 (9 - bin_epoch < 5) + let query_time = start + Duration::from_millis(9999); + // Total duration in window: 4 entries per bin, bins 5-9 contain entries 20-39 + // Bin 5: entries 20,21,22,23 -> sum = 86 + // Bin 6: entries 24,25,26,27 -> sum = 102 + // ... + // Total count = 20, total duration = 5*86 + 16*10 = 590 + assert_eq!( + Duration::from_secs(5 * 86 + 16 * 10), + stats.duration_at(query_time) + ); + } + + #[test] + fn atomic_empty_average() { + let stats = AtomicMovingStats::new(Duration::from_secs(5), Duration::from_secs(1)); + // Query at start_time (no data yet) + assert!(stats.average_at(stats.start_time).is_none()); + assert!(!stats.average_gt_at(stats.start_time, Duration::from_secs(1))); + } + + #[test] + fn atomic_pack_unpack() { + // Test edge cases of packing/unpacking + let packed = PackedBin::pack(u32::MAX, u32::MAX, u64::MAX); + let (epoch, count, nanos) = PackedBin::unpack(packed); + assert_eq!(u32::MAX, epoch); + assert_eq!(u32::MAX, count); + assert_eq!(u64::MAX, nanos); + + let packed = PackedBin::pack(0, 0, 0); + let (epoch, count, nanos) = PackedBin::unpack(packed); + assert_eq!(0, epoch); + assert_eq!(0, count); + assert_eq!(0, nanos); + + let packed = PackedBin::pack(12345, 67890, 123456789012345); + let (epoch, count, nanos) = PackedBin::unpack(packed); + assert_eq!(12345, epoch); + assert_eq!(67890, count); + assert_eq!(123456789012345, nanos); + } } diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 2f5fe0fa00a..fd1f0f8ac36 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -10,10 +10,10 @@ use diesel_async::pooled_connection::{PoolError as DieselPoolError, PoolableConn use diesel_async::{AsyncConnection, RunQueryDsl}; use graph::env::ENV_VARS; use graph::prelude::error; +use graph::prelude::AtomicMovingStats; use graph::prelude::Counter; use graph::prelude::Gauge; use graph::prelude::MetricsRegistry; -use graph::prelude::MovingStats; use graph::prelude::PoolWaitStats; use graph::slog::info; use graph::slog::Logger; @@ -24,8 +24,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; -use graph::parking_lot::RwLock; - use crate::pool::AsyncPool; /// Our own connection manager. It is pretty much the same as @@ -298,7 +296,7 @@ impl WaitMeter { const_labels, ) .expect("failed to create `store_connection_wait_time_ms` counter"); - let wait_stats = Arc::new(RwLock::new(MovingStats::default())); + let wait_stats = Arc::new(AtomicMovingStats::default()); Self { wait_gauge, @@ -307,8 +305,6 @@ impl WaitMeter { } pub(crate) fn add_conn_wait_time(&self, duration: Duration) { - self.wait_stats - .write() - .add_and_register(duration, &self.wait_gauge); + self.wait_stats.add_and_register(duration, &self.wait_gauge); } } diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index cf176db64c2..a66511168e0 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -11,7 +11,7 @@ use graph::derive::CheapClone; use graph::internal_error; use graph::prelude::tokio::time::Instant; use graph::prelude::{ - anyhow::anyhow, crit, debug, error, info, o, Gauge, Logger, MovingStats, PoolWaitStats, + anyhow::anyhow, crit, debug, error, info, o, AtomicMovingStats, Gauge, Logger, PoolWaitStats, StoreError, ENV_VARS, }; use graph::prelude::{tokio, MetricsRegistry}; @@ -25,8 +25,6 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::Duration; -use graph::parking_lot::RwLock; - use crate::catalog; use crate::pool::manager::{ConnectionManager, WaitMeter}; use crate::primary::{self, Mirror, Namespace}; @@ -435,7 +433,7 @@ pub struct PoolInner { // that waiting queries consume few resources. Still this is placed here because the semaphore // is sized acording to the DB connection pool size. query_semaphore: Arc, - semaphore_wait_stats: Arc>, + semaphore_wait_stats: Arc, semaphore_wait_gauge: Box, // Limits concurrent indexing operations to prevent pool exhaustion @@ -444,7 +442,7 @@ pub struct PoolInner { // during diesel-async migration. It also avoids timeouts because of // pool exhaustion when getting a connection. indexing_semaphore: Arc, - indexing_semaphore_wait_stats: Arc>, + indexing_semaphore_wait_stats: Arc, indexing_semaphore_wait_gauge: Box, } @@ -560,11 +558,11 @@ impl PoolInner { fdw_pool, wait_meter, state_tracker, - semaphore_wait_stats: Arc::new(RwLock::new(MovingStats::default())), + semaphore_wait_stats: Arc::new(AtomicMovingStats::default()), query_semaphore, semaphore_wait_gauge, indexing_semaphore, - indexing_semaphore_wait_stats: Arc::new(RwLock::new(MovingStats::default())), + indexing_semaphore_wait_stats: Arc::new(AtomicMovingStats::default()), indexing_semaphore_wait_gauge, } } @@ -721,7 +719,6 @@ impl PoolInner { let start = Instant::now(); let permit = self.query_semaphore.cheap_clone().acquire_owned().await; self.semaphore_wait_stats - .write() .add_and_register(start.elapsed(), &self.semaphore_wait_gauge); permit.unwrap() } @@ -734,7 +731,6 @@ impl PoolInner { let permit = self.indexing_semaphore.cheap_clone().acquire_owned().await; let elapsed = start.elapsed(); self.indexing_semaphore_wait_stats - .write() .add_and_register(elapsed, &self.indexing_semaphore_wait_gauge); (permit.unwrap(), elapsed) } From 06fa27a771a372ce43c09c0643e7ae9f9788b665 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 17 Jan 2026 16:21:23 -0800 Subject: [PATCH 11/13] store: Add HerdCache to chain_head_ptr() to prevent thundering herd The ChainHeadPtrCache introduced in 7ecdbda0f can cause connection pool exhaustion when the cache expires: multiple concurrent callers each acquire a database connection, then block waiting for a write lock to update the cache - while still holding their connections. This adds a HerdCache layer that ensures only one caller queries the database when the TTL cache expires. Other concurrent callers await the in-flight query result instead of each acquiring their own connection. Co-Authored-By: Claude Opus 4.5 --- store/postgres/src/chain_store.rs | 75 ++++++++++++++++++------------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 556c6b80d2d..ba2819b7096 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -2150,6 +2150,8 @@ pub struct ChainStore { ancestor_cache: HerdCache, StoreError>>>, /// Adaptive cache for chain_head_ptr() chain_head_ptr_cache: ChainHeadPtrCache, + /// Herd cache to prevent thundering herd on chain_head_ptr() lookups + chain_head_ptr_herd: HerdCache, StoreError>>>, } impl ChainStore { @@ -2169,6 +2171,7 @@ impl ChainStore { let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain)); let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain)); let chain_head_ptr_cache = ChainHeadPtrCache::new(metrics, chain.clone()); + let chain_head_ptr_herd = HerdCache::new(format!("chain_{}_head_ptr", chain)); ChainStore { logger, pool, @@ -2181,6 +2184,7 @@ impl ChainStore { blocks_by_number_cache, ancestor_cache, chain_head_ptr_cache, + chain_head_ptr_herd, } } @@ -2523,44 +2527,55 @@ impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { use public::ethereum_networks::dsl::*; - // Check cache first (handles disabled check and metrics internally) + // Check TTL cache first (handles disabled check and metrics internally) if let Some(cached) = self.chain_head_ptr_cache.get() { return Ok(Some(cached)); } - // Query database - let mut conn = self.pool.get_permitted().await?; - let result = ethereum_networks - .select((head_block_hash, head_block_number)) - .filter(name.eq(&self.chain)) - .load::<(Option, Option)>(&mut conn) - .await - .map(|rows| { - rows.as_slice() - .first() - .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { - (Some(hash), Some(number)) => Some( - ( - // FIXME: - // - // workaround for arweave - H256::from_slice(&hex::decode(hash).unwrap()[..32]), - *number, - ) - .into(), - ), - (None, None) => None, - _ => unreachable!(), - }) - .and_then(|opt: Option| opt) - })?; + // Use HerdCache to ensure only one caller does the DB lookup + // when cache is expired. Other callers await the in-flight query. + let pool = self.pool.clone(); + let chain = self.chain.clone(); + let lookup = async move { + let mut conn = pool.get_permitted().await?; + ethereum_networks + .select((head_block_hash, head_block_number)) + .filter(name.eq(&chain)) + .load::<(Option, Option)>(&mut conn) + .await + .map(|rows| { + rows.as_slice() + .first() + .map(|(hash_opt, number_opt)| match (hash_opt, number_opt) { + (Some(hash), Some(number)) => Some( + ( + // FIXME: + // + // workaround for arweave + H256::from_slice(&hex::decode(hash).unwrap()[..32]), + *number, + ) + .into(), + ), + (None, None) => None, + _ => unreachable!(), + }) + .and_then(|opt: Option| opt) + }) + .map_err(StoreError::from) + }; + + let (result, _cached) = self + .cached_lookup(&self.chain_head_ptr_herd, &self.chain, lookup) + .await; - // Cache the result (set() handles disabled check internally) - if let Some(ref ptr) = result { + // Update TTL cache with the result + // (set() handles disabled check internally) + if let Ok(Some(ref ptr)) = result { self.chain_head_ptr_cache.set(ptr.clone()); } - Ok(result) + result.map_err(Error::from) } async fn chain_head_cursor(&self) -> Result, Error> { From 36faa01320b382cdf254abd2d51b205191272ac8 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 17 Jan 2026 18:12:27 -0800 Subject: [PATCH 12/13] graph, store: Add time-based connection validation to skip health checks for active connections Connections that were used within the last 30 seconds (configurable via GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS) now skip the SELECT 67 health check during pool recycle. This reduces connection checkout latency from ~4ms to ~0ms for frequently-used connections while still validating idle connections to detect stale database connections. Co-Authored-By: Claude Opus 4.5 --- graph/src/env/store.rs | 8 ++++++++ store/postgres/src/pool/manager.rs | 15 ++++++++++++++- store/postgres/src/pool/mod.rs | 1 + 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 23294ad0a77..6eeb76a9656 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -166,6 +166,11 @@ pub struct EnvVarsStore { /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. /// Set to true to disable chain_head_ptr caching (safety escape hatch). pub disable_chain_head_ptr_cache: bool, + /// Minimum idle time before running connection health check (SELECT 67). + /// Connections used more recently than this threshold skip validation. + /// Set to 0 to always validate (previous behavior). + /// Set by `GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS`. Default is 30 seconds. + pub connection_validation_idle_secs: Duration, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -228,6 +233,7 @@ impl TryFrom for EnvVarsStore { account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, + connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs), }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -337,6 +343,8 @@ pub struct InnerStore { disable_call_cache: bool, #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] disable_chain_head_ptr_cache: bool, + #[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")] + connection_validation_idle_secs: u64, } #[derive(Clone, Copy, Debug)] diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index fd1f0f8ac36..69d59e372b0 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -35,6 +35,8 @@ pub struct ConnectionManager { connection_url: String, state_tracker: StateTracker, error_counter: Counter, + /// Connections idle for less than this threshold skip the SELECT 67 health check + validation_idle_threshold: Duration, } impl ConnectionManager { @@ -44,6 +46,7 @@ impl ConnectionManager { state_tracker: StateTracker, registry: &MetricsRegistry, const_labels: HashMap, + validation_idle_threshold: Duration, ) -> Self { let error_counter = registry .global_counter( @@ -58,6 +61,7 @@ impl ConnectionManager { connection_url, state_tracker, error_counter, + validation_idle_threshold, } } @@ -105,11 +109,20 @@ impl deadpool::managed::Manager for ConnectionManager { async fn recycle( &self, obj: &mut Self::Type, - _metrics: &deadpool::managed::Metrics, + metrics: &deadpool::managed::Metrics, ) -> RecycleResult { if std::thread::panicking() || obj.is_broken() { return Err(RecycleError::Message("Broken connection".into())); } + + // Skip health check if connection was used recently + if self.validation_idle_threshold > Duration::ZERO + && metrics.last_used() < self.validation_idle_threshold + { + return Ok(()); + } + + // Run SELECT 67 only for idle connections let res = diesel::select(67_i32.into_sql::()) .execute(obj) .await diff --git a/store/postgres/src/pool/mod.rs b/store/postgres/src/pool/mod.rs index a66511168e0..afb0aef4ebf 100644 --- a/store/postgres/src/pool/mod.rs +++ b/store/postgres/src/pool/mod.rs @@ -476,6 +476,7 @@ impl PoolInner { state_tracker.clone(), ®istry, const_labels.clone(), + ENV_VARS.store.connection_validation_idle_secs, ); let timeouts = Timeouts { From b8d7458a1ec8f0a14e552f434e456739076a9407 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 20 Jan 2026 13:02:20 -0800 Subject: [PATCH 13/13] graph, store: Add periodic probe for unavailable database recovery When a database shard is marked unavailable due to connection timeouts, StateTracker now allows one probe request through every few seconds (configurable via GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY, default 2s) to check if the database has recovered. This prevents the recovery deadlock where mark_available() only fires on successful connection, but no connections are attempted when unavailable. Co-Authored-By: Claude Opus 4.5 --- docs/environment-variables.md | 5 +++ graph/src/env/store.rs | 11 +++++++ store/postgres/src/pool/manager.rs | 49 ++++++++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 560e5fe87a4..9c4fc5dc8b4 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -216,6 +216,11 @@ those. decisions. Set to `true` to turn simulation on, defaults to `false` - `GRAPH_STORE_CONNECTION_TIMEOUT`: How long to wait to connect to a database before assuming the database is down in ms. Defaults to 5000ms. +- `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`: When a database shard is marked + unavailable due to connection timeouts, this controls how often to allow a + single probe request through to check if the database has recovered. Only one + request per interval will attempt a connection; all others fail instantly. + Value is in seconds and defaults to 2s. - `EXPERIMENTAL_SUBGRAPH_VERSION_SWITCHING_MODE`: default is `instant`, set to `synced` to only switch a named subgraph to a new deployment once it has synced, making the new deployment the "Pending" version. diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index 6eeb76a9656..cc29cb107bf 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -171,6 +171,12 @@ pub struct EnvVarsStore { /// Set to 0 to always validate (previous behavior). /// Set by `GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS`. Default is 30 seconds. pub connection_validation_idle_secs: Duration, + /// When a database shard is marked unavailable due to connection timeouts, + /// this controls how often to allow a single probe request through to check + /// if the database has recovered. Only one request per interval will attempt + /// a connection; all others fail instantly with DatabaseUnavailable. + /// Set by `GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY`. Default is 2 seconds. + pub connection_unavailable_retry: Duration, } // This does not print any values avoid accidentally leaking any sensitive env vars @@ -234,6 +240,9 @@ impl TryFrom for EnvVarsStore { disable_call_cache: x.disable_call_cache, disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs), + connection_unavailable_retry: Duration::from_secs( + x.connection_unavailable_retry_in_secs, + ), }; if let Some(timeout) = vars.batch_timeout { if timeout < 2 * vars.batch_target_duration { @@ -345,6 +354,8 @@ pub struct InnerStore { disable_chain_head_ptr_cache: bool, #[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")] connection_validation_idle_secs: u64, + #[envconfig(from = "GRAPH_STORE_CONNECTION_UNAVAILABLE_RETRY", default = "2")] + connection_unavailable_retry_in_secs: u64, } #[derive(Clone, Copy, Debug)] diff --git a/store/postgres/src/pool/manager.rs b/store/postgres/src/pool/manager.rs index 69d59e372b0..dbbcb288b83 100644 --- a/store/postgres/src/pool/manager.rs +++ b/store/postgres/src/pool/manager.rs @@ -20,9 +20,10 @@ use graph::slog::Logger; use std::collections::HashMap; use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::pool::AsyncPool; @@ -141,6 +142,9 @@ pub(super) struct StateTracker { logger: Logger, available: Arc, ignore_timeout: Arc, + /// Timestamp (as millis since UNIX_EPOCH) when we can next probe. + /// 0 means available/no limit. + next_probe_at: Arc, } impl StateTracker { @@ -149,14 +153,16 @@ impl StateTracker { logger, available: Arc::new(AtomicBool::new(true)), ignore_timeout: Arc::new(AtomicBool::new(false)), + next_probe_at: Arc::new(AtomicU64::new(0)), } } pub(super) fn mark_available(&self) { if !self.is_available() { - info!(self.logger, "Conection checkout"; "event" => "available"); + info!(self.logger, "Connection checkout"; "event" => "available"); } self.available.store(true, Ordering::Relaxed); + self.next_probe_at.store(0, Ordering::Relaxed); } pub(super) fn mark_unavailable(&self, waited: Duration) { @@ -171,10 +177,47 @@ impl StateTracker { } } self.available.store(false, Ordering::Relaxed); + + // Set next probe time + let retry_interval = ENV_VARS.store.connection_unavailable_retry.as_millis() as u64; + let next_probe = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 + + retry_interval; + self.next_probe_at.store(next_probe, Ordering::Relaxed); } pub(super) fn is_available(&self) -> bool { - AtomicBool::load(&self.available, Ordering::Relaxed) + if AtomicBool::load(&self.available, Ordering::Relaxed) { + return true; + } + + // Allow one probe through every `connection_unavailable_retry` interval + let next_probe = AtomicU64::load(&self.next_probe_at, Ordering::Relaxed); + let now_millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + if now_millis >= next_probe { + // Try to claim this probe slot with CAS + let retry_interval = ENV_VARS.store.connection_unavailable_retry.as_millis() as u64; + if self + .next_probe_at + .compare_exchange( + next_probe, + now_millis + retry_interval, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + // We claimed the probe - allow this request through + return true; + } + } + false } pub(super) fn timeout_is_ignored(&self) -> bool {