-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Query the current aggregation bucket #6293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e5401e5
a74aee6
1d6505f
d1a4c1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -465,6 +465,8 @@ pub struct EntityQuery { | |
| pub query_id: Option<String>, | ||
|
|
||
| pub trace: bool, | ||
|
|
||
| pub aggregation_current: Option<AggregationCurrent>, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is this |
||
| } | ||
|
|
||
| impl EntityQuery { | ||
|
|
@@ -483,6 +485,7 @@ impl EntityQuery { | |
| logger: None, | ||
| query_id: None, | ||
| trace: false, | ||
| aggregation_current: None, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -542,6 +545,19 @@ impl EntityQuery { | |
| } | ||
| } | ||
|
|
||
| /// Indicates whether the current, partially filled bucket should be included in the response. | ||
| /// | ||
| /// This is only relevant for aggregation entity queries. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] | ||
| pub enum AggregationCurrent { | ||
| /// Exclude the current, partially filled bucket from the response. | ||
| #[default] | ||
| Exclude, | ||
|
|
||
| /// Include the current, partially filled bucket in the response. | ||
| Include, | ||
| } | ||
|
|
||
| /// Operation types that lead to changes in assignments | ||
| #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] | ||
| #[serde(rename_all = "lowercase")] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -190,9 +190,11 @@ impl AbiJson { | |
| return Ok(Some(vec![])); | ||
| } | ||
| // Recursively resolve the nested path | ||
| return self | ||
| .resolve_field_path(components, nested_path) | ||
| .map(Some); | ||
| return Self::resolve_field_path( | ||
| components, | ||
| nested_path, | ||
| ) | ||
| .map(Some); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These changes also seem unrelated |
||
| } | ||
| } | ||
| } | ||
|
|
@@ -217,7 +219,6 @@ impl AbiJson { | |
| /// Supports both numeric indices and field names | ||
| /// Returns the index path to access the final field | ||
| fn resolve_field_path( | ||
| &self, | ||
| components: &serde_json::Value, | ||
| field_path: &[&str], | ||
| ) -> Result<Vec<usize>, Error> { | ||
|
|
@@ -254,7 +255,7 @@ impl AbiJson { | |
| // Recursively resolve the remaining path | ||
| let mut result = vec![index]; | ||
| let nested_result = | ||
| self.resolve_field_path(nested_components, remaining_path)?; | ||
| Self::resolve_field_path(nested_components, remaining_path)?; | ||
| result.extend(nested_result); | ||
| return Ok(result); | ||
| } else { | ||
|
|
@@ -294,8 +295,10 @@ impl AbiJson { | |
| if let Some(nested_components) = component.get("components") { | ||
| // Recursively resolve the remaining path | ||
| let mut result = vec![index]; | ||
| let nested_result = | ||
| self.resolve_field_path(nested_components, remaining_path)?; | ||
| let nested_result = Self::resolve_field_path( | ||
| nested_components, | ||
| remaining_path, | ||
| )?; | ||
| result.extend(nested_result); | ||
| return Ok(result); | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| use std::collections::{BTreeSet, HashSet}; | ||
|
|
||
| use graph::{ | ||
| components::store::{AttributeNames, ChildMultiplicity, EntityOrder}, | ||
| components::store::{AggregationCurrent, AttributeNames, ChildMultiplicity, EntityOrder}, | ||
| data::{graphql::ObjectOrInterface, store::ID}, | ||
| env::ENV_VARS, | ||
| prelude::{anyhow, q, r, s, QueryExecutionError, ValueMap}, | ||
|
|
@@ -364,6 +364,26 @@ impl Field { | |
| }) | ||
| .transpose() | ||
| } | ||
|
|
||
| pub fn aggregation_current(&self) -> Result<Option<AggregationCurrent>, QueryExecutionError> { | ||
| let Some(value) = self.argument_value(kw::CURRENT) else { | ||
| return Ok(None); | ||
| }; | ||
|
|
||
| if let r::Value::Enum(current) = value { | ||
| match current.as_str() { | ||
| "exclude" => return Ok(Some(AggregationCurrent::Exclude)), | ||
| "include" => return Ok(Some(AggregationCurrent::Include)), | ||
| _ => {} | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be a |
||
| } | ||
|
|
||
| Err(QueryExecutionError::InvalidArgumentError( | ||
| self.position, | ||
| kw::CURRENT.to_string(), | ||
| q::Value::from(value.clone()), | ||
| )) | ||
| } | ||
| } | ||
|
|
||
| impl ValueMap for Field { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| //! Run a GraphQL query and fetch all the entitied needed to build the | ||
| //! final result | ||
|
|
||
| use graph::components::store::AggregationCurrent; | ||
| use graph::data::graphql::ObjectTypeExt; | ||
| use graph::data::query::Trace; | ||
| use graph::data::store::Id; | ||
|
|
@@ -9,7 +10,6 @@ use graph::data::store::IdType; | |
| use graph::data::store::QueryObject; | ||
| use graph::data::value::{Object, Word}; | ||
| use graph::prelude::{r, CacheWeight, CheapClone}; | ||
| use graph::schema::kw; | ||
| use graph::schema::AggregationInterval; | ||
| use graph::schema::Field; | ||
| use graph::slog::warn; | ||
|
|
@@ -626,6 +626,7 @@ impl<'a> Loader<'a> { | |
| let child_type = input_schema | ||
| .object_or_interface(field_type.field_type.get_base_type(), child_interval) | ||
| .expect("we only collect fields that are objects or interfaces"); | ||
| let mut aggregation_current = field.aggregation_current()?; | ||
|
|
||
| let join = if at_root { | ||
| MaybeJoin::Root { child_type } | ||
|
|
@@ -644,6 +645,10 @@ impl<'a> Loader<'a> { | |
| let field_type = object_type | ||
| .field(&field.name) | ||
| .expect("field names are valid"); | ||
|
|
||
| // Loading the current bucket is not supported for nested queries | ||
| aggregation_current = None; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One case where that might be wanted is when you query e.g., for a list of tokens and want stats for each of them, i.e., some structure like But it's fine to leave that as a TODO for now. I am not entirely sure whether silently suppressing |
||
|
|
||
| MaybeJoin::Nested(Join::new( | ||
| &input_schema, | ||
| object_type.cheap_clone(), | ||
|
|
@@ -652,7 +657,10 @@ impl<'a> Loader<'a> { | |
| )) | ||
| }; | ||
|
|
||
| match self.fetch(&parents, &join, field).await { | ||
| match self | ||
| .fetch(&parents, &join, field, aggregation_current) | ||
| .await | ||
| { | ||
| Ok((children, trace)) => { | ||
| let exec_fut = Box::pin(self.execute_selection_set( | ||
| children, | ||
|
|
@@ -696,6 +704,7 @@ impl<'a> Loader<'a> { | |
| parents: &[&mut Node], | ||
| join: &MaybeJoin<'_>, | ||
| field: &a::Field, | ||
| aggregation_current: Option<AggregationCurrent>, | ||
| ) -> Result<(Vec<Node>, Trace), QueryExecutionError> { | ||
| let input_schema = self.resolver.store.input_schema().await?; | ||
| let child_type = join.child_type(); | ||
|
|
@@ -715,11 +724,6 @@ impl<'a> Loader<'a> { | |
| // that causes unnecessary work in the database | ||
| query.order = EntityOrder::Unordered; | ||
| } | ||
| // Apply default timestamp ordering for aggregations if no custom order is specified | ||
| if child_type.is_aggregation() && matches!(query.order, EntityOrder::Default) { | ||
| let ts = child_type.field(kw::TIMESTAMP).unwrap(); | ||
| query.order = EntityOrder::Descending(ts.name.to_string(), ts.value_type); | ||
| } | ||
| query.logger = Some(self.ctx.logger.cheap_clone()); | ||
| if let Some(r::Value::String(id)) = field.argument_value(ARG_ID) { | ||
| query.filter = Some( | ||
|
|
@@ -728,6 +732,10 @@ impl<'a> Loader<'a> { | |
| ); | ||
| } | ||
|
|
||
| if child_type.is_aggregation() { | ||
| query.aggregation_current = Some(aggregation_current.unwrap_or_default()); | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Also, I don't understand why this isn't simply |
||
|
|
||
| if let MaybeJoin::Nested(join) = join { | ||
| // For anything but the root node, restrict the children we select | ||
| // by the parent list | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ use graph::data::value::Object; | |
| use graph::data::value::Value as DataValue; | ||
| use graph::prelude::{r, TryFromValue, ENV_VARS}; | ||
| use graph::schema::ast::{self as sast, FilterOp}; | ||
| use graph::schema::kw; | ||
| use graph::schema::{EntityType, InputSchema, ObjectOrInterface}; | ||
|
|
||
| use crate::execution::ast as a; | ||
|
|
@@ -552,6 +553,14 @@ fn build_order( | |
| } | ||
| } | ||
| } | ||
| // Apply a default ordering to the aggregations so that the most recent buckets are returned first | ||
| (None, _) if entity.is_aggregation() => { | ||
| let ts = entity | ||
| .field(kw::TIMESTAMP) | ||
| .expect("aggregation entities have timestamps"); | ||
|
|
||
| EntityOrder::Descending(ts.name.to_string(), ts.value_type) | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume we need to resolve that here because the code in |
||
| (None, _) => EntityOrder::Default, | ||
| }; | ||
| Ok(order) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file seem unrelated, right?