From 4889de395a5f1b6234c5af5d30bb7e82512b4d7e Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 16:22:59 +0100 Subject: [PATCH 01/10] chore: log non included height --- block/internal/submitting/da_submitter.go | 3 +-- block/internal/submitting/submitter.go | 9 ++++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index a2e0adcf7..bd8088f7b 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -234,8 +234,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed func(submitted []*types.SignedHeader, res *datypes.ResultSubmit) { heights := make([]uint64, len(submitted)) for i, header := range submitted { - headerHash := header.Hash() - cache.SetHeaderDAIncluded(headerHash.String(), res.Height, header.Height()) + cache.SetHeaderDAIncluded(header.Hash().String(), res.Height, header.Height()) heights[i] = header.Height() } if err := s.headerDAHintAppender.AppendDAHint(ctx, res.Height, heights...); err != nil { diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 17f6ce9bc..4c85539b2 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -318,7 +318,7 @@ func (s *Submitter) processDAInclusionLoop() { return case <-ticker.C: currentDAIncluded := s.GetDAIncludedHeight() - s.metrics.DAInclusionHeight.Set(float64(s.GetDAIncludedHeight())) + s.metrics.DAInclusionHeight.Set(float64(currentDAIncluded)) for { nextHeight := currentDAIncluded + 1 @@ -331,6 +331,7 @@ func (s *Submitter) processDAInclusionLoop() { // Check if this height is DA included if included, err := s.IsHeightDAIncluded(nextHeight, header, data); err != nil || !included { + s.logger.Debug().Uint64("height", nextHeight).Msg("height not yet DA included") break } @@ -494,12 +495,10 @@ func (s *Submitter) IsHeightDAIncluded(height uint64, header *types.SignedHeader return false, nil } - headerHash := header.Hash().String() dataCommitment := data.DACommitment() - dataHash := dataCommitment.String() - _, headerIncluded := s.cache.GetHeaderDAIncluded(headerHash) - _, dataIncluded := s.cache.GetDataDAIncluded(dataHash) + _, headerIncluded := s.cache.GetHeaderDAIncluded(header.Hash().String()) + _, dataIncluded := s.cache.GetDataDAIncluded(dataCommitment.String()) dataIncluded = bytes.Equal(dataCommitment, common.DataHashForEmptyTxs) || dataIncluded From c6c14d2b003b85c48ea05622670167bd467ac84a Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 17:13:34 +0100 Subject: [PATCH 02/10] remove gob cache --- apps/evm/cmd/rollback.go | 3 - apps/testapp/cmd/rollback.go | 3 - block/components.go | 2 +- block/internal/cache/generic_cache.go | 283 +++++++------------- block/internal/cache/generic_cache_test.go | 270 ++++++++++++++----- block/internal/cache/manager.go | 206 +++++++------- block/internal/cache/manager_test.go | 158 +++++++---- block/internal/reaping/reaper_test.go | 53 +--- block/internal/syncing/da_retriever_test.go | 9 +- block/internal/syncing/p2p_handler_test.go | 13 +- block/internal/syncing/syncer_test.go | 7 +- node/helpers_test.go | 3 +- pkg/config/config.go | 6 +- pkg/config/config_test.go | 3 +- pkg/store/store.go | 9 + pkg/store/tracing.go | 18 ++ pkg/store/tracing_test.go | 8 + pkg/store/types.go | 3 + test/mocks/store.go | 57 ++++ 19 files changed, 646 insertions(+), 468 deletions(-) diff --git a/apps/evm/cmd/rollback.go b/apps/evm/cmd/rollback.go index 3f11ef8d4..3b56c152c 100644 --- a/apps/evm/cmd/rollback.go +++ b/apps/evm/cmd/rollback.go @@ -80,9 +80,6 @@ func NewRollbackCmd() *cobra.Command { } cmd.Printf("Rolled back ev-node state to height %d\n", height) - if syncNode { - fmt.Println("Restart the node with the `--evnode.clear_cache` flag") - } return nil }, diff --git a/apps/testapp/cmd/rollback.go b/apps/testapp/cmd/rollback.go index 6326f0fd2..187cef713 100644 --- a/apps/testapp/cmd/rollback.go +++ b/apps/testapp/cmd/rollback.go @@ -75,9 +75,6 @@ func NewRollbackCmd() *cobra.Command { } fmt.Printf("Rolled back ev-node state to height %d\n", height) - if syncNode { - fmt.Println("Restart the node with the `--evnode.clear_cache` flag") - } return nil }, diff --git a/block/components.go b/block/components.go index 903402cb1..d5af466ef 100644 --- a/block/components.go +++ b/block/components.go @@ -112,7 +112,7 @@ func (bc *Components) Stop() error { } } if bc.Cache != nil { - if err := bc.Cache.SaveToDisk(); err != nil { + if err := bc.Cache.SaveToStore(); err != nil { errs = errors.Join(errs, fmt.Errorf("failed to save caches: %w", err)) } } diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 97cd197c6..ee7ef1cf9 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -1,18 +1,16 @@ package cache import ( - "bufio" - "encoding/gob" - "errors" + "context" + "encoding/binary" "fmt" - "os" - "path/filepath" "sync" "sync/atomic" lru "github.com/hashicorp/golang-lru/v2" - "golang.org/x/sync/errgroup" + + "github.com/evstack/ev-node/pkg/store" ) const ( @@ -47,62 +45,33 @@ type Cache[T any] struct { // maxDAHeight tracks the maximum DA height seen maxDAHeight *atomic.Uint64 -} - -// CacheConfig holds configuration for cache sizes. -type CacheConfig struct { - ItemsCacheSize int - HashesCacheSize int - DAIncludedCacheSize int -} -// DefaultCacheConfig returns the default cache configuration. -func DefaultCacheConfig() CacheConfig { - return CacheConfig{ - ItemsCacheSize: DefaultItemsCacheSize, - HashesCacheSize: DefaultHashesCacheSize, - DAIncludedCacheSize: DefaultDAIncludedCacheSize, - } -} + // store is used for persisting DA inclusion data (optional, can be nil for ephemeral caches) + store store.Store -// NewCache returns a new Cache struct with default sizes -func NewCache[T any]() *Cache[T] { - cache, _ := NewCacheWithConfig[T](DefaultCacheConfig()) - return cache + // storeKeyPrefix is the prefix used for store keys (allows different caches to use different namespaces) + storeKeyPrefix string } -// NewCacheWithConfig returns a new Cache struct with custom sizes -func NewCacheWithConfig[T any](config CacheConfig) (*Cache[T], error) { - itemsCache, err := lru.New[uint64, *T](config.ItemsCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create items cache: %w", err) - } - - hashesCache, err := lru.New[string, bool](config.HashesCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create hashes cache: %w", err) - } - - daIncludedCache, err := lru.New[string, uint64](config.DAIncludedCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create daIncluded cache: %w", err) - } - +// NewCache returns a new Cache struct with default sizes. +// If store and keyPrefix are provided, DA inclusion data will be persisted to the store for populating the cache on restarts. +func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { + // LRU cache creation only fails if size <= 0, which won't happen with our defaults + itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) + hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) + daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) // hashByHeight must be at least as large as hashes cache to ensure proper pruning. - // If an entry is evicted from hashByHeight before hashes, the corresponding hash - // entry can no longer be pruned by height, causing a slow memory leak. - hashByHeightCache, err := lru.New[uint64, string](config.HashesCacheSize) - if err != nil { - return nil, fmt.Errorf("failed to create hashByHeight cache: %w", err) - } + hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) return &Cache[T]{ - itemsByHeight: itemsCache, - hashes: hashesCache, - daIncluded: daIncludedCache, - hashByHeight: hashByHeightCache, - maxDAHeight: &atomic.Uint64{}, - }, nil + itemsByHeight: itemsCache, + hashes: hashesCache, + daIncluded: daIncludedCache, + hashByHeight: hashByHeightCache, + maxDAHeight: &atomic.Uint64{}, + store: s, + storeKeyPrefix: keyPrefix, + } } // getItem returns an item from the cache by height. @@ -158,11 +127,20 @@ func (c *Cache[T]) getDAIncluded(hash string) (uint64, bool) { return daHeight, true } -// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning +// setDAIncluded sets the hash as DA-included with the given DA height and tracks block height for pruning. func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) + // Persist to store if configured + if c.store != nil { + key := c.storeKeyPrefix + hash + value := make([]byte, 16) // 8 bytes for daHeight + 8 bytes for blockHeight + binary.LittleEndian.PutUint64(value[0:8], daHeight) + binary.LittleEndian.PutUint64(value[8:16], blockHeight) + _ = c.store.SetMetadata(context.Background(), key, value) + } + // Update max DA height if necessary for range 1_000 { current := c.maxDAHeight.Load() @@ -208,156 +186,91 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { } } -const ( - itemsByHeightFilename = "items_by_height.gob" - hashesFilename = "hashes.gob" - daIncludedFilename = "da_included.gob" -) - -// saveMapGob saves a map to a file using gob encoding. -func saveMapGob[K comparable, V any](filePath string, data map[K]V) (err error) { - file, err := os.Create(filePath) - if err != nil { - return fmt.Errorf("failed to create file %s: %w", filePath, err) +// RestoreFromStore loads DA inclusion data from the store into the in-memory cache. +// This should be called during initialization to restore persisted state. +// It iterates through store metadata keys with the cache's prefix and populates the LRU cache. +func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error { + if c.store == nil { + return nil // No store configured, nothing to restore } - writer := bufio.NewWriter(file) - defer func() { - err = errors.Join(err, writer.Flush(), file.Sync(), file.Close()) - }() - if err := gob.NewEncoder(writer).Encode(data); err != nil { - return fmt.Errorf("failed to encode to file %s: %w", filePath, err) - } - return nil -} + for _, hash := range hashes { + key := c.storeKeyPrefix + hash + value, err := c.store.GetMetadata(ctx, key) + if err != nil { + // Key not found is not an error - the hash may not have been DA included yet + continue + } + if len(value) != 16 { + continue // Invalid data, skip + } + + daHeight := binary.LittleEndian.Uint64(value[0:8]) + blockHeight := binary.LittleEndian.Uint64(value[8:16]) -// loadMapGob loads a map from a file using gob encoding. -// if the file does not exist, it returns an empty map and no error. -func loadMapGob[K comparable, V any](filePath string) (map[K]V, error) { - m := make(map[K]V) - file, err := os.Open(filePath) - if err != nil { - if os.IsNotExist(err) { - return m, nil // return empty map if file not found + c.daIncluded.Add(hash, daHeight) + c.hashByHeight.Add(blockHeight, hash) + + // Update max DA height + current := c.maxDAHeight.Load() + if daHeight > current { + c.maxDAHeight.Store(daHeight) } - return nil, fmt.Errorf("failed to open file %s: %w", filePath, err) } - defer file.Close() - decoder := gob.NewDecoder(file) - if err := decoder.Decode(&m); err != nil { - return nil, fmt.Errorf("failed to decode file %s: %w", filePath, err) - } - return m, nil + return nil } -// SaveToDisk saves the cache contents to disk in the specified folder. -// It's the caller's responsibility to ensure that type T (and any types it contains) -// are registered with the gob package if necessary (e.g., using gob.Register). -func (c *Cache[T]) SaveToDisk(folderPath string) error { - if err := os.MkdirAll(folderPath, 0o755); err != nil { - return fmt.Errorf("failed to create directory %s: %w", folderPath, err) +// SaveToStore persists all current DA inclusion entries to the store. +// This can be called before shutdown to ensure all data is persisted. +func (c *Cache[T]) SaveToStore(ctx context.Context) error { + if c.store == nil { + return nil // No store configured } - var wg errgroup.Group - - // save items by height - wg.Go(func() error { - itemsByHeightMap := make(map[uint64]*T) - keys := c.itemsByHeight.Keys() - for _, k := range keys { - if v, ok := c.itemsByHeight.Peek(k); ok { - itemsByHeightMap[k] = v - } - } - if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil { - return fmt.Errorf("save %s: %w", itemsByHeightFilename, err) - } - return nil - }) - - // save hashes - wg.Go(func() error { - hashesToSave := make(map[string]bool) - keys := c.hashes.Keys() - for _, k := range keys { - if v, ok := c.hashes.Peek(k); ok { - hashesToSave[k] = v - } + keys := c.daIncluded.Keys() + for _, hash := range keys { + daHeight, ok := c.daIncluded.Peek(hash) + if !ok { + continue } - if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil { - return fmt.Errorf("save %s: %w", hashesFilename, err) - } - return nil - }) - - // save daIncluded - wg.Go(func() error { - daIncludedToSave := make(map[string]uint64) - keys := c.daIncluded.Keys() - for _, k := range keys { - if v, ok := c.daIncluded.Peek(k); ok { - daIncludedToSave[k] = v + // We need to find the block height for this hash + // Since we track hash by height, we need to iterate + var blockHeight uint64 + heightKeys := c.hashByHeight.Keys() + for _, h := range heightKeys { + if storedHash, ok := c.hashByHeight.Peek(h); ok && storedHash == hash { + blockHeight = h + break } } - if err := saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave); err != nil { - return fmt.Errorf("save %s: %w", daIncludedFilename, err) + key := c.storeKeyPrefix + hash + value := make([]byte, 16) + binary.LittleEndian.PutUint64(value[0:8], daHeight) + binary.LittleEndian.PutUint64(value[8:16], blockHeight) + + if err := c.store.SetMetadata(ctx, key, value); err != nil { + return fmt.Errorf("failed to save DA inclusion for hash %s: %w", hash, err) } - return nil - }) + } - return wg.Wait() + return nil } -// LoadFromDisk loads the cache contents from disk from the specified folder. -// It populates the current cache instance. If files are missing, corresponding parts of the cache will be empty. -// It's the caller's responsibility to ensure that type T (and any types it contains) -// are registered with the gob package if necessary (e.g., using gob.Register). -func (c *Cache[T]) LoadFromDisk(folderPath string) error { - var wg errgroup.Group - - // load items by height - wg.Go(func() error { - itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename)) - if err != nil { - return fmt.Errorf("failed to load %s : %w", itemsByHeightFilename, err) - } - for k, v := range itemsByHeightMap { - c.itemsByHeight.Add(k, v) - } - return nil - }) - - // load hashes - wg.Go(func() error { - hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename)) - if err != nil { - return fmt.Errorf("failed to load %s : %w", hashesFilename, err) - } - for k, v := range hashesMap { - c.hashes.Add(k, v) - } +// ClearFromStore removes all DA inclusion entries from the store for this cache. +func (c *Cache[T]) ClearFromStore(ctx context.Context, hashes []string) error { + if c.store == nil { return nil - }) + } - // load daIncluded - wg.Go(func() error { - daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename)) - if err != nil { - return fmt.Errorf("failed to load %s : %w", daIncludedFilename, err) - } - for k, v := range daIncludedMap { - c.daIncluded.Add(k, v) - // Update max DA height during load - current := c.maxDAHeight.Load() - if v > current { - c.maxDAHeight.Store(v) - } + for _, hash := range hashes { + key := c.storeKeyPrefix + hash + if err := c.store.DeleteMetadata(ctx, key); err != nil { + return fmt.Errorf("failed to delete DA inclusion for hash %s: %w", hash, err) } - return nil - }) + } - return wg.Wait() + return nil } diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index 1c6fa8333..a6e7c6054 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -1,22 +1,27 @@ package cache import ( - "encoding/gob" - "fmt" - "os" - "path/filepath" + "context" "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/pkg/store" ) type testItem struct{ V int } -func init() { - gob.Register(&testItem{}) +// memStore creates an in-memory store for testing +func testMemStore(t *testing.T) store.Store { + ds, err := store.NewTestInMemoryKVStore() + require.NoError(t, err) + return store.New(ds) } // TestCache_MaxDAHeight verifies that daHeight tracks the maximum DA height func TestCache_MaxDAHeight(t *testing.T) { - c := NewCache[testItem]() + c := NewCache[testItem](nil, "") // Initially should be 0 if got := c.daHeight(); got != 0 { @@ -38,77 +43,82 @@ func TestCache_MaxDAHeight(t *testing.T) { if got := c.daHeight(); got != 200 { t.Errorf("after setDAIncluded(200): daHeight = %d, want 200", got) } +} - // Test persistence - dir := t.TempDir() - if err := c.SaveToDisk(dir); err != nil { - t.Fatalf("SaveToDisk failed: %v", err) - } +// TestCache_MaxDAHeight_WithStore verifies that daHeight is restored from store +func TestCache_MaxDAHeight_WithStore(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - c2 := NewCache[testItem]() - if err := c2.LoadFromDisk(dir); err != nil { - t.Fatalf("LoadFromDisk failed: %v", err) + c1 := NewCache[testItem](st, "test/da-included/") + + // Set DA included entries + c1.setDAIncluded("hash1", 100, 1) + c1.setDAIncluded("hash2", 200, 2) + c1.setDAIncluded("hash3", 150, 3) + + if got := c1.daHeight(); got != 200 { + t.Errorf("after setDAIncluded: daHeight = %d, want 200", got) } + // Create new cache and restore from store + c2 := NewCache[testItem](st, "test/da-included/") + + // Restore with the known hashes + err := c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + require.NoError(t, err) + if got := c2.daHeight(); got != 200 { - t.Errorf("after load: daHeight = %d, want 200", got) + t.Errorf("after restore: daHeight = %d, want 200", got) } + + // Verify individual entries were restored + daHeight, ok := c2.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = c2.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(200), daHeight) + + daHeight, ok = c2.getDAIncluded("hash3") + assert.True(t, ok) + assert.Equal(t, uint64(150), daHeight) } -// TestCache_SaveLoad_ErrorPaths covers SaveToDisk and LoadFromDisk error scenarios. -func TestCache_SaveLoad_ErrorPaths(t *testing.T) { - c := NewCache[testItem]() - for i := 0; i < 5; i++ { - v := &testItem{V: i} - c.setItem(uint64(i), v) - c.setSeen(fmt.Sprintf("s%d", i), uint64(i)) - c.setDAIncluded(fmt.Sprintf("d%d", i), uint64(i), uint64(i)) - } +// TestCache_WithStorePersistence tests that DA inclusion is persisted to store +func TestCache_WithStorePersistence(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() - // Normal save/load roundtrip - dir := t.TempDir() - if err := c.SaveToDisk(dir); err != nil { - t.Fatalf("SaveToDisk failed: %v", err) - } - c2 := NewCache[testItem]() - if err := c2.LoadFromDisk(dir); err != nil { - t.Fatalf("LoadFromDisk failed: %v", err) - } - // Spot-check a few values - if got := c2.getItem(3); got == nil || got.V != 3 { - t.Fatalf("roundtrip getItem mismatch: got %#v", got) - } + c1 := NewCache[testItem](st, "test/") - _, c2OK := c2.getDAIncluded("d2") + // Set DA inclusion - this should persist to store immediately + c1.setDAIncluded("hash1", 100, 1) + c1.setDAIncluded("hash2", 200, 2) - if !c2.isSeen("s1") || !c2OK { - t.Fatalf("roundtrip auxiliary maps mismatch") - } + // Create new cache with same store and restore + c2 := NewCache[testItem](st, "test/") - // SaveToDisk error: path exists as a file - filePath := filepath.Join(t.TempDir(), "not_a_dir") - if err := os.WriteFile(filePath, []byte("x"), 0o600); err != nil { - t.Fatalf("failed to create file: %v", err) - } - if err := c.SaveToDisk(filePath); err == nil { - t.Fatalf("expected error when saving to a file path, got nil") - } + err := c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + require.NoError(t, err) - // LoadFromDisk error: corrupt gob - badDir := t.TempDir() - badFile := filepath.Join(badDir, itemsByHeightFilename) - if err := os.WriteFile(badFile, []byte("not a gob"), 0o600); err != nil { - t.Fatalf("failed to write bad gob: %v", err) - } - c3 := NewCache[testItem]() - if err := c3.LoadFromDisk(badDir); err == nil { - t.Fatalf("expected error when loading corrupted gob, got nil") - } + // hash1 and hash2 should be restored, hash3 should not exist + daHeight, ok := c2.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = c2.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(200), daHeight) + + _, ok = c2.getDAIncluded("hash3") + assert.False(t, ok) } // TestCache_LargeDataset covers edge cases with height index management at scale. func TestCache_LargeDataset(t *testing.T) { - c := NewCache[testItem]() + c := NewCache[testItem](nil, "") const N = 20000 // Insert in descending order to exercise insert positions for i := N - 1; i >= 0; i-- { @@ -120,3 +130,141 @@ func TestCache_LargeDataset(t *testing.T) { c.getNextItem(uint64(i)) } } + +// TestCache_BasicOperations tests basic cache operations +func TestCache_BasicOperations(t *testing.T) { + c := NewCache[testItem](nil, "") + + // Test setItem/getItem + item := &testItem{V: 42} + c.setItem(1, item) + got := c.getItem(1) + assert.NotNil(t, got) + assert.Equal(t, 42, got.V) + + // Test getItem for non-existent key + got = c.getItem(999) + assert.Nil(t, got) + + // Test setSeen/isSeen + assert.False(t, c.isSeen("hash1")) + c.setSeen("hash1", 1) + assert.True(t, c.isSeen("hash1")) + + // Test removeSeen + c.removeSeen("hash1") + assert.False(t, c.isSeen("hash1")) + + // Test setDAIncluded/getDAIncluded + _, ok := c.getDAIncluded("hash2") + assert.False(t, ok) + + c.setDAIncluded("hash2", 100, 2) + daHeight, ok := c.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + // Test removeDAIncluded + c.removeDAIncluded("hash2") + _, ok = c.getDAIncluded("hash2") + assert.False(t, ok) +} + +// TestCache_GetNextItem tests the atomic get-and-remove operation +func TestCache_GetNextItem(t *testing.T) { + c := NewCache[testItem](nil, "") + + // Set multiple items + c.setItem(1, &testItem{V: 1}) + c.setItem(2, &testItem{V: 2}) + c.setItem(3, &testItem{V: 3}) + + // Get and remove item at height 2 + got := c.getNextItem(2) + assert.NotNil(t, got) + assert.Equal(t, 2, got.V) + + // Item should be removed + got = c.getNextItem(2) + assert.Nil(t, got) + + // Other items should still exist + got = c.getItem(1) + assert.NotNil(t, got) + assert.Equal(t, 1, got.V) + + got = c.getItem(3) + assert.NotNil(t, got) + assert.Equal(t, 3, got.V) +} + +// TestCache_DeleteAllForHeight tests deleting all data for a specific height +func TestCache_DeleteAllForHeight(t *testing.T) { + c := NewCache[testItem](nil, "") + + // Set items at different heights + c.setItem(1, &testItem{V: 1}) + c.setItem(2, &testItem{V: 2}) + c.setSeen("hash1", 1) + c.setSeen("hash2", 2) + + // Delete height 1 + c.deleteAllForHeight(1) + + // Height 1 data should be gone + assert.Nil(t, c.getItem(1)) + assert.False(t, c.isSeen("hash1")) + + // Height 2 data should still exist + assert.NotNil(t, c.getItem(2)) + assert.True(t, c.isSeen("hash2")) +} + +// TestCacheWithConfig tests creating cache with custom config +func TestCache_WithNilStore(t *testing.T) { + // Cache without store should work fine + c := NewCache[testItem](nil, "") + require.NotNil(t, c) + + // Basic operations should work + c.setItem(1, &testItem{V: 1}) + got := c.getItem(1) + assert.NotNil(t, got) + assert.Equal(t, 1, got.V) + + // DA inclusion should work (just not persisted) + c.setDAIncluded("hash1", 100, 1) + daHeight, ok := c.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) +} + +// TestCache_SaveToStore tests the SaveToStore method +func TestCache_SaveToStore(t *testing.T) { + st := testMemStore(t) + ctx := context.Background() + + c := NewCache[testItem](st, "save-test/") + + // Set some DA included entries + c.setDAIncluded("hash1", 100, 1) + c.setDAIncluded("hash2", 200, 2) + + // Save to store (should be a no-op since we persist on setDAIncluded) + err := c.SaveToStore(ctx) + require.NoError(t, err) + + // Verify data is in store by creating new cache and restoring + c2 := NewCache[testItem](st, "save-test/") + + err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2"}) + require.NoError(t, err) + + daHeight, ok := c2.getDAIncluded("hash1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = c2.getDAIncluded("hash2") + assert.True(t, ok) + assert.Equal(t, uint64(200), daHeight) +} diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index aec7726d1..97df14890 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -2,10 +2,7 @@ package cache import ( "context" - "encoding/gob" "fmt" - "os" - "path/filepath" "sync" "time" @@ -17,30 +14,15 @@ import ( "github.com/evstack/ev-node/types" ) -var ( - cacheDir = "cache" - headerCacheDir = filepath.Join(cacheDir, "header") - dataCacheDir = filepath.Join(cacheDir, "data") - pendingEventsCacheDir = filepath.Join(cacheDir, "pending_da_events") - txCacheDir = filepath.Join(cacheDir, "tx") +const ( + // Store key prefixes for different cache types + headerDAIncludedPrefix = "cache/header-da-included/" + dataDAIncludedPrefix = "cache/data-da-included/" // DefaultTxCacheRetention is the default time to keep transaction hashes in cache DefaultTxCacheRetention = 24 * time.Hour ) -// gobRegisterOnce ensures gob type registration happens exactly once process-wide. -var gobRegisterOnce sync.Once - -// registerGobTypes registers all concrete types that may be encoded/decoded by the cache. -// Gob registration is global and must not be performed repeatedly to avoid conflicts. -func registerGobTypes() { - gobRegisterOnce.Do(func() { - gob.Register(&types.SignedHeader{}) - gob.Register(&types.Data{}) - gob.Register(&common.DAHeightEvent{}) - }) -} - // CacheManager provides thread-safe cache operations for tracking seen blocks // and DA inclusion status during block execution and syncing. type CacheManager interface { @@ -67,10 +49,9 @@ type CacheManager interface { GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent) - // Disk operations - SaveToDisk() error - LoadFromDisk() error - ClearFromDisk() error + // Store operations + SaveToStore() error + RestoreFromStore() error // Cleanup operations DeleteHeight(blockHeight uint64) @@ -105,6 +86,7 @@ type implementation struct { pendingEventsCache *Cache[common.DAHeightEvent] pendingHeaders *PendingHeaders pendingData *PendingData + store store.Store config config.Config logger zerolog.Logger } @@ -129,59 +111,53 @@ func NewPendingManager(store store.Store, logger zerolog.Logger) (PendingManager } // NewCacheManager creates a new cache manager instance -func NewCacheManager(cfg config.Config, logger zerolog.Logger) (CacheManager, error) { - // Initialize caches - headerCache := NewCache[types.SignedHeader]() - dataCache := NewCache[types.Data]() - txCache := NewCache[struct{}]() - pendingEventsCache := NewCache[common.DAHeightEvent]() - - registerGobTypes() +func NewCacheManager(cfg config.Config, st store.Store, logger zerolog.Logger) (CacheManager, error) { + // Initialize caches with store-based persistence for DA inclusion data + headerCache := NewCache[types.SignedHeader](st, headerDAIncludedPrefix) + dataCache := NewCache[types.Data](st, dataDAIncludedPrefix) + // TX cache and pending events cache don't need store persistence + txCache := NewCache[struct{}](nil, "") + pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") + impl := &implementation{ headerCache: headerCache, dataCache: dataCache, txCache: txCache, txTimestamps: new(sync.Map), pendingEventsCache: pendingEventsCache, + store: st, config: cfg, logger: logger, } - if cfg.ClearCache { - // Clear the cache from disk - if err := impl.ClearFromDisk(); err != nil { - logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") - } - } else { - // Load existing cache from disk - if err := impl.LoadFromDisk(); err != nil { - logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache") - } + // Restore existing cache from store + if err := impl.RestoreFromStore(); err != nil { + logger.Warn().Err(err).Msg("failed to restore cache from store, starting with empty cache") } return impl, nil } // NewManager creates a new cache manager instance -func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Manager, error) { - // Initialize caches - headerCache := NewCache[types.SignedHeader]() - dataCache := NewCache[types.Data]() - txCache := NewCache[struct{}]() - pendingEventsCache := NewCache[common.DAHeightEvent]() +func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { + // Initialize caches with store-based persistence for DA inclusion data + headerCache := NewCache[types.SignedHeader](st, headerDAIncludedPrefix) + dataCache := NewCache[types.Data](st, dataDAIncludedPrefix) + // TX cache and pending events cache don't need store persistence + txCache := NewCache[struct{}](nil, "") + pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") // Initialize pending managers - pendingHeaders, err := NewPendingHeaders(store, logger) + pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { return nil, fmt.Errorf("failed to create pending headers: %w", err) } - pendingData, err := NewPendingData(store, logger) + pendingData, err := NewPendingData(st, logger) if err != nil { return nil, fmt.Errorf("failed to create pending data: %w", err) } - registerGobTypes() impl := &implementation{ headerCache: headerCache, dataCache: dataCache, @@ -190,20 +166,14 @@ func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Ma pendingEventsCache: pendingEventsCache, pendingHeaders: pendingHeaders, pendingData: pendingData, + store: st, config: cfg, logger: logger, } - if cfg.ClearCache { - // Clear the cache from disk - if err := impl.ClearFromDisk(); err != nil { - logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") - } - } else { - // Load existing cache from disk - if err := impl.LoadFromDisk(); err != nil { - logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache") - } + // Restore existing cache from store + if err := impl.RestoreFromStore(); err != nil { + logger.Warn().Err(err).Msg("failed to restore cache from store, starting with empty cache") } return impl, nil @@ -386,71 +356,95 @@ func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEven return m.pendingEventsCache.getNextItem(height) } -func (m *implementation) SaveToDisk() error { - cfgDir := filepath.Join(m.config.RootDir, "data") - - // Ensure gob types are registered before encoding - registerGobTypes() - - if err := m.headerCache.SaveToDisk(filepath.Join(cfgDir, headerCacheDir)); err != nil { - return fmt.Errorf("failed to save header cache to disk: %w", err) - } - - if err := m.dataCache.SaveToDisk(filepath.Join(cfgDir, dataCacheDir)); err != nil { - return fmt.Errorf("failed to save data cache to disk: %w", err) - } +// SaveToStore persists the DA inclusion cache to the store. +// DA inclusion data is persisted on every SetHeaderDAIncluded/SetDataDAIncluded call, +// so this method ensures any remaining data is flushed. +func (m *implementation) SaveToStore() error { + ctx := context.Background() - if err := m.txCache.SaveToDisk(filepath.Join(cfgDir, txCacheDir)); err != nil { - return fmt.Errorf("failed to save tx cache to disk: %w", err) + if err := m.headerCache.SaveToStore(ctx); err != nil { + return fmt.Errorf("failed to save header cache to store: %w", err) } - if err := m.pendingEventsCache.SaveToDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil { - return fmt.Errorf("failed to save pending events cache to disk: %w", err) + if err := m.dataCache.SaveToStore(ctx); err != nil { + return fmt.Errorf("failed to save data cache to store: %w", err) } - // Note: txTimestamps are not persisted to disk intentionally. - // On restart, all cached transactions will be treated as "new" for cleanup purposes, - // which is acceptable as they will be cleaned up on the next cleanup cycle if old enough. - + // TX cache and pending events are ephemeral - not persisted return nil } -func (m *implementation) LoadFromDisk() error { - // Ensure types are registered exactly once prior to decoding - registerGobTypes() +// RestoreFromStore restores the DA inclusion cache from the store. +// This iterates through blocks in the store and checks for persisted DA inclusion data. +func (m *implementation) RestoreFromStore() error { + ctx := context.Background() - cfgDir := filepath.Join(m.config.RootDir, "data") - - if err := m.headerCache.LoadFromDisk(filepath.Join(cfgDir, headerCacheDir)); err != nil { - return fmt.Errorf("failed to load header cache from disk: %w", err) + // Get current store height to know how many blocks to check + height, err := m.store.Height(ctx) + if err != nil { + return fmt.Errorf("failed to get store height: %w", err) } - if err := m.dataCache.LoadFromDisk(filepath.Join(cfgDir, dataCacheDir)); err != nil { - return fmt.Errorf("failed to load data cache from disk: %w", err) + if height == 0 { + return nil // No blocks to restore } - if err := m.txCache.LoadFromDisk(filepath.Join(cfgDir, txCacheDir)); err != nil { - return fmt.Errorf("failed to load tx cache from disk: %w", err) + // Collect hashes from stored blocks + var headerHashes []string + var dataHashes []string + + for h := uint64(1); h <= height; h++ { + header, data, err := m.store.GetBlockData(ctx, h) + if err != nil { + m.logger.Warn().Uint64("height", h).Err(err).Msg("failed to get block data during cache restore") + continue + } + + if header != nil { + headerHashes = append(headerHashes, header.Hash().String()) + } + if data != nil { + dataHashes = append(dataHashes, data.DACommitment().String()) + } } - if err := m.pendingEventsCache.LoadFromDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil { - return fmt.Errorf("failed to load pending events cache from disk: %w", err) + // Restore DA inclusion data from store + if err := m.headerCache.RestoreFromStore(ctx, headerHashes); err != nil { + return fmt.Errorf("failed to restore header cache from store: %w", err) } - // After loading tx cache from disk, initialize timestamps for loaded transactions - // Set them to current time so they won't be immediately cleaned up - now := time.Now() - for _, hash := range m.txCache.hashes.Keys() { - m.txTimestamps.Store(hash, now) + if err := m.dataCache.RestoreFromStore(ctx, dataHashes); err != nil { + return fmt.Errorf("failed to restore data cache from store: %w", err) } + m.logger.Info(). + Int("header_hashes", len(headerHashes)). + Int("data_hashes", len(dataHashes)). + Msg("restored DA inclusion cache from store") + return nil } -func (m *implementation) ClearFromDisk() error { - cachePath := filepath.Join(m.config.RootDir, "data", cacheDir) - if err := os.RemoveAll(cachePath); err != nil { - return fmt.Errorf("failed to clear cache from disk: %w", err) +// ClearFromStore clears in-memory caches and deletes DA inclusion entries from the store. +func (m *implementation) ClearFromStore() error { + ctx := context.Background() + + // Get hashes from current in-memory caches and delete from store + headerHashes := m.headerCache.daIncluded.Keys() + if err := m.headerCache.ClearFromStore(ctx, headerHashes); err != nil { + return fmt.Errorf("failed to clear header cache from store: %w", err) } + + dataHashes := m.dataCache.daIncluded.Keys() + if err := m.dataCache.ClearFromStore(ctx, dataHashes); err != nil { + return fmt.Errorf("failed to clear data cache from store: %w", err) + } + + // Clear in-memory caches by creating new ones + m.headerCache = NewCache[types.SignedHeader](m.store, headerDAIncludedPrefix) + m.dataCache = NewCache[types.Data](m.store, dataDAIncludedPrefix) + m.txCache = NewCache[struct{}](nil, "") + m.pendingEventsCache = NewCache[common.DAHeightEvent](nil, "") + return nil } diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 328a1de7d..056230e31 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -2,8 +2,6 @@ package cache import ( "context" - "encoding/gob" - "path/filepath" "testing" "time" @@ -87,54 +85,61 @@ func TestManager_PendingEventsCRUD(t *testing.T) { assert.Nil(t, got1Again) } -func TestManager_SaveAndLoadFromDisk(t *testing.T) { +func TestManager_SaveAndRestoreFromStore(t *testing.T) { t.Parallel() cfg := tempConfig(t) st := memStore(t) + ctx := context.Background() + + // First, we need to save some block data to the store so RestoreFromStore can find the hashes + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + h2, d2 := types.GetRandomBlock(2, 1, "test-chain") + + batch1, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch1.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch1.SetHeight(1)) + require.NoError(t, batch1.Commit()) - // must register for gob before saving - gob.Register(&types.SignedHeader{}) - gob.Register(&types.Data{}) - gob.Register(&common.DAHeightEvent{}) + batch2, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch2.SaveBlockData(h2, d2, &types.Signature{})) + require.NoError(t, batch2.SetHeight(2)) + require.NoError(t, batch2.Commit()) m1, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // populate caches - hdr := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: "c", Height: 2}}} - dat := &types.Data{Metadata: &types.Metadata{ChainID: "c", Height: 2}} - m1.SetHeaderSeen("H2", 2) - m1.SetDataSeen("D2", 2) - m1.SetHeaderDAIncluded("H2", 100, 2) - m1.SetDataDAIncluded("D2", 101, 2) - m1.SetPendingEvent(2, &common.DAHeightEvent{Header: hdr, Data: dat, DaHeight: 99}) - - // persist - err = m1.SaveToDisk() + // Set DA inclusion for the blocks + m1.SetHeaderDAIncluded(h1.Hash().String(), 100, 1) + m1.SetDataDAIncluded(d1.DACommitment().String(), 100, 1) + m1.SetHeaderDAIncluded(h2.Hash().String(), 101, 2) + m1.SetDataDAIncluded(d2.DACommitment().String(), 101, 2) + + // Persist to store + err = m1.SaveToStore() require.NoError(t, err) - // create a fresh manager on same root and verify load + // Create a fresh manager on same store and verify restore m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - // check loaded items - assert.True(t, m2.IsHeaderSeen("H2")) - assert.True(t, m2.IsDataSeen("D2")) - _, ok := m2.GetHeaderDAIncluded("H2") + // Check DA inclusion was restored + daHeight, ok := m2.GetHeaderDAIncluded(h1.Hash().String()) + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = m2.GetDataDAIncluded(d1.DACommitment().String()) + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = m2.GetHeaderDAIncluded(h2.Hash().String()) + assert.True(t, ok) + assert.Equal(t, uint64(101), daHeight) + + daHeight, ok = m2.GetDataDAIncluded(d2.DACommitment().String()) assert.True(t, ok) - _, ok2 := m2.GetDataDAIncluded("D2") - assert.True(t, ok2) - - // Verify pending event was loaded - loadedEvent := m2.GetNextPendingEvent(2) - require.NotNil(t, loadedEvent) - assert.Equal(t, uint64(2), loadedEvent.Header.Height()) - - // directories exist under cfg.RootDir/data/cache/... - base := filepath.Join(cfg.RootDir, "data", "cache") - assert.DirExists(t, filepath.Join(base, "header")) - assert.DirExists(t, filepath.Join(base, "data")) - assert.DirExists(t, filepath.Join(base, "pending_da_events")) + assert.Equal(t, uint64(101), daHeight) } func TestManager_GetNextPendingEvent_NonExistent(t *testing.T) { @@ -340,7 +345,7 @@ func TestManager_CleanupOldTxs_NoTransactions(t *testing.T) { assert.Equal(t, 0, removed) } -func TestManager_TxCache_PersistAndLoad(t *testing.T) { +func TestManager_TxCache_NotPersistedToStore(t *testing.T) { t.Parallel() cfg := tempConfig(t) st := memStore(t) @@ -355,20 +360,17 @@ func TestManager_TxCache_PersistAndLoad(t *testing.T) { assert.True(t, m1.IsTxSeen("persistent-tx1")) assert.True(t, m1.IsTxSeen("persistent-tx2")) - // Save to disk - err = m1.SaveToDisk() + // Save to store + err = m1.SaveToStore() require.NoError(t, err) - // Create new manager and verify transactions are loaded + // Create new manager - tx cache should be empty (not persisted) m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) - assert.True(t, m2.IsTxSeen("persistent-tx1")) - assert.True(t, m2.IsTxSeen("persistent-tx2")) - - // Verify tx cache directory exists - txCacheDir := filepath.Join(cfg.RootDir, "data", "cache", "tx") - assert.DirExists(t, txCacheDir) + // TX cache is ephemeral and not persisted + assert.False(t, m2.IsTxSeen("persistent-tx1")) + assert.False(t, m2.IsTxSeen("persistent-tx2")) } func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { @@ -399,3 +401,67 @@ func TestManager_DeleteHeight_PreservesTxCache(t *testing.T) { // Transaction should still be present (height-independent) assert.True(t, m.IsTxSeen("tx-persistent")) } + +func TestManager_DAInclusionPersistence(t *testing.T) { + t.Parallel() + cfg := tempConfig(t) + st := memStore(t) + ctx := context.Background() + + // Create blocks and save to store + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + // Create manager and set DA inclusion + m1, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + headerHash := h1.Hash().String() + dataHash := d1.DACommitment().String() + + m1.SetHeaderDAIncluded(headerHash, 100, 1) + m1.SetDataDAIncluded(dataHash, 101, 1) + + // Verify DA height is tracked + assert.Equal(t, uint64(101), m1.DaHeight()) + + // Create new manager - DA inclusion should be restored + m2, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + // DA inclusion should be restored from store + daHeight, ok := m2.GetHeaderDAIncluded(headerHash) + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) + + daHeight, ok = m2.GetDataDAIncluded(dataHash) + assert.True(t, ok) + assert.Equal(t, uint64(101), daHeight) + + // Max DA height should also be restored + assert.Equal(t, uint64(101), m2.DaHeight()) +} + +func TestCacheManager_Creation(t *testing.T) { + t.Parallel() + cfg := tempConfig(t) + st := memStore(t) + + cm, err := NewCacheManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + require.NotNil(t, cm) + + // Test basic operations + cm.SetHeaderSeen("h1", 1) + assert.True(t, cm.IsHeaderSeen("h1")) + + cm.SetHeaderDAIncluded("h1", 100, 1) + daHeight, ok := cm.GetHeaderDAIncluded("h1") + assert.True(t, ok) + assert.Equal(t, uint64(100), daHeight) +} diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index bd710669e..9a7c77800 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -70,10 +70,14 @@ func newTestCache(t *testing.T) cache.CacheManager { t.Helper() cfg := config.Config{ - RootDir: t.TempDir(), - ClearCache: true, + RootDir: t.TempDir(), } - cacheManager, err := cache.NewCacheManager(cfg, zerolog.Nop()) + + // Create an in-memory store for the cache + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + + cacheManager, err := cache.NewCacheManager(cfg, st, zerolog.Nop()) require.NoError(t, err) return cacheManager @@ -209,46 +213,3 @@ func TestReaper_SubmitTxs_SequencerError_NoPersistence_NoNotify(t *testing.T) { t.Fatal("did not expect notification on sequencer error") } } - -func TestReaper_CachePersistence(t *testing.T) { - // Test that transaction seen status persists to disk and can be loaded - mockExec := testmocks.NewMockExecutor(t) - mockSeq := testmocks.NewMockSequencer(t) - - tx1 := []byte("persistent-tx") - - // Create cache with real store - tempDir := t.TempDir() - dataStore := dssync.MutexWrap(ds.NewMapDatastore()) - st := store.New(dataStore) - cfg := config.Config{ - RootDir: tempDir, - ClearCache: false, // Don't clear cache - } - cm, err := cache.NewManager(cfg, st, zerolog.Nop()) - require.NoError(t, err) - - e := newTestExecutor(t) - r := newTestReaper(t, "chain-persist", mockExec, mockSeq, e, cm) - - mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx1}, nil).Once() - mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")). - Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once() - - require.NoError(t, r.SubmitTxs()) - - // Verify tx is marked as seen - assert.True(t, cm.IsTxSeen(hashTx(tx1))) - - // Save to disk - require.NoError(t, cm.SaveToDisk()) - - // Create new cache manager and load from disk - dataStore2 := dssync.MutexWrap(ds.NewMapDatastore()) - st2 := store.New(dataStore2) - cm2, err := cache.NewManager(cfg, st2, zerolog.Nop()) - require.NoError(t, err) - - // Verify the seen status was persisted - assert.True(t, cm2.IsTxSeen(hashTx(tx1))) -} diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 8b27513a8..9bcf46565 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -19,6 +21,7 @@ import ( datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" + "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/types" ) @@ -33,7 +36,11 @@ func newTestDARetriever(t *testing.T, mockClient *mocks.MockClient, cfg config.C cfg.DA.DataNamespace = "test-data-ns" } - cm, err := cache.NewCacheManager(cfg, zerolog.Nop()) + // Create an in-memory store for the cache + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + + cm, err := cache.NewCacheManager(cfg, st, zerolog.Nop()) require.NoError(t, err) if mockClient == nil { diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index 40c6876d8..db696f490 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -18,6 +20,7 @@ import ( "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" + "github.com/evstack/ev-node/pkg/store" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -77,10 +80,14 @@ func setupP2P(t *testing.T) *P2PTestData { dataStoreMock := extmocks.NewMockStore[*types.P2PData](t) cfg := config.Config{ - RootDir: t.TempDir(), - ClearCache: true, + RootDir: t.TempDir(), } - cacheManager, err := cache.NewCacheManager(cfg, zerolog.Nop()) + + // Create an in-memory store for the cache + memDS := dssync.MutexWrap(ds.NewMapDatastore()) + st := store.New(memDS) + + cacheManager, err := cache.NewCacheManager(cfg, st, zerolog.Nop()) require.NoError(t, err, "failed to create cache manager") handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5edec1cce..c88e079b2 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -285,7 +285,7 @@ func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) + cm, err := cache.NewCacheManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) // current height 1 @@ -329,7 +329,6 @@ func TestSyncLoopPersistState(t *testing.T) { cfg := config.DefaultConfig() t.Setenv("HOME", t.TempDir()) cfg.RootDir = t.TempDir() - cfg.ClearCache = true cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) @@ -425,7 +424,7 @@ func TestSyncLoopPersistState(t *testing.T) { require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load()) // wait for all events consumed - require.NoError(t, cm.SaveToDisk()) + require.NoError(t, cm.SaveToStore()) t.Log("processLoop on instance1 completed") // then @@ -442,7 +441,7 @@ func TestSyncLoopPersistState(t *testing.T) { // and when new instance is up on restart cm, err = cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) - require.NoError(t, cm.LoadFromDisk()) + require.NoError(t, cm.RestoreFromStore()) syncerInst2 := NewSyncer( st, diff --git a/node/helpers_test.go b/node/helpers_test.go index d8f54c5cc..94989d2f7 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -89,8 +89,7 @@ func getTestConfig(t *testing.T, n int) evconfig.Config { // Use a higher base port to reduce chances of conflicts with system services startPort := 40000 // Spread port ranges further apart return evconfig.Config{ - RootDir: t.TempDir(), - ClearCache: true, // Clear cache between tests to avoid interference with other tests and slow shutdown on serialization + RootDir: t.TempDir(), Node: evconfig.NodeConfig{ Aggregator: true, BlockTime: evconfig.DurationWrapper{Duration: 100 * time.Millisecond}, diff --git a/pkg/config/config.go b/pkg/config/config.go index e03a277ce..7f4698c69 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -51,8 +51,6 @@ const ( FlagReadinessMaxBlocksBehind = FlagPrefixEvnode + "node.readiness_max_blocks_behind" // FlagScrapeInterval is a flag for specifying the reaper scrape interval FlagScrapeInterval = FlagPrefixEvnode + "node.scrape_interval" - // FlagClearCache is a flag for clearing the cache - FlagClearCache = FlagPrefixEvnode + "clear_cache" // Data Availability configuration flags @@ -174,8 +172,7 @@ const ( // Config stores Rollkit configuration. type Config struct { - RootDir string `mapstructure:"-" yaml:"-" comment:"Root directory where rollkit files are located"` - ClearCache bool `mapstructure:"-" yaml:"-" comment:"Clear the cache"` + RootDir string `mapstructure:"-" yaml:"-" comment:"Root directory where rollkit files are located"` // Base configuration DBPath string `mapstructure:"db_path" yaml:"db_path" comment:"Path inside the root directory where the database is located"` @@ -423,7 +420,6 @@ func AddFlags(cmd *cobra.Command) { // Add base flags cmd.Flags().String(FlagDBPath, def.DBPath, "path for the node database") - cmd.Flags().Bool(FlagClearCache, def.ClearCache, "clear the cache") // Node configuration flags cmd.Flags().Bool(FlagAggregator, def.Node.Aggregator, "run node as an aggregator") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 1834e1b40..24c32160d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -51,7 +51,6 @@ func TestAddFlags(t *testing.T) { // Test specific flags assertFlagValue(t, flags, FlagDBPath, DefaultConfig().DBPath) - assertFlagValue(t, flags, FlagClearCache, DefaultConfig().ClearCache) // Node flags assertFlagValue(t, flags, FlagAggregator, DefaultConfig().Node.Aggregator) @@ -112,7 +111,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) // Count the number of flags we're explicitly checking - expectedFlagCount := 63 // Update this number if you add more flag checks above + expectedFlagCount := 62 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/store/store.go b/pkg/store/store.go index 40d00547b..a821e6a8d 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -190,6 +190,15 @@ func (s *DefaultStore) GetMetadata(ctx context.Context, key string) ([]byte, err return data, nil } +// DeleteMetadata removes a metadata key from the store. +func (s *DefaultStore) DeleteMetadata(ctx context.Context, key string) error { + err := s.db.Delete(ctx, ds.NewKey(getMetaKey(key))) + if err != nil { + return fmt.Errorf("failed to delete metadata for key '%s': %w", key, err) + } + return nil +} + // Sync flushes the store state to disk. // Returns nil if the database has been closed (common during shutdown). func (s *DefaultStore) Sync(ctx context.Context) (err error) { diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index a63ffb782..04baf748f 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -193,6 +193,24 @@ func (t *tracedStore) SetMetadata(ctx context.Context, key string, value []byte) return nil } +func (t *tracedStore) DeleteMetadata(ctx context.Context, key string) error { + ctx, span := t.tracer.Start(ctx, "Store.DeleteMetadata", + trace.WithAttributes( + attribute.String("key", key), + ), + ) + defer span.End() + + err := t.inner.DeleteMetadata(ctx, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (t *tracedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { ctx, span := t.tracer.Start(ctx, "Store.Rollback", trace.WithAttributes( diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index a5dca2417..5c273f661 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -27,6 +27,7 @@ type tracingMockStore struct { getStateAtHeightFn func(ctx context.Context, height uint64) (types.State, error) getMetadataFn func(ctx context.Context, key string) ([]byte, error) setMetadataFn func(ctx context.Context, key string, value []byte) error + deleteMetadataFn func(ctx context.Context, key string) error rollbackFn func(ctx context.Context, height uint64, aggregator bool) error newBatchFn func(ctx context.Context) (Batch, error) } @@ -101,6 +102,13 @@ func (m *tracingMockStore) SetMetadata(ctx context.Context, key string, value [] return nil } +func (m *tracingMockStore) DeleteMetadata(ctx context.Context, key string) error { + if m.deleteMetadataFn != nil { + return m.deleteMetadataFn(ctx, key) + } + return nil +} + func (m *tracingMockStore) Rollback(ctx context.Context, height uint64, aggregator bool) error { if m.rollbackFn != nil { return m.rollbackFn(ctx, height, aggregator) diff --git a/pkg/store/types.go b/pkg/store/types.go index bf1cb6ced..fa1ecdc92 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -39,6 +39,9 @@ type Store interface { // This method enables evolve to safely persist any information. SetMetadata(ctx context.Context, key string, value []byte) error + // DeleteMetadata removes a metadata key from the store. + DeleteMetadata(ctx context.Context, key string) error + // Close safely closes underlying data storage, to ensure that data is actually saved. Close() error diff --git a/test/mocks/store.go b/test/mocks/store.go index 7f2aa180d..79b3ac6f5 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -83,6 +83,63 @@ func (_c *MockStore_Close_Call) RunAndReturn(run func() error) *MockStore_Close_ return _c } +// DeleteMetadata provides a mock function for the type MockStore +func (_mock *MockStore) DeleteMetadata(ctx context.Context, key string) error { + ret := _mock.Called(ctx, key) + + if len(ret) == 0 { + panic("no return value specified for DeleteMetadata") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = returnFunc(ctx, key) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_DeleteMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteMetadata' +type MockStore_DeleteMetadata_Call struct { + *mock.Call +} + +// DeleteMetadata is a helper method to define mock.On call +// - ctx context.Context +// - key string +func (_e *MockStore_Expecter) DeleteMetadata(ctx interface{}, key interface{}) *MockStore_DeleteMetadata_Call { + return &MockStore_DeleteMetadata_Call{Call: _e.mock.On("DeleteMetadata", ctx, key)} +} + +func (_c *MockStore_DeleteMetadata_Call) Run(run func(ctx context.Context, key string)) *MockStore_DeleteMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockStore_DeleteMetadata_Call) Return(err error) *MockStore_DeleteMetadata_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_DeleteMetadata_Call) RunAndReturn(run func(ctx context.Context, key string) error) *MockStore_DeleteMetadata_Call { + _c.Call.Return(run) + return _c +} + // GetBlockByHash provides a mock function for the type MockStore func (_mock *MockStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { ret := _mock.Called(ctx, hash) From 6b7347d25b7a7491e808ff5176ec56222e4b5d3d Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 17:25:35 +0100 Subject: [PATCH 03/10] updates --- CHANGELOG.md | 1 + block/internal/cache/generic_cache.go | 3 +-- block/internal/cache/manager.go | 8 +++++++- block/internal/submitting/submitter.go | 4 ++++ block/internal/submitting/submitter_test.go | 10 +++++----- block/internal/syncing/syncer.go | 7 ++++++- 6 files changed, 24 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0c086087..9150fb9f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046)) +- **BREAKING** Remove `--evnode.clear_cache` flag, make pending events cache and tx cache fully ephemeral. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047)) ## v1.0.0-rc.2 diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index ee7ef1cf9..4e58a1769 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -48,8 +48,7 @@ type Cache[T any] struct { // store is used for persisting DA inclusion data (optional, can be nil for ephemeral caches) store store.Store - - // storeKeyPrefix is the prefix used for store keys (allows different caches to use different namespaces) + // storeKeyPrefix is the prefix used for store keys storeKeyPrefix string } diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 97df14890..34dfb0313 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -26,19 +26,21 @@ const ( // CacheManager provides thread-safe cache operations for tracking seen blocks // and DA inclusion status during block execution and syncing. type CacheManager interface { + DaHeight() uint64 + // Header operations IsHeaderSeen(hash string) bool SetHeaderSeen(hash string, blockHeight uint64) GetHeaderDAIncluded(hash string) (uint64, bool) SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64) RemoveHeaderDAIncluded(hash string) - DaHeight() uint64 // Data operations IsDataSeen(hash string) bool SetDataSeen(hash string, blockHeight uint64) GetDataDAIncluded(hash string) (uint64, bool) SetDataDAIncluded(hash string, daHeight uint64, blockHeight uint64) + RemoveDataDAIncluded(hash string) // Transaction operations IsTxSeen(hash string) bool @@ -222,6 +224,10 @@ func (m *implementation) SetDataDAIncluded(hash string, daHeight uint64, blockHe m.dataCache.setDAIncluded(hash, daHeight, blockHeight) } +func (m *implementation) RemoveDataDAIncluded(hash string) { + m.dataCache.removeDAIncluded(hash) +} + // Transaction operations func (m *implementation) IsTxSeen(hash string) bool { return m.txCache.isSeen(hash) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 4c85539b2..a1e703c23 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -364,6 +364,10 @@ func (s *Submitter) processDAInclusionLoop() { // Delete height cache for that height // This can only be performed after the height has been persisted to store s.cache.DeleteHeight(nextHeight) + + // Remove DA included status from cache since this height is now finalized + s.cache.RemoveHeaderDAIncluded(header.Hash().String()) + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) } } } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 3e0e0b343..989e39360 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -528,15 +528,15 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { assert.False(t, cm.IsHeaderSeen(h2.Hash().String()), "height 2 header should be cleared from cache") assert.False(t, cm.IsDataSeen(d2.DACommitment().String()), "height 2 data should be cleared from cache") - // Verify DA inclusion status remains for processed heights + // Verify DA inclusion status is removed for processed heights (cleaned up after finalization) _, h1DAIncluded := cm.GetHeaderDAIncluded(h1.Hash().String()) _, d1DAIncluded := cm.GetDataDAIncluded(d1.DACommitment().String()) _, h2DAIncluded := cm.GetHeaderDAIncluded(h2.Hash().String()) _, d2DAIncluded := cm.GetDataDAIncluded(d2.DACommitment().String()) - assert.True(t, h1DAIncluded, "height 1 header DA inclusion status should remain") - assert.True(t, d1DAIncluded, "height 1 data DA inclusion status should remain") - assert.True(t, h2DAIncluded, "height 2 header DA inclusion status should remain") - assert.True(t, d2DAIncluded, "height 2 data DA inclusion status should remain") + assert.False(t, h1DAIncluded, "height 1 header DA inclusion status should be removed after finalization") + assert.False(t, d1DAIncluded, "height 1 data DA inclusion status should be removed after finalization") + assert.False(t, h2DAIncluded, "height 2 header DA inclusion status should be removed after finalization") + assert.False(t, d2DAIncluded, "height 2 data DA inclusion status should be removed after finalization") // Verify unprocessed height 3 cache remains intact assert.True(t, cm.IsHeaderSeen(h3.Hash().String()), "height 3 header should remain in cache") diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 88f46aafe..4fe7171d2 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -724,8 +724,10 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve // here only the previous block needs to be applied to proceed to the verification. // The header validation must be done before applying the block to avoid executing gibberish if err := s.ValidateBlock(ctx, currentState, data, header); err != nil { - // remove header as da included (not per se needed, but keep cache clean) + // remove header as da included from cache s.cache.RemoveHeaderDAIncluded(headerHash) + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } @@ -737,7 +739,10 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") if errors.Is(err, errMaliciousProposer) { + // remove header as da included from cache s.cache.RemoveHeaderDAIncluded(headerHash) + s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + return err } } From 027e6f2b4397c93463a3dbda5684b165e0335d45 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 17:33:04 +0100 Subject: [PATCH 04/10] bring back clear cache flag --- CHANGELOG.md | 2 +- apps/evm/cmd/rollback.go | 3 ++ apps/testapp/cmd/rollback.go | 3 ++ block/internal/cache/manager.go | 60 ++++----------------- block/internal/cache/manager_test.go | 2 +- block/internal/reaping/reaper_test.go | 2 +- block/internal/syncing/da_retriever_test.go | 2 +- block/internal/syncing/p2p_handler_test.go | 2 +- block/internal/syncing/syncer_test.go | 2 +- pkg/config/config.go | 6 ++- pkg/config/config_test.go | 3 +- 11 files changed, 29 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9150fb9f9..c9ef6b77a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Improve `cache.NumPendingData` to not return empty data. Automatically bumps `LastSubmittedHeight` to reflect that. ([#3046](https://github.com/evstack/ev-node/pull/3046)) -- **BREAKING** Remove `--evnode.clear_cache` flag, make pending events cache and tx cache fully ephemeral. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047)) +- **BREAKING** Make pending events cache and tx cache fully ephemeral. Those will be re-fetched on restart. DA Inclusion cache persists until cleared up after DA inclusion has been processed. Persist accross restart using store metadata. ([#3047](https://github.com/evstack/ev-node/pull/3047)) ## v1.0.0-rc.2 diff --git a/apps/evm/cmd/rollback.go b/apps/evm/cmd/rollback.go index 3b56c152c..3f11ef8d4 100644 --- a/apps/evm/cmd/rollback.go +++ b/apps/evm/cmd/rollback.go @@ -80,6 +80,9 @@ func NewRollbackCmd() *cobra.Command { } cmd.Printf("Rolled back ev-node state to height %d\n", height) + if syncNode { + fmt.Println("Restart the node with the `--evnode.clear_cache` flag") + } return nil }, diff --git a/apps/testapp/cmd/rollback.go b/apps/testapp/cmd/rollback.go index 187cef713..6326f0fd2 100644 --- a/apps/testapp/cmd/rollback.go +++ b/apps/testapp/cmd/rollback.go @@ -75,6 +75,9 @@ func NewRollbackCmd() *cobra.Command { } fmt.Printf("Rolled back ev-node state to height %d\n", height) + if syncNode { + fmt.Println("Restart the node with the `--evnode.clear_cache` flag") + } return nil }, diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 34dfb0313..d75e0eff8 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -93,53 +93,6 @@ type implementation struct { logger zerolog.Logger } -// NewPendingManager creates a new pending manager instance -func NewPendingManager(store store.Store, logger zerolog.Logger) (PendingManager, error) { - pendingHeaders, err := NewPendingHeaders(store, logger) - if err != nil { - return nil, fmt.Errorf("failed to create pending headers: %w", err) - } - - pendingData, err := NewPendingData(store, logger) - if err != nil { - return nil, fmt.Errorf("failed to create pending data: %w", err) - } - - return &implementation{ - pendingHeaders: pendingHeaders, - pendingData: pendingData, - logger: logger, - }, nil -} - -// NewCacheManager creates a new cache manager instance -func NewCacheManager(cfg config.Config, st store.Store, logger zerolog.Logger) (CacheManager, error) { - // Initialize caches with store-based persistence for DA inclusion data - headerCache := NewCache[types.SignedHeader](st, headerDAIncludedPrefix) - dataCache := NewCache[types.Data](st, dataDAIncludedPrefix) - // TX cache and pending events cache don't need store persistence - txCache := NewCache[struct{}](nil, "") - pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") - - impl := &implementation{ - headerCache: headerCache, - dataCache: dataCache, - txCache: txCache, - txTimestamps: new(sync.Map), - pendingEventsCache: pendingEventsCache, - store: st, - config: cfg, - logger: logger, - } - - // Restore existing cache from store - if err := impl.RestoreFromStore(); err != nil { - logger.Warn().Err(err).Msg("failed to restore cache from store, starting with empty cache") - } - - return impl, nil -} - // NewManager creates a new cache manager instance func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { // Initialize caches with store-based persistence for DA inclusion data @@ -173,9 +126,16 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag logger: logger, } - // Restore existing cache from store - if err := impl.RestoreFromStore(); err != nil { - logger.Warn().Err(err).Msg("failed to restore cache from store, starting with empty cache") + if cfg.ClearCache { + // Clear the cache from disk + if err := impl.ClearFromStore(); err != nil { + logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") + } + } else { + // Restore existing cache from store + if err := impl.RestoreFromStore(); err != nil { + logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache") + } } return impl, nil diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 056230e31..9045d4174 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -452,7 +452,7 @@ func TestCacheManager_Creation(t *testing.T) { cfg := tempConfig(t) st := memStore(t) - cm, err := NewCacheManager(cfg, st, zerolog.Nop()) + cm, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) require.NotNil(t, cm) diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index 9a7c77800..5700882e8 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -77,7 +77,7 @@ func newTestCache(t *testing.T) cache.CacheManager { memDS := dssync.MutexWrap(ds.NewMapDatastore()) st := store.New(memDS) - cacheManager, err := cache.NewCacheManager(cfg, st, zerolog.Nop()) + cacheManager, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) return cacheManager diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 9bcf46565..2270463b9 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -40,7 +40,7 @@ func newTestDARetriever(t *testing.T, mockClient *mocks.MockClient, cfg config.C memDS := dssync.MutexWrap(ds.NewMapDatastore()) st := store.New(memDS) - cm, err := cache.NewCacheManager(cfg, st, zerolog.Nop()) + cm, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) if mockClient == nil { diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index db696f490..a1cc6d993 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -87,7 +87,7 @@ func setupP2P(t *testing.T) *P2PTestData { memDS := dssync.MutexWrap(ds.NewMapDatastore()) st := store.New(memDS) - cacheManager, err := cache.NewCacheManager(cfg, st, zerolog.Nop()) + cacheManager, err := cache.NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err, "failed to create cache manager") handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index c88e079b2..5a36ed00c 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -285,7 +285,7 @@ func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewCacheManager(config.DefaultConfig(), st, zerolog.Nop()) + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) require.NoError(t, err) // current height 1 diff --git a/pkg/config/config.go b/pkg/config/config.go index 7f4698c69..e03a277ce 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -51,6 +51,8 @@ const ( FlagReadinessMaxBlocksBehind = FlagPrefixEvnode + "node.readiness_max_blocks_behind" // FlagScrapeInterval is a flag for specifying the reaper scrape interval FlagScrapeInterval = FlagPrefixEvnode + "node.scrape_interval" + // FlagClearCache is a flag for clearing the cache + FlagClearCache = FlagPrefixEvnode + "clear_cache" // Data Availability configuration flags @@ -172,7 +174,8 @@ const ( // Config stores Rollkit configuration. type Config struct { - RootDir string `mapstructure:"-" yaml:"-" comment:"Root directory where rollkit files are located"` + RootDir string `mapstructure:"-" yaml:"-" comment:"Root directory where rollkit files are located"` + ClearCache bool `mapstructure:"-" yaml:"-" comment:"Clear the cache"` // Base configuration DBPath string `mapstructure:"db_path" yaml:"db_path" comment:"Path inside the root directory where the database is located"` @@ -420,6 +423,7 @@ func AddFlags(cmd *cobra.Command) { // Add base flags cmd.Flags().String(FlagDBPath, def.DBPath, "path for the node database") + cmd.Flags().Bool(FlagClearCache, def.ClearCache, "clear the cache") // Node configuration flags cmd.Flags().Bool(FlagAggregator, def.Node.Aggregator, "run node as an aggregator") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 24c32160d..1834e1b40 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -51,6 +51,7 @@ func TestAddFlags(t *testing.T) { // Test specific flags assertFlagValue(t, flags, FlagDBPath, DefaultConfig().DBPath) + assertFlagValue(t, flags, FlagClearCache, DefaultConfig().ClearCache) // Node flags assertFlagValue(t, flags, FlagAggregator, DefaultConfig().Node.Aggregator) @@ -111,7 +112,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCEnableDAVisualization, DefaultConfig().RPC.EnableDAVisualization) // Count the number of flags we're explicitly checking - expectedFlagCount := 62 // Update this number if you add more flag checks above + expectedFlagCount := 63 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 From bbdaed5df1f8c94c4b2adcb56c1adc31a963de65 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 18:07:21 +0100 Subject: [PATCH 05/10] cleanup syncing flow --- block/internal/cache/generic_cache.go | 24 +++++---- block/internal/cache/manager.go | 36 +++++++++++++ block/internal/cache/manager_test.go | 78 +++++++++++++++++++++++---- block/internal/syncing/syncer.go | 40 ++------------ block/internal/syncing/syncer_test.go | 59 -------------------- 5 files changed, 122 insertions(+), 115 deletions(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 4e58a1769..9b7c934a3 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -141,15 +141,7 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6 } // Update max DA height if necessary - for range 1_000 { - current := c.maxDAHeight.Load() - if daHeight <= current { - return - } - if c.maxDAHeight.CompareAndSwap(current, daHeight) { - return - } - } + c.setMaxDAHeight(daHeight) } // removeDAIncluded removes the DA-included status of the hash @@ -163,6 +155,20 @@ func (c *Cache[T]) daHeight() uint64 { return c.maxDAHeight.Load() } +// setMaxDAHeight sets the maximum DA height if the provided value is greater +// than the current value. +func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { + for range 1_000 { + current := c.maxDAHeight.Load() + if daHeight <= current { + return + } + if c.maxDAHeight.CompareAndSwap(current, daHeight) { + return + } + } +} + // removeSeen removes a hash from the seen cache. func (c *Cache[T]) removeSeen(hash string) { c.hashes.Remove(hash) diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index d75e0eff8..96d727baa 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -2,6 +2,7 @@ package cache import ( "context" + "encoding/binary" "fmt" "sync" "time" @@ -383,9 +384,13 @@ func (m *implementation) RestoreFromStore() error { return fmt.Errorf("failed to restore data cache from store: %w", err) } + // Initialize DA height from store metadata to ensure DaHeight() is never 0. + m.initDAHeightFromStore(ctx) + m.logger.Info(). Int("header_hashes", len(headerHashes)). Int("data_hashes", len(dataHashes)). + Uint64("da_height", m.DaHeight()). Msg("restored DA inclusion cache from store") return nil @@ -412,5 +417,36 @@ func (m *implementation) ClearFromStore() error { m.txCache = NewCache[struct{}](nil, "") m.pendingEventsCache = NewCache[common.DAHeightEvent](nil, "") + // Initialize DA height from store metadata to ensure DaHeight() is never 0. + m.initDAHeightFromStore(ctx) + return nil } + +// initDAHeightFromStore initializes the maxDAHeight in both header and data caches +// from the HeightToDAHeight store metadata (final da inclusion tracking). +func (m *implementation) initDAHeightFromStore(ctx context.Context) { + // Get the DA included height from store (last processed block height) + daIncludedHeightBytes, err := m.store.GetMetadata(ctx, store.DAIncludedHeightKey) + if err != nil || len(daIncludedHeightBytes) != 8 { + return + } + daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes) + if daIncludedHeight == 0 { + return + } + + // Get header DA height for the last included height + headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight) + if headerBytes, err := m.store.GetMetadata(ctx, headerKey); err == nil && len(headerBytes) == 8 { + headerDAHeight := binary.LittleEndian.Uint64(headerBytes) + m.headerCache.setMaxDAHeight(headerDAHeight) + } + + // Get data DA height for the last included height + dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight) + if dataBytes, err := m.store.GetMetadata(ctx, dataKey); err == nil && len(dataBytes) == 8 { + dataDAHeight := binary.LittleEndian.Uint64(dataBytes) + m.dataCache.setMaxDAHeight(dataDAHeight) + } +} diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 9045d4174..61f744bf6 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -2,6 +2,7 @@ package cache import ( "context" + "encoding/binary" "testing" "time" @@ -447,21 +448,78 @@ func TestManager_DAInclusionPersistence(t *testing.T) { assert.Equal(t, uint64(101), m2.DaHeight()) } -func TestCacheManager_Creation(t *testing.T) { +func TestManager_DaHeightAfterCacheClear(t *testing.T) { t.Parallel() + cfg := tempConfig(t) st := memStore(t) + ctx := context.Background() - cm, err := NewManager(cfg, st, zerolog.Nop()) + // Store a block first + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + batch, err := st.NewBatch(ctx) require.NoError(t, err) - require.NotNil(t, cm) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) - // Test basic operations - cm.SetHeaderSeen("h1", 1) - assert.True(t, cm.IsHeaderSeen("h1")) + // Set up the HeightToDAHeight metadata (simulating what submitter does) + headerDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(headerDAHeightBytes, 150) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerDAHeightBytes)) - cm.SetHeaderDAIncluded("h1", 100, 1) - daHeight, ok := cm.GetHeaderDAIncluded("h1") - assert.True(t, ok) - assert.Equal(t, uint64(100), daHeight) + dataDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(dataDAHeightBytes, 155) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataDAHeightBytes)) + + // Set DAIncludedHeightKey to indicate height 1 was DA included + daIncludedBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncludedBytes, 1) + require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, daIncludedBytes)) + + // Create manager with ClearCache = true + cfg.ClearCache = true + m, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + // DaHeight should NOT be 0 - it should be initialized from store metadata + assert.Equal(t, uint64(155), m.DaHeight(), "DaHeight should be initialized from HeightToDAHeight metadata even after cache clear") +} + +func TestManager_DaHeightFromStoreOnRestore(t *testing.T) { + t.Parallel() + + cfg := tempConfig(t) + st := memStore(t) + ctx := context.Background() + + // Store a block first + h1, d1 := types.GetRandomBlock(1, 1, "test-chain") + batch, err := st.NewBatch(ctx) + require.NoError(t, err) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + + // Set up HeightToDAHeight metadata but NOT the cache entries + // This simulates a scenario where DA inclusion was processed but cache entries were lost + headerDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(headerDAHeightBytes, 200) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerDAHeightBytes)) + + dataDAHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(dataDAHeightBytes, 205) + require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataDAHeightBytes)) + + daIncludedBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(daIncludedBytes, 1) + require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, daIncludedBytes)) + + // Create manager without ClearCache - should restore and init from metadata + cfg.ClearCache = false + m, err := NewManager(cfg, st, zerolog.Nop()) + require.NoError(t, err) + + // DaHeight should be the max from HeightToDAHeight metadata + assert.Equal(t, uint64(205), m.DaHeight(), "DaHeight should be initialized from HeightToDAHeight metadata on restore") } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 4fe7171d2..6df6b2c22 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto/sha256" - "encoding/binary" "encoding/hex" "errors" "fmt" @@ -346,9 +345,9 @@ func (s *Syncer) initializeState() error { } s.SetLastState(state) - // Set DA height to the maximum of the genesis start height, the state's DA height, the cached DA height, and the highest stored included DA height. - // This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height. - s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight, s.getHighestStoredDAHeight())) + // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. + // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. + s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight)) s.logger.Info(). Uint64("height", state.LastBlockHeight). @@ -1213,39 +1212,6 @@ func (s *Syncer) sleepOrDone(duration time.Duration) bool { } } -// getHighestStoredDAHeight retrieves the highest DA height from the store by checking -// the DA heights stored for the last DA included height -// this relies on the node syncing with DA and setting included heights. -func (s *Syncer) getHighestStoredDAHeight() uint64 { - // Get the DA included height from store - daIncludedHeightBytes, err := s.store.GetMetadata(s.ctx, store.DAIncludedHeightKey) - if err != nil || len(daIncludedHeightBytes) != 8 { - return 0 - } - daIncludedHeight := binary.LittleEndian.Uint64(daIncludedHeightBytes) - if daIncludedHeight == 0 { - return 0 - } - - var highestDAHeight uint64 - - // Get header DA height for the last included height - headerKey := store.GetHeightToDAHeightHeaderKey(daIncludedHeight) - if headerBytes, err := s.store.GetMetadata(s.ctx, headerKey); err == nil && len(headerBytes) == 8 { - headerDAHeight := binary.LittleEndian.Uint64(headerBytes) - highestDAHeight = max(highestDAHeight, headerDAHeight) - } - - // Get data DA height for the last included height - dataKey := store.GetHeightToDAHeightDataKey(daIncludedHeight) - if dataBytes, err := s.store.GetMetadata(s.ctx, dataKey); err == nil && len(dataBytes) == 8 { - dataDAHeight := binary.LittleEndian.Uint64(dataBytes) - highestDAHeight = max(highestDAHeight, dataDAHeight) - } - - return highestDAHeight -} - type p2pWaitState struct { height uint64 cancel context.CancelFunc diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5a36ed00c..85aa9a597 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -4,7 +4,6 @@ import ( "context" crand "crypto/rand" "crypto/sha512" - "encoding/binary" "errors" "sync/atomic" "testing" @@ -609,9 +608,6 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) { nil, ) - // Mock GetMetadata calls for DA included height retrieval - mockStore.EXPECT().GetMetadata(mock.Anything, store.DAIncludedHeightKey).Return(nil, datastore.ErrNotFound) - // Mock Batch operations mockBatch := testmocks.NewMockBatch(t) mockBatch.Test(t) @@ -656,61 +652,6 @@ func requireEmptyChan(t *testing.T, errorCh chan error) { } } -func TestSyncer_getHighestStoredDAHeight(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - ctx := t.Context() - - syncer := &Syncer{ - store: st, - ctx: ctx, - logger: zerolog.Nop(), - } - - // Test case 1: No DA included height set - highestDA := syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(0), highestDA) - - // Test case 2: DA included height set, but no mappings - bz := make([]byte, 8) - binary.LittleEndian.PutUint64(bz, 1) - require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(0), highestDA) - - // Test case 3: DA included height with header mapping - headerBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(headerBytes, 100) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(1), headerBytes)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(100), highestDA) - - // Test case 4: DA included height with both header and data mappings (data is higher) - dataBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(dataBytes, 105) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(1), dataBytes)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(105), highestDA) - - // Test case 5: Advance to height 2 with higher DA heights - binary.LittleEndian.PutUint64(bz, 2) - require.NoError(t, st.SetMetadata(ctx, store.DAIncludedHeightKey, bz)) - - headerBytes2 := make([]byte, 8) - binary.LittleEndian.PutUint64(headerBytes2, 200) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(2), headerBytes2)) - - dataBytes2 := make([]byte, 8) - binary.LittleEndian.PutUint64(dataBytes2, 195) - require.NoError(t, st.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(2), dataBytes2)) - - highestDA = syncer.getHighestStoredDAHeight() - assert.Equal(t, uint64(200), highestDA, "should return highest DA height from most recent included height") -} - func TestProcessHeightEvent_TriggersAsyncDARetrieval(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) From af367eb7943e855196a2462f947144dbd1377512 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 18:12:36 +0100 Subject: [PATCH 06/10] simplify --- block/internal/cache/generic_cache.go | 2 +- block/internal/submitting/submitter.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index 9b7c934a3..a611230ce 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -187,7 +187,7 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { if ok { c.hashes.Remove(hash) - // c.daIncluded.Remove(hash) // we actually do not want to delete the DA-included status here + c.daIncluded.Remove(hash) } } diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index a1e703c23..4c85539b2 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -364,10 +364,6 @@ func (s *Submitter) processDAInclusionLoop() { // Delete height cache for that height // This can only be performed after the height has been persisted to store s.cache.DeleteHeight(nextHeight) - - // Remove DA included status from cache since this height is now finalized - s.cache.RemoveHeaderDAIncluded(header.Hash().String()) - s.cache.RemoveDataDAIncluded(data.DACommitment().String()) } } } From 8a1e535bcdf2f0ea7c729dae92b18f1bcf45226f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 18:21:01 +0100 Subject: [PATCH 07/10] fix IsHeightDAIncluded to not always rely on cache --- block/internal/submitting/submitter.go | 7 +++++++ block/internal/submitting/submitter_test.go | 19 +++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index 4c85539b2..a4efbf1a8 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -487,10 +487,17 @@ func (s *Submitter) setNodeHeightToDAHeight(ctx context.Context, height uint64, // IsHeightDAIncluded checks if a height is included in DA func (s *Submitter) IsHeightDAIncluded(height uint64, header *types.SignedHeader, data *types.Data) (bool, error) { + // If height is at or below the DA included height, it was already processed + // and cache entries were cleared. We know it's DA included. + if height <= s.GetDAIncludedHeight() { + return true, nil + } + currentHeight, err := s.store.Height(s.ctx) if err != nil { return false, err } + if currentHeight < height { return false, nil } diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index 989e39360..6420a16f7 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -114,16 +114,21 @@ func TestSubmitter_IsHeightDAIncluded(t *testing.T) { require.NoError(t, batch.SetHeight(5)) require.NoError(t, batch.Commit()) - s := &Submitter{store: st, cache: cm, logger: zerolog.Nop()} + daIncludedHeight := &atomic.Uint64{} + daIncludedHeight.Store(2) // heights 1 and 2 are already DA included and cache cleared + + s := &Submitter{store: st, cache: cm, logger: zerolog.Nop(), daIncludedHeight: daIncludedHeight} s.ctx = ctx h1, d1 := newHeaderAndData("chain", 3, true) h2, d2 := newHeaderAndData("chain", 4, true) + h3, d3 := newHeaderAndData("chain", 2, true) // already DA included, cache was cleared - cm.SetHeaderDAIncluded(h1.Hash().String(), 100, 2) - cm.SetDataDAIncluded(d1.DACommitment().String(), 100, 2) + cm.SetHeaderDAIncluded(h1.Hash().String(), 100, 3) + cm.SetDataDAIncluded(d1.DACommitment().String(), 100, 3) cm.SetHeaderDAIncluded(h2.Hash().String(), 101, 4) // no data for h2 + // no cache entries for h3/d3 - they were cleared after DA inclusion processing specs := map[string]struct { height uint64 @@ -132,9 +137,11 @@ func TestSubmitter_IsHeightDAIncluded(t *testing.T) { exp bool expErr bool }{ - "below store height and cached": {height: 3, header: h1, data: d1, exp: true}, - "above store height": {height: 6, header: h2, data: d2, exp: false}, - "data missing": {height: 4, header: h2, data: d2, exp: false}, + "below store height and cached": {height: 3, header: h1, data: d1, exp: true}, + "above store height": {height: 6, header: h2, data: d2, exp: false}, + "data missing": {height: 4, header: h2, data: d2, exp: false}, + "at daIncludedHeight - cache cleared": {height: 2, header: h3, data: d3, exp: true}, + "below daIncludedHeight - cache cleared": {height: 1, header: h3, data: d3, exp: true}, } for name, spec := range specs { From adddf8be280fdad4079c13107b3a740bdf06c14f Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 20:44:10 +0100 Subject: [PATCH 08/10] save only on exit --- block/internal/cache/generic_cache.go | 9 --------- block/internal/cache/generic_cache_test.go | 12 +++++++++--- block/internal/cache/manager_test.go | 3 +++ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index a611230ce..a3b388f41 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -131,15 +131,6 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6 c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) - // Persist to store if configured - if c.store != nil { - key := c.storeKeyPrefix + hash - value := make([]byte, 16) // 8 bytes for daHeight + 8 bytes for blockHeight - binary.LittleEndian.PutUint64(value[0:8], daHeight) - binary.LittleEndian.PutUint64(value[8:16], blockHeight) - _ = c.store.SetMetadata(context.Background(), key, value) - } - // Update max DA height if necessary c.setMaxDAHeight(daHeight) } diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index a6e7c6054..b4e8c2b79 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -61,11 +61,14 @@ func TestCache_MaxDAHeight_WithStore(t *testing.T) { t.Errorf("after setDAIncluded: daHeight = %d, want 200", got) } + err := c1.SaveToStore(ctx) + require.NoError(t, err) + // Create new cache and restore from store c2 := NewCache[testItem](st, "test/da-included/") // Restore with the known hashes - err := c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) require.NoError(t, err) if got := c2.daHeight(); got != 200 { @@ -93,14 +96,17 @@ func TestCache_WithStorePersistence(t *testing.T) { c1 := NewCache[testItem](st, "test/") - // Set DA inclusion - this should persist to store immediately + // Set DA inclusion c1.setDAIncluded("hash1", 100, 1) c1.setDAIncluded("hash2", 200, 2) + err := c1.SaveToStore(ctx) + require.NoError(t, err) + // Create new cache with same store and restore c2 := NewCache[testItem](st, "test/") - err := c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) + err = c2.RestoreFromStore(ctx, []string{"hash1", "hash2", "hash3"}) require.NoError(t, err) // hash1 and hash2 should be restored, hash3 should not exist diff --git a/block/internal/cache/manager_test.go b/block/internal/cache/manager_test.go index 61f744bf6..6963c9eb0 100644 --- a/block/internal/cache/manager_test.go +++ b/block/internal/cache/manager_test.go @@ -431,6 +431,9 @@ func TestManager_DAInclusionPersistence(t *testing.T) { // Verify DA height is tracked assert.Equal(t, uint64(101), m1.DaHeight()) + err = m1.SaveToStore() + require.NoError(t, err) + // Create new manager - DA inclusion should be restored m2, err := NewManager(cfg, st, zerolog.Nop()) require.NoError(t, err) From 4ee435421595a200d0959418a817409f30dbed31 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Tue, 3 Feb 2026 20:45:50 +0100 Subject: [PATCH 09/10] remove debug log --- block/internal/submitting/submitter.go | 1 - 1 file changed, 1 deletion(-) diff --git a/block/internal/submitting/submitter.go b/block/internal/submitting/submitter.go index a4efbf1a8..1c5b034c1 100644 --- a/block/internal/submitting/submitter.go +++ b/block/internal/submitting/submitter.go @@ -331,7 +331,6 @@ func (s *Submitter) processDAInclusionLoop() { // Check if this height is DA included if included, err := s.IsHeightDAIncluded(nextHeight, header, data); err != nil || !included { - s.logger.Debug().Uint64("height", nextHeight).Msg("height not yet DA included") break } From ddf70e4920b82b8069be30c2296609cd437c3689 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 4 Feb 2026 11:47:29 +0100 Subject: [PATCH 10/10] persist as we go --- block/internal/cache/generic_cache.go | 57 +++++++++++++++++++-------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index a3b388f41..83d5c9653 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -73,6 +73,29 @@ func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { } } +// storeKey returns the store key for a given hash. +func (c *Cache[T]) storeKey(hash string) string { + return c.storeKeyPrefix + hash +} + +// encodeDAInclusion encodes daHeight and blockHeight into a 16-byte value. +func encodeDAInclusion(daHeight, blockHeight uint64) []byte { + value := make([]byte, 16) // 8 bytes for daHeight + 8 bytes for blockHeight + binary.LittleEndian.PutUint64(value[0:8], daHeight) + binary.LittleEndian.PutUint64(value[8:16], blockHeight) + return value +} + +// decodeDAInclusion decodes a 16-byte value into daHeight and blockHeight. +func decodeDAInclusion(value []byte) (daHeight, blockHeight uint64, ok bool) { + if len(value) != 16 { + return 0, 0, false + } + daHeight = binary.LittleEndian.Uint64(value[0:8]) + blockHeight = binary.LittleEndian.Uint64(value[8:16]) + return daHeight, blockHeight, true +} + // getItem returns an item from the cache by height. // Returns nil if not found or type mismatch. func (c *Cache[T]) getItem(height uint64) *T { @@ -131,13 +154,21 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6 c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) + // Persist to store if configured (for SIGKILL protection) + if c.store != nil { + _ = c.store.SetMetadata(context.Background(), c.storeKey(hash), encodeDAInclusion(daHeight, blockHeight)) + } + // Update max DA height if necessary c.setMaxDAHeight(daHeight) } -// removeDAIncluded removes the DA-included status of the hash +// removeDAIncluded removes the DA-included status of the hash from cache and store. func (c *Cache[T]) removeDAIncluded(hash string) { c.daIncluded.Remove(hash) + if c.store != nil { + _ = c.store.DeleteMetadata(context.Background(), c.storeKey(hash)) + } } // daHeight returns the maximum DA height from all DA-included items. @@ -165,7 +196,7 @@ func (c *Cache[T]) removeSeen(hash string) { c.hashes.Remove(hash) } -// deleteAllForHeight removes all items and their associated data from the cache at the given height. +// deleteAllForHeight removes all items and their associated data from the cache and store at the given height. func (c *Cache[T]) deleteAllForHeight(height uint64) { c.itemsByHeight.Remove(height) @@ -178,7 +209,7 @@ func (c *Cache[T]) deleteAllForHeight(height uint64) { if ok { c.hashes.Remove(hash) - c.daIncluded.Remove(hash) + c.removeDAIncluded(hash) } } @@ -191,19 +222,17 @@ func (c *Cache[T]) RestoreFromStore(ctx context.Context, hashes []string) error } for _, hash := range hashes { - key := c.storeKeyPrefix + hash - value, err := c.store.GetMetadata(ctx, key) + value, err := c.store.GetMetadata(ctx, c.storeKey(hash)) if err != nil { // Key not found is not an error - the hash may not have been DA included yet continue } - if len(value) != 16 { + + daHeight, blockHeight, ok := decodeDAInclusion(value) + if !ok { continue // Invalid data, skip } - daHeight := binary.LittleEndian.Uint64(value[0:8]) - blockHeight := binary.LittleEndian.Uint64(value[8:16]) - c.daIncluded.Add(hash, daHeight) c.hashByHeight.Add(blockHeight, hash) @@ -242,12 +271,7 @@ func (c *Cache[T]) SaveToStore(ctx context.Context) error { } } - key := c.storeKeyPrefix + hash - value := make([]byte, 16) - binary.LittleEndian.PutUint64(value[0:8], daHeight) - binary.LittleEndian.PutUint64(value[8:16], blockHeight) - - if err := c.store.SetMetadata(ctx, key, value); err != nil { + if err := c.store.SetMetadata(ctx, c.storeKey(hash), encodeDAInclusion(daHeight, blockHeight)); err != nil { return fmt.Errorf("failed to save DA inclusion for hash %s: %w", hash, err) } } @@ -262,8 +286,7 @@ func (c *Cache[T]) ClearFromStore(ctx context.Context, hashes []string) error { } for _, hash := range hashes { - key := c.storeKeyPrefix + hash - if err := c.store.DeleteMetadata(ctx, key); err != nil { + if err := c.store.DeleteMetadata(ctx, c.storeKey(hash)); err != nil { return fmt.Errorf("failed to delete DA inclusion for hash %s: %w", hash, err) } }