diff --git a/block/internal/syncing/block_syncer.go b/block/internal/syncing/block_syncer.go index e48dd4677..e65279a9d 100644 --- a/block/internal/syncing/block_syncer.go +++ b/block/internal/syncing/block_syncer.go @@ -21,5 +21,5 @@ type BlockSyncer interface { ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error // VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled. - VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error + VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error } diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 88f46aafe..248cd9af4 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -114,6 +114,7 @@ type Syncer struct { gracePeriodMultiplier *atomic.Pointer[float64] blockFullnessEMA *atomic.Pointer[float64] gracePeriodConfig forcedInclusionGracePeriodConfig + p2pHeightHints map[uint64]uint64 // map[height]daHeight // Lifecycle ctx context.Context @@ -183,6 +184,7 @@ func NewSyncer( gracePeriodMultiplier: gracePeriodMultiplier, blockFullnessEMA: blockFullnessEMA, gracePeriodConfig: newForcedInclusionGracePeriodConfig(), + p2pHeightHints: make(map[uint64]uint64), } s.blockSyncer = s if raftNode != nil && !reflect.ValueOf(raftNode).IsNil() { @@ -654,6 +656,7 @@ func (s *Syncer) processHeightEvent(ctx context.Context, event *common.DAHeightE Msg("P2P event with DA height hint, queuing priority DA retrieval") // Queue priority DA retrieval - will be processed in fetchDAUntilCaughtUp + s.p2pHeightHints[height] = daHeightHint s.daRetriever.QueuePriorityHeight(daHeightHint) } } @@ -733,13 +736,17 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve } // Verify forced inclusion transactions if configured - if event.Source == common.SourceDA { - if err := s.VerifyForcedInclusionTxs(ctx, currentState, data); err != nil { - s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") - if errors.Is(err, errMaliciousProposer) { - s.cache.RemoveHeaderDAIncluded(headerHash) - return err - } + currentDaHeight, ok := s.p2pHeightHints[nextHeight] + if !ok { + currentDaHeight = currentState.DAHeight + } else { + delete(s.p2pHeightHints, nextHeight) + } + if err := s.VerifyForcedInclusionTxs(ctx, currentDaHeight, data); err != nil { + s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") + if errors.Is(err, errMaliciousProposer) { + s.cache.RemoveHeaderDAIncluded(headerHash) + return err } } @@ -777,6 +784,22 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve return fmt.Errorf("failed to commit batch: %w", err) } + // Persist DA height mapping for blocks synced from DA + // This ensures consistency with the sequencer's submitter which also persists this mapping + // Note: P2P hints are already persisted via store_adapter.Append when items have DAHint set + // But DaHeight from events always take precedence as they are authoritative (comes from DA) + if event.DaHeight > 0 { + daHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(daHeightBytes, event.DaHeight) + + if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightHeaderKey(nextHeight), daHeightBytes); err != nil { + s.logger.Warn().Err(err).Uint64("height", nextHeight).Msg("failed to persist header DA height mapping") + } + if err := s.store.SetMetadata(ctx, store.GetHeightToDAHeightDataKey(nextHeight), daHeightBytes); err != nil { + s.logger.Warn().Err(err).Uint64("height", nextHeight).Msg("failed to persist data DA height mapping") + } + } + // Update in-memory state after successful commit s.SetLastState(newState) s.metrics.Height.Set(float64(newState.LastBlockHeight)) @@ -952,7 +975,7 @@ func (s *Syncer) getEffectiveGracePeriod() uint64 { // Note: Due to block size constraints (MaxBytes), sequencers may defer forced inclusion transactions // to future blocks (smoothing). This is legitimate behavior within an epoch. // However, ALL forced inclusion txs from an epoch MUST be included before the next epoch begins or grace boundary (whichever comes later). -func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { if s.fiRetriever == nil { return nil } @@ -962,7 +985,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type s.updateDynamicGracePeriod(blockFullness) // Retrieve forced inclusion transactions from DA for current epoch - forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentState.DAHeight) + forcedIncludedTxsEvent, err := s.fiRetriever.RetrieveForcedIncludedTxs(ctx, daHeight) if err != nil { if errors.Is(err, da.ErrForceInclusionNotConfigured) { s.logger.Debug().Msg("forced inclusion namespace not configured, skipping verification") @@ -1049,10 +1072,10 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type effectiveGracePeriod := s.getEffectiveGracePeriod() graceBoundary := pending.EpochEnd + (effectiveGracePeriod * s.genesis.DAEpochForcedInclusion) - if currentState.DAHeight > graceBoundary { + if daHeight > graceBoundary { maliciousTxs = append(maliciousTxs, pending) s.logger.Warn(). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Uint64("epoch_end", pending.EpochEnd). Uint64("grace_boundary", graceBoundary). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). @@ -1062,7 +1085,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type Msg("forced inclusion transaction past grace boundary - marking as malicious") } else { remainingPending = append(remainingPending, pending) - if currentState.DAHeight > pending.EpochEnd { + if daHeight > pending.EpochEnd { txsInGracePeriod++ } } @@ -1086,7 +1109,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type effectiveGracePeriod := s.getEffectiveGracePeriod() s.logger.Error(). Uint64("height", data.Height()). - Uint64("current_da_height", currentState.DAHeight). + Uint64("current_da_height", daHeight). Int("malicious_count", len(maliciousTxs)). Uint64("base_grace_periods", s.gracePeriodConfig.basePeriod). Uint64("effective_grace_periods", effectiveGracePeriod). @@ -1106,7 +1129,7 @@ func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, currentState type s.logger.Info(). Uint64("height", data.Height()). - Uint64("da_height", currentState.DAHeight). + Uint64("da_height", daHeight). Uint64("epoch_start", forcedIncludedTxsEvent.StartDaHeight). Uint64("epoch_end", forcedIncludedTxsEvent.EndDaHeight). Int("included_count", includedCount). diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 075b6f169..c8ea117e0 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -430,7 +430,7 @@ func TestVerifyForcedInclusionTxs_AllTransactionsIncluded(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since all forced txs are included - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data) require.NoError(t, err) } @@ -508,7 +508,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since forced tx blob may be legitimately deferred within the epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -521,7 +521,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data2 := makeData(gen.ChainID, 2, 1) data2.Txs[0] = []byte("regular_tx_3") - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data2) require.NoError(t, err) // Should pass since DAHeight=1 equals grace boundary, not past it // Mock DA for height 2 to return no forced inclusion transactions @@ -534,7 +534,7 @@ func TestVerifyForcedInclusionTxs_MissingTransactions(t *testing.T) { data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data3) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -615,7 +615,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since dataBin2 may be legitimately deferred within the epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data) require.NoError(t, err) // Mock DA for next epoch to return no forced inclusion transactions @@ -629,7 +629,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data2.Txs[0] = types.Tx([]byte("regular_tx_3")) // Verify - should pass since we're at the grace boundary, not past it - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data2) require.NoError(t, err) // Mock DA for height 2 (when we move to DAHeight 2) @@ -644,7 +644,7 @@ func TestVerifyForcedInclusionTxs_PartiallyIncluded(t *testing.T) { data3 := makeData(gen.ChainID, 3, 1) data3.Txs[0] = types.Tx([]byte("regular_tx_4")) - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data3) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data3) require.Error(t, err) require.Contains(t, err.Error(), "sequencer is malicious") require.Contains(t, err.Error(), "past grace boundary") @@ -717,7 +717,7 @@ func TestVerifyForcedInclusionTxs_NoForcedTransactions(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since no forced txs to verify - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data) require.NoError(t, err) } @@ -782,7 +782,7 @@ func TestVerifyForcedInclusionTxs_NamespaceNotConfigured(t *testing.T) { currentState.DAHeight = 0 // Verify - should pass since namespace not configured - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data) require.NoError(t, err) } @@ -871,7 +871,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { currentState.DAHeight = 104 // Verify - should pass since dataBin2 can be deferred within epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data1) require.NoError(t, err) // Verify that dataBin2 is now tracked as pending @@ -900,7 +900,7 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { data2.Txs[1] = types.Tx(dataBin2) // The deferred one we're waiting for // Verify - should pass since dataBin2 is now included and clears pending - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data2) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data2) require.NoError(t, err) // Verify that pending queue is now empty (dataBin2 was included) @@ -997,7 +997,7 @@ func TestVerifyForcedInclusionTxs_MaliciousAfterEpochEnd(t *testing.T) { currentState.DAHeight = 102 // Verify - should pass, tx can be deferred within epoch - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data1) require.NoError(t, err) } @@ -1093,6 +1093,235 @@ func TestVerifyForcedInclusionTxs_SmoothingExceedsEpoch(t *testing.T) { currentState := s.getLastState() currentState.DAHeight = 102 // At epoch end - err = s.VerifyForcedInclusionTxs(t.Context(), currentState, data1) + err = s.VerifyForcedInclusionTxs(t.Context(), currentState.DAHeight, data1) require.NoError(t, err, "smoothing within epoch should be allowed") } + +func TestVerifyForcedInclusionTxs_P2PBlocks(t *testing.T) { + t.Run("P2P block with all forced txs included passes verification", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "tchain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + DAStartHeight: 0, + DAEpochForcedInclusion: 1, + } + + cfg := config.DefaultConfig() + cfg.DA.ForcedInclusionNamespace = "nsForcedInclusion" + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain"). + Return([]byte("app0"), nil).Once() + setupFilterTxsMock(mockExec) + + client := testmocks.NewMockClient(t) + client.On("GetHeaderNamespace").Return([]byte(cfg.DA.Namespace)).Maybe() + client.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() + client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + client, + cm, + common.NopMetrics(), + cfg, + gen, + extmocks.NewMockStore[*types.P2PSignedHeader](t), + extmocks.NewMockStore[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + s.ctx = context.Background() + + // Initialize DA retriever and forced inclusion retriever + s.daRetriever = NewDARetriever(client, cm, gen, zerolog.Nop()) + s.fiRetriever = da.NewForcedInclusionRetriever(client, zerolog.Nop(), cfg, gen.DAStartHeight, gen.DAEpochForcedInclusion) + t.Cleanup(func() { s.fiRetriever.Stop() }) + + // Mock DA to return forced inclusion transactions at epoch 0 + forcedTxData, _ := makeSignedDataBytes(t, gen.ChainID, 10, addr, pub, signer, 2) + client.On("Retrieve", mock.Anything, uint64(0), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("fi1")}, Timestamp: time.Now()}, + Data: [][]byte{forcedTxData}, + }).Once() + + // Create block data that INCLUDES the forced transaction + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 1) + data.Txs[0] = types.Tx(forcedTxData) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + + // Mock ExecuteTxs for successful block execution + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). + Return([]byte("app1"), nil).Once() + + // Create P2P event with DA height hint matching the epoch (DA height 0) + evt := common.DAHeightEvent{ + Header: hdr, + Data: data, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{0, 0}, + } + + // Process the P2P block - should succeed with forced inclusion verification + s.processHeightEvent(context.Background(), &evt) + + // Verify no errors occurred + select { + case err := <-errChan: + t.Fatalf("unexpected error: %v", err) + default: + } + + // Verify block was processed + h, err := st.Height(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), h, "block should have been synced") + }) + + t.Run("P2P block missing forced txs triggers malicious detection after grace period", func(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + gen := genesis.Genesis{ + ChainID: "tchain", + InitialHeight: 1, + StartTime: time.Now().Add(-time.Second), + ProposerAddress: addr, + DAStartHeight: 0, + DAEpochForcedInclusion: 1, + } + + cfg := config.DefaultConfig() + cfg.DA.ForcedInclusionNamespace = "nsForcedInclusion" + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain"). + Return([]byte("app0"), nil).Once() + setupFilterTxsMock(mockExec) + + client := testmocks.NewMockClient(t) + client.On("GetHeaderNamespace").Return([]byte(cfg.DA.Namespace)).Maybe() + client.On("GetDataNamespace").Return([]byte(cfg.DA.DataNamespace)).Maybe() + client.On("GetForcedInclusionNamespace").Return([]byte(cfg.DA.ForcedInclusionNamespace)).Maybe() + client.On("HasForcedInclusionNamespace").Return(true).Maybe() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + client, + cm, + common.NopMetrics(), + cfg, + gen, + extmocks.NewMockStore[*types.P2PSignedHeader](t), + extmocks.NewMockStore[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + s.ctx = context.Background() + + // Initialize DA retriever and forced inclusion retriever + s.daRetriever = NewDARetriever(client, cm, gen, zerolog.Nop()) + s.fiRetriever = da.NewForcedInclusionRetriever(client, zerolog.Nop(), cfg, gen.DAStartHeight, gen.DAEpochForcedInclusion) + t.Cleanup(func() { s.fiRetriever.Stop() }) + + // Process first block successfully (within grace period) + // Mock DA to return forced inclusion transactions at epoch 0 + forcedTxData, _ := makeSignedDataBytes(t, gen.ChainID, 10, addr, pub, signer, 2) + client.On("Retrieve", mock.Anything, uint64(0), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("fi1")}, Timestamp: time.Now()}, + Data: [][]byte{forcedTxData}, + }).Once() + + lastState := s.getLastState() + data1 := makeData(gen.ChainID, 1, 1) + data1.Txs[0] = types.Tx([]byte("regular_tx")) + _, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data1, nil) + + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). + Return([]byte("app1"), nil).Once() + + evt1 := common.DAHeightEvent{ + Header: hdr1, + Data: data1, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{0, 0}, + } + + // First block processes fine (forced tx can be deferred within grace period) + s.processHeightEvent(context.Background(), &evt1) + + select { + case err := <-errChan: + t.Fatalf("unexpected error on first block: %v", err) + default: + } + + // Verify block was processed + h, err := st.Height(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), h) + + // Now process second block past grace boundary + // Mock DA for epoch 2 (past grace boundary) + client.On("Retrieve", mock.Anything, uint64(2), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound, Timestamp: time.Now()}, + }).Once() + + // Set the DA height hint to be past grace boundary (epoch 0 end + grace period of 1 epoch = boundary at 1, so 2 is past) + lastState = s.getLastState() + data2 := makeData(gen.ChainID, 2, 1) + data2.Txs[0] = types.Tx([]byte("regular_tx_2")) + _, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, lastState.AppHash, data2, hdr1.Hash()) + + evt2 := common.DAHeightEvent{ + Header: hdr2, + Data: data2, + Source: common.SourceP2P, + DaHeightHints: [2]uint64{2, 2}, // DA height 2 is past grace boundary + } + + // Second block should fail with malicious sequencer error + s.processHeightEvent(context.Background(), &evt2) + + // Verify critical error was sent + select { + case err := <-errChan: + require.Error(t, err) + require.Contains(t, err.Error(), "sequencer malicious", "should detect malicious sequencer") + case <-time.After(100 * time.Millisecond): + t.Fatal("expected malicious sequencer error to be sent to error channel") + } + + // Verify block 2 was NOT synced + h, err = st.Height(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(1), h, "block 2 should not have been synced due to malicious sequencer") + }) +} diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 5edec1cce..41c89e7a8 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -207,6 +207,77 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { assert.Equal(t, uint64(1), st1.LastBlockHeight) } +func TestSyncer_PersistsDAHeightMapping_WhenSyncingFromDA(t *testing.T) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + st := store.New(ds) + + cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + require.NoError(t, err) + + addr, pub, signer := buildSyncTestSigner(t) + + cfg := config.DefaultConfig() + gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} + + mockExec := testmocks.NewMockExecutor(t) + mockExec.EXPECT().InitChain(mock.Anything, mock.Anything, uint64(1), "tchain").Return([]byte("app0"), nil).Once() + + errChan := make(chan error, 1) + s := NewSyncer( + st, + mockExec, + nil, + cm, + common.NopMetrics(), + cfg, + gen, + extmocks.NewMockStore[*types.P2PSignedHeader](t), + extmocks.NewMockStore[*types.P2PData](t), + zerolog.Nop(), + common.DefaultBlockOptions(), + errChan, + nil, + ) + + require.NoError(t, s.initializeState()) + s.ctx = t.Context() + + // Create signed header & data for height 1 + lastState := s.getLastState() + data := makeData(gen.ChainID, 1, 0) + _, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil) + + // Expect ExecuteTxs call for height 1 + mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash). + Return([]byte("app1"), nil).Once() + + // Process event with DA source and a specific DA height + daHeight := uint64(42) + evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: daHeight, Source: common.SourceDA} + s.processHeightEvent(t.Context(), &evt) + + requireEmptyChan(t, errChan) + + // Verify block was synced + h, err := st.Height(t.Context()) + require.NoError(t, err) + assert.Equal(t, uint64(1), h) + + // Verify DA height mapping was persisted for header + headerDABytes, err := st.GetMetadata(t.Context(), store.GetHeightToDAHeightHeaderKey(1)) + require.NoError(t, err) + require.Len(t, headerDABytes, 8) + headerDAHeight := binary.LittleEndian.Uint64(headerDABytes) + assert.Equal(t, daHeight, headerDAHeight) + + // Verify DA height mapping was persisted for data + dataDABytes, err := st.GetMetadata(t.Context(), store.GetHeightToDAHeightDataKey(1)) + require.NoError(t, err) + require.Len(t, dataDABytes, 8) + dataDAHeight := binary.LittleEndian.Uint64(dataDABytes) + assert.Equal(t, daHeight, dataDAHeight) +} + func TestSequentialBlockSync(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) diff --git a/block/internal/syncing/tracing.go b/block/internal/syncing/tracing.go index bc4326366..1877886d3 100644 --- a/block/internal/syncing/tracing.go +++ b/block/internal/syncing/tracing.go @@ -85,16 +85,16 @@ func (t *tracedBlockSyncer) ValidateBlock(ctx context.Context, currState types.S return err } -func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (t *tracedBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { ctx, span := t.tracer.Start(ctx, "BlockSyncer.VerifyForcedInclusion", trace.WithAttributes( attribute.Int64("block.height", int64(data.Height())), - attribute.Int64("da.height", int64(currentState.DAHeight)), + attribute.Int64("da.height", int64(daHeight)), ), ) defer span.End() - err := t.inner.VerifyForcedInclusionTxs(ctx, currentState, data) + err := t.inner.VerifyForcedInclusionTxs(ctx, daHeight, data) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/block/internal/syncing/tracing_test.go b/block/internal/syncing/tracing_test.go index 679f3f7a3..b49235871 100644 --- a/block/internal/syncing/tracing_test.go +++ b/block/internal/syncing/tracing_test.go @@ -21,7 +21,7 @@ type mockBlockSyncer struct { trySyncNextBlockFn func(ctx context.Context, event *common.DAHeightEvent) error applyBlockFn func(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error) validateBlockFn func(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error - verifyForcedInclusionFn func(ctx context.Context, currentState types.State, data *types.Data) error + verifyForcedInclusionFn func(ctx context.Context, daHeight uint64, data *types.Data) error } func (m *mockBlockSyncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error { @@ -45,9 +45,9 @@ func (m *mockBlockSyncer) ValidateBlock(ctx context.Context, currState types.Sta return nil } -func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, currentState types.State, data *types.Data) error { +func (m *mockBlockSyncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error { if m.verifyForcedInclusionFn != nil { - return m.verifyForcedInclusionFn(ctx, currentState, data) + return m.verifyForcedInclusionFn(ctx, daHeight, data) } return nil } @@ -248,7 +248,7 @@ func TestTracedBlockSyncer_ValidateBlock_Error(t *testing.T) { func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { mock := &mockBlockSyncer{ - verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + verifyForcedInclusionFn: func(ctx context.Context, daHeight uint64, data *types.Data) error { return nil }, } @@ -260,11 +260,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { Height: 100, }, } - state := types.State{ - DAHeight: 50, - } - err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + err := syncer.VerifyForcedInclusionTxs(ctx, 50, data) require.NoError(t, err) spans := sr.Ended() @@ -280,7 +277,7 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Success(t *testing.T) { func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { mock := &mockBlockSyncer{ - verifyForcedInclusionFn: func(ctx context.Context, currentState types.State, data *types.Data) error { + verifyForcedInclusionFn: func(ctx context.Context, daHeight uint64, data *types.Data) error { return errors.New("forced inclusion verification failed") }, } @@ -292,11 +289,8 @@ func TestTracedBlockSyncer_VerifyForcedInclusionTxs_Error(t *testing.T) { Height: 100, }, } - state := types.State{ - DAHeight: 50, - } - err := syncer.VerifyForcedInclusionTxs(ctx, state, data) + err := syncer.VerifyForcedInclusionTxs(ctx, 50, data) require.Error(t, err) spans := sr.Ended() diff --git a/test/e2e/evm_force_inclusion_e2e_test.go b/test/e2e/evm_force_inclusion_e2e_test.go index ceabc78c1..17f06c3b0 100644 --- a/test/e2e/evm_force_inclusion_e2e_test.go +++ b/test/e2e/evm_force_inclusion_e2e_test.go @@ -400,8 +400,6 @@ func setupFullNodeWithForceInclusionCheck(t *testing.T, sut *SystemUnderTest, fu // Note: This test simulates the scenario by having the sequencer configured to // listen to the wrong namespace, while we submit directly to the correct namespace. func TestEvmSyncerMaliciousSequencerForceInclusionE2E(t *testing.T) { - t.Skip() // Unskip once https://github.com/evstack/ev-node/pull/2963 is merged - sut := NewSystemUnderTest(t) workDir := t.TempDir() sequencerHome := filepath.Join(workDir, "sequencer")