diff --git a/Cargo.lock b/Cargo.lock index 921ffa9eb28..a295b5058fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -1382,6 +1397,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-graphql" version = "7.2.1" @@ -1865,6 +1894,27 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.4.0" @@ -6593,6 +6643,7 @@ version = "0.12.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "encoding_rs", diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index 8850764d63b..509d239bf01 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -14,7 +14,7 @@ mod transport; pub use self::capabilities::NodeCapabilities; pub use self::ethereum_adapter::EthereumAdapter; pub use self::runtime::RuntimeAdapter; -pub use self::transport::Transport; +pub use self::transport::{Compression, Transport}; pub use env::ENV_VARS; pub use buffered_call_cache::BufferedCallCache; diff --git a/chain/ethereum/src/network.rs b/chain/ethereum/src/network.rs index 536f7a8a54d..301068650f8 100644 --- a/chain/ethereum/src/network.rs +++ b/chain/ethereum/src/network.rs @@ -314,6 +314,7 @@ mod tests { use graph::components::network_provider::ProviderManager; use graph::components::network_provider::ProviderName; use graph::data::value::Word; + use graph::http::HeaderMap; use graph::{ endpoint::EndpointMetrics, @@ -324,7 +325,9 @@ mod tests { }; use std::sync::Arc; - use crate::{EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport}; + use crate::{ + Compression, EthereumAdapter, EthereumAdapterTrait, ProviderEthRpcMetrics, Transport, + }; use super::{EthereumNetworkAdapter, EthereumNetworkAdapters, NodeCapabilities}; @@ -394,6 +397,7 @@ mod tests { HeaderMap::new(), metrics.clone(), "", + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -497,6 +501,7 @@ mod tests { HeaderMap::new(), metrics.clone(), "", + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -568,6 +573,7 @@ mod tests { HeaderMap::new(), metrics.clone(), "", + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -632,6 +638,7 @@ mod tests { HeaderMap::new(), metrics.clone(), "", + Compression::None, ); let provider_metrics = Arc::new(ProviderEthRpcMetrics::new(mock_registry.clone())); @@ -919,6 +926,7 @@ mod tests { HeaderMap::new(), endpoint_metrics.clone(), "", + Compression::None, ); Arc::new( diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index d5fac8523bb..2c78b618f95 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -11,6 +11,27 @@ use alloy::transports::{TransportError, TransportFut}; use graph::prelude::alloy::transports::{http::Http, ipc::IpcConnect, ws::WsConnect}; +/// Compression method for RPC requests. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum Compression { + #[default] + None, + Gzip, + Brotli, + Deflate, +} + +impl std::fmt::Display for Compression { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Compression::None => write!(f, "none"), + Compression::Gzip => write!(f, "gzip"), + Compression::Brotli => write!(f, "brotli"), + Compression::Deflate => write!(f, "deflate"), + } + } +} + /// Abstraction over different transport types for Alloy providers. #[derive(Clone, Debug)] pub enum Transport { @@ -46,11 +67,24 @@ impl Transport { headers: graph::http::HeaderMap, metrics: Arc, provider: impl AsRef, + compression: Compression, ) -> Self { - let client = reqwest::Client::builder() - .default_headers(headers) - .build() - .expect("Failed to build HTTP client"); + let mut client_builder = reqwest::Client::builder().default_headers(headers); + + match compression { + Compression::None => {} + Compression::Gzip => { + client_builder = client_builder.gzip(true); + } + Compression::Brotli => { + client_builder = client_builder.brotli(true); + } + Compression::Deflate => { + client_builder = client_builder.deflate(true); + } + } + + let client = client_builder.build().expect("Failed to build HTTP client"); let http_transport = Http::with_client(client, rpc); let metrics_transport = MetricsHttp::new(http_transport, metrics, provider.as_ref().into()); diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 4b53cbee452..65b24736cce 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -27,7 +27,7 @@ chrono = "0.4.43" envconfig = { workspace = true } Inflector = "0.11.3" atty = "0.2" -reqwest = { version = "0.12.23", features = ["json", "stream", "multipart"] } +reqwest = { version = "0.12.23", features = ["json", "stream", "multipart", "gzip", "brotli", "deflate"] } ethabi = "17.2" hex = "0.4.3" http0 = { version = "0", package = "http" } diff --git a/graph/src/components/link_resolver/arweave.rs b/graph/src/components/link_resolver/arweave.rs index abd3d80a503..a9b66516dee 100644 --- a/graph/src/components/link_resolver/arweave.rs +++ b/graph/src/components/link_resolver/arweave.rs @@ -42,7 +42,7 @@ impl Default for ArweaveClient { Self { base_url: "https://arweave.net".parse().unwrap(), - client: Client::default(), + client: Client::builder().gzip(false).build().unwrap(), logger: Logger::root(slog::Discard, o!()), } } @@ -53,7 +53,7 @@ impl ArweaveClient { Self { base_url, logger, - client: Client::default(), + client: Client::builder().gzip(false).build().unwrap(), } } } diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 057e774d93e..69cc0bbe02f 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -48,6 +48,7 @@ shard = "primary" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, + { label = "mainnet-2", details = { type = "web3", url = "http://rpc.mainnet.io", features = ["archive", "compression/gzip"] }}, { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, ] diff --git a/node/src/chain.rs b/node/src/chain.rs index b1f2b0709cb..39dc1686853 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -206,11 +206,13 @@ pub async fn create_ethereum_networks_for_chain( } let logger = logger.new(o!("provider" => provider.label.clone())); + let compression = web3.compression(); info!( logger, "Creating transport"; "url" => &web3.url, - "capabilities" => capabilities + "capabilities" => capabilities, + "compression" => compression.to_string() ); use crate::config::Transport::*; @@ -221,6 +223,7 @@ pub async fn create_ethereum_networks_for_chain( web3.headers.clone(), endpoint_metrics.cheap_clone(), &provider.label, + compression, ), Ipc => Transport::new_ipc(&web3.url).await, Ws => Transport::new_ws(&web3.url).await, diff --git a/node/src/config.rs b/node/src/config.rs index b118f34da57..ad5236cd182 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -17,7 +17,7 @@ use graph::{ }, }; use graph_chain_ethereum as ethereum; -use graph_chain_ethereum::NodeCapabilities; +use graph_chain_ethereum::{Compression, NodeCapabilities}; use graph_store_postgres::{DeploymentPlacer, Shard as ShardName, PRIMARY_SHARD}; use graph::http::{HeaderMap, Uri}; @@ -704,12 +704,31 @@ impl Web3Provider { } } + pub fn compression(&self) -> Compression { + if self.features.contains("compression/gzip") { + Compression::Gzip + } else if self.features.contains("compression/brotli") { + Compression::Brotli + } else if self.features.contains("compression/deflate") { + Compression::Deflate + } else { + Compression::None + } + } + pub fn limit_for(&self, node: &NodeId) -> SubgraphLimit { self.rules.limit_for(node) } } -const PROVIDER_FEATURES: [&str; 3] = ["traces", "archive", "no_eip1898"]; +const PROVIDER_FEATURES: [&str; 6] = [ + "traces", + "archive", + "no_eip1898", + "compression/gzip", + "compression/brotli", + "compression/deflate", +]; const DEFAULT_PROVIDER_FEATURES: [&str; 2] = ["traces", "archive"]; impl Provider { @@ -770,6 +789,18 @@ impl Provider { } } + let compression_count = web3 + .features + .iter() + .filter(|f| f.starts_with("compression/")) + .count(); + if compression_count > 1 { + return Err(anyhow!( + "at most one compression method allowed for provider {}", + self.label + )); + } + web3.url = shellexpand::env(&web3.url)?.into_owned(); let label = &self.label; @@ -1201,6 +1232,7 @@ mod tests { use graph::http::{HeaderMap, HeaderValue}; use graph::prelude::regex::Regex; use graph::prelude::{toml, NodeId}; + use graph_chain_ethereum::Compression; use std::collections::BTreeSet; use std::fs::read_to_string; use std::path::{Path, PathBuf}; @@ -1619,6 +1651,117 @@ mod tests { assert!(SubgraphLimit::Limit(10) > SubgraphLimit::Disabled); } + #[test] + fn it_parses_web3_provider_with_gzip_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/gzip"] } + "#, + ) + .unwrap(); + + let mut features = BTreeSet::new(); + features.insert("archive".to_string()); + features.insert("compression/gzip".to_string()); + + assert_eq!( + Provider { + label: "compressed".to_owned(), + details: ProviderDetails::Web3(Web3Provider { + transport: Transport::Rpc, + url: "http://localhost:8545".to_owned(), + features, + headers: HeaderMap::new(), + rules: Vec::new(), + }), + }, + actual + ); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Gzip); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_with_brotli_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/brotli"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Brotli); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_with_deflate_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "compressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive", "compression/deflate"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::Deflate); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_parses_web3_provider_without_compression_feature() { + let actual: Provider = toml::from_str( + r#" + label = "uncompressed" + details = { type = "web3", url = "http://localhost:8545", features = ["archive"] } + "#, + ) + .unwrap(); + + match actual.details { + ProviderDetails::Web3(ref web3) => { + assert_eq!(web3.compression(), Compression::None); + } + _ => panic!("expected Web3 provider"), + } + } + + #[test] + fn it_rejects_multiple_compression_features() { + let mut actual: Provider = toml::from_str( + r#" + label = "multi-comp" + details = { type = "web3", url = "http://localhost:8545", features = ["compression/gzip", "compression/brotli"] } + "#, + ) + .unwrap(); + + let err = actual.validate(); + assert!(err.is_err()); + let err = err.unwrap_err(); + assert!( + err.to_string() + .contains("at most one compression method allowed"), + "result: {:?}", + err + ); + } + #[test] fn duplicated_labels_are_not_allowed_within_chain() { let mut actual = toml::from_str::(