From e5401e5e969219ac89c8c98cdb3d957fe0d3fec5 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Tue, 20 Jan 2026 15:03:47 +0200 Subject: [PATCH 1/4] chore(graph, store): fix clippy warnings --- graph/src/amp/sql/query_builder/parser.rs | 8 ++++---- graph/src/data_source/common.rs | 17 ++++++++++------- store/postgres/src/deployment.rs | 2 +- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/graph/src/amp/sql/query_builder/parser.rs b/graph/src/amp/sql/query_builder/parser.rs index 1f965b955b6..2e40b0e53a3 100644 --- a/graph/src/amp/sql/query_builder/parser.rs +++ b/graph/src/amp/sql/query_builder/parser.rs @@ -42,15 +42,15 @@ struct AllowOnlySelectQueries; impl AllowOnlySelectQueries { /// Returns an error if the `set_expr` is not a `SELECT` expression. - fn visit_set_expr(&self, set_expr: &ast::SetExpr) -> Result<()> { + fn visit_set_expr(set_expr: &ast::SetExpr) -> Result<()> { match set_expr { ast::SetExpr::Select(_) | ast::SetExpr::Query(_) | ast::SetExpr::Values(_) | ast::SetExpr::Table(_) => Ok(()), ast::SetExpr::SetOperation { left, right, .. } => { - self.visit_set_expr(left)?; - self.visit_set_expr(right)?; + Self::visit_set_expr(left)?; + Self::visit_set_expr(right)?; Ok(()) } ast::SetExpr::Insert(_) | ast::SetExpr::Update(_) | ast::SetExpr::Delete(_) => { @@ -64,7 +64,7 @@ impl Visitor for AllowOnlySelectQueries { type Break = anyhow::Error; fn pre_visit_query(&mut self, query: &ast::Query) -> ControlFlow { - match self.visit_set_expr(&query.body) { + match Self::visit_set_expr(&query.body) { Ok(()) => ControlFlow::Continue(()), Err(e) => ControlFlow::Break(e), } diff --git a/graph/src/data_source/common.rs b/graph/src/data_source/common.rs index 511d18f3de7..e781e2844be 100644 --- a/graph/src/data_source/common.rs +++ b/graph/src/data_source/common.rs @@ -190,9 +190,11 @@ impl AbiJson { return Ok(Some(vec![])); } // Recursively resolve the nested path - return self - .resolve_field_path(components, nested_path) - .map(Some); + return Self::resolve_field_path( + components, + nested_path, + ) + .map(Some); } } } @@ -217,7 +219,6 @@ impl AbiJson { /// Supports both numeric indices and field names /// Returns the index path to access the final field fn resolve_field_path( - &self, components: &serde_json::Value, field_path: &[&str], ) -> Result, Error> { @@ -254,7 +255,7 @@ impl AbiJson { // Recursively resolve the remaining path let mut result = vec![index]; let nested_result = - self.resolve_field_path(nested_components, remaining_path)?; + Self::resolve_field_path(nested_components, remaining_path)?; result.extend(nested_result); return Ok(result); } else { @@ -294,8 +295,10 @@ impl AbiJson { if let Some(nested_components) = component.get("components") { // Recursively resolve the remaining path let mut result = vec![index]; - let nested_result = - self.resolve_field_path(nested_components, remaining_path)?; + let nested_result = Self::resolve_field_path( + nested_components, + remaining_path, + )?; result.extend(nested_result); return Ok(result); } else { diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 239ccdf61b3..100da5fd329 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -970,7 +970,7 @@ pub async fn update_deployment_status( d::failed.eq(health.is_failed()), d::health.eq(health), d::fatal_error.eq::>(fatal_error), - d::non_fatal_errors.eq::>(non_fatal_errors.unwrap_or(vec![])), + d::non_fatal_errors.eq::>(non_fatal_errors.unwrap_or_default()), )) .execute(conn) .await From a74aee6aef0c71624e52987dc7fd7ac5ca21d32f Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Tue, 20 Jan 2026 15:06:10 +0200 Subject: [PATCH 2/4] fix(graph, graphql): apply default order on the build_order call This makes sure that when the default order is applied the column used in ORDER BY is also included in the SELECT projection --- graph/src/schema/input/mod.rs | 1 + graphql/src/store/prefetch.rs | 6 ------ graphql/src/store/query.rs | 9 +++++++++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/graph/src/schema/input/mod.rs b/graph/src/schema/input/mod.rs index bd6aa2d3017..c3b5cfeb92c 100644 --- a/graph/src/schema/input/mod.rs +++ b/graph/src/schema/input/mod.rs @@ -51,6 +51,7 @@ pub mod kw { pub const INTERVALS: &str = "intervals"; pub const INTERVAL: &str = "interval"; pub const CUMULATIVE: &str = "cumulative"; + pub const CURRENT: &str = "current"; } /// The internal representation of a subgraph schema, i.e., the diff --git a/graphql/src/store/prefetch.rs b/graphql/src/store/prefetch.rs index ce5722bfb78..93d57c1c377 100644 --- a/graphql/src/store/prefetch.rs +++ b/graphql/src/store/prefetch.rs @@ -9,7 +9,6 @@ use graph::data::store::IdType; use graph::data::store::QueryObject; use graph::data::value::{Object, Word}; use graph::prelude::{r, CacheWeight, CheapClone}; -use graph::schema::kw; use graph::schema::AggregationInterval; use graph::schema::Field; use graph::slog::warn; @@ -715,11 +714,6 @@ impl<'a> Loader<'a> { // that causes unnecessary work in the database query.order = EntityOrder::Unordered; } - // Apply default timestamp ordering for aggregations if no custom order is specified - if child_type.is_aggregation() && matches!(query.order, EntityOrder::Default) { - let ts = child_type.field(kw::TIMESTAMP).unwrap(); - query.order = EntityOrder::Descending(ts.name.to_string(), ts.value_type); - } query.logger = Some(self.ctx.logger.cheap_clone()); if let Some(r::Value::String(id)) = field.argument_value(ARG_ID) { query.filter = Some( diff --git a/graphql/src/store/query.rs b/graphql/src/store/query.rs index ce43dee97a9..855b83b409a 100644 --- a/graphql/src/store/query.rs +++ b/graphql/src/store/query.rs @@ -12,6 +12,7 @@ use graph::data::value::Object; use graph::data::value::Value as DataValue; use graph::prelude::{r, TryFromValue, ENV_VARS}; use graph::schema::ast::{self as sast, FilterOp}; +use graph::schema::kw; use graph::schema::{EntityType, InputSchema, ObjectOrInterface}; use crate::execution::ast as a; @@ -552,6 +553,14 @@ fn build_order( } } } + // Apply a default ordering to the aggregations so that the most recent buckets are returned first + (None, _) if entity.is_aggregation() => { + let ts = entity + .field(kw::TIMESTAMP) + .expect("aggregation entities have timestamps"); + + EntityOrder::Descending(ts.name.to_string(), ts.value_type) + } (None, _) => EntityOrder::Default, }; Ok(order) From 1d6505f2eeb2b9d9f5ba9fc3ebbabd92e5e5b4bb Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Tue, 20 Jan 2026 15:22:50 +0200 Subject: [PATCH 3/4] feat(graph, graphql, store): support querying the current bucket --- graph/src/components/store/mod.rs | 16 ++ graph/src/schema/api.rs | 7 + graph/src/schema/meta.graphql | 9 + graphql/src/execution/ast.rs | 22 ++- graphql/src/store/prefetch.rs | 16 +- store/postgres/src/relational.rs | 5 +- store/postgres/src/relational/rollup.rs | 236 ++++++++++++++++++++--- store/postgres/src/relational_queries.rs | 110 ++++++++++- 8 files changed, 380 insertions(+), 41 deletions(-) diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 77675967c25..f728eb13beb 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -465,6 +465,8 @@ pub struct EntityQuery { pub query_id: Option, pub trace: bool, + + pub aggregation_current: Option, } impl EntityQuery { @@ -483,6 +485,7 @@ impl EntityQuery { logger: None, query_id: None, trace: false, + aggregation_current: None, } } @@ -542,6 +545,19 @@ impl EntityQuery { } } +/// Indicates whether the current, partially filled bucket should be included in the response. +/// +/// This is only relevant for aggregation entity queries. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum AggregationCurrent { + /// Exclude the current, partially filled bucket from the response. + #[default] + Exclude, + + /// Include the current, partially filled bucket in the response. + Include, +} + /// Operation types that lead to changes in assignments #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "lowercase")] diff --git a/graph/src/schema/api.rs b/graph/src/schema/api.rs index 86b13a9f3f2..7a787c526e8 100644 --- a/graph/src/schema/api.rs +++ b/graph/src/schema/api.rs @@ -704,6 +704,11 @@ impl FilterOps { "", s::Type::NamedType("OrderDirection".to_string()), ), + input_value( + "current", + "", + s::Type::NamedType("Aggregation_current".to_string()), + ), ], }; @@ -2212,6 +2217,8 @@ type Gravatar @entity { assert_eq!("Aggregation_interval", interval.value_type.get_base_type()); let filter = field.argument("where").unwrap(); assert_eq!(&filter_type, filter.value_type.get_base_type()); + let current = field.argument("current").unwrap(); + assert_eq!("Aggregation_current", current.value_type.get_base_type()); let s::TypeDefinition::InputObject(filter) = schema .get_type_definition_from_type(&filter.value_type) diff --git a/graph/src/schema/meta.graphql b/graph/src/schema/meta.graphql index 1b48bfa6501..729874d8b9d 100644 --- a/graph/src/schema/meta.graphql +++ b/graph/src/schema/meta.graphql @@ -106,3 +106,12 @@ enum Aggregation_interval { hour day } + +"Indicates whether the current, partially filled bucket should be included in the response. Defaults to `exclude`" +enum Aggregation_current { + "Exclude the current, partially filled bucket from the response" + exclude + + "Include the current, partially filled bucket in the response" + include +} diff --git a/graphql/src/execution/ast.rs b/graphql/src/execution/ast.rs index 65bdb6298d1..4f6012dc4ea 100644 --- a/graphql/src/execution/ast.rs +++ b/graphql/src/execution/ast.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeSet, HashSet}; use graph::{ - components::store::{AttributeNames, ChildMultiplicity, EntityOrder}, + components::store::{AggregationCurrent, AttributeNames, ChildMultiplicity, EntityOrder}, data::{graphql::ObjectOrInterface, store::ID}, env::ENV_VARS, prelude::{anyhow, q, r, s, QueryExecutionError, ValueMap}, @@ -364,6 +364,26 @@ impl Field { }) .transpose() } + + pub fn aggregation_current(&self) -> Result, QueryExecutionError> { + let Some(value) = self.argument_value(kw::CURRENT) else { + return Ok(None); + }; + + if let r::Value::Enum(current) = value { + match current.as_str() { + "exclude" => return Ok(Some(AggregationCurrent::Exclude)), + "include" => return Ok(Some(AggregationCurrent::Include)), + _ => {} + } + } + + Err(QueryExecutionError::InvalidArgumentError( + self.position, + kw::CURRENT.to_string(), + q::Value::from(value.clone()), + )) + } } impl ValueMap for Field { diff --git a/graphql/src/store/prefetch.rs b/graphql/src/store/prefetch.rs index 93d57c1c377..b3ae6420af2 100644 --- a/graphql/src/store/prefetch.rs +++ b/graphql/src/store/prefetch.rs @@ -1,6 +1,7 @@ //! Run a GraphQL query and fetch all the entitied needed to build the //! final result +use graph::components::store::AggregationCurrent; use graph::data::graphql::ObjectTypeExt; use graph::data::query::Trace; use graph::data::store::Id; @@ -625,6 +626,7 @@ impl<'a> Loader<'a> { let child_type = input_schema .object_or_interface(field_type.field_type.get_base_type(), child_interval) .expect("we only collect fields that are objects or interfaces"); + let mut aggregation_current = field.aggregation_current()?; let join = if at_root { MaybeJoin::Root { child_type } @@ -643,6 +645,10 @@ impl<'a> Loader<'a> { let field_type = object_type .field(&field.name) .expect("field names are valid"); + + // Loading the current bucket is not supported for nested queries + aggregation_current = None; + MaybeJoin::Nested(Join::new( &input_schema, object_type.cheap_clone(), @@ -651,7 +657,10 @@ impl<'a> Loader<'a> { )) }; - match self.fetch(&parents, &join, field).await { + match self + .fetch(&parents, &join, field, aggregation_current) + .await + { Ok((children, trace)) => { let exec_fut = Box::pin(self.execute_selection_set( children, @@ -695,6 +704,7 @@ impl<'a> Loader<'a> { parents: &[&mut Node], join: &MaybeJoin<'_>, field: &a::Field, + aggregation_current: Option, ) -> Result<(Vec, Trace), QueryExecutionError> { let input_schema = self.resolver.store.input_schema().await?; let child_type = join.child_type(); @@ -722,6 +732,10 @@ impl<'a> Loader<'a> { ); } + if child_type.is_aggregation() { + query.aggregation_current = Some(aggregation_current.unwrap_or_default()); + } + if let MaybeJoin::Nested(join) = join { // For anything but the root node, restrict the children we select // by the parent list diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 404daa42b8d..ecd88298fac 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -17,7 +17,7 @@ mod query_tests; pub(crate) mod dsl; pub(crate) mod index; pub(crate) mod prune; -mod rollup; +pub(crate) mod rollup; pub(crate) mod value; use diesel::deserialize::FromSql; @@ -237,7 +237,7 @@ pub struct Layout { pub input_schema: InputSchema, /// The rollups for aggregations in this layout - rollups: Vec, + pub(crate) rollups: Vec, } impl Layout { @@ -882,6 +882,7 @@ impl Layout { query.block, query.query_id, &self.site, + query.aggregation_current, )?; let query_clone = query.clone(); diff --git a/store/postgres/src/relational/rollup.rs b/store/postgres/src/relational/rollup.rs index c2929f6ca05..8da4d04faad 100644 --- a/store/postgres/src/relational/rollup.rs +++ b/store/postgres/src/relational/rollup.rs @@ -229,13 +229,18 @@ impl<'a> Agg<'a> { #[derive(Debug, Clone)] pub(crate) struct Rollup { pub(crate) interval: AggregationInterval, - #[allow(dead_code)] - agg_table: Arc, + pub(crate) agg_table: Arc
, insert_sql: String, /// A query that determines the last time a rollup was done. The query /// finds the latest timestamp in the aggregation table and adds the /// length of the aggregation interval to deduce the last rollup time last_rollup_sql: String, + + /// The SQL query that loads the current, partially filled bucket. + /// + /// Contains a `--FILTERS;` comment that can be replaced with additional filters like `and c.block$ <= $1`. + /// The filters are applied to the SQL query that loads the time series entities, not to the aggregated values. + pub(crate) select_current_sql: String, } impl Rollup { @@ -264,11 +269,15 @@ impl Rollup { let mut insert_sql = String::new(); sql.insert(&mut insert_sql)?; let last_rollup_sql = sql.last_rollup(); + let mut select_current_sql = String::new(); + sql.select_current(&mut select_current_sql)?; + Ok(Self { interval, agg_table, insert_sql, last_rollup_sql, + select_current_sql, }) } @@ -363,23 +372,22 @@ impl<'a> RollupSql<'a> { /// $2: end timestamp (exclusive) /// $3: block number fn select_bucket(&self, with_block: bool, w: &mut dyn fmt::Write) -> fmt::Result { - let max_id = match self.agg_table.primary_key().column_type.id_type() { - Ok(IdType::Bytes) => "max(id::text)::bytea", - Ok(IdType::String) | Ok(IdType::Int8) => "max(id)", - Err(_) => unreachable!("we make sure that the primary key has an id_type"), - }; - write!(w, "select {max_id} as id, timestamp")?; + write!( + w, + "select {max_id} as id, timestamp", + max_id = self.max_id(), + )?; if with_block { write!(w, ", $3")?; } - write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, w, |w, agg| agg.aggregate("id", w))?; + write_dims(self.dimensions, w, true)?; + comma_sep(self.aggregates, w, true, |w, agg| agg.aggregate("id", w))?; let secs = self.interval.as_duration().as_secs(); write!( w, " from (select id, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp" )?; - write_dims(self.dimensions, w)?; + write_dims(self.dimensions, w, true)?; let agg_srcs: Vec<&str> = { let mut agg_srcs: Vec<_> = self .aggregates @@ -392,7 +400,7 @@ impl<'a> RollupSql<'a> { agg_srcs.dedup(); agg_srcs }; - comma_sep(agg_srcs, w, |w, col: &str| write!(w, "\"{}\"", col))?; + comma_sep(agg_srcs, w, true, |w, col: &str| write!(w, "\"{}\"", col))?; write!( w, " from {src_table} where {src_table}.timestamp >= $1 and {src_table}.timestamp < $2", @@ -403,7 +411,7 @@ impl<'a> RollupSql<'a> { " order by {src_table}.timestamp) data group by timestamp", src_table = self.src_table )?; - write_dims(self.dimensions, w) + write_dims(self.dimensions, w, true) } fn select(&self, w: &mut dyn fmt::Write) -> fmt::Result { @@ -420,8 +428,8 @@ impl<'a> RollupSql<'a> { "insert into {}(id, timestamp, block$", self.agg_table.qualified_name )?; - write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, w, |w, agg| { + write_dims(self.dimensions, w, true)?; + comma_sep(self.aggregates, w, true, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, ") ") @@ -442,10 +450,10 @@ impl<'a> RollupSql<'a> { /// for any group keys that appear in `bucket` fn select_prev(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!(w, "select bucket.id, bucket.timestamp")?; - comma_sep(self.dimensions, w, |w, col| { + comma_sep(self.dimensions, w, true, |w, col| { write!(w, "bucket.\"{}\"", col.name) })?; - comma_sep(self.aggregates, w, |w, agg| agg.prev_agg(w))?; + comma_sep(self.aggregates, w, true, |w, agg| agg.prev_agg(w))?; write!(w, " from bucket cross join lateral (")?; write!(w, "select * from {} prev", self.agg_table.qualified_name)?; write!(w, " where prev.timestamp < $1")?; @@ -461,14 +469,16 @@ impl<'a> RollupSql<'a> { fn select_combined(&self, w: &mut dyn fmt::Write) -> fmt::Result { write!(w, "select id, timestamp")?; - comma_sep(self.dimensions, w, |w, col| write!(w, "\"{}\"", col.name))?; - comma_sep(self.aggregates, w, |w, agg| agg.combine("seq", w))?; + comma_sep(self.dimensions, w, true, |w, col| { + write!(w, "\"{}\"", col.name) + })?; + comma_sep(self.aggregates, w, true, |w, agg| agg.combine("seq", w))?; write!( w, " from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u " )?; write!(w, " group by id, timestamp")?; - write_dims(self.dimensions, w)?; + write_dims(self.dimensions, w, true)?; Ok(()) } @@ -501,8 +511,8 @@ impl<'a> RollupSql<'a> { write!(w, " ")?; self.insert_into(w)?; write!(w, "select id, timestamp, $3 as block$")?; - write_dims(self.dimensions, w)?; - comma_sep(self.aggregates, w, |w, agg| { + write_dims(self.dimensions, w, true)?; + comma_sep(self.aggregates, w, true, |w, agg| { write!(w, "\"{}\"", agg.agg_column.name) })?; write!(w, " from combined") @@ -528,16 +538,186 @@ impl<'a> RollupSql<'a> { secs, self.agg_table.qualified_name ) } + + fn max_id(&self) -> &'static str { + match self.agg_table.primary_key().column_type.id_type() { + Ok(IdType::Bytes) => "max(id::text)::bytea", + Ok(IdType::String) | Ok(IdType::Int8) => "max(id)", + Err(_) => unreachable!("we make sure that the primary key has an id_type"), + } + } + + /// Generates the SQL query that loads the current, partially filled bucket. + /// + /// The generated SQL query contains a `--FILTERS;` comment that can be replaced + /// with additional filters like `and c.block$ <= $1`. The filters are applied to + /// the SQL query that loads the time series entities, not to the aggregated values. + fn select_current(&self, w: &mut dyn fmt::Write) -> fmt::Result { + if self.has_cumulative_aggregates() { + self.select_current_bucket_cumulative(w)?; + } else { + self.select_current_bucket(w)?; + } + Ok(()) + } + + /// Generates the SQL query that loads the current, partially filled bucket when + /// the aggregation has cumulative aggregates. + /// + /// The generated query has the following structure: + /// + /// with bucket as ( + /// {select current bucket query} + /// ), prev as ( + /// select + /// bucket.id, + /// bucket.vid, + /// bucket.block$ + /// bucket.timestamp, + /// {dimensions}, + /// {prev aggregates} + /// from bucket cross join lateral ( + /// select * from {agg table} where timestamp < {last rollup timestamp} + /// order by timestamp desc limit 1 + /// ) + /// ), combined ( + /// {select * from bucket and prev} + /// group by + /// id, + /// vid, + /// block$, + /// timestamp, + /// {dimensions} + /// ) + /// select + /// id, + /// vid, + /// block$, + /// timestamp, + /// {dimensions}, + /// {aggregates} + /// from combined; + fn select_current_bucket_cumulative(&self, w: &mut dyn fmt::Write) -> fmt::Result { + write!(w, "with bucket as (")?; + self.select_current_bucket(w)?; + write!(w, "), prev as (")?; + write!( + w, + "select bucket.id, bucket.vid, bucket.block$, bucket.timestamp" + )?; + comma_sep(self.dimensions, w, true, |w, col| { + write!(w, "bucket.\"{}\"", col.name) + })?; + comma_sep(self.aggregates, w, true, |w, agg| agg.prev_agg(w))?; + write!(w, " from bucket cross join lateral (")?; + write!(w, "select * from {} prev", self.agg_table.qualified_name)?; + write!( + w, + " where prev.timestamp < coalesce(({last_rollup}), '-infinity'::timestamptz)", + last_rollup = self.last_rollup() + )?; + for dim in self.dimensions { + write!( + w, + " and prev.\"{name}\" = bucket.\"{name}\"", + name = dim.name + )?; + } + write!(w, " order by prev.timestamp desc limit 1) prev")?; + write!(w, "), combined as (")?; + write!(w, "select id, vid, block$, timestamp")?; + comma_sep(self.dimensions, w, true, |w, col| { + write!(w, "\"{}\"", col.name) + })?; + comma_sep(self.aggregates, w, true, |w, agg| agg.combine("seq", w))?; + write!( + w, + " from (select *, 1 as seq from prev union all select *, 2 as seq from bucket) u " + )?; + write!(w, " group by id, vid, block$, timestamp")?; + write_dims(self.dimensions, w, true)?; + write!(w, ")")?; + write!(w, "select id, vid, block$, timestamp")?; + write_dims(self.dimensions, w, true)?; + comma_sep(self.aggregates, w, true, |w, agg| { + write!(w, "\"{}\"", agg.agg_column.name) + })?; + write!(w, " from combined as c") + } + + /// Generates the SQL query that loads the current, partially filled bucket when + /// the aggregation does not have cumulative aggregates. + /// + /// The generated query has the following structure: + /// + /// select + /// max(id) as id, + /// max(vid) as vid, + /// max(block$) as block$, + /// max(timestamp) as timestamp, + /// {dimensions}, + /// {aggregates} + /// from + /// ({select timeseries entities} where timestamp >= {last rollup timestamp} {--FILTERS;}) + /// group by + /// {dimensions}; + fn select_current_bucket(&self, w: &mut dyn fmt::Write) -> fmt::Result { + write!( + w, + "select {max_id} as id, max(vid) as vid, max(block$) as block$, max(timestamp) as timestamp", + max_id = self.max_id(), + )?; + write_dims(self.dimensions, w, true)?; + comma_sep(self.aggregates, w, true, |w, agg| agg.aggregate("id", w))?; + write!( + w, + " from (select id, vid, block$, date_bin('{secs}s', timestamp, 'epoch'::timestamptz) as timestamp", + secs = self.interval.as_duration().as_secs(), + )?; + write_dims(self.dimensions, w, true)?; + let agg_srcs: Vec<&str> = { + let mut agg_srcs: Vec<_> = self + .aggregates + .iter() + .flat_map(|agg| &agg.src_columns) + .copied() + .filter(|&col| col != "id" && col != "timestamp") + .collect(); + agg_srcs.sort(); + agg_srcs.dedup(); + agg_srcs + }; + comma_sep(agg_srcs, w, true, |w, col: &str| write!(w, "\"{}\"", col))?; + write!( + w, + " from {src_table} as c where c.timestamp >= coalesce(({last_rollup}), '-infinity'::timestamptz)", + src_table = self.src_table, + last_rollup = self.last_rollup(), + )?; + write!(w, " --FILTERS;) c")?; + if !self.dimensions.is_empty() { + write!(w, " group by ")?; + write_dims(self.dimensions, w, false)?; + } + Ok(()) + } } /// Write the elements in `list` separated by commas into `w`. The list /// elements are written by calling `out` with each of them. -fn comma_sep(list: impl IntoIterator, w: &mut dyn fmt::Write, out: F) -> fmt::Result +fn comma_sep( + list: impl IntoIterator, + w: &mut dyn fmt::Write, + comma_prefix: bool, + out: F, +) -> fmt::Result where F: Fn(&mut dyn fmt::Write, T) -> fmt::Result, { - for elem in list { - write!(w, ", ")?; + for (i, elem) in list.into_iter().enumerate() { + if comma_prefix || i != 0 { + write!(w, ", ")?; + } out(w, elem)?; } Ok(()) @@ -545,8 +725,10 @@ where /// Write the names of the columns in `dimensions` into `w` as a /// comma-separated list of quoted column names. -fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write) -> fmt::Result { - comma_sep(dimensions, w, |w, col| write!(w, "\"{}\"", col.name)) +fn write_dims(dimensions: &[&Column], w: &mut dyn fmt::Write, comma_prefix: bool) -> fmt::Result { + comma_sep(dimensions, w, comma_prefix, |w, col| { + write!(w, "\"{}\"", col.name) + }) } #[cfg(test)] diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index ef066b208c8..0337b8605d5 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -26,7 +26,10 @@ use graph::prelude::{ ParentLink, QueryExecutionError, StoreError, Value, ENV_VARS, }; use graph::schema::{EntityType, FulltextAlgorithm, FulltextConfig, InputSchema}; -use graph::{components::store::AttributeNames, data::store::scalar}; +use graph::{ + components::store::{AggregationCurrent, AttributeNames}, + data::store::scalar, +}; use inflector::Inflector; use itertools::Itertools; use std::collections::{BTreeMap, BTreeSet, HashSet}; @@ -40,8 +43,8 @@ use std::string::ToString; use crate::block_range::{BoundSide, EntityBlockRange}; use crate::relational::dsl::AtBlock; use crate::relational::{ - dsl, Column, ColumnType, Layout, SqlName, Table, BYTE_ARRAY_PREFIX_SIZE, PRIMARY_KEY_COLUMN, - STRING_PREFIX_SIZE, VID_COLUMN, + dsl, rollup::Rollup, Column, ColumnType, Layout, SqlName, Table, BYTE_ARRAY_PREFIX_SIZE, + PRIMARY_KEY_COLUMN, STRING_PREFIX_SIZE, VID_COLUMN, }; use crate::{ block_range::{ @@ -2634,6 +2637,11 @@ impl<'a> ParentLimit<'a> { // limiting is taken care of in a wrapper around // the query we are currently building } + + /// Returns the maximum number of rows that could be requested by this range filter. + fn max_num_rows(&self) -> u32 { + self.range.0.first.unwrap_or(EntityRange::FIRST) + self.range.0.skip + } } /// This is the parallel to `EntityWindow`, with names translated to @@ -4257,6 +4265,7 @@ pub struct FilterQuery<'a> { block: BlockNumber, query_id: Option, site: &'a Site, + rollup: Option<&'a Rollup>, } /// String representation that is useful for debugging when `walk_ast` fails @@ -4284,10 +4293,12 @@ impl<'a> FilterQuery<'a> { block: BlockNumber, query_id: Option, site: &'a Site, + aggregation_current: Option, ) -> Result { let sort_key = SortKey::new(order, collection, filter, layout, block)?; let range = FilterRange(range); let limit = ParentLimit { sort_key, range }; + let rollup = Self::find_rollup(collection, layout, aggregation_current)?; Ok(FilterQuery { collection, @@ -4295,9 +4306,46 @@ impl<'a> FilterQuery<'a> { block, query_id, site, + rollup, }) } + /// Finds the relevant [Rollup] for an aggregation entity query when the query requires the current bucket. + /// + /// Returns `None` for non-aggregation entity queries. + /// + /// Returns an error if the query is not supported. + fn find_rollup( + collection: &'a FilterCollection, + layout: &'a Layout, + aggregation_current: Option, + ) -> Result, QueryExecutionError> { + if !matches!(aggregation_current, Some(AggregationCurrent::Include)) { + return Ok(None); + } + + // Supporting window and/or multiple entity queries would make the existing SQL queries even more complicated. + // It is also unclear whether supporting that would be a valid use case. + let entity = match collection { + FilterCollection::All(entities) if entities.len() == 1 => &entities[0], + _ => return Err(QueryExecutionError::NotSupported( + "The current aggregation bucket can only be queried in a root query for one aggregation entity".to_string()) + ), + }; + + // This is not supported because the use of `SELECT *` does not always produce the expected results when combined with `UNION ALL`. + if matches!(entity.column_names, AttributeNames::All) { + return Err(QueryExecutionError::NotSupported( + "The current aggregation bucket can only be queried when fields are explicitly selected".to_string()) + ); + } + + Ok(layout + .rollups + .iter() + .find(|rollup| rollup.agg_table.object.as_str() == entity.table.meta.object.as_str())) + } + /// Generate /// from schema.table c /// where block_range @> $block @@ -4343,19 +4391,61 @@ impl<'a> FilterQuery<'a> { /// where block_range @> $block /// and filter /// order by .. limit .. skip ..) c + /// + /// For aggregation entity queries that require the current bucket, + /// the generated query has the following structure: + /// + /// select '..' as entity, to_jsonb(e.*) as data + /// from ( + /// (select {column names} from agg_table c where {filters} limit {limit + skip + 1}) + /// union all + /// (select {column names} from ({current bucket query from agg source table} + {filters}) c) + /// ) c order by .. limit .. skip .. fn query_no_window_one_entity<'b>( &'b self, wh: &'b WholeTable<'a>, out: &mut AstPass<'_, 'b, Pg>, ) -> QueryResult<()> { Self::select_entity_and_data(wh.table, out); - out.push_sql(" from (select "); - write_column_names(&wh.column_names, wh.table, Some("c."), out)?; - self.filtered_rows(wh, out)?; - out.push_sql("\n "); - self.limit.sort_key.order_by(out, false)?; - self.limit.range.walk_ast(out.reborrow())?; - out.push_sql(") c"); + out.push_sql(" from ("); + + match self.rollup { + Some(rollup) => { + out.push_sql("(select "); + write_column_names(&wh.column_names, wh.table, Some("c."), out)?; + self.filtered_rows(wh, out)?; + out.push_sql("\n "); + self.limit.sort_key.order_by(out, false)?; + out.push_sql(" limit "); + out.push_sql(&self.limit.max_num_rows().to_string()); + out.push_sql(") union all (select "); + write_column_names(&wh.column_names, wh.table, Some("c."), out)?; + out.push_sql(" from ("); + let mut current_sql_split = rollup.select_current_sql.split("--FILTERS;"); + out.push_sql(current_sql_split.next().unwrap()); + out.push_sql(" and "); + wh.at_block.walk_ast(out.reborrow())?; + if let Some(filter) = &wh.filter { + out.push_sql(" and "); + filter.walk_ast(out.reborrow())?; + } + out.push_sql(current_sql_split.next().unwrap()); + out.push_sql(") c ) ) c"); + out.push_sql("\n "); + self.limit.sort_key.order_by(out, false)?; + self.limit.range.walk_ast(out.reborrow())?; + } + None => { + out.push_sql("select "); + write_column_names(&wh.column_names, wh.table, Some("c."), out)?; + self.filtered_rows(wh, out)?; + out.push_sql("\n "); + self.limit.sort_key.order_by(out, false)?; + self.limit.range.walk_ast(out.reborrow())?; + out.push_sql(") c"); + } + } + Ok(()) } From d1a4c1a16603f00043fa0a93439a7846db6b074c Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Tue, 20 Jan 2026 15:39:21 +0200 Subject: [PATCH 4/4] fix(docs): update the aggregation docs --- docs/aggregations.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/aggregations.md b/docs/aggregations.md index fafbd4d3305..a6492d5e624 100644 --- a/docs/aggregations.md +++ b/docs/aggregations.md @@ -178,8 +178,8 @@ accepts the following arguments: dimension - A mandatory `interval` - An optional `current` to indicate whether to include the current, - partially filled bucket in the response. Can be either `ignore` (the - default) or `include` (still **TODO** and not implemented) + partially filled bucket in the response. Can be either `exclude` (the + default) or `include` - Optional `timestamp_{gte|gt|lt|lte|eq|in}` filters to restrict the range of timestamps to return. The timestamp to filter by must be a string containing microseconds since the epoch. The value `"1704164640000000"` @@ -189,7 +189,7 @@ accepts the following arguments: ```graphql token_stats(interval: "hour", - current: ignore, + current: exclude, where: { token: "0x1234", timestamp_gte: 1234567890,