diff --git a/CHANGELOG.md b/CHANGELOG.md index e0c086087..c9ef6b77a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046)) +- **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047)) ## v1.0.0-rc.2 diff --git a/block/components.go b/block/components.go index 903402cb1..d5af466ef 100644 --- a/block/components.go +++ b/block/components.go @@ -112,7 +112,7 @@ func (bc *Components) Stop() error { } } if bc.Cache != nil { - if err := bc.Cache.SaveToDisk(); err != nil { + if err := bc.Cache.SaveToStore(); err != nil { errs = errors.Join(errs, fmt.Errorf("failed to save caches: %w", err)) } } diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 97cd197c6..83d5c9653 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -1,18 +1,16 @@ package cache import ( - "bufio" - "encoding/gob" - "errors" + "context" + "encoding/binary" "fmt" - "os" - "path/filepath" "sync" "sync/atomic" lru "github.com/hashicorp/golang-lru/v2" - "golang.org/x/sync/errgroup" + + "github.com/evstack/ev-node/pkg/store" ) const ( @@ -47,62 +45,55 @@ type Cache[T any] struct { // maxDAHeight tracks the maximum DA height seen maxDAHeight *atomic.Uint64 -} -// CacheConfig holds configuration for cache sizes. -type CacheConfig struct { - ItemsCacheSize int - HashesCacheSize int - DAIncludedCacheSize int + // store is used for persisting DA inclusion data (optional, can be nil for ephemeral caches) + store store.Store + // storeKeyPrefix is the prefix used for store keys + storeKeyPrefix string } -// DefaultCacheConfig returns the default cache configuration. -func DefaultCacheConfig() CacheConfig { - return CacheConfig{ - ItemsCacheSize: DefaultItemsCacheSize, - HashesCacheSize: DefaultHashesCacheSize, - DAIncludedCacheSize: DefaultDAIncludedCacheSize, +// NewCache returns a new Cache struct with default sizes. +// If store and keyPrefix are provided, DA inclusion data will be persisted to the store for populating the cache on restarts. +func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { + // LRU cache creation only fails if size <= 0, which won't happen with our defaults + itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) + hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) + daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) + // hashByHeight must be at least as large as hashes cache to ensure proper pruning. + hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) + + return &Cache[T]{ + itemsByHeight: itemsCache, + hashes: hashesCache, + daIncluded: daIncludedCache, + hashByHeight: hashByHeightCache, + maxDAHeight: &atomic.Uint64{}, + store: s, + storeKeyPrefix: keyPrefix, } } -// NewCache returns a new Cache struct with default sizes -func NewCache[T any]() *Cache[T] { - cache, _ := NewCacheWithConfig[T](DefaultCacheConfig()) - return cache +// storeKey returns the store key for a given hash. +func (c *Cache[T]) storeKey(hash string) string { + return c.storeKeyPrefix + hash } -// NewCacheWithConfig returns a new Cache struct with custom sizes -func NewCacheWithConfig[T any](config CacheConfig) (*Cache[T], error) { - itemsCache, err := lru.New[uint64, *T](config.ItemsCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create items cache: %w", err) - } - - hashesCache, err := lru.New[string, bool](config.HashesCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create hashes cache: %w", err) - } - - daIncludedCache, err := lru.New[string, uint64](config.DAIncludedCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create daIncluded cache: %w", err) - } +// encodeDAInclusion encodes daHeight and blockHeight into a 16-byte value. +func encodeDAInclusion(daHeight, blockHeight uint64) []byte { + value := make([]byte, 16) // 8 bytes for daHeight + 8 bytes for blockHeight + binary.LittleEndian.PutUint64(value[0:8], daHeight) + binary.LittleEndian.PutUint64(value[8:16], blockHeight) + return value +} - // hashByHeight must be at least as large as hashes cache to ensure proper pruning. - // If an entry is evicted from hashByHeight before hashes, the corresponding hash - // entry can no longer be pruned by height, causing a slow memory leak. - hashByHeightCache, err := lru.New[uint64, string](config.HashesCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create hashByHeight cache: %w", err) +// decodeDAInclusion decodes a 16-byte value into daHeight and blockHeight. +func decodeDAInclusion(value []byte) (daHeight, blockHeight uint64, ok bool) { + if len(value) != 16 { + return 0, 0, false } - - return &Cache[T]{ - itemsByHeight: itemsCache, - hashes: hashesCache, - daIncluded: daIncludedCache, - hashByHeight: hashByHeightCache, - maxDAHeight: &atomic.Uint64{}, - }, nil + daHeight = binary.LittleEndian.Uint64(value[0:8]) + blockHeight = binary.LittleEndian.Uint64(value[8:16]) + return daHeight, blockHeight, true } // getItem returns an item from the cache by height. @@ -158,26 +149,26 @@ func (c *Cache[T]) getDAIncluded(hash string) (uint64, bool) { return daHeight, true } -// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning +// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning. func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) - // Update max DA height if necessary - for range 1_000 { - current := c.maxDAHeight.Load() - if daHeight <= current { - return - } - if c.maxDAHeight.CompareAndSwap(current, daHeight) { - return - } + // Persist to store if configured (for SIGKILL protection) + if c.store != nil { + _ = c.store.SetMetadata(context.Background(), c.storeKey(hash), encodeDAInclusion(daHeight, blockHeight)) } + + // Update max DA height if necessary + c.setMaxDAHeight(daHeight) } -// removeDAIncluded removes the DA-included status of the hash +// removeDAIncluded removes the DA-included status of the hash from cache and store. func (c *Cache[T]) removeDAIncluded(hash string) { c.daIncluded.Remove(hash) + if c.store != nil { + _ = c.store.DeleteMetadata(context.Background(), c.storeKey(hash)) + } } // daHeight returns the maximum DA height from all DA-included items. @@ -186,12 +177,26 @@ func (c *Cache[T]) daHeight() uint64 { return c.maxDAHeight.Load() } +// setMaxDAHeight sets the maximum DA height if the provided value is greater +// than the current value. +func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { + for range 1_000 { + current := c.maxDAHeight.Load() + if daHeight <= current { + return + } + if c.maxDAHeight.CompareAndSwap(current, daHeight) { + return + } + } +} + // removeSeen removes a hash from the seen cache. func (c *Cache[T]) removeSeen(hash string) { c.hashes.Remove(hash) } -// deleteAllForHeight removes all items and their associated data from the cache at the given height. +// deleteAllForHeight removes all items and their associated data from the cache and store at the given height. func (c *Cache[T]) deleteAllForHeight(height uint64) { c.itemsByHeight.Remove(height) @@ -204,160 +209,87 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { if ok { c.hashes.Remove(hash) - // c.daIncluded.Remove(hash) // we actually do not want to delete the DA-included status here + c.removeDAIncluded(hash) } } -const ( - itemsByHeightFilename = "items_by_height.gob" - hashesFilename = "hashes.gob" - daIncludedFilename = "da_included.gob" -) - -// saveMapGob saves a map to a file using gob encoding. -func saveMapGob[K comparable, V any](filePath string, data map[K]V) (err error) { - file, err := os.Create(filePath) - if err != nil { - return fmt.Errorf("failed to create file %s: %w", filePath, err) +// RestoreFromStore loads DA inclusion data from the store into the in-memory cache. +// This should be called during initialization to restore persisted state. +// It iterates through store metadata keys with the cache's prefix and populates the LRU cache. +func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error { + if c.store == nil { + return nil // No store configured, nothing to restore } - writer := bufio.NewWriter(file) - defer func() { - err = errors.Join(err, writer.Flush(), file.Sync(), file.Close()) - }() - if err := gob.NewEncoder(writer).Encode(data); err != nil { - return fmt.Errorf("failed to encode to file %s: %w", filePath, err) - } - return nil -} + for _, hash := range hashes { + value, err := c.store.GetMetadata(ctx, c.storeKey(hash)) + if err != nil { + // Key not found is not an error - the hash may not have been DA included yet + continue + } -// loadMapGob loads a map from a file using gob encoding. -// if the file does not exist, it returns an empty map and no error. -func loadMapGob[K comparable, V any](filePath string) (map[K]V, error) { - m := make(map[K]V) - file, err := os.Open(filePath) - if err != nil { - if os.IsNotExist(err) { - return m, nil // return empty map if file not found + daHeight, blockHeight, ok := decodeDAInclusion(value) + if !ok { + continue // Invalid data, skip } - return nil, fmt.Errorf("failed to open file %s: %w", filePath, err) - } - defer file.Close() - decoder := gob.NewDecoder(file) - if err := decoder.Decode(&m); err != nil { - return nil, fmt.Errorf("failed to decode file %s: %w", filePath, err) + c.daIncluded.Add(hash, daHeight) + c.hashByHeight.Add(blockHeight, hash) + + // Update max DA height + current := c.maxDAHeight.Load() + if daHeight > current { + c.maxDAHeight.Store(daHeight) + } } - return m, nil + + return nil } -// SaveToDisk saves the cache contents to disk in the specified folder. -// It's the caller's responsibility to ensure that type T (and any types it contains) -// are registered with the gob package if necessary (e.g., using gob.Register). -func (c *Cache[T]) SaveToDisk(folderPath string) error { - if err := os.MkdirAll(folderPath, 0o755); err != nil { - return fmt.Errorf("failed to create directory %s: %w", folderPath, err) +// SaveToStore persists all current DA inclusion entries to the store. +// This can be called before shutdown to ensure all data is persisted. +func (c *Cache[T]) SaveToStore(ctx context.Context) error { + if c.store == nil { + return nil // No store configured } - var wg errgroup.Group - - // save items by height - wg.Go(func() error { - itemsByHeightMap := make(map[uint64]*T) - keys := c.itemsByHeight.Keys() - for _, k := range keys { - if v, ok := c.itemsByHeight.Peek(k); ok { - itemsByHeightMap[k] = v - } - } - if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil { - return fmt.Errorf("save %s: %w", itemsByHeightFilename, err) - } - return nil - }) - - // save hashes - wg.Go(func() error { - hashesToSave := make(map[string]bool) - keys := c.hashes.Keys() - for _, k := range keys { - if v, ok := c.hashes.Peek(k); ok { - hashesToSave[k] = v - } + keys := c.daIncluded.Keys() + for _, hash := range keys { + daHeight, ok := c.daIncluded.Peek(hash) + if !ok { + continue } - if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil { - return fmt.Errorf("save %s: %w", hashesFilename, err) - } - return nil - }) - - // save daIncluded - wg.Go(func() error { - daIncludedToSave := make(map[string]uint64) - keys := c.daIncluded.Keys() - for _, k := range keys { - if v, ok := c.daIncluded.Peek(k); ok { - daIncludedToSave[k] = v + // We need to find the block height for this hash + // Since we track hash by height, we need to iterate + var blockHeight uint64 + heightKeys := c.hashByHeight.Keys() + for _, h := range heightKeys { + if storedHash, ok := c.hashByHeight.Peek(h); ok && storedHash == hash { + blockHeight = h + break } } - if err := saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave); err != nil { - return fmt.Errorf("save %s: %w", daIncludedFilename, err) + if err := c.store.SetMetadata(ctx, c.storeKey(hash), encodeDAInclusion(daHeight, blockHeight)); err != nil { + return fmt.Errorf("failed to save DA inclusion for hash %s: %w", hash, err) } - return nil - }) + } - return wg.Wait() + return nil } -// LoadFromDisk loads the cache contents from disk from the specified folder. -// It populates the current cache instance. If files are missing, corresponding parts of the cache will be empty. -// It's the caller's responsibility to ensure that type T (and any types it contains) -// are registered with the gob package if necessary (e.g., using gob.Register). -func (c *Cache[T]) LoadFromDisk(folderPath string) error { - var wg errgroup.Group - - // load items by height - wg.Go(func() error { - itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename)) - if err != nil { - return fmt.Errorf("failed to load %s : %w", itemsByHeightFilename, err) - } - for k, v := range itemsByHeightMap { - c.itemsByHeight.Add(k, v) - } +// ClearFromStore removes all DA inclusion entries from the store for this cache. +func (c *Cache[T]) ClearFromStore(ctx context.Context, hashes []string) error { + if c.store == nil { return nil - }) - - // load hashes - wg.Go(func() error { - hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename)) - if err != nil { - return fmt.Errorf("failed to load %s : %w", hashesFilename, err) - } - for k, v := range hashesMap { - c.hashes.Add(k, v) - } - return nil - }) + } - // load daIncluded - wg.Go(func() error { - daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename)) - if err != nil { - return fmt.Errorf("failed to load %s : %w", daIncludedFilename, err) + for _, hash := range hashes { + if err := c.store.DeleteMetadata(ctx, c.storeKey(hash)); err != nil { + return fmt.Errorf("failed to delete DA inclusion for hash %s: %w", hash, err) } - for k, v := range daIncludedMap { - c.daIncluded.Add(k, v) - // Update max DA height during load - current := c.maxDAHeight.Load() - if v > current { - c.maxDAHeight.Store(v) - } - } - return nil - }) + } - return wg.Wait() + return nil } diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index 1c6fa8333..b4e8c2b79 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -1,22 +1,27 @@ package cache import ( - "encoding/gob" - "fmt" - "os" - "path/filepath" + "context" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/store" ) type testItem struct{ V int } -func init() { - gob.Register(&testItem{}) +// memStore creates an in-memory store for testing +func testMemStore(t *testing.T) store.Store { + ds, err := store.NewTestInMemoryKVStore() + require.NoError(t, err) + return store.New(ds) } // TestCache_MaxDAHeight verifies that daHeight tracks the maximum DA height func TestCache_MaxDAHeight(t *testing.T) { - c := NewCache[testItem]() + c := NewCache[testItem](nil, "") // Initially should be 0 if got := c.daHeight(); got != 0 { @@ -38,77 +43,88 @@ func TestCache_MaxDAHeight(t *testing.T) { if got := c.daHeight(); got != 200 { t.Errorf("after setDAIncluded(200): daHeight = %d, want 200", got) } +} - // Test persistence - dir := t.TempDir() - if err := c.SaveToDisk(dir); err != nil { - t.Fatalf("SaveToDisk failed: %v", err) - } +// TestCache_MaxDAHeight_WithStore verifies that daHeight is restored from store +func TestCache_MaxDAHeight_WithStore(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - c2 := NewCache[testItem]() - if err := c2.LoadFromDisk(dir); err != nil { - t.Fatalf("LoadFromDisk failed: %v", err) + c1 := NewCache[testItem](st, "test/da-included/") + + // Set DA included entries + c1.setDAIncluded("hash1", 100, 1) + c1.setDAIncluded("hash2", 200, 2) + c1.setDAIncluded("hash3", 150, 3) + + if got := c1.daHeight(); got != 200 { + t.Errorf("after setDAIncluded: daHeight = %d, want 200", got) } + err := c1.SaveToStore(ctx) + require.NoError(t, err) + + // Create new cache and restore from store + c2 := NewCache[testItem](st, "test/da-included/") + + // Restore with the known hashes + err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + require.NoError(t, err) + if got := c2.daHeight(); got != 200 { - t.Errorf("after load: daHeight = %d, want 200", got) + t.Errorf("after restore: daHeight = %d, want 200", got) } + + // Verify individual entries were restored + daHeight, ok := c2.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = c2.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(200), daHeight) + + daHeight, ok = c2.getDAIncluded("hash3") + assert.True(t, ok) + assert.Equal(t, uint64(150), daHeight) } -// TestCache_SaveLoad_ErrorPaths covers SaveToDisk and LoadFromDisk error scenarios. -func TestCache_SaveLoad_ErrorPaths(t *testing.T) { - c := NewCache[testItem]() - for i := 0; i < 5; i++ { - v := &testItem{V: i} - c.setItem(uint64(i), v) - c.setSeen(fmt.Sprintf("s%d", i), uint64(i)) - c.setDAIncluded(fmt.Sprintf("d%d", i), uint64(i), uint64(i)) - } +// TestCache_WithStorePersistence tests that DA inclusion is persisted to store +func TestCache_WithStorePersistence(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - // Normal save/load roundtrip - dir := t.TempDir() - if err := c.SaveToDisk(dir); err != nil { - t.Fatalf("SaveToDisk failed: %v", err) - } - c2 := NewCache[testItem]() - if err := c2.LoadFromDisk(dir); err != nil { - t.Fatalf("LoadFromDisk failed: %v", err) - } - // Spot-check a few values - if got := c2.getItem(3); got == nil || got.V != 3 { - t.Fatalf("roundtrip getItem mismatch: got %#v", got) - } + c1 := NewCache[testItem](st, "test/") - _, c2OK := c2.getDAIncluded("d2") + // Set DA inclusion + c1.setDAIncluded("hash1", 100, 1) + c1.setDAIncluded("hash2", 200, 2) - if !c2.isSeen("s1") || !c2OK { - t.Fatalf("roundtrip auxiliary maps mismatch") - } + err := c1.SaveToStore(ctx) + require.NoError(t, err) - // SaveToDisk error: path exists as a file - filePath := filepath.Join(t.TempDir(), "not_a_dir") - if err := os.WriteFile(filePath, []byte("x"), 0o600); err != nil { - t.Fatalf("failed to create file: %v", err) - } - if err := c.SaveToDisk(filePath); err == nil { - t.Fatalf("expected error when saving to a file path, got nil") - } + // Create new cache with same store and restore + c2 := NewCache[testItem](st, "test/") - // LoadFromDisk error: corrupt gob - badDir := t.TempDir() - badFile := filepath.Join(badDir, itemsByHeightFilename) - if err := os.WriteFile(badFile, []byte("not a gob"), 0o600); err != nil { - t.Fatalf("failed to write bad gob: %v", err) - } - c3 := NewCache[testItem]() - if err := c3.LoadFromDisk(badDir); err == nil { - t.Fatalf("expected error when loading corrupted gob, got nil") - } + err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + require.NoError(t, err) + + // hash1 and hash2 should be restored, hash3 should not exist + daHeight, ok := c2.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = c2.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(200), daHeight) + + _, ok = c2.getDAIncluded("hash3") + assert.False(t, ok) } // TestCache_LargeDataset covers edge cases with height index management at scale. func TestCache_LargeDataset(t *testing.T) { - c := NewCache[testItem]() + c := NewCache[testItem](nil, "") const N = 20000 // Insert in descending order to exercise insert positions for i := N - 1; i >= 0; i-- { @@ -120,3 +136,141 @@ func TestCache_LargeDataset(t *testing.T) { c.getNextItem(uint64(i)) } } + +// TestCache_BasicOperations tests basic cache operations +func TestCache_BasicOperations(t *testing.T) { + c := NewCache[testItem](nil, "") + + // Test setItem/getItem + item := &testItem{V: 42} + c.setItem(1, item) + got := c.getItem(1) + assert.NotNil(t, got) + assert.Equal(t, 42, got.V) + + // Test getItem for non-existent key + got = c.getItem(999) + assert.Nil(t, got) + + // Test setSeen/isSeen + assert.False(t, c.isSeen("hash1")) + c.setSeen("hash1", 1) + assert.True(t, c.isSeen("hash1")) + + // Test removeSeen + c.removeSeen("hash1") + assert.False(t, c.isSeen("hash1")) + + // Test setDAIncluded/getDAIncluded + _, ok := c.getDAIncluded("hash2") + assert.False(t, ok) + + c.setDAIncluded("hash2", 100, 2) + daHeight, ok := c.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + // Test removeDAIncluded + c.removeDAIncluded("hash2") + _, ok = c.getDAIncluded("hash2") + assert.False(t, ok) +} + +// TestCache_GetNextItem tests the atomic get-and-remove operation +func TestCache_GetNextItem(t *testing.T) { + c := NewCache[testItem](nil, "") + + // Set multiple items + c.setItem(1, &testItem{V: 1}) + c.setItem(2, &testItem{V: 2}) + c.setItem(3, &testItem{V: 3}) + + // Get and remove item at height 2 + got := c.getNextItem(2) + assert.NotNil(t, got) + assert.Equal(t, 2, got.V) + + // Item should be removed + got = c.getNextItem(2) + assert.Nil(t, got) + + // Other items should still exist + got = c.getItem(1) + assert.NotNil(t, got) + assert.Equal(t, 1, got.V) + + got = c.getItem(3) + assert.NotNil(t, got) + assert.Equal(t, 3, got.V) +} + +// TestCache_DeleteAllForHeight tests deleting all data for a specific height +func TestCache_DeleteAllForHeight(t *testing.T) { + c := NewCache[testItem](nil, "") + + // Set items at different heights + c.setItem(1, &testItem{V: 1}) + c.setItem(2, &testItem{V: 2}) + c.setSeen("hash1", 1) + c.setSeen("hash2", 2) + + // Delete height 1 + c.deleteAllForHeight(1) + + // Height 1 data should be gone + assert.Nil(t, c.getItem(1)) + assert.False(t, c.isSeen("hash1")) + + // Height 2 data should still exist + assert.NotNil(t, c.getItem(2)) + assert.True(t, c.isSeen("hash2")) +} + +// TestCacheWithConfig tests creating cache with custom config +func TestCache_WithNilStore(t *testing.T) { + // Cache without store should work fine + c := NewCache[testItem](nil, "") + require.NotNil(t, c) + + // Basic operations should work + c.setItem(1, &testItem{V: 1}) + got := c.getItem(1) + assert.NotNil(t, got) + assert.Equal(t, 1, got.V) + + // DA inclusion should work (just not persisted) + c.setDAIncluded("hash1", 100, 1) + daHeight, ok := c.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) +} + +// TestCache_SaveToStore tests the SaveToStore method +func TestCache_SaveToStore(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() + + c := NewCache[testItem](st, "save-test/") + + // Set some DA included entries + c.setDAIncluded("hash1", 100, 1) + c.setDAIncluded("hash2", 200, 2) + + // Save to store (should be a no-op since we persist on setDAIncluded) + err := c.SaveToStore(ctx) + require.NoError(t, err) + + // Verify data is in store by creating new cache and restoring + c2 := NewCache[testItem](st, "save-test/") + + err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2"}) + require.NoError(t, err) + + daHeight, ok := c2.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = c2.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(200), daHeight) +} diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index aec7726d1..96d727baa 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -2,10 +2,8 @@ package cache import ( "context" - "encoding/gob" + "encoding/binary" "fmt" - "os" - "path/filepath" "sync" "time" @@ -17,46 +15,33 @@ import ( "github.com/evstack/ev-node/types" ) -var ( - cacheDir = "cache" - headerCacheDir = filepath.Join(cacheDir, "header") - dataCacheDir = filepath.Join(cacheDir, "data") - pendingEventsCacheDir = filepath.Join(cacheDir, "pending_da_events") - txCacheDir = filepath.Join(cacheDir, "tx") +const ( + // Store key prefixes for different cache types + headerDAIncludedPrefix = "cache/header-da-included/" + dataDAIncludedPrefix = "cache/data-da-included/" // DefaultTxCacheRetention is the default time to keep transaction hashes in cache DefaultTxCacheRetention = 24 * time.Hour ) -// gobRegisterOnce ensures gob type registration happens exactly once process-wide. -var gobRegisterOnce sync.Once - -// registerGobTypes registers all concrete types that may be encoded/decoded by the cache. -// Gob registration is global and must not be performed repeatedly to avoid conflicts. -func registerGobTypes() { - gobRegisterOnce.Do(func() { - gob.Register(&types.SignedHeader{}) - gob.Register(&types.Data{}) - gob.Register(&common.DAHeightEvent{}) - }) -} - // CacheManager provides thread-safe cache operations for tracking seen blocks // and DA inclusion status during block execution and syncing. type CacheManager interface { + DaHeight() uint64 + // Header operations IsHeaderSeen(hash string) bool SetHeaderSeen(hash string, blockHeight uint64) GetHeaderDAIncluded(hash string) (uint64, bool) SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64) RemoveHeaderDAIncluded(hash string) - DaHeight() uint64 // Data operations IsDataSeen(hash string) bool SetDataSeen(hash string, blockHeight uint64) GetDataDAIncluded(hash string) (uint64, bool) SetDataDAIncluded(hash string, daHeight uint64, blockHeight uint64) + RemoveDataDAIncluded(hash string) // Transaction operations IsTxSeen(hash string) bool @@ -67,10 +52,9 @@ type CacheManager interface { GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent) - // Disk operations - SaveToDisk() error - LoadFromDisk() error - ClearFromDisk() error + // Store operations + SaveToStore() error + RestoreFromStore() error // Cleanup operations DeleteHeight(blockHeight uint64) @@ -105,83 +89,31 @@ type implementation struct { pendingEventsCache *Cache[common.DAHeightEvent] pendingHeaders *PendingHeaders pendingData *PendingData + store store.Store config config.Config logger zerolog.Logger } -// NewPendingManager creates a new pending manager instance -func NewPendingManager(store store.Store, logger zerolog.Logger) (PendingManager, error) { - pendingHeaders, err := NewPendingHeaders(store, logger) - if err != nil { - return nil, fmt.Errorf("failed to create pending headers: %w", err) - } - - pendingData, err := NewPendingData(store, logger) - if err != nil { - return nil, fmt.Errorf("failed to create pending data: %w", err) - } - - return &implementation{ - pendingHeaders: pendingHeaders, - pendingData: pendingData, - logger: logger, - }, nil -} - -// NewCacheManager creates a new cache manager instance -func NewCacheManager(cfg config.Config, logger zerolog.Logger) (CacheManager, error) { - // Initialize caches - headerCache := NewCache[types.SignedHeader]() - dataCache := NewCache[types.Data]() - txCache := NewCache[struct{}]() - pendingEventsCache := NewCache[common.DAHeightEvent]() - - registerGobTypes() - impl := &implementation{ - headerCache: headerCache, - dataCache: dataCache, - txCache: txCache, - txTimestamps: new(sync.Map), - pendingEventsCache: pendingEventsCache, - config: cfg, - logger: logger, - } - - if cfg.ClearCache { - // Clear the cache from disk - if err := impl.ClearFromDisk(); err != nil { - logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") - } - } else { - // Load existing cache from disk - if err := impl.LoadFromDisk(); err != nil { - logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache") - } - } - - return impl, nil -} - // NewManager creates a new cache manager instance -func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Manager, error) { - // Initialize caches - headerCache := NewCache[types.SignedHeader]() - dataCache := NewCache[types.Data]() - txCache := NewCache[struct{}]() - pendingEventsCache := NewCache[common.DAHeightEvent]() +func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { + // Initialize caches with store-based persistence for DA inclusion data + headerCache := NewCache[types.SignedHeader](st, headerDAIncludedPrefix) + dataCache := NewCache[types.Data](st, dataDAIncludedPrefix) + // TX cache and pending events cache don't need store persistence + txCache := NewCache[struct{}](nil, "") + pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") // Initialize pending managers - pendingHeaders, err := NewPendingHeaders(store, logger) + pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { return nil, fmt.Errorf("failed to create pending headers: %w", err) } - pendingData, err := NewPendingData(store, logger) + pendingData, err := NewPendingData(st, logger) if err != nil { return nil, fmt.Errorf("failed to create pending data: %w", err) } - registerGobTypes() impl := &implementation{ headerCache: headerCache, dataCache: dataCache, @@ -190,18 +122,19 @@ func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Ma pendingEventsCache: pendingEventsCache, pendingHeaders: pendingHeaders, pendingData: pendingData, + store: st, config: cfg, logger: logger, } if cfg.ClearCache { // Clear the cache from disk - if err := impl.ClearFromDisk(); err != nil { + if err := impl.ClearFromStore(); err != nil { logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") } } else { - // Load existing cache from disk - if err := impl.LoadFromDisk(); err != nil { + // Restore existing cache from store + if err := impl.RestoreFromStore(); err != nil { logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache") } } @@ -252,6 +185,10 @@ func (m *implementation) SetDataDAIncluded(hash string, daHeight uint64, blockHe m.dataCache.setDAIncluded(hash, daHeight, blockHeight) } +func (m *implementation) RemoveDataDAIncluded(hash string) { + m.dataCache.removeDAIncluded(hash) +} + // Transaction operations func (m *implementation) IsTxSeen(hash string) bool { return m.txCache.isSeen(hash) @@ -386,71 +323,130 @@ func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEven return m.pendingEventsCache.getNextItem(height) } -func (m *implementation) SaveToDisk() error { - cfgDir := filepath.Join(m.config.RootDir, "data") +// SaveToStore persists the DA inclusion cache to the store. +// DA inclusion data is persisted on every SetHeaderDAIncluded/SetDataDAIncluded call, +// so this method ensures any remaining data is flushed. +func (m *implementation) SaveToStore() error { + ctx := context.Background() - // Ensure gob types are registered before encoding - registerGobTypes() - - if err := m.headerCache.SaveToDisk(filepath.Join(cfgDir, headerCacheDir)); err != nil { - return fmt.Errorf("failed to save header cache to disk: %w", err) + if err := m.headerCache.SaveToStore(ctx); err != nil { + return fmt.Errorf("failed to save header cache to store: %w", err) } - if err := m.dataCache.SaveToDisk(filepath.Join(cfgDir, dataCacheDir)); err != nil { - return fmt.Errorf("failed to save data cache to disk: %w", err) + if err := m.dataCache.SaveToStore(ctx); err != nil { + return fmt.Errorf("failed to save data cache to store: %w", err) } - if err := m.txCache.SaveToDisk(filepath.Join(cfgDir, txCacheDir)); err != nil { - return fmt.Errorf("failed to save tx cache to disk: %w", err) - } + // TX cache and pending events are ephemeral - not persisted + return nil +} - if err := m.pendingEventsCache.SaveToDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil { - return fmt.Errorf("failed to save pending events cache to disk: %w", err) - } +// RestoreFromStore restores the DA inclusion cache from the store. +// This iterates through blocks in the store and checks for persisted DA inclusion data. +func (m *implementation) RestoreFromStore() error { + ctx := context.Background() - // Note: txTimestamps are not persisted to disk intentionally. - // On restart, all cached transactions will be treated as "new" for cleanup purposes, - // which is acceptable as they will be cleaned up on the next cleanup cycle if old enough. + // Get current store height to know how many blocks to check + height, err := m.store.Height(ctx) + if err != nil { + return fmt.Errorf("failed to get store height: %w", err) + } - return nil -} + if height == 0 { + return nil // No blocks to restore + } -func (m *implementation) LoadFromDisk() error { - // Ensure types are registered exactly once prior to decoding - registerGobTypes() + // Collect hashes from stored blocks + var headerHashes []string + var dataHashes []string - cfgDir := filepath.Join(m.config.RootDir, "data") + for h := uint64(1); h <= height; h++ { + header, data, err := m.store.GetBlockData(ctx, h) + if err != nil { + m.logger.Warn().Uint64("height", h).Err(err).Msg("failed to get block data during cache restore") + continue + } - if err := m.headerCache.LoadFromDisk(filepath.Join(cfgDir, headerCacheDir)); err != nil { - return fmt.Errorf("failed to load header cache from disk: %w", err) + if header != nil { + headerHashes = append(headerHashes, header.Hash().String()) + } + if data != nil { + dataHashes = append(dataHashes, data.DACommitment().String()) + } } - if err := m.dataCache.LoadFromDisk(filepath.Join(cfgDir, dataCacheDir)); err != nil { - return fmt.Errorf("failed to load data cache from disk: %w", err) + // Restore DA inclusion data from store + if err := m.headerCache.RestoreFromStore(ctx, headerHashes); err != nil { + return fmt.Errorf("failed to restore header cache from store: %w", err) } - if err := m.txCache.LoadFromDisk(filepath.Join(cfgDir, txCacheDir)); err != nil { - return fmt.Errorf("failed to load tx cache from disk: %w", err) + if err := m.dataCache.RestoreFromStore(ctx, dataHashes); err != nil { + return fmt.Errorf("failed to restore data cache from store: %w", err) } - if err := m.pendingEventsCache.LoadFromDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil { - return fmt.Errorf("failed to load pending events cache from disk: %w", err) + // Initialize DA height from store metadata to ensure DaHeight() is never 0. + m.initDAHeightFromStore(ctx) + + m.logger.Info(). + Int("header_hashes", len(headerHashes)). + Int("data_hashes", len(dataHashes)). + Uint64("da_height", m.DaHeight()). + Msg("restored DA inclusion cache from store") + + return nil +} + +// ClearFromStore clears in-memory caches and deletes DA inclusion entries from the store. +func (m *implementation) ClearFromStore() error { + ctx := context.Background() + + // Get hashes from current in-memory caches and delete from store + headerHashes := m.headerCache.daIncluded.Keys() + if err := m.headerCache.ClearFromStore(ctx, headerHashes); err != nil { + return fmt.Errorf("failed to clear header cache from store: %w", err) } - // After loading tx cache from disk, initialize timestamps for loaded transactions - // Set them to current time so they won't be immediately cleaned up - now := time.Now() - for _, hash := range m.txCache.hashes.Keys() { - m.txTimestamps.Store(hash, now) + dataHashes := m.dataCache.daIncluded.Keys() + if err := m.dataCache.ClearFromStore(ctx, dataHashes); err != nil { + return fmt.Errorf("failed to clear data cache from store: %w", err) } + // Clear in-memory caches by creating new ones + m.headerCache = NewCache[types.SignedHeader](m.store, headerDAIncludedPrefix) + m.dataCache = NewCache[types.Data](m.store, dataDAIncludedPrefix) + m.txCache = NewCache[struct{}](nil, "") + m.pendingEventsCache = NewCache[common.DAHeightEvent](nil, "") + + // Initialize DA height from store metadata to ensure DaHeight() is never 0. + m.initDAHeightFromStore(ctx) + return nil } -func (m *implementation) ClearFromDisk() error { - cachePath := filepath.Join(m.config.RootDir, "data", cacheDir) - if err := os.RemoveAll(cachePath); err != nil { - return fmt.Errorf("failed to clear cache from disk: %w", err) +// initDAHeightFromStore initializes the maxDAHeight in both header and data caches +// from the HeightToDAHeight store metadata (final da inclusion tracking). +func (m *implementation) initDAHeightFromStore(ctx context.Context) { + // Get the DA included height from store (last processed block height) + daIncludedHeightBytes, err := m.store.GetMetadata(ctx, store.DAIncludedHeightKey) + if err != nil || len(daIncludedHeightBytes) != 8 { + return + } + daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes) + if daIncludedHeight == 0 { + return + } + + // Get header DA height for the last included height + headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight) + if headerBytes, err := m.store.GetMetadata(ctx, headerKey); err == nil && len(headerBytes) == 8 { + headerDAHeight := binary.LittleEndian.Uint64(headerBytes) + m.headerCache.setMaxDAHeight(headerDAHeight) + } + + // Get data DA height for the last included height + dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight) + if dataBytes, err := m.store.GetMetadata(ctx, dataKey); err == nil && len(dataBytes) == 8 { + dataDAHeight := binary.LittleEndian.Uint64(dataBytes) + m.dataCache.setMaxDAHeight(dataDAHeight) } - return nil } diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 328a1de7d..6963c9eb0 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -2,8 +2,7 @@ package cache import ( "context" - "encoding/gob" - "path/filepath" + "encoding/binary" "testing" "time" @@ -87,54 +86,61 @@ func TestManager_PendingEventsCRUD(t *testing.T) { assert.Nil(t, got1Again) } -func TestManager_SaveAndLoadFromDisk(t *testing.T) { +func TestManager_SaveAndRestoreFromStore(t *testing.T) { t.Parallel() cfg := tempConfig(t) st := memStore(t) + ctx := context.Background() + + // First, we need to save some block data to the store so RestoreFromStore can find the hashes + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + h2, d2 := types.GetRandomBlock(2, 1, "test-chain") + + batch1, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch1.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch1.SetHeight(1)) + require.NoError(t, batch1.Commit()) - // must register for gob before saving - gob.Register(&types.SignedHeader{}) - gob.Register(&types.Data{}) - gob.Register(&common.DAHeightEvent{}) + batch2, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockData(h2, d2, &types.Signature{})) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.Commit()) m1, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // populate caches - hdr := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: "c", Height: 2}}} - dat := &types.Data{Metadata: &types.Metadata{ChainID: "c", Height: 2}} - m1.SetHeaderSeen("H2", 2) - m1.SetDataSeen("D2", 2) - m1.SetHeaderDAIncluded("H2", 100, 2) - m1.SetDataDAIncluded("D2", 101, 2) - m1.SetPendingEvent(2, &common.DAHeightEvent{Header: hdr, Data: dat, DaHeight: 99}) - - // persist - err = m1.SaveToDisk() + // Set DA inclusion for the blocks + m1.SetHeaderDAIncluded(h1.Hash().String(), 100, 1) + m1.SetDataDAIncluded(d1.DACommitment().String(), 100, 1) + m1.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) + m1.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) + + // Persist to store + err = m1.SaveToStore() require.NoError(t, err) - // create a fresh manager on same root and verify load + // Create a fresh manager on same store and verify restore m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // check loaded items - assert.True(t, m2.IsHeaderSeen("H2")) - assert.True(t, m2.IsDataSeen("D2")) - _, ok := m2.GetHeaderDAIncluded("H2") + // Check DA inclusion was restored + daHeight, ok := m2.GetHeaderDAIncluded(h1.Hash().String()) + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = m2.GetDataDAIncluded(d1.DACommitment().String()) assert.True(t, ok) - _, ok2 := m2.GetDataDAIncluded("D2") - assert.True(t, ok2) - - // Verify pending event was loaded - loadedEvent := m2.GetNextPendingEvent(2) - require.NotNil(t, loadedEvent) - assert.Equal(t, uint64(2), loadedEvent.Header.Height()) - - // directories exist under cfg.RootDir/data/cache/... - base := filepath.Join(cfg.RootDir, "data", "cache") - assert.DirExists(t, filepath.Join(base, "header")) - assert.DirExists(t, filepath.Join(base, "data")) - assert.DirExists(t, filepath.Join(base, "pending_da_events")) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = m2.GetHeaderDAIncluded(h2.Hash().String()) + assert.True(t, ok) + assert.Equal(t, uint64(101), daHeight) + + daHeight, ok = m2.GetDataDAIncluded(d2.DACommitment().String()) + assert.True(t, ok) + assert.Equal(t, uint64(101), daHeight) } func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) { @@ -340,7 +346,7 @@ func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) { assert.Equal(t, 0, removed) } -func TestManager_TxCache_PersistAndLoad(t *testing.T) { +func TestManager_TxCache_NotPersistedToStore(t *testing.T) { t.Parallel() cfg := tempConfig(t) st := memStore(t) @@ -355,20 +361,17 @@ func TestManager_TxCache_PersistAndLoad(t *testing.T) { assert.True(t, m1.IsTxSeen("persistent-tx1")) assert.True(t, m1.IsTxSeen("persistent-tx2")) - // Save to disk - err = m1.SaveToDisk() + // Save to store + err = m1.SaveToStore() require.NoError(t, err) - // Create new manager and verify transactions are loaded + // Create new manager - tx cache should be empty (not persisted) m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - assert.True(t, m2.IsTxSeen("persistent-tx1")) - assert.True(t, m2.IsTxSeen("persistent-tx2")) - - // Verify tx cache directory exists - txCacheDir := filepath.Join(cfg.RootDir, "data", "cache", "tx") - assert.DirExists(t, txCacheDir) + // TX cache is ephemeral and not persisted + assert.False(t, m2.IsTxSeen("persistent-tx1")) + assert.False(t, m2.IsTxSeen("persistent-tx2")) } func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { @@ -399,3 +402,127 @@ func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { // Transaction should still be present (height-independent) assert.True(t, m.IsTxSeen("tx-persistent")) } + +func TestManager_DAInclusionPersistence(t *testing.T) { + t.Parallel() + cfg := tempConfig(t) + st := memStore(t) + ctx := context.Background() + + // Create blocks and save to store + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + // Create manager and set DA inclusion + m1, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + headerHash := h1.Hash().String() + dataHash := d1.DACommitment().String() + + m1.SetHeaderDAIncluded(headerHash, 100, 1) + m1.SetDataDAIncluded(dataHash, 101, 1) + + // Verify DA height is tracked + assert.Equal(t, uint64(101), m1.DaHeight()) + + err = m1.SaveToStore() + require.NoError(t, err) + + // Create new manager - DA inclusion should be restored + m2, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + // DA inclusion should be restored from store + daHeight, ok := m2.GetHeaderDAIncluded(headerHash) + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = m2.GetDataDAIncluded(dataHash) + assert.True(t, ok) + assert.Equal(t, uint64(101), daHeight) + + // Max DA height should also be restored + assert.Equal(t, uint64(101), m2.DaHeight()) +} + +func TestManager_DaHeightAfterCacheClear(t *testing.T) { + t.Parallel() + + cfg := tempConfig(t) + st := memStore(t) + ctx := context.Background() + + // Store a block first + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + // Set up the HeightToDAHeight metadata (simulating what submitter does) + headerDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(headerDAHeightBytes, 150) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerDAHeightBytes)) + + dataDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(dataDAHeightBytes, 155) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataDAHeightBytes)) + + // Set DAIncludedHeightKey to indicate height 1 was DA included + daIncludedBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncludedBytes, 1) + require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, daIncludedBytes)) + + // Create manager with ClearCache = true + cfg.ClearCache = true + m, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + // DaHeight should NOT be 0 - it should be initialized from store metadata + assert.Equal(t, uint64(155), m.DaHeight(), "DaHeight should be initialized from HeightToDAHeight metadata even after cache clear") +} + +func TestManager_DaHeightFromStoreOnRestore(t *testing.T) { + t.Parallel() + + cfg := tempConfig(t) + st := memStore(t) + ctx := context.Background() + + // Store a block first + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + // Set up HeightToDAHeight metadata but NOT the cache entries + // This simulates a scenario where DA inclusion was processed but cache entries were lost + headerDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(headerDAHeightBytes, 200) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerDAHeightBytes)) + + dataDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(dataDAHeightBytes, 205) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataDAHeightBytes)) + + daIncludedBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncludedBytes, 1) + require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, daIncludedBytes)) + + // Create manager without ClearCache - should restore and init from metadata + cfg.ClearCache = false + m, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + // DaHeight should be the max from HeightToDAHeight metadata + assert.Equal(t, uint64(205), m.DaHeight(), "DaHeight should be initialized from HeightToDAHeight metadata on restore") +} diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index bd710669e..5700882e8 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -70,10 +70,14 @@ func newTestCache(t *testing.T) cache.CacheManager { t.Helper() cfg := config.Config{ - RootDir: t.TempDir(), - ClearCache: true, + RootDir: t.TempDir(), } - cacheManager, err := cache.NewCacheManager(cfg, zerolog.Nop()) + + // Create an in-memory store for the cache + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + + cacheManager, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) return cacheManager @@ -209,46 +213,3 @@ func TestReaper_SubmitTxs_SequencerError_NoPersistence_NoNotify(t *testing.T) { t.Fatal("did not expect notification on sequencer error") } } - -func TestReaper_CachePersistence(t *testing.T) { - // Test that transaction seen status persists to disk and can be loaded - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - - tx1 := []byte("persistent-tx") - - // Create cache with real store - tempDir := t.TempDir() - dataStore := dssync.MutexWrap(ds.NewMapDatastore()) - st := store.New(dataStore) - cfg := config.Config{ - RootDir: tempDir, - ClearCache: false, // Don't clear cache - } - cm, err := cache.NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - e := newTestExecutor(t) - r := newTestReaper(t, "chain-persist", mockExec, mockSeq, e, cm) - - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1}, nil).Once() - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). - Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - - require.NoError(t, r.SubmitTxs()) - - // Verify tx is marked as seen - assert.True(t, cm.IsTxSeen(hashTx(tx1))) - - // Save to disk - require.NoError(t, cm.SaveToDisk()) - - // Create new cache manager and load from disk - dataStore2 := dssync.MutexWrap(ds.NewMapDatastore()) - st2 := store.New(dataStore2) - cm2, err := cache.NewManager(cfg, st2, zerolog.Nop()) - require.NoError(t, err) - - // Verify the seen status was persisted - assert.True(t, cm2.IsTxSeen(hashTx(tx1))) -} diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index a2e0adcf7..bd8088f7b 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -234,8 +234,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { heights := make([]uint64, len(submitted)) for i, header := range submitted { - headerHash := header.Hash() - cache.SetHeaderDAIncluded(headerHash.String(), res.Height, header.Height()) + cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) heights[i] = header.Height() } if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 17f6ce9bc..1c5b034c1 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -318,7 +318,7 @@ func (s *Submitter) processDAInclusionLoop() { return case <-ticker.C: currentDAIncluded := s.GetDAIncludedHeight() - s.metrics.DAInclusionHeight.Set(float64(s.GetDAIncludedHeight())) + s.metrics.DAInclusionHeight.Set(float64(currentDAIncluded)) for { nextHeight := currentDAIncluded + 1 @@ -486,20 +486,25 @@ func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, // IsHeightDAIncluded checks if a height is included in DA func (s *Submitter) IsHeightDAIncluded(height uint64, header *types.SignedHeader, data *types.Data) (bool, error) { + // If height is at or below the DA included height, it was already processed + // and cache entries were cleared. We know it's DA included. + if height <= s.GetDAIncludedHeight() { + return true, nil + } + currentHeight, err := s.store.Height(s.ctx) if err != nil { return false, err } + if currentHeight < height { return false, nil } - headerHash := header.Hash().String() dataCommitment := data.DACommitment() - dataHash := dataCommitment.String() - _, headerIncluded := s.cache.GetHeaderDAIncluded(headerHash) - _, dataIncluded := s.cache.GetDataDAIncluded(dataHash) + _, headerIncluded := s.cache.GetHeaderDAIncluded(header.Hash().String()) + _, dataIncluded := s.cache.GetDataDAIncluded(dataCommitment.String()) dataIncluded = bytes.Equal(dataCommitment, common.DataHashForEmptyTxs) || dataIncluded diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 3e0e0b343..6420a16f7 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -114,16 +114,21 @@ func TestSubmitter_IsHeightDAIncluded(t *testing.T) { require.NoError(t, batch.SetHeight(5)) require.NoError(t, batch.Commit()) - s := &Submitter{store: st, cache: cm, logger: zerolog.Nop()} + daIncludedHeight := &atomic.Uint64{} + daIncludedHeight.Store(2) // heights 1 and 2 are already DA included and cache cleared + + s := &Submitter{store: st, cache: cm, logger: zerolog.Nop(), daIncludedHeight: daIncludedHeight} s.ctx = ctx h1, d1 := newHeaderAndData("chain", 3, true) h2, d2 := newHeaderAndData("chain", 4, true) + h3, d3 := newHeaderAndData("chain", 2, true) // already DA included, cache was cleared - cm.SetHeaderDAIncluded(h1.Hash().String(), 100, 2) - cm.SetDataDAIncluded(d1.DACommitment().String(), 100, 2) + cm.SetHeaderDAIncluded(h1.Hash().String(), 100, 3) + cm.SetDataDAIncluded(d1.DACommitment().String(), 100, 3) cm.SetHeaderDAIncluded(h2.Hash().String(), 101, 4) // no data for h2 + // no cache entries for h3/d3 - they were cleared after DA inclusion processing specs := map[string]struct { height uint64 @@ -132,9 +137,11 @@ func TestSubmitter_IsHeightDAIncluded(t *testing.T) { exp bool expErr bool }{ - "below store height and cached": {height: 3, header: h1, data: d1, exp: true}, - "above store height": {height: 6, header: h2, data: d2, exp: false}, - "data missing": {height: 4, header: h2, data: d2, exp: false}, + "below store height and cached": {height: 3, header: h1, data: d1, exp: true}, + "above store height": {height: 6, header: h2, data: d2, exp: false}, + "data missing": {height: 4, header: h2, data: d2, exp: false}, + "at daIncludedHeight - cache cleared": {height: 2, header: h3, data: d3, exp: true}, + "below daIncludedHeight - cache cleared": {height: 1, header: h3, data: d3, exp: true}, } for name, spec := range specs { @@ -528,15 +535,15 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { assert.False(t, cm.IsHeaderSeen(h2.Hash().String()), "height 2 header should be cleared from cache") assert.False(t, cm.IsDataSeen(d2.DACommitment().String()), "height 2 data should be cleared from cache") - // Verify DA inclusion status remains for processed heights + // Verify DA inclusion status is removed for processed heights (cleaned up after finalization) _, h1DAIncluded := cm.GetHeaderDAIncluded(h1.Hash().String()) _, d1DAIncluded := cm.GetDataDAIncluded(d1.DACommitment().String()) _, h2DAIncluded := cm.GetHeaderDAIncluded(h2.Hash().String()) _, d2DAIncluded := cm.GetDataDAIncluded(d2.DACommitment().String()) - assert.True(t, h1DAIncluded, "height 1 header DA inclusion status should remain") - assert.True(t, d1DAIncluded, "height 1 data DA inclusion status should remain") - assert.True(t, h2DAIncluded, "height 2 header DA inclusion status should remain") - assert.True(t, d2DAIncluded, "height 2 data DA inclusion status should remain") + assert.False(t, h1DAIncluded, "height 1 header DA inclusion status should be removed after finalization") + assert.False(t, d1DAIncluded, "height 1 data DA inclusion status should be removed after finalization") + assert.False(t, h2DAIncluded, "height 2 header DA inclusion status should be removed after finalization") + assert.False(t, d2DAIncluded, "height 2 data DA inclusion status should be removed after finalization") // Verify unprocessed height 3 cache remains intact assert.True(t, cm.IsHeaderSeen(h3.Hash().String()), "height 3 header should remain in cache") diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 8b27513a8..2270463b9 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -19,6 +21,7 @@ import ( datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/types" ) @@ -33,7 +36,11 @@ func newTestDARetriever(t *testing.T, mockClient *mocks.MockClient, cfg config.C cfg.DA.DataNamespace = "test-data-ns" } - cm, err := cache.NewCacheManager(cfg, zerolog.Nop()) + // Create an in-memory store for the cache + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + + cm, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) if mockClient == nil { diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 40c6876d8..a1cc6d993 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -18,6 +20,7 @@ import ( "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" + "github.com/evstack/ev-node/pkg/store" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -77,10 +80,14 @@ func setupP2P(t *testing.T) *P2PTestData { dataStoreMock := extmocks.NewMockStore[*types.P2PData](t) cfg := config.Config{ - RootDir: t.TempDir(), - ClearCache: true, + RootDir: t.TempDir(), } - cacheManager, err := cache.NewCacheManager(cfg, zerolog.Nop()) + + // Create an in-memory store for the cache + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + + cacheManager, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err, "failed to create cache manager") handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 88f46aafe..6df6b2c22 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto/sha256" - "encoding/binary" "encoding/hex" "errors" "fmt" @@ -346,9 +345,9 @@ func (s *Syncer) initializeState() error { } s.SetLastState(state) - // Set DA height to the maximum of the genesis start height, the state's DA height, the cached DA height, and the highest stored included DA height. - // This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height. - s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight, s.getHighestStoredDAHeight())) + // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. + // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. + s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight)) s.logger.Info(). Uint64("height", state.LastBlockHeight). @@ -724,8 +723,10 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish if err := s.ValidateBlock(ctx, currentState, data, header); err != nil { - // remove header as da included (not per se needed, but keep cache clean) + // remove header as da included from cache s.cache.RemoveHeaderDAIncluded(headerHash) + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } @@ -737,7 +738,10 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") if errors.Is(err, errMaliciousProposer) { + // remove header as da included from cache s.cache.RemoveHeaderDAIncluded(headerHash) + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + return err } } @@ -1208,39 +1212,6 @@ func (s *Syncer) sleepOrDone(duration time.Duration) bool { } } -// getHighestStoredDAHeight retrieves the highest DA height from the store by checking -// the DA heights stored for the last DA included height -// this relies on the node syncing with DA and setting included heights. -func (s *Syncer) getHighestStoredDAHeight() uint64 { - // Get the DA included height from store - daIncludedHeightBytes, err := s.store.GetMetadata(s.ctx, store.DAIncludedHeightKey) - if err != nil || len(daIncludedHeightBytes) != 8 { - return 0 - } - daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes) - if daIncludedHeight == 0 { - return 0 - } - - var highestDAHeight uint64 - - // Get header DA height for the last included height - headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight) - if headerBytes, err := s.store.GetMetadata(s.ctx, headerKey); err == nil && len(headerBytes) == 8 { - headerDAHeight := binary.LittleEndian.Uint64(headerBytes) - highestDAHeight = max(highestDAHeight, headerDAHeight) - } - - // Get data DA height for the last included height - dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight) - if dataBytes, err := s.store.GetMetadata(s.ctx, dataKey); err == nil && len(dataBytes) == 8 { - dataDAHeight := binary.LittleEndian.Uint64(dataBytes) - highestDAHeight = max(highestDAHeight, dataDAHeight) - } - - return highestDAHeight -} - type p2pWaitState struct { height uint64 cancel context.CancelFunc diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5edec1cce..85aa9a597 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -4,7 +4,6 @@ import ( "context" crand "crypto/rand" "crypto/sha512" - "encoding/binary" "errors" "sync/atomic" "testing" @@ -285,7 +284,7 @@ func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) // current height 1 @@ -329,7 +328,6 @@ func TestSyncLoopPersistState(t *testing.T) { cfg := config.DefaultConfig() t.Setenv("HOME", t.TempDir()) cfg.RootDir = t.TempDir() - cfg.ClearCache = true cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) @@ -425,7 +423,7 @@ func TestSyncLoopPersistState(t *testing.T) { require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load()) // wait for all events consumed - require.NoError(t, cm.SaveToDisk()) + require.NoError(t, cm.SaveToStore()) t.Log("processLoop on instance1 completed") // then @@ -442,7 +440,7 @@ func TestSyncLoopPersistState(t *testing.T) { // and when new instance is up on restart cm, err = cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) - require.NoError(t, cm.LoadFromDisk()) + require.NoError(t, cm.RestoreFromStore()) syncerInst2 := NewSyncer( st, @@ -610,9 +608,6 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) { nil, ) - // Mock GetMetadata calls for DA included height retrieval - mockStore.EXPECT().GetMetadata(mock.Anything, store.DAIncludedHeightKey).Return(nil, datastore.ErrNotFound) - // Mock Batch operations mockBatch := testmocks.NewMockBatch(t) mockBatch.Test(t) @@ -657,61 +652,6 @@ func requireEmptyChan(t *testing.T, errorCh chan error) { } } -func TestSyncer_getHighestStoredDAHeight(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - ctx := t.Context() - - syncer := &Syncer{ - store: st, - ctx: ctx, - logger: zerolog.Nop(), - } - - // Test case 1: No DA included height set - highestDA := syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(0), highestDA) - - // Test case 2: DA included height set, but no mappings - bz := make([]byte, 8) - binary.LittleEndian.PutUint64(bz, 1) - require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(0), highestDA) - - // Test case 3: DA included height with header mapping - headerBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(headerBytes, 100) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerBytes)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(100), highestDA) - - // Test case 4: DA included height with both header and data mappings (data is higher) - dataBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(dataBytes, 105) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataBytes)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(105), highestDA) - - // Test case 5: Advance to height 2 with higher DA heights - binary.LittleEndian.PutUint64(bz, 2) - require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz)) - - headerBytes2 := make([]byte, 8) - binary.LittleEndian.PutUint64(headerBytes2, 200) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(2), headerBytes2)) - - dataBytes2 := make([]byte, 8) - binary.LittleEndian.PutUint64(dataBytes2, 195) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(2), dataBytes2)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(200), highestDA, "should return highest DA height from most recent included height") -} - func TestProcessHeightEvent_TriggersAsyncDARetrieval(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/node/helpers_test.go b/node/helpers_test.go index d8f54c5cc..94989d2f7 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -89,8 +89,7 @@ func getTestConfig(t *testing.T, n int) evconfig.Config { // Use a higher base port to reduce chances of conflicts with system services startPort := 40000 // Spread port ranges further apart return evconfig.Config{ - RootDir: t.TempDir(), - ClearCache: true, // Clear cache between tests to avoid interference with other tests and slow shutdown on serialization + RootDir: t.TempDir(), Node: evconfig.NodeConfig{ Aggregator: true, BlockTime: evconfig.DurationWrapper{Duration: 100 * time.Millisecond}, diff --git a/pkg/store/store.go b/pkg/store/store.go index 40d00547b..a821e6a8d 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -190,6 +190,15 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err return data, nil } +// DeleteMetadata removes a metadata key from the store. +func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error { + err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key))) + if err != nil { + return fmt.Errorf("failed to delete metadata for key '%s': %w", key, err) + } + return nil +} + // Sync flushes the store state to disk. // Returns nil if the database has been closed (common during shutdown). func (s *DefaultStore) Sync(ctx context.Context) (err error) { diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index a63ffb782..04baf748f 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -193,6 +193,24 @@ func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) return nil } +func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error { + ctx, span := t.tracer.Start(ctx, "Store.DeleteMetadata", + trace.WithAttributes( + attribute.String("key", key), + ), + ) + defer span.End() + + err := t.inner.DeleteMetadata(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { ctx, span := t.tracer.Start(ctx, "Store.Rollback", trace.WithAttributes( diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index a5dca2417..5c273f661 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -27,6 +27,7 @@ type tracingMockStore struct { getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) getMetadataFn func(ctx context.Context, key string) ([]byte, error) setMetadataFn func(ctx context.Context, key string, value []byte) error + deleteMetadataFn func(ctx context.Context, key string) error rollbackFn func(ctx context.Context, height uint64, aggregator bool) error newBatchFn func(ctx context.Context) (Batch, error) } @@ -101,6 +102,13 @@ func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value [] return nil } +func (m *tracingMockStore) DeleteMetadata(ctx context.Context, key string) error { + if m.deleteMetadataFn != nil { + return m.deleteMetadataFn(ctx, key) + } + return nil +} + func (m *tracingMockStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { if m.rollbackFn != nil { return m.rollbackFn(ctx, height, aggregator) diff --git a/pkg/store/types.go b/pkg/store/types.go index bf1cb6ced..fa1ecdc92 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -39,6 +39,9 @@ type Store interface { // This method enables evolve to safely persist any information. SetMetadata(ctx context.Context, key string, value []byte) error + // DeleteMetadata removes a metadata key from the store. + DeleteMetadata(ctx context.Context, key string) error + // Close safely closes underlying data storage, to ensure that data is actually saved. Close() error diff --git a/test/mocks/store.go b/test/mocks/store.go index 7f2aa180d..79b3ac6f5 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -83,6 +83,63 @@ func (_c *MockStore_Close_Call) RunAndReturn(run func() error) *MockStore_Close_ return _c } +// DeleteMetadata provides a mock function for the type MockStore +func (_mock *MockStore) DeleteMetadata(ctx context.Context, key string) error { + ret := _mock.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for DeleteMetadata") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = returnFunc(ctx, key) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_DeleteMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteMetadata' +type MockStore_DeleteMetadata_Call struct { + *mock.Call +} + +// DeleteMetadata is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockStore_Expecter) DeleteMetadata(ctx interface{}, key interface{}) *MockStore_DeleteMetadata_Call { + return &MockStore_DeleteMetadata_Call{Call: _e.mock.On("DeleteMetadata", ctx, key)} +} + +func (_c *MockStore_DeleteMetadata_Call) Run(run func(ctx context.Context, key string)) *MockStore_DeleteMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStore_DeleteMetadata_Call) Return(err error) *MockStore_DeleteMetadata_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_DeleteMetadata_Call) RunAndReturn(run func(ctx context.Context, key string) error) *MockStore_DeleteMetadata_Call { + _c.Call.Return(run) + return _c +} + // GetBlockByHash provides a mock function for the type MockStore func (_mock *MockStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { ret := _mock.Called(ctx, hash)