diff --git a/Cargo.lock b/Cargo.lock index faaa6e2..3a25374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -705,6 +705,24 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.72.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993776b509cfb49c750f11b8f07a46fa23e0a1386ffc01fb1e7d343efc387895" +dependencies = [ + "bitflags 2.10.0", + "cexpr", + "clang-sys", + "itertools 0.13.0", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.111", +] + [[package]] name = "bit-set" version = "0.8.0" @@ -882,6 +900,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "c-kzg" version = "2.1.5" @@ -924,6 +952,15 @@ dependencies = [ "shlex", ] +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -977,6 +1014,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.53" @@ -2019,6 +2067,8 @@ name = "ethlambda-storage" version = "0.1.0" dependencies = [ "ethlambda-types", + "rocksdb", + "tempfile", "tracing", ] @@ -3449,6 +3499,16 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if 1.0.4", + "windows-link", +] + [[package]] name = "libm" version = "0.2.15" @@ -4166,6 +4226,21 @@ dependencies = [ "yamux 0.13.8", ] +[[package]] +name = "librocksdb-sys" +version = "0.17.3+10.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cef2a00ee60fe526157c9023edab23943fae1ce2ab6f4abb2a807c1746835de9" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + [[package]] name = "libtest-mimic" version = "0.8.1" @@ -4247,6 +4322,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "malachite" version = "0.6.1" @@ -5679,7 +5764,7 @@ dependencies = [ "once_cell", "socket2 0.6.1", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -6021,6 +6106,16 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "rocksdb" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddb7af00d2b17dbd07d82c0063e25411959748ff03e8d4f96134c2ff41fce34f" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rtnetlink" version = "0.13.1" @@ -8214,3 +8309,13 @@ dependencies = [ "quote", "syn 2.0.111", ] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index f9b521a..e925252 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,3 +68,4 @@ tree_hash_derive = "0.9.1" vergen-git2 = { version = "9", features = ["rustc"] } rand = "0.9" +rocksdb = "0.24" diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index c9da076..c4fa991 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -4,6 +4,7 @@ use std::{ collections::{BTreeMap, HashMap}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, + sync::Arc, }; use clap::Parser; @@ -19,7 +20,7 @@ use tracing::{error, info}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; -use ethlambda_storage::Store; +use ethlambda_storage::{Store, backend::RocksDBBackend}; const ASCII_ART: &str = r#" _ _ _ _ _ @@ -91,7 +92,8 @@ async fn main() { read_validator_keys(&validators_path, &validator_keys_dir, &options.node_id); let genesis_state = State::from_genesis(&genesis, validators); - let store = Store::from_genesis(genesis_state); + let backend = Arc::new(RocksDBBackend::open("./data").expect("Failed to open RocksDB")); + let store = Store::from_genesis(backend, genesis_state); let (p2p_tx, p2p_rx) = tokio::sync::mpsc::unbounded_channel(); let blockchain = BlockChain::spawn(store.clone(), p2p_tx, validator_keys); diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index a0ee5d8..86274e0 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -1,10 +1,11 @@ use std::{ collections::{HashMap, HashSet}, path::Path, + sync::Arc, }; use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; -use ethlambda_storage::Store; +use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ attestation::Attestation, block::{Block, BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation}, @@ -35,7 +36,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> { let anchor_state: State = test.anchor_state.into(); let anchor_block: Block = test.anchor_block.into(); let genesis_time = anchor_state.config.genesis_time; - let mut store = Store::get_forkchoice_store(anchor_state, anchor_block); + let backend = Arc::new(InMemoryBackend::new()); + let mut store = Store::get_forkchoice_store(backend, anchor_state, anchor_block); // Block registry: maps block labels to their roots let mut block_registry: HashMap = HashMap::new(); diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index a0fb24c..d76da29 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -1,7 +1,8 @@ use std::path::Path; +use std::sync::Arc; use ethlambda_blockchain::{SECONDS_PER_SLOT, store}; -use ethlambda_storage::Store; +use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ block::{Block, SignedBlockWithAttestation}, primitives::TreeHash, @@ -41,7 +42,8 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Initialize the store with the anchor state and block let genesis_time = anchor_state.config.genesis_time; - let mut st = Store::get_forkchoice_store(anchor_state, anchor_block); + let backend = Arc::new(InMemoryBackend::new()); + let mut st = Store::get_forkchoice_store(backend, anchor_state, anchor_block); // Step 2: Run the state transition function with the block fixture let signed_block: SignedBlockWithAttestation = test.signed_block_with_attestation.into(); diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index d577a70..88562bf 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -74,7 +74,7 @@ mod tests { body::Body, http::{Request, StatusCode}, }; - use ethlambda_storage::Store; + use ethlambda_storage::{Store, backend::InMemoryBackend}; use ethlambda_types::{ block::{BlockBody, BlockHeader}, primitives::TreeHash, @@ -82,6 +82,7 @@ mod tests { }; use http_body_util::BodyExt; use serde_json::json; + use std::sync::Arc; use tower::ServiceExt; /// Create a minimal test state for testing. @@ -116,7 +117,8 @@ mod tests { #[tokio::test] async fn test_get_latest_justified_checkpoint() { let state = create_test_state(); - let store = Store::from_genesis(state); + let backend = Arc::new(InMemoryBackend::new()); + let store = Store::from_genesis(backend, state); let app = build_api_router(store.clone()); @@ -151,7 +153,8 @@ mod tests { use ethlambda_types::primitives::Encode; let state = create_test_state(); - let store = Store::from_genesis(state); + let backend = Arc::new(InMemoryBackend::new()); + let store = Store::from_genesis(backend, state); // Get the expected state from the store let finalized = store.latest_finalized(); diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 5cc1f3b..a81be60 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -13,3 +13,7 @@ version.workspace = true ethlambda-types.workspace = true tracing.workspace = true +rocksdb.workspace = true + +[dev-dependencies] +tempfile = "3" diff --git a/crates/storage/src/backend/in_memory.rs b/crates/storage/src/backend/in_memory.rs index 914cc21..00b9da3 100644 --- a/crates/storage/src/backend/in_memory.rs +++ b/crates/storage/src/backend/in_memory.rs @@ -136,181 +136,11 @@ impl StorageWriteBatch for InMemoryWriteBatch { #[cfg(test)] mod tests { use super::*; + use crate::backend::tests::run_backend_tests; #[test] - fn test_put_and_get() { + fn test_in_memory_backend() { let backend = InMemoryBackend::new(); - - // Write data - { - let mut batch = backend.begin_write().unwrap(); - batch - .put_batch(Table::Blocks, vec![(b"key1".to_vec(), b"value1".to_vec())]) - .unwrap(); - batch.commit().unwrap(); - } - - // Read data - { - let view = backend.begin_read().unwrap(); - let value = view.get(Table::Blocks, b"key1").unwrap(); - assert_eq!(value, Some(b"value1".to_vec())); - } - } - - #[test] - fn test_delete() { - let backend = InMemoryBackend::new(); - - // Write data - { - let mut batch = backend.begin_write().unwrap(); - batch - .put_batch(Table::Blocks, vec![(b"key1".to_vec(), b"value1".to_vec())]) - .unwrap(); - batch.commit().unwrap(); - } - - // Delete data - { - let mut batch = backend.begin_write().unwrap(); - batch - .delete_batch(Table::Blocks, vec![b"key1".to_vec()]) - .unwrap(); - batch.commit().unwrap(); - } - - // Verify deleted - { - let view = backend.begin_read().unwrap(); - let value = view.get(Table::Blocks, b"key1").unwrap(); - assert_eq!(value, None); - } - } - - #[test] - fn test_prefix_iterator() { - let backend = InMemoryBackend::new(); - - // Write data with common prefix - { - let mut batch = backend.begin_write().unwrap(); - batch - .put_batch( - Table::Metadata, - vec![ - (b"config:a".to_vec(), b"1".to_vec()), - (b"config:b".to_vec(), b"2".to_vec()), - (b"other:x".to_vec(), b"3".to_vec()), - ], - ) - .unwrap(); - batch.commit().unwrap(); - } - - // Query by prefix - { - let view = backend.begin_read().unwrap(); - let mut results: Vec<_> = view - .prefix_iterator(Table::Metadata, b"config:") - .unwrap() - .collect::, _>>() - .unwrap(); - - results.sort_by(|a, b| a.0.cmp(&b.0)); - assert_eq!(results.len(), 2); - assert_eq!(&*results[0].0, b"config:a"); - assert_eq!(&*results[1].0, b"config:b"); - } - } - - #[test] - fn test_nonexistent_key() { - let backend = InMemoryBackend::new(); - let view = backend.begin_read().unwrap(); - let value = view.get(Table::Blocks, b"nonexistent").unwrap(); - assert_eq!(value, None); - } - - #[test] - fn test_delete_then_put() { - let backend = InMemoryBackend::new(); - - // Initial value - { - let mut batch = backend.begin_write().unwrap(); - batch - .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"old".to_vec())]) - .unwrap(); - batch.commit().unwrap(); - } - - // Delete then put in same batch - put should win - { - let mut batch = backend.begin_write().unwrap(); - batch - .delete_batch(Table::Blocks, vec![b"key".to_vec()]) - .unwrap(); - batch - .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"new".to_vec())]) - .unwrap(); - batch.commit().unwrap(); - } - - let view = backend.begin_read().unwrap(); - assert_eq!( - view.get(Table::Blocks, b"key").unwrap(), - Some(b"new".to_vec()) - ); - } - - #[test] - fn test_put_then_delete() { - let backend = InMemoryBackend::new(); - - // Put then delete in same batch - delete should win - { - let mut batch = backend.begin_write().unwrap(); - batch - .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"value".to_vec())]) - .unwrap(); - batch - .delete_batch(Table::Blocks, vec![b"key".to_vec()]) - .unwrap(); - batch.commit().unwrap(); - } - - let view = backend.begin_read().unwrap(); - assert_eq!(view.get(Table::Blocks, b"key").unwrap(), None); - } - - #[test] - fn test_multiple_tables() { - let backend = InMemoryBackend::new(); - - // Write to different tables - { - let mut batch = backend.begin_write().unwrap(); - batch - .put_batch(Table::Blocks, vec![(b"key".to_vec(), b"block".to_vec())]) - .unwrap(); - batch - .put_batch(Table::States, vec![(b"key".to_vec(), b"state".to_vec())]) - .unwrap(); - batch.commit().unwrap(); - } - - // Verify isolation - { - let view = backend.begin_read().unwrap(); - assert_eq!( - view.get(Table::Blocks, b"key").unwrap(), - Some(b"block".to_vec()) - ); - assert_eq!( - view.get(Table::States, b"key").unwrap(), - Some(b"state".to_vec()) - ); - } + run_backend_tests(&backend); } } diff --git a/crates/storage/src/backend/mod.rs b/crates/storage/src/backend/mod.rs index d53103f..8574c57 100644 --- a/crates/storage/src/backend/mod.rs +++ b/crates/storage/src/backend/mod.rs @@ -6,7 +6,14 @@ //! //! - [`InMemoryBackend`]: Thread-safe in-memory storage using `RwLock`. //! Suitable for testing and ephemeral nodes. Data is lost on restart. +//! +//! - [`RocksDBBackend`] (requires `rocksdb` feature): Persistent storage using RocksDB. +//! Suitable for production nodes. mod in_memory; +mod rocksdb; +#[cfg(test)] +mod tests; pub use in_memory::InMemoryBackend; +pub use rocksdb::RocksDBBackend; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs new file mode 100644 index 0000000..72fb159 --- /dev/null +++ b/crates/storage/src/backend/rocksdb.rs @@ -0,0 +1,180 @@ +//! RocksDB storage backend. + +use crate::api::{ + ALL_TABLES, Error, PrefixResult, StorageBackend, StorageReadView, StorageWriteBatch, Table, +}; +use rocksdb::{ + ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch, WriteOptions, +}; +use std::path::Path; +use std::sync::Arc; + +/// Returns the column family name for a table. +fn cf_name(table: Table) -> &'static str { + match table { + Table::Blocks => "blocks", + Table::States => "states", + Table::LatestKnownAttestations => "latest_known_attestations", + Table::LatestNewAttestations => "latest_new_attestations", + Table::GossipSignatures => "gossip_signatures", + Table::AggregatedPayloads => "aggregated_payloads", + Table::Metadata => "metadata", + } +} + +/// RocksDB storage backend. +#[derive(Clone)] +pub struct RocksDBBackend { + db: Arc>, +} + +impl RocksDBBackend { + /// Open a RocksDB database at the given path. + pub fn open(path: impl AsRef) -> Result { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let cf_descriptors: Vec<_> = ALL_TABLES + .iter() + .map(|t| ColumnFamilyDescriptor::new(cf_name(*t), Options::default())) + .collect(); + + let db = + DBWithThreadMode::::open_cf_descriptors(&opts, path, cf_descriptors)?; + + Ok(Self { db: Arc::new(db) }) + } +} + +impl StorageBackend for RocksDBBackend { + fn begin_read(&self) -> Result, Error> { + Ok(Box::new(RocksDBReadView { + db: Arc::clone(&self.db), + })) + } + + fn begin_write(&self) -> Result, Error> { + Ok(Box::new(RocksDBWriteBatch { + db: Arc::clone(&self.db), + batch: WriteBatch::default(), + })) + } +} + +/// Read-only view into RocksDB. +struct RocksDBReadView { + db: Arc>, +} + +impl StorageReadView for RocksDBReadView { + fn get(&self, table: Table, key: &[u8]) -> Result>, Error> { + let cf = self + .db + .cf_handle(cf_name(table)) + .ok_or_else(|| format!("Column family {} not found", cf_name(table)))?; + + Ok(self.db.get_cf(&cf, key)?) + } + + fn prefix_iterator( + &self, + table: Table, + prefix: &[u8], + ) -> Result + '_>, Error> { + let cf = self + .db + .cf_handle(cf_name(table)) + .ok_or_else(|| format!("Column family {} not found", cf_name(table)))?; + + let prefix_owned = prefix.to_vec(); + let iter = self + .db + .prefix_iterator_cf(&cf, prefix) + .map(|result| result.map_err(|e| Box::new(e) as Error)) + .take_while(move |result| match result { + Ok((key, _)) => key.starts_with(&prefix_owned), + Err(_) => true, // propagate errors + }); + + Ok(Box::new(iter)) + } +} + +/// Write batch for RocksDB. +struct RocksDBWriteBatch { + db: Arc>, + batch: WriteBatch, +} + +impl StorageWriteBatch for RocksDBWriteBatch { + fn put_batch(&mut self, table: Table, batch: Vec<(Vec, Vec)>) -> Result<(), Error> { + let cf = self + .db + .cf_handle(cf_name(table)) + .ok_or_else(|| format!("Column family {} not found", cf_name(table)))?; + + for (key, value) in batch { + self.batch.put_cf(&cf, key, value); + } + Ok(()) + } + + fn delete_batch(&mut self, table: Table, keys: Vec>) -> Result<(), Error> { + let cf = self + .db + .cf_handle(cf_name(table)) + .ok_or_else(|| format!("Column family {} not found", cf_name(table)))?; + + for key in keys { + self.batch.delete_cf(&cf, key); + } + Ok(()) + } + + fn commit(self: Box) -> Result<(), Error> { + let mut write_opts = WriteOptions::default(); + write_opts.set_sync(false); + + self.db.write_opt(self.batch, &write_opts)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::Table; + use crate::backend::tests::run_backend_tests; + use tempfile::tempdir; + + #[test] + fn test_rocksdb_backend() { + let dir = tempdir().unwrap(); + let backend = RocksDBBackend::open(dir.path()).unwrap(); + run_backend_tests(&backend); + } + + #[test] + fn test_persistence() { + let dir = tempdir().unwrap(); + + // Write data + { + let backend = RocksDBBackend::open(dir.path()).unwrap(); + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch(Table::Blocks, vec![(b"key1".to_vec(), b"value1".to_vec())]) + .unwrap(); + batch.commit().unwrap(); + } + + // Reopen and read + { + let backend = RocksDBBackend::open(dir.path()).unwrap(); + let view = backend.begin_read().unwrap(); + let value = view.get(Table::Blocks, b"key1").unwrap(); + assert_eq!(value, Some(b"value1".to_vec())); + } + } +} diff --git a/crates/storage/src/backend/tests.rs b/crates/storage/src/backend/tests.rs new file mode 100644 index 0000000..7401577 --- /dev/null +++ b/crates/storage/src/backend/tests.rs @@ -0,0 +1,203 @@ +//! Shared tests for storage backends. +//! +//! This module provides a generic test suite that can be run against any +//! `StorageBackend` implementation to verify correct behavior. +//! +//! Note: These tests use simple text keys (e.g., `b"test_put_get_key"`) rather +//! than real production data (SSZ-encoded H256 hashes, validator indices, etc.). +//! This may cause issues if a backend implementation relies on specific key +//! formats or lengths. If adding a backend with such constraints, consider +//! adding backend-specific tests with realistic data. + +use crate::api::{StorageBackend, Table}; + +/// Run the full test suite against a backend. +pub fn run_backend_tests(backend: &dyn StorageBackend) { + test_put_and_get(backend); + test_delete(backend); + test_prefix_iterator(backend); + test_nonexistent_key(backend); + test_delete_then_put(backend); + test_put_then_delete(backend); + test_multiple_tables(backend); +} + +fn test_put_and_get(backend: &dyn StorageBackend) { + // Write data + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Blocks, + vec![(b"test_put_get_key".to_vec(), b"value1".to_vec())], + ) + .unwrap(); + batch.commit().unwrap(); + } + + // Read data + { + let view = backend.begin_read().unwrap(); + let value = view.get(Table::Blocks, b"test_put_get_key").unwrap(); + assert_eq!(value, Some(b"value1".to_vec())); + } +} + +fn test_delete(backend: &dyn StorageBackend) { + // Write data + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Blocks, + vec![(b"test_delete_key".to_vec(), b"value1".to_vec())], + ) + .unwrap(); + batch.commit().unwrap(); + } + + // Delete data + { + let mut batch = backend.begin_write().unwrap(); + batch + .delete_batch(Table::Blocks, vec![b"test_delete_key".to_vec()]) + .unwrap(); + batch.commit().unwrap(); + } + + // Verify deleted + { + let view = backend.begin_read().unwrap(); + let value = view.get(Table::Blocks, b"test_delete_key").unwrap(); + assert_eq!(value, None); + } +} + +fn test_prefix_iterator(backend: &dyn StorageBackend) { + // Write data with common prefix + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Metadata, + vec![ + (b"test_prefix:a".to_vec(), b"1".to_vec()), + (b"test_prefix:b".to_vec(), b"2".to_vec()), + (b"test_other:x".to_vec(), b"3".to_vec()), + ], + ) + .unwrap(); + batch.commit().unwrap(); + } + + // Query by prefix + { + let view = backend.begin_read().unwrap(); + let mut results: Vec<_> = view + .prefix_iterator(Table::Metadata, b"test_prefix:") + .unwrap() + .collect::, _>>() + .unwrap(); + + results.sort_by(|a, b| a.0.cmp(&b.0)); + assert_eq!(results.len(), 2); + assert_eq!(&*results[0].0, b"test_prefix:a"); + assert_eq!(&*results[1].0, b"test_prefix:b"); + } +} + +fn test_nonexistent_key(backend: &dyn StorageBackend) { + let view = backend.begin_read().unwrap(); + let value = view + .get(Table::Blocks, b"test_nonexistent_key_12345") + .unwrap(); + assert_eq!(value, None); +} + +fn test_delete_then_put(backend: &dyn StorageBackend) { + // Initial value + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Blocks, + vec![(b"test_del_put_key".to_vec(), b"old".to_vec())], + ) + .unwrap(); + batch.commit().unwrap(); + } + + // Delete then put in same batch - put should win + { + let mut batch = backend.begin_write().unwrap(); + batch + .delete_batch(Table::Blocks, vec![b"test_del_put_key".to_vec()]) + .unwrap(); + batch + .put_batch( + Table::Blocks, + vec![(b"test_del_put_key".to_vec(), b"new".to_vec())], + ) + .unwrap(); + batch.commit().unwrap(); + } + + let view = backend.begin_read().unwrap(); + assert_eq!( + view.get(Table::Blocks, b"test_del_put_key").unwrap(), + Some(b"new".to_vec()) + ); +} + +fn test_put_then_delete(backend: &dyn StorageBackend) { + // Put then delete in same batch - delete should win + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Blocks, + vec![(b"test_put_del_key".to_vec(), b"value".to_vec())], + ) + .unwrap(); + batch + .delete_batch(Table::Blocks, vec![b"test_put_del_key".to_vec()]) + .unwrap(); + batch.commit().unwrap(); + } + + let view = backend.begin_read().unwrap(); + assert_eq!(view.get(Table::Blocks, b"test_put_del_key").unwrap(), None); +} + +fn test_multiple_tables(backend: &dyn StorageBackend) { + // Write to different tables + { + let mut batch = backend.begin_write().unwrap(); + batch + .put_batch( + Table::Blocks, + vec![(b"test_multi_key".to_vec(), b"block".to_vec())], + ) + .unwrap(); + batch + .put_batch( + Table::States, + vec![(b"test_multi_key".to_vec(), b"state".to_vec())], + ) + .unwrap(); + batch.commit().unwrap(); + } + + // Verify isolation + { + let view = backend.begin_read().unwrap(); + assert_eq!( + view.get(Table::Blocks, b"test_multi_key").unwrap(), + Some(b"block".to_vec()) + ); + assert_eq!( + view.get(Table::States, b"test_multi_key").unwrap(), + Some(b"state".to_vec()) + ); + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 5442564..1829fbd 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,5 +1,6 @@ mod api; -mod backend; +pub mod backend; mod store; +pub use api::StorageBackend; pub use store::{ForkCheckpoints, SignatureKey, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 7830924..6777018 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use crate::api::{StorageBackend, Table}; -use crate::backend::InMemoryBackend; use ethlambda_types::{ attestation::AttestationData, @@ -95,7 +94,7 @@ pub struct Store { impl Store { /// Initialize a Store from a genesis state. - pub fn from_genesis(mut genesis_state: State) -> Self { + pub fn from_genesis(backend: Arc, mut genesis_state: State) -> Self { // Ensure the header state root is zero before computing the state root genesis_state.latest_block_header.state_root = H256::ZERO; @@ -107,16 +106,18 @@ impl Store { state_root: genesis_state_root, body: BlockBody::default(), }; - Self::get_forkchoice_store(genesis_state, genesis_block) + Self::get_forkchoice_store(backend, genesis_state, genesis_block) } /// Initialize a Store from an anchor state and block. - pub fn get_forkchoice_store(anchor_state: State, anchor_block: Block) -> Self { + pub fn get_forkchoice_store( + backend: Arc, + anchor_state: State, + anchor_block: Block, + ) -> Self { let anchor_state_root = anchor_state.tree_hash_root(); let anchor_block_root = anchor_block.tree_hash_root(); - let backend: Arc = Arc::new(InMemoryBackend::new()); - let anchor_checkpoint = Checkpoint { root: anchor_block_root, slot: anchor_block.slot,