Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ accepts the following arguments:
dimension
- A mandatory `interval`
- An optional `current` to indicate whether to include the current,
partially filled bucket in the response. Can be either `ignore` (the
default) or `include` (still **TODO** and not implemented)
partially filled bucket in the response. Can be either `exclude` (the
default) or `include`
- Optional `timestamp_{gte|gt|lt|lte|eq|in}` filters to restrict the range
of timestamps to return. The timestamp to filter by must be a string
containing microseconds since the epoch. The value `"1704164640000000"`
Expand All @@ -189,7 +189,7 @@ accepts the following arguments:

```graphql
token_stats(interval: "hour",
current: ignore,
current: exclude,
where: {
token: "0x1234",
timestamp_gte: 1234567890,
Expand Down
8 changes: 4 additions & 4 deletions graph/src/amp/sql/query_builder/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ struct AllowOnlySelectQueries;

impl AllowOnlySelectQueries {
/// Returns an error if the `set_expr` is not a `SELECT` expression.
fn visit_set_expr(&self, set_expr: &ast::SetExpr) -> Result<()> {
fn visit_set_expr(set_expr: &ast::SetExpr) -> Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file seem unrelated, right?

match set_expr {
ast::SetExpr::Select(_)
| ast::SetExpr::Query(_)
| ast::SetExpr::Values(_)
| ast::SetExpr::Table(_) => Ok(()),
ast::SetExpr::SetOperation { left, right, .. } => {
self.visit_set_expr(left)?;
self.visit_set_expr(right)?;
Self::visit_set_expr(left)?;
Self::visit_set_expr(right)?;
Ok(())
}
ast::SetExpr::Insert(_) | ast::SetExpr::Update(_) | ast::SetExpr::Delete(_) => {
Expand All @@ -64,7 +64,7 @@ impl Visitor for AllowOnlySelectQueries {
type Break = anyhow::Error;

fn pre_visit_query(&mut self, query: &ast::Query) -> ControlFlow<Self::Break> {
match self.visit_set_expr(&query.body) {
match Self::visit_set_expr(&query.body) {
Ok(()) => ControlFlow::Continue(()),
Err(e) => ControlFlow::Break(e),
}
Expand Down
16 changes: 16 additions & 0 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ pub struct EntityQuery {
pub query_id: Option<String>,

pub trace: bool,

pub aggregation_current: Option<AggregationCurrent>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this None? I assume when the query does not access aggregations

}

impl EntityQuery {
Expand All @@ -483,6 +485,7 @@ impl EntityQuery {
logger: None,
query_id: None,
trace: false,
aggregation_current: None,
}
}

Expand Down Expand Up @@ -542,6 +545,19 @@ impl EntityQuery {
}
}

/// Indicates whether the current, partially filled bucket should be included in the response.
///
/// This is only relevant for aggregation entity queries.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AggregationCurrent {
/// Exclude the current, partially filled bucket from the response.
#[default]
Exclude,

/// Include the current, partially filled bucket in the response.
Include,
}

/// Operation types that lead to changes in assignments
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "lowercase")]
Expand Down
17 changes: 10 additions & 7 deletions graph/src/data_source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ impl AbiJson {
return Ok(Some(vec![]));
}
// Recursively resolve the nested path
return self
.resolve_field_path(components, nested_path)
.map(Some);
return Self::resolve_field_path(
components,
nested_path,
)
.map(Some);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes also seem unrelated

}
}
}
Expand All @@ -217,7 +219,6 @@ impl AbiJson {
/// Supports both numeric indices and field names
/// Returns the index path to access the final field
fn resolve_field_path(
&self,
components: &serde_json::Value,
field_path: &[&str],
) -> Result<Vec<usize>, Error> {
Expand Down Expand Up @@ -254,7 +255,7 @@ impl AbiJson {
// Recursively resolve the remaining path
let mut result = vec![index];
let nested_result =
self.resolve_field_path(nested_components, remaining_path)?;
Self::resolve_field_path(nested_components, remaining_path)?;
result.extend(nested_result);
return Ok(result);
} else {
Expand Down Expand Up @@ -294,8 +295,10 @@ impl AbiJson {
if let Some(nested_components) = component.get("components") {
// Recursively resolve the remaining path
let mut result = vec![index];
let nested_result =
self.resolve_field_path(nested_components, remaining_path)?;
let nested_result = Self::resolve_field_path(
nested_components,
remaining_path,
)?;
result.extend(nested_result);
return Ok(result);
} else {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,11 @@ impl FilterOps {
"",
s::Type::NamedType("OrderDirection".to_string()),
),
input_value(
"current",
"",
s::Type::NamedType("Aggregation_current".to_string()),
),
],
};

Expand Down Expand Up @@ -2212,6 +2217,8 @@ type Gravatar @entity {
assert_eq!("Aggregation_interval", interval.value_type.get_base_type());
let filter = field.argument("where").unwrap();
assert_eq!(&filter_type, filter.value_type.get_base_type());
let current = field.argument("current").unwrap();
assert_eq!("Aggregation_current", current.value_type.get_base_type());

let s::TypeDefinition::InputObject(filter) = schema
.get_type_definition_from_type(&filter.value_type)
Expand Down
1 change: 1 addition & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod kw {
pub const INTERVALS: &str = "intervals";
pub const INTERVAL: &str = "interval";
pub const CUMULATIVE: &str = "cumulative";
pub const CURRENT: &str = "current";
}

/// The internal representation of a subgraph schema, i.e., the
Expand Down
9 changes: 9 additions & 0 deletions graph/src/schema/meta.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,12 @@ enum Aggregation_interval {
hour
day
}

"Indicates whether the current, partially filled bucket should be included in the response. Defaults to `exclude`"
enum Aggregation_current {
"Exclude the current, partially filled bucket from the response"
exclude

"Include the current, partially filled bucket in the response"
include
}
22 changes: 21 additions & 1 deletion graphql/src/execution/ast.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeSet, HashSet};

use graph::{
components::store::{AttributeNames, ChildMultiplicity, EntityOrder},
components::store::{AggregationCurrent, AttributeNames, ChildMultiplicity, EntityOrder},
data::{graphql::ObjectOrInterface, store::ID},
env::ENV_VARS,
prelude::{anyhow, q, r, s, QueryExecutionError, ValueMap},
Expand Down Expand Up @@ -364,6 +364,26 @@ impl Field {
})
.transpose()
}

pub fn aggregation_current(&self) -> Result<Option<AggregationCurrent>, QueryExecutionError> {
let Some(value) = self.argument_value(kw::CURRENT) else {
return Ok(None);
};

if let r::Value::Enum(current) = value {
match current.as_str() {
"exclude" => return Ok(Some(AggregationCurrent::Exclude)),
"include" => return Ok(Some(AggregationCurrent::Include)),
_ => {}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a FromStr implementation on AggregationCurrent

}

Err(QueryExecutionError::InvalidArgumentError(
self.position,
kw::CURRENT.to_string(),
q::Value::from(value.clone()),
))
}
}

impl ValueMap for Field {
Expand Down
22 changes: 15 additions & 7 deletions graphql/src/store/prefetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Run a GraphQL query and fetch all the entitied needed to build the
//! final result

use graph::components::store::AggregationCurrent;
use graph::data::graphql::ObjectTypeExt;
use graph::data::query::Trace;
use graph::data::store::Id;
Expand All @@ -9,7 +10,6 @@ use graph::data::store::IdType;
use graph::data::store::QueryObject;
use graph::data::value::{Object, Word};
use graph::prelude::{r, CacheWeight, CheapClone};
use graph::schema::kw;
use graph::schema::AggregationInterval;
use graph::schema::Field;
use graph::slog::warn;
Expand Down Expand Up @@ -626,6 +626,7 @@ impl<'a> Loader<'a> {
let child_type = input_schema
.object_or_interface(field_type.field_type.get_base_type(), child_interval)
.expect("we only collect fields that are objects or interfaces");
let mut aggregation_current = field.aggregation_current()?;

let join = if at_root {
MaybeJoin::Root { child_type }
Expand All @@ -644,6 +645,10 @@ impl<'a> Loader<'a> {
let field_type = object_type
.field(&field.name)
.expect("field names are valid");

// Loading the current bucket is not supported for nested queries
aggregation_current = None;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One case where that might be wanted is when you query e.g., for a list of tokens and want stats for each of them, i.e., some structure like { tokens { id tokenStats(..) { .. } } }.

But it's fine to leave that as a TODO for now.

I am not entirely sure whether silently suppressing current: include is the right approach. I think we should produce an error during query validation for this case.


MaybeJoin::Nested(Join::new(
&input_schema,
object_type.cheap_clone(),
Expand All @@ -652,7 +657,10 @@ impl<'a> Loader<'a> {
))
};

match self.fetch(&parents, &join, field).await {
match self
.fetch(&parents, &join, field, aggregation_current)
.await
{
Ok((children, trace)) => {
let exec_fut = Box::pin(self.execute_selection_set(
children,
Expand Down Expand Up @@ -696,6 +704,7 @@ impl<'a> Loader<'a> {
parents: &[&mut Node],
join: &MaybeJoin<'_>,
field: &a::Field,
aggregation_current: Option<AggregationCurrent>,
) -> Result<(Vec<Node>, Trace), QueryExecutionError> {
let input_schema = self.resolver.store.input_schema().await?;
let child_type = join.child_type();
Expand All @@ -715,11 +724,6 @@ impl<'a> Loader<'a> {
// that causes unnecessary work in the database
query.order = EntityOrder::Unordered;
}
// Apply default timestamp ordering for aggregations if no custom order is specified
if child_type.is_aggregation() && matches!(query.order, EntityOrder::Default) {
let ts = child_type.field(kw::TIMESTAMP).unwrap();
query.order = EntityOrder::Descending(ts.name.to_string(), ts.value_type);
}
query.logger = Some(self.ctx.logger.cheap_clone());
if let Some(r::Value::String(id)) = field.argument_value(ARG_ID) {
query.filter = Some(
Expand All @@ -728,6 +732,10 @@ impl<'a> Loader<'a> {
);
}

if child_type.is_aggregation() {
query.aggregation_current = Some(aggregation_current.unwrap_or_default());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think child_type.is_aggregation() && aggregation_current.is_some() indicates an implementation error somewhere that we should detect (similar for the opposite case)

Also, I don't understand why this isn't simply query.aggregation_current = aggregation_current?


if let MaybeJoin::Nested(join) = join {
// For anything but the root node, restrict the children we select
// by the parent list
Expand Down
9 changes: 9 additions & 0 deletions graphql/src/store/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use graph::data::value::Object;
use graph::data::value::Value as DataValue;
use graph::prelude::{r, TryFromValue, ENV_VARS};
use graph::schema::ast::{self as sast, FilterOp};
use graph::schema::kw;
use graph::schema::{EntityType, InputSchema, ObjectOrInterface};

use crate::execution::ast as a;
Expand Down Expand Up @@ -552,6 +553,14 @@ fn build_order(
}
}
}
// Apply a default ordering to the aggregations so that the most recent buckets are returned first
(None, _) if entity.is_aggregation() => {
let ts = entity
.field(kw::TIMESTAMP)
.expect("aggregation entities have timestamps");

EntityOrder::Descending(ts.name.to_string(), ts.value_type)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we need to resolve that here because the code in relational_queries doesn't have enough information to do it there. It's a little nasty that in some cases we turn the default into a real order, and in others we pass `EntityOrder::Default' and let query generation resolve it. But I am not sure how to make that more sensible (and definitely out of scope for this PR)

(None, _) => EntityOrder::Default,
};
Ok(order)
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ pub async fn update_deployment_status(
d::failed.eq(health.is_failed()),
d::health.eq(health),
d::fatal_error.eq::<Option<String>>(fatal_error),
d::non_fatal_errors.eq::<Vec<String>>(non_fatal_errors.unwrap_or(vec![])),
d::non_fatal_errors.eq::<Vec<String>>(non_fatal_errors.unwrap_or_default()),
))
.execute(conn)
.await
Expand Down
5 changes: 3 additions & 2 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod query_tests;
pub(crate) mod dsl;
pub(crate) mod index;
pub(crate) mod prune;
mod rollup;
pub(crate) mod rollup;
pub(crate) mod value;

use diesel::deserialize::FromSql;
Expand Down Expand Up @@ -237,7 +237,7 @@ pub struct Layout {
pub input_schema: InputSchema,

/// The rollups for aggregations in this layout
rollups: Vec<Rollup>,
pub(crate) rollups: Vec<Rollup>,
}

impl Layout {
Expand Down Expand Up @@ -882,6 +882,7 @@ impl Layout {
query.block,
query.query_id,
&self.site,
query.aggregation_current,
)?;

let query_clone = query.clone();
Expand Down
Loading
Loading