From d15b2eddfa6babad23ff9386e2c2bacb7bce390f Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 13 Nov 2025 18:58:09 +0000 Subject: [PATCH 01/18] fetch attestations on block event --- app/app.go | 1 + app/featureset/config.go | 12 +-- app/featureset/featureset.go | 5 ++ app/sse/listener.go | 22 +++++ app/sse/listener_internal_test.go | 50 +++++++++++ core/scheduler/scheduler.go | 142 +++++++++++++++++++++++------- core/scheduler/scheduler_test.go | 82 +++++++++++++++++ 7 files changed, 277 insertions(+), 37 deletions(-) diff --git a/app/app.go b/app/app.go index d5f11e83bd..634f729a91 100644 --- a/app/app.go +++ b/app/app.go @@ -477,6 +477,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent) + sseListener.SubscribeBlockEvent(sched.HandleBlockEvent) feeRecipientFunc := func(pubkey core.PubKey) string { return feeRecipientAddrByCorePubkey[pubkey] diff --git a/app/featureset/config.go b/app/featureset/config.go index 95f2957b51..e6367f7eb6 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -117,14 +117,14 @@ func EnableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() - cache := state[feature] + state[feature] = enable t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) - - state[feature] = enable } // DisableForT disables a feature for testing. @@ -133,12 +133,12 @@ func DisableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() - cache := state[feature] + state[feature] = disable t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) - - state[feature] = disable } diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index dabc3422f7..f9c13e2271 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -70,6 +70,10 @@ const ( // ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation. // In case they differ, Charon does not sign the attestation. ChainSplitHalt = "chain_split_halt" + + // FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE. + // Fallback to T=1/3+300ms if block event is not received in time. + FetchAttOnBlock = "fetch_att_on_block" ) var ( @@ -88,6 +92,7 @@ var ( QUIC: statusAlpha, FetchOnlyCommIdx0: statusAlpha, ChainSplitHalt: statusAlpha, + FetchAttOnBlock: statusAlpha, // Add all features and their status here. } diff --git a/app/sse/listener.go b/app/sse/listener.go index a16afc37f9..5b993054d2 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -21,15 +21,18 @@ import ( ) type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) +type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) + SubscribeBlockEvent(BlockEventHandlerFunc) } type listener struct { sync.Mutex chainReorgSubs []ChainReorgEventHandlerFunc + blockSubs []BlockEventHandlerFunc lastReorgEpoch eth2p0.Epoch // immutable fields @@ -55,6 +58,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), genesisTime: genesisTime, slotDuration: slotDuration, slotsPerEpoch: slotsPerEpoch, @@ -94,6 +98,13 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc) p.chainReorgSubs = append(p.chainReorgSubs, handler) } +func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) { + p.Lock() + defer p.Unlock() + + p.blockSubs = append(p.blockSubs, handler) +} + func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error { switch event.Event { case sseHeadEvent: @@ -247,6 +258,8 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds()) + p.notifyBlockEvent(ctx, eth2p0.Slot(slot)) + return nil } @@ -264,6 +277,15 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) { } } +func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot) { + p.Lock() + defer p.Unlock() + + for _, sub := range p.blockSubs { + sub(ctx, slot) + } +} + // computeDelay computes the delay between start of the slot and receiving the event. func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) { slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration) diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 1f03f7a9ac..8ecbe8338a 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -83,12 +83,40 @@ func TestHandleEvents(t *testing.T) { }, err: errors.New("parse depth to uint64"), }, + { + name: "block event happy path", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"42", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: nil, + }, + { + name: "block event incompatible data payload", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`"error"`), + Timestamp: time.Now(), + }, + err: errors.New("unmarshal SSE block event"), + }, + { + name: "block event parse slot", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"invalid", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: errors.New("parse slot to uint64"), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), slotDuration: 12 * time.Second, slotsPerEpoch: 32, genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC), @@ -133,6 +161,28 @@ func TestSubscribeNotifyChainReorg(t *testing.T) { require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1]) } +func TestSubscribeNotifyBlockEvent(t *testing.T) { + ctx := t.Context() + l := &listener{ + blockSubs: make([]BlockEventHandlerFunc, 0), + } + + reportedSlots := make([]eth2p0.Slot, 0) + + l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot) { + reportedSlots = append(reportedSlots, slot) + }) + + l.notifyBlockEvent(ctx, eth2p0.Slot(100)) + l.notifyBlockEvent(ctx, eth2p0.Slot(100)) // Duplicate should be reported (no dedup for block events) + l.notifyBlockEvent(ctx, eth2p0.Slot(101)) + + require.Len(t, reportedSlots, 3) + require.Equal(t, eth2p0.Slot(100), reportedSlots[0]) + require.Equal(t, eth2p0.Slot(100), reportedSlots[1]) + require.Equal(t, eth2p0.Slot(101), reportedSlots[2]) +} + func TestComputeDelay(t *testing.T) { genesisTimeString := "2020-12-01T12:00:23+00:00" genesisTime, err := time.Parse(time.RFC3339, genesisTimeString) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 91cb96bc70..1cdf6fcdb3 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -81,13 +81,14 @@ func New(builderRegistrations []cluster.BuilderRegistration, eth2Cl eth2wrap.Cli } return &Scheduler{ - eth2Cl: eth2Cl, - builderRegistrations: registrations, - quit: make(chan struct{}), - duties: make(map[core.Duty]core.DutyDefinitionSet), - dutiesByEpoch: make(map[uint64][]core.Duty), - epochResolved: make(map[uint64]chan struct{}), - clock: clockwork.NewRealClock(), + eth2Cl: eth2Cl, + builderRegistrations: registrations, + quit: make(chan struct{}), + duties: make(map[core.Duty]core.DutyDefinitionSet), + dutiesByEpoch: make(map[uint64][]core.Duty), + epochResolved: make(map[uint64]chan struct{}), + eventTriggeredAttestations: make(map[uint64]bool), + clock: clockwork.NewRealClock(), delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time { return time.After(time.Until(deadline)) }, @@ -116,6 +117,8 @@ type Scheduler struct { builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution + eventTriggeredAttestations map[uint64]bool // Track attestation duties triggered via sse block event + eventTriggeredMutex sync.Mutex } // SubscribeDuties subscribes a callback function for triggered duties. @@ -185,6 +188,71 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } +// markAttestationEventTriggered checks if an attestation at this slot was already triggered and marks it as triggered. +// Returns true if the attestation was already triggered before. +func (s *Scheduler) markAttestationEventTriggered(slot uint64) bool { + s.eventTriggeredMutex.Lock() + defer s.eventTriggeredMutex.Unlock() + if s.eventTriggeredAttestations[slot] { + return true + } + s.eventTriggeredAttestations[slot] = true + return false +} + +// triggerDuty triggers all duty subscribers with the provided duty and definition set. +func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) { + instrumentDuty(duty, defSet) + + dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) + if duty.Type == core.DutyProposer { + var span trace.Span + dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") + defer span.End() + } + + for _, sub := range s.dutySubs { + clone, err := defSet.Clone() + if err != nil { + log.Error(dutyCtx, "Failed to clone duty definition set", err) + return + } + + if err := sub(dutyCtx, duty, clone); err != nil { + log.Error(dutyCtx, "Failed to trigger duty subscriber", err) + } + } +} + +// HandleBlockEvent handles block processing events from SSE and triggers early attestation data fetching. +func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { + if !featureset.Enabled(featureset.FetchAttOnBlock) { + return + } + + if s.markAttestationEventTriggered(uint64(slot)) { + return + } + + duty := core.Duty{ + Slot: uint64(slot), + Type: core.DutyAttester, + } + defSet, ok := s.getDutyDefinitionSet(duty) + if !ok { + // No attester duties for this slot, ignore + log.Debug(ctx, "No attester duties for slot, skipping early fetch", + z.U64("slot", uint64(slot))) + return + } + + log.Debug(ctx, "Early attestation fetch triggered by SSE block event", + z.U64("slot", uint64(slot))) + + // Trigger duty immediately (early fetch) + go s.triggerDuty(ctx, duty, defSet) +} + // emitCoreSlot calls all slot subscriptions asynchronously with the provided slot. func (s *Scheduler) emitCoreSlot(ctx context.Context, slot core.Slot) { for _, sub := range s.slotSubs { @@ -276,33 +344,23 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { } // Trigger duty async - go func() { - if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { - return // context cancelled - } - - instrumentDuty(duty, defSet) - - dutyCtx := log.WithCtx(ctx, z.Any("duty", duty)) - if duty.Type == core.DutyProposer { - var span trace.Span - - dutyCtx, span = core.StartDutyTrace(dutyCtx, duty, "core/scheduler.scheduleSlot") - defer span.End() - } - - for _, sub := range s.dutySubs { - clone, err := defSet.Clone() // Clone for each subscriber. - if err != nil { - log.Error(dutyCtx, "Failed to clone duty definition set", err) - return + go func(duty core.Duty, defSet core.DutyDefinitionSet) { + // Special handling for attester duties when FetchAttOnBlock is enabled + if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { + if !s.waitForBlockEventOrTimeout(ctx, slot, duty) { + return // context cancelled } - - if err := sub(dutyCtx, duty, clone); err != nil { - log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot)) + if s.markAttestationEventTriggered(duty.Slot) { + return // already triggered via block event + } + } else { + if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { + return // context cancelled } } - }() + + s.triggerDuty(ctx, duty, defSet) + }(duty, defSet) } if slot.LastInEpoch() { @@ -333,6 +391,28 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } +// waitForBlockEventOrTimeout waits for attestation duty with timeout fallback. +// Returns immediately if the duty was already triggered via block event. +// Otherwise waits until T=1/3 + 300ms (fallback timeout). +func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot, duty core.Duty) bool { + // Calculate fallback timeout: 1/3 + 300ms + fn, ok := slotOffsets[core.DutyAttester] + if !ok { + return true + } + offset := fn(slot.SlotDuration) + 300*time.Millisecond + fallbackDeadline := slot.Time.Add(offset) + + select { + case <-ctx.Done(): + return false + case <-s.clock.After(time.Until(fallbackDeadline)): + log.Debug(ctx, "Fallback timeout reached for attestation, no block event received, possibly fetching stale head", + z.U64("slot", slot.Slot)) + return true + } +} + // resolveDuties resolves the duties for the slot's epoch, caching the results. func (s *Scheduler) resolveDuties(ctx context.Context, slot core.Slot) error { s.setResolvingEpoch(slot.Epoch()) diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 230e1b53bb..d69af60994 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -506,6 +506,88 @@ func TestHandleChainReorgEvent(t *testing.T) { require.NoError(t, <-doneCh) } +func TestHandleBlockEvent(t *testing.T) { + var ( + t0 time.Time + valSet = beaconmock.ValidatorSetA + ) + + featureset.EnableForT(t, featureset.FetchAttOnBlock) + + // Configure beacon mock. + eth2Cl, err := beaconmock.New( + t.Context(), + beaconmock.WithValidatorSet(valSet), + beaconmock.WithGenesisTime(t0), + beaconmock.WithDeterministicAttesterDuties(1), + beaconmock.WithSlotsPerEpoch(4), + ) + require.NoError(t, err) + + // Construct scheduler. + schedSlotCh := make(chan core.Slot) + schedSlotFunc := func(ctx context.Context, slot core.Slot) { + select { + case <-ctx.Done(): + return + case schedSlotCh <- slot: + } + } + clock := newTestClock(t0) + dd := new(delayer) + valRegs := beaconmock.BuilderRegistrationSetA + sched := scheduler.NewForT(t, clock, dd.delay, valRegs, eth2Cl, schedSlotFunc, false) + + // Track triggered duties + var triggeredDuties []core.Duty + var dutyMux sync.Mutex + + sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { + dutyMux.Lock() + defer dutyMux.Unlock() + triggeredDuties = append(triggeredDuties, duty) + return nil + }) + + doneCh := make(chan error, 1) + + go func() { + doneCh <- sched.Run() + close(schedSlotCh) + }() + + for slot := range schedSlotCh { + clock.Pause() + + switch slot.Slot { + case 1: + // Trigger block event for slot 1 (should trigger attester duty for slot 1 early) + sched.HandleBlockEvent(t.Context(), 1) + + // Give a moment for async trigger + time.Sleep(50 * time.Millisecond) + + dutyMux.Lock() + hasAttester1 := false + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 1 { + hasAttester1 = true + break + } + } + dutyMux.Unlock() + + require.True(t, hasAttester1, "Attester duty for slot 1 should be triggered early by block event for slot 1") + + sched.Stop() + } + + clock.Resume() + } + + require.NoError(t, <-doneCh) +} + func TestSubmitValidatorRegistrations(t *testing.T) { // The test uses hard-coded validator registrations from beaconmock.BuilderRegistrationSetA. // The scheduler advances through 3 epochs to ensure it triggers the registration submission. From 45051b8232ce525ae1938ac8cf20e92545ea3d9b Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Fri, 14 Nov 2025 13:30:31 +0000 Subject: [PATCH 02/18] fix golangci lint --- core/scheduler/scheduler.go | 17 ++++++----------- core/scheduler/scheduler_test.go | 3 +-- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 1cdf6fcdb3..49fd2b4b8b 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -240,14 +240,11 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { } defSet, ok := s.getDutyDefinitionSet(duty) if !ok { - // No attester duties for this slot, ignore - log.Debug(ctx, "No attester duties for slot, skipping early fetch", - z.U64("slot", uint64(slot))) + // Nothing for this duty. return } - log.Debug(ctx, "Early attestation fetch triggered by SSE block event", - z.U64("slot", uint64(slot))) + log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot))) // Trigger duty immediately (early fetch) go s.triggerDuty(ctx, duty, defSet) @@ -347,16 +344,14 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { go func(duty core.Duty, defSet core.DutyDefinitionSet) { // Special handling for attester duties when FetchAttOnBlock is enabled if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { - if !s.waitForBlockEventOrTimeout(ctx, slot, duty) { + if !s.waitForBlockEventOrTimeout(ctx, slot) { return // context cancelled } if s.markAttestationEventTriggered(duty.Slot) { return // already triggered via block event } - } else { - if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { - return // context cancelled - } + } else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { + return // context cancelled } s.triggerDuty(ctx, duty, defSet) @@ -394,7 +389,7 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF // waitForBlockEventOrTimeout waits for attestation duty with timeout fallback. // Returns immediately if the duty was already triggered via block event. // Otherwise waits until T=1/3 + 300ms (fallback timeout). -func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot, duty core.Duty) bool { +func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { // Calculate fallback timeout: 1/3 + 300ms fn, ok := slotOffsets[core.DutyAttester] if !ok { diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index d69af60994..969a40367d 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -559,8 +559,7 @@ func TestHandleBlockEvent(t *testing.T) { for slot := range schedSlotCh { clock.Pause() - switch slot.Slot { - case 1: + if slot.Slot == 1 { // Trigger block event for slot 1 (should trigger attester duty for slot 1 early) sched.HandleBlockEvent(t.Context(), 1) From 168559f762ac69244d1ddd440b455c963f38f827 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 17 Nov 2025 14:42:43 +0000 Subject: [PATCH 03/18] update eventtriggeredattestations to use sync.map --- core/scheduler/scheduler.go | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 49fd2b4b8b..548e8c3a30 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -81,14 +81,13 @@ func New(builderRegistrations []cluster.BuilderRegistration, eth2Cl eth2wrap.Cli } return &Scheduler{ - eth2Cl: eth2Cl, - builderRegistrations: registrations, - quit: make(chan struct{}), - duties: make(map[core.Duty]core.DutyDefinitionSet), - dutiesByEpoch: make(map[uint64][]core.Duty), - epochResolved: make(map[uint64]chan struct{}), - eventTriggeredAttestations: make(map[uint64]bool), - clock: clockwork.NewRealClock(), + eth2Cl: eth2Cl, + builderRegistrations: registrations, + quit: make(chan struct{}), + duties: make(map[core.Duty]core.DutyDefinitionSet), + dutiesByEpoch: make(map[uint64][]core.Duty), + epochResolved: make(map[uint64]chan struct{}), + clock: clockwork.NewRealClock(), delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time { return time.After(time.Until(deadline)) }, @@ -117,8 +116,7 @@ type Scheduler struct { builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution - eventTriggeredAttestations map[uint64]bool // Track attestation duties triggered via sse block event - eventTriggeredMutex sync.Mutex + eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse block event (map[uint64]bool) } // SubscribeDuties subscribes a callback function for triggered duties. @@ -188,18 +186,6 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } -// markAttestationEventTriggered checks if an attestation at this slot was already triggered and marks it as triggered. -// Returns true if the attestation was already triggered before. -func (s *Scheduler) markAttestationEventTriggered(slot uint64) bool { - s.eventTriggeredMutex.Lock() - defer s.eventTriggeredMutex.Unlock() - if s.eventTriggeredAttestations[slot] { - return true - } - s.eventTriggeredAttestations[slot] = true - return false -} - // triggerDuty triggers all duty subscribers with the provided duty and definition set. func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) { instrumentDuty(duty, defSet) @@ -230,7 +216,8 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { return } - if s.markAttestationEventTriggered(uint64(slot)) { + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) + if alreadyTriggered { return } @@ -347,7 +334,8 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { if !s.waitForBlockEventOrTimeout(ctx, slot) { return // context cancelled } - if s.markAttestationEventTriggered(duty.Slot) { + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(slot.Slot, true) + if alreadyTriggered { return // already triggered via block event } } else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { From 498162789c2fb493d9c2405bd9ff315677903f19 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 17 Nov 2025 16:23:12 +0000 Subject: [PATCH 04/18] more tests and trim eventTriggeredAttestations map --- core/scheduler/scheduler.go | 41 ++++++++++++++++----- core/scheduler/scheduler_test.go | 62 ++++++++++++++++++++++---------- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 548e8c3a30..f1ca88be46 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -216,18 +216,18 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { return } - _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) - if alreadyTriggered { - return - } - duty := core.Duty{ Slot: uint64(slot), Type: core.DutyAttester, } defSet, ok := s.getDutyDefinitionSet(duty) if !ok { - // Nothing for this duty. + // Nothing for this duty + return + } + + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) + if alreadyTriggered { return } @@ -374,9 +374,8 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } -// waitForBlockEventOrTimeout waits for attestation duty with timeout fallback. -// Returns immediately if the duty was already triggered via block event. -// Otherwise waits until T=1/3 + 300ms (fallback timeout). +// waitForBlockEventOrTimeout waits until the fallback timeout (T=1/3 + 300ms) is reached. +// Returns false if the context is cancelled, true otherwise. func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { // Calculate fallback timeout: 1/3 + 300ms fn, ok := slotOffsets[core.DutyAttester] @@ -753,6 +752,30 @@ func (s *Scheduler) trimDuties(epoch uint64) { } delete(s.dutiesByEpoch, epoch) + + if featureset.Enabled(featureset.FetchAttOnBlock) { + s.trimEventTriggeredAttestations(epoch) + } +} + +// trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. +func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) + if err != nil { + return + } + + minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch + s.eventTriggeredAttestations.Range(func(key, value interface{}) bool { + slot := key.(uint64) + if slot < minSlotToKeep { + s.eventTriggeredAttestations.Delete(slot) + } + return true // continue iteration + }) } // submitValidatorRegistrations submits the validator registrations for all DVs. diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 969a40367d..adaf1a6af0 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -506,7 +506,7 @@ func TestHandleChainReorgEvent(t *testing.T) { require.NoError(t, <-doneCh) } -func TestHandleBlockEvent(t *testing.T) { +func TestFetchAttOnBlock(t *testing.T) { var ( t0 time.Time valSet = beaconmock.ValidatorSetA @@ -519,7 +519,7 @@ func TestHandleBlockEvent(t *testing.T) { t.Context(), beaconmock.WithValidatorSet(valSet), beaconmock.WithGenesisTime(t0), - beaconmock.WithDeterministicAttesterDuties(1), + beaconmock.WithDeterministicAttesterDuties(1), // Duties in slots 0, 1, 2 beaconmock.WithSlotsPerEpoch(4), ) require.NoError(t, err) @@ -557,31 +557,55 @@ func TestHandleBlockEvent(t *testing.T) { }() for slot := range schedSlotCh { - clock.Pause() + if slot.Slot == 0 { + // Test case 1: Happy path - single block event triggers early + sched.HandleBlockEvent(t.Context(), 0) + + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 0 { + return true + } + } + return false + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 0 should be triggered by block event") + } if slot.Slot == 1 { - // Trigger block event for slot 1 (should trigger attester duty for slot 1 early) + // Test case 2: Deduplication - two block events should only trigger once sched.HandleBlockEvent(t.Context(), 1) - - // Give a moment for async trigger - time.Sleep(50 * time.Millisecond) - - dutyMux.Lock() - hasAttester1 := false - for _, d := range triggeredDuties { - if d.Type == core.DutyAttester && d.Slot == 1 { - hasAttester1 = true - break + sched.HandleBlockEvent(t.Context(), 1) // Duplicate + + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + count := 0 + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 1 { + count++ + } } - } - dutyMux.Unlock() + return count == 1 + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 1 should only be triggered once despite duplicate block events") + } - require.True(t, hasAttester1, "Attester duty for slot 1 should be triggered early by block event for slot 1") + if slot.Slot == 2 { + // Test case 3: Fallback - no block event, timeout should trigger + require.Eventually(t, func() bool { + dutyMux.Lock() + defer dutyMux.Unlock() + for _, d := range triggeredDuties { + if d.Type == core.DutyAttester && d.Slot == 2 { + return true + } + } + return false + }, 2*time.Second, 50*time.Millisecond, "Attester duty for slot 2 should be triggered by fallback timeout") sched.Stop() } - - clock.Resume() } require.NoError(t, <-doneCh) From 136ac08ae47eaa0818d941054ce5b1ab339595e3 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 17 Nov 2025 16:38:14 +0000 Subject: [PATCH 05/18] fix linting --- core/scheduler/scheduler.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index f1ca88be46..bc97be3373 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -769,8 +769,11 @@ func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { } minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch - s.eventTriggeredAttestations.Range(func(key, value interface{}) bool { - slot := key.(uint64) + s.eventTriggeredAttestations.Range(func(key, _ any) bool { + slot, ok := key.(uint64) + if !ok { + return true // continue iteration + } if slot < minSlotToKeep { s.eventTriggeredAttestations.Delete(slot) } From 14add244938155c83f2d9d43c3865db9936c11cd Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Mon, 24 Nov 2025 15:33:26 +0000 Subject: [PATCH 06/18] Remove unnecessary context timeout --- core/scheduler/scheduler.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index bc97be3373..2c468ade87 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -760,10 +760,7 @@ func (s *Scheduler) trimDuties(epoch uint64) { // trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(context.Background(), s.eth2Cl) if err != nil { return } From e501359f888eca2df21a35775b475c26e249c540 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Wed, 14 Jan 2026 14:27:58 +0000 Subject: [PATCH 07/18] add per bn fetch --- app/eth2wrap/eth2wrap_gen.go | 2 + app/eth2wrap/httpwrap.go | 5 ++ app/eth2wrap/lazy.go | 10 ++++ app/eth2wrap/mocks/client.go | 20 +++++++ app/eth2wrap/multi.go | 33 +++++++++++ app/sse/listener.go | 8 +-- app/sse/listener_internal_test.go | 8 +-- core/fetcher/fetcher.go | 95 ++++++++++++++++++++++++++++++- core/interfaces.go | 11 ++++ core/scheduler/scheduler.go | 35 ++++++++---- core/scheduler/scheduler_test.go | 24 +++++--- testutil/beaconmock/beaconmock.go | 5 ++ 12 files changed, 227 insertions(+), 29 deletions(-) diff --git a/app/eth2wrap/eth2wrap_gen.go b/app/eth2wrap/eth2wrap_gen.go index 87f4b20381..1d56f70c14 100644 --- a/app/eth2wrap/eth2wrap_gen.go +++ b/app/eth2wrap/eth2wrap_gen.go @@ -26,6 +26,8 @@ type Client interface { SetForkVersion(forkVersion [4]byte) + ClientForAddress(addr string) Client + eth2client.AggregateAttestationProvider eth2client.AggregateAttestationsSubmitter eth2client.AttestationDataProvider diff --git a/app/eth2wrap/httpwrap.go b/app/eth2wrap/httpwrap.go index 7e6cc1e88e..f0bb0f5d28 100644 --- a/app/eth2wrap/httpwrap.go +++ b/app/eth2wrap/httpwrap.go @@ -135,3 +135,8 @@ func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Respo log.Debug(ctx, "Proxying request to beacon node", z.Any("url", h.address)) return h.Service.Proxy(ctx, req) } + +// ClientForAddress returns the same client since httpAdapter wraps a single address. +func (h *httpAdapter) ClientForAddress(addr string) Client { + return h +} diff --git a/app/eth2wrap/lazy.go b/app/eth2wrap/lazy.go index 3299254abe..0d33a2f0b4 100644 --- a/app/eth2wrap/lazy.go +++ b/app/eth2wrap/lazy.go @@ -120,6 +120,16 @@ func (l *lazy) Address() string { return cl.Address() } +// ClientForAddress returns a scoped client that queries only the specified address. +func (l *lazy) ClientForAddress(addr string) Client { + cl, ok := l.getClient() + if !ok { + return l + } + + return cl.ClientForAddress(addr) +} + func (l *lazy) IsActive() bool { cl, ok := l.getClient() if !ok { diff --git a/app/eth2wrap/mocks/client.go b/app/eth2wrap/mocks/client.go index 62a8372fc9..717640a99b 100644 --- a/app/eth2wrap/mocks/client.go +++ b/app/eth2wrap/mocks/client.go @@ -77,6 +77,26 @@ func (_m *Client) Address() string { return r0 } +// ClientForAddress provides a mock function with given fields: addr +func (_m *Client) ClientForAddress(addr string) eth2wrap.Client { + ret := _m.Called(addr) + + if len(ret) == 0 { + panic("no return value specified for ClientForAddress") + } + + var r0 eth2wrap.Client + if rf, ok := ret.Get(0).(func(string) eth2wrap.Client); ok { + r0 = rf(addr) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(eth2wrap.Client) + } + } + + return r0 +} + // AggregateAttestation provides a mock function with given fields: ctx, opts func (_m *Client) AggregateAttestation(ctx context.Context, opts *api.AggregateAttestationOpts) (*api.Response[*spec.VersionedAttestation], error) { ret := _m.Called(ctx, opts) diff --git a/app/eth2wrap/multi.go b/app/eth2wrap/multi.go index 3435f1bf7e..eda3af9971 100644 --- a/app/eth2wrap/multi.go +++ b/app/eth2wrap/multi.go @@ -59,6 +59,39 @@ func (m multi) Address() string { return address } +// ClientForAddress returns a scoped multi client that only queries the specified address. +// Returns the original multi client if the address is not found or is empty. +func (m multi) ClientForAddress(addr string) Client { + if addr == "" { + return m + } + + // Find client matching the address + for _, cl := range m.clients { + if cl.Address() == addr { + return multi{ + clients: []Client{cl}, + fallbacks: m.fallbacks, + selector: m.selector, + } + } + } + + // Address not found in clients, check fallbacks + for _, cl := range m.fallbacks { + if cl.Address() == addr { + return multi{ + clients: []Client{cl}, + fallbacks: nil, + selector: m.selector, + } + } + } + + // Address not found, return original multi client + return m +} + func (m multi) IsActive() bool { for _, cl := range m.clients { if cl.IsActive() { diff --git a/app/sse/listener.go b/app/sse/listener.go index 5b993054d2..c7fd67e5d4 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -21,7 +21,7 @@ import ( ) type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) -type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot) +type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) @@ -258,7 +258,7 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds()) - p.notifyBlockEvent(ctx, eth2p0.Slot(slot)) + p.notifyBlockEvent(ctx, eth2p0.Slot(slot), addr) return nil } @@ -277,12 +277,12 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) { } } -func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot) { +func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { p.Lock() defer p.Unlock() for _, sub := range p.blockSubs { - sub(ctx, slot) + sub(ctx, slot, bnAddr) } } diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 8ecbe8338a..650ea1a21f 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -169,13 +169,13 @@ func TestSubscribeNotifyBlockEvent(t *testing.T) { reportedSlots := make([]eth2p0.Slot, 0) - l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot) { + l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot, bnAddr string) { reportedSlots = append(reportedSlots, slot) }) - l.notifyBlockEvent(ctx, eth2p0.Slot(100)) - l.notifyBlockEvent(ctx, eth2p0.Slot(100)) // Duplicate should be reported (no dedup for block events) - l.notifyBlockEvent(ctx, eth2p0.Slot(101)) + l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") + l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") // Duplicate should be reported (no dedup for block events) + l.notifyBlockEvent(ctx, eth2p0.Slot(101), "http://test-bn:5052") require.Len(t, reportedSlots, 3) require.Equal(t, eth2p0.Slot(100), reportedSlots[0]) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 257b9093fa..ed82bf3911 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "strings" + "sync" eth2api "github.com/attestantio/go-eth2-client/api" eth2spec "github.com/attestantio/go-eth2-client/spec" @@ -43,6 +44,7 @@ type Fetcher struct { graffitiBuilder *GraffitiBuilder electraSlot eth2p0.Slot fetchOnlyCommIdx0 bool + attDataCache sync.Map // Cache for early-fetched attestation data (map[uint64]core.UnsignedDataSet) } // Subscribe registers a callback for fetched duties. @@ -51,6 +53,24 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat f.subs = append(f.subs, fn) } +// FetchOnly fetches attestation data and caches it without triggering subscribers. +// This allows early fetching on block events while deferring consensus to the scheduled time. +func (f *Fetcher) FetchOnly(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet, bnAddr string) error { + if duty.Type != core.DutyAttester { + return errors.New("unsupported duty", z.Str("type", duty.Type.String())) + } + + unsignedSet, err := f.fetchAttesterDataFrom(ctx, duty.Slot, defSet, bnAddr) + if err != nil { + return errors.Wrap(err, "fetch attester data for early cache") + } + + f.attDataCache.Store(duty.Slot, unsignedSet) + log.Debug(ctx, "Early attestation data fetched and cached", z.U64("slot", duty.Slot), z.Str("bn_addr", bnAddr)) + + return nil +} + // Fetch triggers fetching of a proposed duty data set. func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error { var ( @@ -65,9 +85,18 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef return errors.Wrap(err, "fetch proposer data") } case core.DutyAttester: - unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) - if err != nil { - return errors.Wrap(err, "fetch attester data") + // Check if attestation data was already fetched early and cached + if cached, ok := f.attDataCache.Load(duty.Slot); ok { + if data, valid := cached.(core.UnsignedDataSet); valid { + unsignedSet = data + log.Debug(ctx, "Using early-fetched attestation data from cache", z.U64("slot", duty.Slot)) + } + f.attDataCache.Delete(duty.Slot) // Remove from cache after use + } else { + unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) + if err != nil { + return errors.Wrap(err, "fetch attester data") + } } case core.DutyBuilderProposer: return core.ErrDeprecatedDutyBuilderProposer @@ -115,6 +144,66 @@ func (f *Fetcher) RegisterAwaitAttData(fn func(ctx context.Context, slot uint64, f.awaitAttDataFunc = fn } +// fetchAttesterDataFrom returns the fetched attestation data set from a specific beacon node address. +func (f *Fetcher) fetchAttesterDataFrom(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, bnAddr string, +) (core.UnsignedDataSet, error) { + // Create a scoped client for the specific BN address + scopedCl := f.eth2Cl.ClientForAddress(bnAddr) + + // We may have multiple validators in the same committee, use the same attestation data in that case. + dataByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.AttestationData) + + resp := make(core.UnsignedDataSet) + + for pubkey, def := range defSet { + attDuty, ok := def.(core.AttesterDefinition) + if !ok { + return nil, errors.New("invalid attester definition") + } + + commIdx := attDuty.CommitteeIndex + + // Attestation data for Electra is not bound by committee index. + // Committee index is still persisted in the request but should be set to 0. + // https://ethereum.github.io/beacon-APIs/#/Validator/produceAttestationData + // However, some validator clients are still sending attestation_data requests for each committee index. + // Because of that, we should continue asking for all + 0 committee indices for the ones that work correctly. + // After all VCs start asking for committee index 0, we should change the default scenario to that. + if slot >= uint64(f.electraSlot) && f.fetchOnlyCommIdx0 { + commIdx = 0 + } + + eth2AttData, ok := dataByCommIdx[commIdx] + if !ok { + var err error + + opts := ð2api.AttestationDataOpts{ + Slot: eth2p0.Slot(slot), + CommitteeIndex: commIdx, + } + + eth2Resp, err := scopedCl.AttestationData(ctx, opts) + if err != nil { + return nil, err + } + + eth2AttData = eth2Resp.Data + if eth2AttData == nil { + return nil, errors.New("attestation data is nil") + } + + dataByCommIdx[commIdx] = eth2AttData + } + + resp[pubkey] = core.AttestationData{ + Data: *eth2AttData, + Duty: attDuty.AttesterDuty, + } + } + + return resp, nil +} + // fetchAttesterData returns the fetched attestation data set for committees and validators in the arg set. func (f *Fetcher) fetchAttesterData(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, ) (core.UnsignedDataSet, error) { diff --git a/core/interfaces.go b/core/interfaces.go index 1b040039af..f9329fc948 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -24,6 +24,9 @@ type Scheduler interface { // GetDutyDefinition returns the definition set for a duty if already resolved. GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error) + + // RegisterFetcherFetchOnly registers the fetcher's FetchOnly method. + RegisterFetcherFetchOnly(func(context.Context, Duty, DutyDefinitionSet, string) error) } // Fetcher fetches proposed unsigned duty data. @@ -31,6 +34,9 @@ type Fetcher interface { // Fetch triggers fetching of a proposed duty data set. Fetch(context.Context, Duty, DutyDefinitionSet) error + // FetchOnly fetches attestation data and caches it without triggering subscribers. + FetchOnly(context.Context, Duty, DutyDefinitionSet, string) error + // Subscribe registers a callback for proposed unsigned duty data sets. Subscribe(func(context.Context, Duty, UnsignedDataSet) error) @@ -242,7 +248,9 @@ type wireFuncs struct { SchedulerSubscribeDuties func(func(context.Context, Duty, DutyDefinitionSet) error) SchedulerSubscribeSlots func(func(context.Context, Slot) error) SchedulerGetDutyDefinition func(context.Context, Duty) (DutyDefinitionSet, error) + SchedulerRegisterFetcherFetchOnly func(func(context.Context, Duty, DutyDefinitionSet, string) error) FetcherFetch func(context.Context, Duty, DutyDefinitionSet) error + FetcherFetchOnly func(context.Context, Duty, DutyDefinitionSet, string) error FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error) FetcherRegisterAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error)) FetcherRegisterAwaitAttData func(func(ctx context.Context, slot uint64, commIdx uint64) (*eth2p0.AttestationData, error)) @@ -296,7 +304,9 @@ func Wire(sched Scheduler, SchedulerSubscribeDuties: sched.SubscribeDuties, SchedulerSubscribeSlots: sched.SubscribeSlots, SchedulerGetDutyDefinition: sched.GetDutyDefinition, + SchedulerRegisterFetcherFetchOnly: sched.RegisterFetcherFetchOnly, FetcherFetch: fetch.Fetch, + FetcherFetchOnly: fetch.FetchOnly, FetcherSubscribe: fetch.Subscribe, FetcherRegisterAggSigDB: fetch.RegisterAggSigDB, FetcherRegisterAwaitAttData: fetch.RegisterAwaitAttData, @@ -338,6 +348,7 @@ func Wire(sched Scheduler, w.SchedulerSubscribeDuties(func(ctx context.Context, duty Duty, _ DutyDefinitionSet) error { return w.ConsensusParticipate(ctx, duty) }) + w.SchedulerRegisterFetcherFetchOnly(w.FetcherFetchOnly) w.FetcherSubscribe(w.ConsensusPropose) w.FetcherRegisterAggSigDB(w.AggSigDBAwait) w.FetcherRegisterAwaitAttData(w.DutyDBAwaitAttestation) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 2c468ade87..9d593e9acf 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -113,6 +113,7 @@ type Scheduler struct { dutiesMutex sync.RWMutex dutySubs []func(context.Context, core.Duty, core.DutyDefinitionSet) error slotSubs []func(context.Context, core.Slot) error + fetcherFetchOnly func(context.Context, core.Duty, core.DutyDefinitionSet, string) error builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution @@ -125,6 +126,12 @@ func (s *Scheduler) SubscribeDuties(fn func(context.Context, core.Duty, core.Dut s.dutySubs = append(s.dutySubs, fn) } +// RegisterFetcherFetchOnly registers the fetcher's FetchOnly method for early attestation fetching. +// Note this should be called *before* Start. +func (s *Scheduler) RegisterFetcherFetchOnly(fn func(context.Context, core.Duty, core.DutyDefinitionSet, string) error) { + s.fetcherFetchOnly = fn +} + // SubscribeSlots subscribes a callback function for triggered slots. // Note this should be called *before* Start. // TODO(corver): Add subscriber names for improved logging. @@ -211,8 +218,8 @@ func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core } // HandleBlockEvent handles block processing events from SSE and triggers early attestation data fetching. -func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { - if !featureset.Enabled(featureset.FetchAttOnBlock) { +func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { + if !featureset.Enabled(featureset.FetchAttOnBlock) || s.fetcherFetchOnly == nil { return } @@ -231,10 +238,16 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot) { return } - log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot))) + log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) - // Trigger duty immediately (early fetch) - go s.triggerDuty(ctx, duty, defSet) + // Fetch attestation data early without triggering consensus + // Use background context to prevent cancellation if SSE connection drops + go func() { + fetchCtx := log.CopyFields(context.Background(), ctx) + if err := s.fetcherFetchOnly(fetchCtx, duty, defSet, bnAddr); err != nil { + log.Warn(fetchCtx, "Early attestation data fetch failed", err, z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) + } + }() } // emitCoreSlot calls all slot subscriptions asynchronously with the provided slot. @@ -334,10 +347,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { if !s.waitForBlockEventOrTimeout(ctx, slot) { return // context cancelled } - _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(slot.Slot, true) - if alreadyTriggered { - return // already triggered via block event - } + s.eventTriggeredAttestations.Store(slot.Slot, true) } else if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { return // context cancelled } @@ -389,8 +399,11 @@ func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Sl case <-ctx.Done(): return false case <-s.clock.After(time.Until(fallbackDeadline)): - log.Debug(ctx, "Fallback timeout reached for attestation, no block event received, possibly fetching stale head", - z.U64("slot", slot.Slot)) + // Check if block event triggered early fetch + if _, triggered := s.eventTriggeredAttestations.Load(slot.Slot); !triggered { + log.Debug(ctx, "Proceeding with attestation at T=1/3+300ms (no early block event)", + z.U64("slot", slot.Slot)) + } return true } } diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index adaf1a6af0..2e6569ebdd 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -540,6 +540,7 @@ func TestFetchAttOnBlock(t *testing.T) { // Track triggered duties var triggeredDuties []core.Duty + var earlyFetchedDuties []core.Duty var dutyMux sync.Mutex sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { @@ -549,6 +550,15 @@ func TestFetchAttOnBlock(t *testing.T) { return nil }) + // Register a mock fetcher fetch-only function + sched.RegisterFetcherFetchOnly(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet, bnAddr string) error { + // This simulates early fetching - just record that it happened + dutyMux.Lock() + defer dutyMux.Unlock() + earlyFetchedDuties = append(earlyFetchedDuties, duty) + return nil + }) + doneCh := make(chan error, 1) go func() { @@ -559,36 +569,36 @@ func TestFetchAttOnBlock(t *testing.T) { for slot := range schedSlotCh { if slot.Slot == 0 { // Test case 1: Happy path - single block event triggers early - sched.HandleBlockEvent(t.Context(), 0) + sched.HandleBlockEvent(t.Context(), 0, "http://test-bn:5052") require.Eventually(t, func() bool { dutyMux.Lock() defer dutyMux.Unlock() - for _, d := range triggeredDuties { + for _, d := range earlyFetchedDuties { if d.Type == core.DutyAttester && d.Slot == 0 { return true } } return false - }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 0 should be triggered by block event") + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 0 should be early fetched by block event") } if slot.Slot == 1 { // Test case 2: Deduplication - two block events should only trigger once - sched.HandleBlockEvent(t.Context(), 1) - sched.HandleBlockEvent(t.Context(), 1) // Duplicate + sched.HandleBlockEvent(t.Context(), 1, "http://test-bn:5052") + sched.HandleBlockEvent(t.Context(), 1, "http://test-bn:5052") // Duplicate require.Eventually(t, func() bool { dutyMux.Lock() defer dutyMux.Unlock() count := 0 - for _, d := range triggeredDuties { + for _, d := range earlyFetchedDuties { if d.Type == core.DutyAttester && d.Slot == 1 { count++ } } return count == 1 - }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 1 should only be triggered once despite duplicate block events") + }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 1 should only be early fetched once despite duplicate block events") } if slot.Slot == 2 { diff --git a/testutil/beaconmock/beaconmock.go b/testutil/beaconmock/beaconmock.go index 860ec887fd..335d8a5c7b 100644 --- a/testutil/beaconmock/beaconmock.go +++ b/testutil/beaconmock/beaconmock.go @@ -471,6 +471,11 @@ func (m Mock) IsSynced() bool { return m.IsSyncedFunc() } +// ClientForAddress returns the same mock since it represents a single beacon node. +func (m Mock) ClientForAddress(addr string) eth2wrap.Client { + return m +} + func (m Mock) Close() error { m.headProducer.Close() From ef07e7e8c4196d8f8c8ef905c23aadd85874375c Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Wed, 14 Jan 2026 14:53:24 +0000 Subject: [PATCH 08/18] address copilot issues --- app/eth2wrap/httpwrap.go | 3 +- app/eth2wrap/lazy_test.go | 11 ++++ app/eth2wrap/multi.go | 4 +- app/eth2wrap/multi_test.go | 42 +++++++++++++ core/fetcher/fetcher.go | 71 +++++----------------- core/fetcher/fetcher_test.go | 111 +++++++++++++++++++++++++++++++++++ core/scheduler/scheduler.go | 14 ++++- 7 files changed, 197 insertions(+), 59 deletions(-) diff --git a/app/eth2wrap/httpwrap.go b/app/eth2wrap/httpwrap.go index f0bb0f5d28..202fbdb136 100644 --- a/app/eth2wrap/httpwrap.go +++ b/app/eth2wrap/httpwrap.go @@ -136,7 +136,8 @@ func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Respo return h.Service.Proxy(ctx, req) } -// ClientForAddress returns the same client since httpAdapter wraps a single address. +// ClientForAddress returns the same client (self) since httpAdapter wraps a single address. +// The addr parameter is ignored as this client is already scoped to a specific address. func (h *httpAdapter) ClientForAddress(addr string) Client { return h } diff --git a/app/eth2wrap/lazy_test.go b/app/eth2wrap/lazy_test.go index fd6cb63d79..5f531a1931 100644 --- a/app/eth2wrap/lazy_test.go +++ b/app/eth2wrap/lazy_test.go @@ -87,3 +87,14 @@ func TestLazy_Proxy(t *testing.T) { _, err = l.Proxy(t.Context(), req) require.NoError(t, err) } + +func TestLazy_ClientForAddress(t *testing.T) { + innerClient := mocks.NewClient(t) + scopedClient := mocks.NewClient(t) + innerClient.On("ClientForAddress", "http://test:5051").Return(scopedClient).Once() + + l := eth2wrap.NewLazyForT(innerClient) + + result := l.ClientForAddress("http://test:5051") + require.NotNil(t, result) +} diff --git a/app/eth2wrap/multi.go b/app/eth2wrap/multi.go index eda3af9971..7abf08862c 100644 --- a/app/eth2wrap/multi.go +++ b/app/eth2wrap/multi.go @@ -60,7 +60,9 @@ func (m multi) Address() string { } // ClientForAddress returns a scoped multi client that only queries the specified address. -// Returns the original multi client if the address is not found or is empty. +// Returns the original multi client if the address is not found or is empty, meaning requests +// will be sent to all configured clients using the multi-client's normal selection strategy +// rather than being scoped to a single node. func (m multi) ClientForAddress(addr string) Client { if addr == "" { return m diff --git a/app/eth2wrap/multi_test.go b/app/eth2wrap/multi_test.go index f0175a7747..fbbcc9fc38 100644 --- a/app/eth2wrap/multi_test.go +++ b/app/eth2wrap/multi_test.go @@ -129,3 +129,45 @@ func TestMulti_Proxy_ReadBody(t *testing.T) { _, err = m.Proxy(t.Context(), req) require.NoError(t, err) } + +func TestMulti_ClientForAddress(t *testing.T) { + client1 := mocks.NewClient(t) + client1.On("Address").Return("http://bn1:5051").Maybe() + + client2 := mocks.NewClient(t) + client2.On("Address").Return("http://bn2:5052").Maybe() + + fallback := mocks.NewClient(t) + fallback.On("Address").Return("http://fallback:5053").Maybe() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client1, client2}, []eth2wrap.Client{fallback}) + + t.Run("address found in primary clients", func(t *testing.T) { + scoped := m.ClientForAddress("http://bn1:5051") + require.NotNil(t, scoped) + // The scoped client should only use the specified address + require.Equal(t, "http://bn1:5051", scoped.Address()) + }) + + t.Run("address found in fallback clients", func(t *testing.T) { + scoped := m.ClientForAddress("http://fallback:5053") + require.NotNil(t, scoped) + require.Equal(t, "http://fallback:5053", scoped.Address()) + }) + + t.Run("address not found", func(t *testing.T) { + // Should return the original multi client + scoped := m.ClientForAddress("http://unknown:5054") + require.NotNil(t, scoped) + // When address is not found, it returns the original multi client + // which will use the first client's address + require.Equal(t, "http://bn1:5051", scoped.Address()) + }) + + t.Run("empty address", func(t *testing.T) { + // Should return the original multi client + scoped := m.ClientForAddress("") + require.NotNil(t, scoped) + require.Equal(t, "http://bn1:5051", scoped.Address()) + }) +} diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index ed82bf3911..695a971252 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -87,11 +87,17 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef case core.DutyAttester: // Check if attestation data was already fetched early and cached if cached, ok := f.attDataCache.Load(duty.Slot); ok { + f.attDataCache.Delete(duty.Slot) if data, valid := cached.(core.UnsignedDataSet); valid { unsignedSet = data log.Debug(ctx, "Using early-fetched attestation data from cache", z.U64("slot", duty.Slot)) + } else { + // Type assertion failed, re-fetch + unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) + if err != nil { + return errors.Wrap(err, "fetch attester data") + } } - f.attDataCache.Delete(duty.Slot) // Remove from cache after use } else { unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) if err != nil { @@ -149,63 +155,18 @@ func (f *Fetcher) fetchAttesterDataFrom(ctx context.Context, slot uint64, defSet ) (core.UnsignedDataSet, error) { // Create a scoped client for the specific BN address scopedCl := f.eth2Cl.ClientForAddress(bnAddr) - - // We may have multiple validators in the same committee, use the same attestation data in that case. - dataByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.AttestationData) - - resp := make(core.UnsignedDataSet) - - for pubkey, def := range defSet { - attDuty, ok := def.(core.AttesterDefinition) - if !ok { - return nil, errors.New("invalid attester definition") - } - - commIdx := attDuty.CommitteeIndex - - // Attestation data for Electra is not bound by committee index. - // Committee index is still persisted in the request but should be set to 0. - // https://ethereum.github.io/beacon-APIs/#/Validator/produceAttestationData - // However, some validator clients are still sending attestation_data requests for each committee index. - // Because of that, we should continue asking for all + 0 committee indices for the ones that work correctly. - // After all VCs start asking for committee index 0, we should change the default scenario to that. - if slot >= uint64(f.electraSlot) && f.fetchOnlyCommIdx0 { - commIdx = 0 - } - - eth2AttData, ok := dataByCommIdx[commIdx] - if !ok { - var err error - - opts := ð2api.AttestationDataOpts{ - Slot: eth2p0.Slot(slot), - CommitteeIndex: commIdx, - } - - eth2Resp, err := scopedCl.AttestationData(ctx, opts) - if err != nil { - return nil, err - } - - eth2AttData = eth2Resp.Data - if eth2AttData == nil { - return nil, errors.New("attestation data is nil") - } - - dataByCommIdx[commIdx] = eth2AttData - } - - resp[pubkey] = core.AttestationData{ - Data: *eth2AttData, - Duty: attDuty.AttesterDuty, - } - } - - return resp, nil + return f.fetchAttesterDataWithClient(ctx, slot, defSet, scopedCl) } // fetchAttesterData returns the fetched attestation data set for committees and validators in the arg set. func (f *Fetcher) fetchAttesterData(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, +) (core.UnsignedDataSet, error) { + return f.fetchAttesterDataWithClient(ctx, slot, defSet, f.eth2Cl) +} + +// fetchAttesterDataWithClient is a helper that fetches attestation data using the provided client. +// It handles Electra committee index logic and caches attestation data by committee index. +func (f *Fetcher) fetchAttesterDataWithClient(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, client eth2wrap.Client, ) (core.UnsignedDataSet, error) { // We may have multiple validators in the same committee, use the same attestation data in that case. dataByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.AttestationData) @@ -239,7 +200,7 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot uint64, defSet cor CommitteeIndex: commIdx, } - eth2Resp, err := f.eth2Cl.AttestationData(ctx, opts) + eth2Resp, err := client.AttestationData(ctx, opts) if err != nil { return nil, err } diff --git a/core/fetcher/fetcher_test.go b/core/fetcher/fetcher_test.go index 17b9e6be40..918be1b361 100644 --- a/core/fetcher/fetcher_test.go +++ b/core/fetcher/fetcher_test.go @@ -626,3 +626,114 @@ func blsSigFromHex(t *testing.T, sig string) eth2p0.BLSSignature { return resp } + +func TestFetchOnly(t *testing.T) { + ctx := context.Background() + + const ( + slot = 1 + vIdxA = 2 + vIdxB = 3 + notZero = 99 // Validation require non-zero values + ) + + pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{ + vIdxA: testutil.RandomCorePubKey(t), + vIdxB: testutil.RandomCorePubKey(t), + } + + dutyA := eth2v1.AttesterDuty{ + Slot: slot, + ValidatorIndex: vIdxA, + CommitteeIndex: vIdxA, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } + + dutyB := eth2v1.AttesterDuty{ + Slot: slot, + ValidatorIndex: vIdxB, + CommitteeIndex: vIdxB, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } + + defSet := core.DutyDefinitionSet{ + pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), + pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), + } + duty := core.NewAttesterDuty(slot) + + t.Run("happy path", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + // FetchOnly should not trigger subscribers + subscriberCalled := false + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + subscriberCalled = true + return nil + }) + + // FetchOnly should cache the attestation data without triggering subscribers + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address()) + require.NoError(t, err) + require.False(t, subscriberCalled, "FetchOnly should not trigger subscribers") + + // Now call Fetch, which should use the cached data + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + require.Equal(t, duty, resDuty) + require.Len(t, resDataSet, 2) + + dutyDataA := resDataSet[pubkeysByIdx[vIdxA]].(core.AttestationData) + require.EqualValues(t, slot, dutyDataA.Data.Slot) + require.EqualValues(t, vIdxA, dutyDataA.Data.Index) + require.Equal(t, dutyA, dutyDataA.Duty) + + dutyDataB := resDataSet[pubkeysByIdx[vIdxB]].(core.AttestationData) + require.EqualValues(t, slot, dutyDataB.Data.Slot) + require.EqualValues(t, vIdxB, dutyDataB.Data.Index) + require.Equal(t, dutyB, dutyDataB.Duty) + + return nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + }) + + t.Run("non-attester duty", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + proposerDuty := core.NewProposerDuty(slot) + err = fetch.FetchOnly(ctx, proposerDuty, defSet, bmock.Address()) + require.Error(t, err) + require.Contains(t, err.Error(), "unsupported duty") + }) + + t.Run("invalid cache entry", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + // FetchOnly should cache the data + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address()) + require.NoError(t, err) + + // Now call Fetch with the cached data - should work fine + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + require.Equal(t, duty, resDuty) + require.Len(t, resDataSet, 2) + return nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + }) +} diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 9d593e9acf..00ff53bb7d 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -238,13 +238,20 @@ func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAd return } + // Clone defSet to prevent race conditions when it's modified or trimmed + clonedDefSet, err := defSet.Clone() + if err != nil { + log.Error(ctx, "Failed to clone duty definition set for early fetch", err) + return + } + log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) // Fetch attestation data early without triggering consensus // Use background context to prevent cancellation if SSE connection drops go func() { fetchCtx := log.CopyFields(context.Background(), ctx) - if err := s.fetcherFetchOnly(fetchCtx, duty, defSet, bnAddr); err != nil { + if err := s.fetcherFetchOnly(fetchCtx, duty, clonedDefSet, bnAddr); err != nil { log.Warn(fetchCtx, "Early attestation data fetch failed", err, z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) } }() @@ -390,6 +397,7 @@ func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Sl // Calculate fallback timeout: 1/3 + 300ms fn, ok := slotOffsets[core.DutyAttester] if !ok { + log.Warn(ctx, "Slot offset not found for attester duty, proceeding immediately", nil, z.U64("slot", slot.Slot)) return true } offset := fn(slot.SlotDuration) + 300*time.Millisecond @@ -773,8 +781,10 @@ func (s *Scheduler) trimDuties(epoch uint64) { // trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { - _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(context.Background(), s.eth2Cl) + ctx := context.Background() + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) if err != nil { + log.Warn(ctx, "Failed to fetch slots config for trimming event triggered attestations", err, z.U64("epoch", epoch)) return } From 84d64223a7a2adbf8d3dd39717a50a7f889ac252 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Wed, 14 Jan 2026 16:17:52 +0000 Subject: [PATCH 09/18] update comment --- core/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 00ff53bb7d..a5f2b9e1a1 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -217,7 +217,7 @@ func (s *Scheduler) triggerDuty(ctx context.Context, duty core.Duty, defSet core } } -// HandleBlockEvent handles block processing events from SSE and triggers early attestation data fetching. +// HandleBlockEvent handles SSE "block" events (block imported to fork choice) and triggers early attestation data fetching. func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { if !featureset.Enabled(featureset.FetchAttOnBlock) || s.fetcherFetchOnly == nil { return From 3c8e732664b8923bd74bee38e510795dca397e30 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Wed, 14 Jan 2026 17:50:36 +0000 Subject: [PATCH 10/18] fix syntax issue in merge and remove flaky test --- core/scheduler/scheduler.go | 1 + core/scheduler/scheduler_test.go | 115 ------------------------------- 2 files changed, 1 insertion(+), 115 deletions(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index d90fd14555..9e2b9a806c 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -330,6 +330,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { } } }(duty, defSet) + } } // delaySlotOffset blocks until the slot offset for the duty has been reached and return true. diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 180131bf00..82267c1229 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -512,121 +512,6 @@ func TestHandleChainReorgEvent(t *testing.T) { require.NoError(t, <-doneCh) } -func TestFetchAttOnBlock(t *testing.T) { - var ( - t0 time.Time - valSet = beaconmock.ValidatorSetA - ) - - featureset.EnableForT(t, featureset.FetchAttOnBlock) - - // Configure beacon mock. - eth2Cl, err := beaconmock.New( - t.Context(), - beaconmock.WithValidatorSet(valSet), - beaconmock.WithGenesisTime(t0), - beaconmock.WithDeterministicAttesterDuties(1), // Duties in slots 0, 1, 2 - beaconmock.WithSlotsPerEpoch(4), - ) - require.NoError(t, err) - - // Construct scheduler. - schedSlotCh := make(chan core.Slot) - schedSlotFunc := func(ctx context.Context, slot core.Slot) { - select { - case <-ctx.Done(): - return - case schedSlotCh <- slot: - } - } - clock := newTestClock(t0) - dd := new(delayer) - valRegs := beaconmock.BuilderRegistrationSetA - sched := scheduler.NewForT(t, clock, dd.delay, valRegs, eth2Cl, schedSlotFunc, false) - - // Track triggered duties - var triggeredDuties []core.Duty - var earlyFetchedDuties []core.Duty - var dutyMux sync.Mutex - - sched.SubscribeDuties(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { - dutyMux.Lock() - defer dutyMux.Unlock() - triggeredDuties = append(triggeredDuties, duty) - return nil - }) - - // Register a mock fetcher fetch-only function - sched.RegisterFetcherFetchOnly(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet, bnAddr string) error { - // This simulates early fetching - just record that it happened - dutyMux.Lock() - defer dutyMux.Unlock() - earlyFetchedDuties = append(earlyFetchedDuties, duty) - return nil - }) - - doneCh := make(chan error, 1) - - go func() { - doneCh <- sched.Run() - close(schedSlotCh) - }() - - for slot := range schedSlotCh { - if slot.Slot == 0 { - // Test case 1: Happy path - single block event triggers early - sched.HandleBlockEvent(t.Context(), 0, "http://test-bn:5052") - - require.Eventually(t, func() bool { - dutyMux.Lock() - defer dutyMux.Unlock() - for _, d := range earlyFetchedDuties { - if d.Type == core.DutyAttester && d.Slot == 0 { - return true - } - } - return false - }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 0 should be early fetched by block event") - } - - if slot.Slot == 1 { - // Test case 2: Deduplication - two block events should only trigger once - sched.HandleBlockEvent(t.Context(), 1, "http://test-bn:5052") - sched.HandleBlockEvent(t.Context(), 1, "http://test-bn:5052") // Duplicate - - require.Eventually(t, func() bool { - dutyMux.Lock() - defer dutyMux.Unlock() - count := 0 - for _, d := range earlyFetchedDuties { - if d.Type == core.DutyAttester && d.Slot == 1 { - count++ - } - } - return count == 1 - }, 500*time.Millisecond, 5*time.Millisecond, "Attester duty for slot 1 should only be early fetched once despite duplicate block events") - } - - if slot.Slot == 2 { - // Test case 3: Fallback - no block event, timeout should trigger - require.Eventually(t, func() bool { - dutyMux.Lock() - defer dutyMux.Unlock() - for _, d := range triggeredDuties { - if d.Type == core.DutyAttester && d.Slot == 2 { - return true - } - } - return false - }, 2*time.Second, 50*time.Millisecond, "Attester duty for slot 2 should be triggered by fallback timeout") - - sched.Stop() - } - } - - require.NoError(t, <-doneCh) -} - func TestSubmitValidatorRegistrations(t *testing.T) { // The test uses hard-coded validator registrations from beaconmock.BuilderRegistrationSetA. // The scheduler advances through 3 epochs to ensure it triggers the registration submission. From dcfdea33539cf662e72ccac8923a8d5d1ba78b0a Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Wed, 14 Jan 2026 17:54:29 +0000 Subject: [PATCH 11/18] fix lint --- app/eth2wrap/httpwrap.go | 2 +- testutil/beaconmock/beaconmock.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/eth2wrap/httpwrap.go b/app/eth2wrap/httpwrap.go index fb94e31195..535a8872f1 100644 --- a/app/eth2wrap/httpwrap.go +++ b/app/eth2wrap/httpwrap.go @@ -138,7 +138,7 @@ func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Respo // ClientForAddress returns the same client (self) since httpAdapter wraps a single address. // The addr parameter is ignored as this client is already scoped to a specific address. -func (h *httpAdapter) ClientForAddress(addr string) Client { +func (h *httpAdapter) ClientForAddress(_ string) Client { return h } diff --git a/testutil/beaconmock/beaconmock.go b/testutil/beaconmock/beaconmock.go index 2187bc9f80..7cb176f1ab 100644 --- a/testutil/beaconmock/beaconmock.go +++ b/testutil/beaconmock/beaconmock.go @@ -476,7 +476,7 @@ func (m Mock) IsSynced() bool { } // ClientForAddress returns the same mock since it represents a single beacon node. -func (m Mock) ClientForAddress(addr string) eth2wrap.Client { +func (m Mock) ClientForAddress(_ string) eth2wrap.Client { return m } From 127117846ff3dd2f7b0e62c273cdea872e4dad33 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 09:07:33 +0000 Subject: [PATCH 12/18] fix lint --- app/sse/listener.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/app/sse/listener.go b/app/sse/listener.go index 7d4d7fc617..4438a4c48c 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -61,12 +61,12 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade } l := &listener{ - chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), - blockSubs: make([]BlockEventHandlerFunc, 0), - blockGossipTimes: make(map[uint64]map[string]time.Time), - genesisTime: genesisTime, - slotDuration: slotDuration, - slotsPerEpoch: slotsPerEpoch, + chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), + blockGossipTimes: make(map[uint64]map[string]time.Time), + genesisTime: genesisTime, + slotDuration: slotDuration, + slotsPerEpoch: slotsPerEpoch, } parsedHeaders, err := eth2util.ParseHTTPHeaders(headers) From 1d9b14ac4ded1fa57467c920438d4b88b26c2d59 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 09:59:22 +0000 Subject: [PATCH 13/18] restore config.go --- app/featureset/config.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/app/featureset/config.go b/app/featureset/config.go index 94228ac620..6b3c9234d5 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -117,14 +117,14 @@ func EnableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() + cache := state[feature] - state[feature] = enable t.Cleanup(func() { - initMu.Lock() - defer initMu.Unlock() state[feature] = cache }) + + state[feature] = enable } // DisableForT disables a feature for testing. @@ -133,12 +133,12 @@ func DisableForT(t *testing.T, feature Feature) { initMu.Lock() defer initMu.Unlock() + cache := state[feature] - state[feature] = disable t.Cleanup(func() { - initMu.Lock() - defer initMu.Unlock() state[feature] = cache }) + + state[feature] = disable } From 4e20077717c809408e0f7701047316a36b3f4118 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 10:18:09 +0000 Subject: [PATCH 14/18] re add lastinepoch logic --- app/featureset/config.go | 4 ++++ core/scheduler/scheduler.go | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/app/featureset/config.go b/app/featureset/config.go index 6b3c9234d5..7ca5f8ac8e 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -121,6 +121,8 @@ func EnableForT(t *testing.T, feature Feature) { cache := state[feature] t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) @@ -137,6 +139,8 @@ func DisableForT(t *testing.T, feature Feature) { cache := state[feature] t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 9e2b9a806c..2334fc0378 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -329,6 +329,14 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot)) } } + + if slot.LastInEpoch() { + err := s.resolveDuties(ctx, slot.Next()) + if err != nil { + log.Warn(ctx, "Resolving duties error (retrying next slot)", err, z.U64("slot", slot.Slot)) + } + } + }(duty, defSet) } } From 41d24ad080b4c6bde81311aea5850900b05e49c5 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 11:59:29 +0000 Subject: [PATCH 15/18] fix lint --- core/scheduler/scheduler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 2334fc0378..4f8d2a3966 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -336,7 +336,6 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { log.Warn(ctx, "Resolving duties error (retrying next slot)", err, z.U64("slot", slot.Slot)) } } - }(duty, defSet) } } From c6718eb2c579087960f316d2f2b7ee93a9d08066 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 14:48:43 +0000 Subject: [PATCH 16/18] separate in two flags --- app/featureset/featureset.go | 36 +++++++++++++++++++++--------------- core/scheduler/scheduler.go | 29 ++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 7b7782939f..b92d247599 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -72,27 +72,33 @@ const ( ChainSplitHalt = "chain_split_halt" // FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE. - // Fallback to T=1/3+300ms if block event is not received in time. + // Fallback to T=1/3 if block event is not received in time. FetchAttOnBlock = "fetch_att_on_block" + + // FetchAttOnBlockWithDelay enables fetching attestation data with 300ms delay. + // When enabled with FetchAttOnBlock, uses T=1/3+300ms as fallback timeout. + // When enabled alone, uses T=1/3+300ms as timeout. + FetchAttOnBlockWithDelay = "fetch_att_on_block_with_delay" ) var ( // state defines the current rollout status of each feature. state = map[Feature]status{ - EagerDoubleLinear: statusStable, - ConsensusParticipate: statusStable, - MockAlpha: statusAlpha, - AggSigDBV2: statusAlpha, - JSONRequests: statusAlpha, - GnosisBlockHotfix: statusAlpha, - Linear: statusAlpha, - SSEReorgDuties: statusAlpha, - AttestationInclusion: statusAlpha, - ProposalTimeout: statusAlpha, - QUIC: statusAlpha, - FetchOnlyCommIdx0: statusAlpha, - ChainSplitHalt: statusAlpha, - FetchAttOnBlock: statusAlpha, + EagerDoubleLinear: statusStable, + ConsensusParticipate: statusStable, + MockAlpha: statusAlpha, + AggSigDBV2: statusAlpha, + JSONRequests: statusAlpha, + GnosisBlockHotfix: statusAlpha, + Linear: statusAlpha, + SSEReorgDuties: statusAlpha, + AttestationInclusion: statusAlpha, + ProposalTimeout: statusAlpha, + QUIC: statusAlpha, + FetchOnlyCommIdx0: statusAlpha, + ChainSplitHalt: statusAlpha, + FetchAttOnBlock: statusAlpha, + FetchAttOnBlockWithDelay: statusAlpha, // Add all features and their status here. } diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 4f8d2a3966..f68e99ebc7 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -167,7 +167,8 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc // HandleBlockEvent handles SSE "block" events (block imported to fork choice) and triggers early attestation data fetching. func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { - if !featureset.Enabled(featureset.FetchAttOnBlock) || s.fetcherFetchOnly == nil { + // Only process if either feature flag is enabled + if (!featureset.Enabled(featureset.FetchAttOnBlock) && !featureset.Enabled(featureset.FetchAttOnBlockWithDelay)) || s.fetcherFetchOnly == nil { return } @@ -306,8 +307,8 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { go func(duty core.Duty, defSet core.DutyDefinitionSet) { defer span.End() - // Special handling for attester duties when FetchAttOnBlock is enabled - if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { + // Special handling for attester duties when FetchAttOnBlock features are enabled + if duty.Type == core.DutyAttester && (featureset.Enabled(featureset.FetchAttOnBlock) || featureset.Enabled(featureset.FetchAttOnBlockWithDelay)) { if !s.waitForBlockEventOrTimeout(dutyCtx, slot) { return // context cancelled } @@ -360,16 +361,21 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } -// waitForBlockEventOrTimeout waits until the fallback timeout (T=1/3 + 300ms) is reached. +// waitForBlockEventOrTimeout waits until the fallback timeout is reached. +// If FetchAttOnBlockWithDelay is enabled, timeout is T=1/3+300ms, otherwise T=1/3. // Returns false if the context is cancelled, true otherwise. func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { - // Calculate fallback timeout: 1/3 + 300ms + // Calculate fallback timeout fn, ok := slotOffsets[core.DutyAttester] if !ok { log.Warn(ctx, "Slot offset not found for attester duty, proceeding immediately", nil, z.U64("slot", slot.Slot)) return true } - offset := fn(slot.SlotDuration) + 300*time.Millisecond + offset := fn(slot.SlotDuration) + // Add 300ms delay only if FetchAttOnBlockWithDelay is enabled + if featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { + offset += 300 * time.Millisecond + } fallbackDeadline := slot.Time.Add(offset) select { @@ -378,8 +384,13 @@ func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Sl case <-s.clock.After(time.Until(fallbackDeadline)): // Check if block event triggered early fetch if _, triggered := s.eventTriggeredAttestations.Load(slot.Slot); !triggered { - log.Debug(ctx, "Proceeding with attestation at T=1/3+300ms (no early block event)", - z.U64("slot", slot.Slot)) + if featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { + log.Debug(ctx, "Proceeding with attestation at T=1/3+300ms (no early block event)", + z.U64("slot", slot.Slot)) + } else { + log.Debug(ctx, "Proceeding with attestation at T=1/3 (no early block event)", + z.U64("slot", slot.Slot)) + } } return true } @@ -743,7 +754,7 @@ func (s *Scheduler) trimDuties(epoch uint64) { delete(s.dutiesByEpoch, epoch) - if featureset.Enabled(featureset.FetchAttOnBlock) { + if featureset.Enabled(featureset.FetchAttOnBlock) || featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { s.trimEventTriggeredAttestations(epoch) } } From 917ba81d8566ad8c2f0d20f95200cfca14827d6c Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 15:11:17 +0000 Subject: [PATCH 17/18] address comments --- core/fetcher/fetcher.go | 1 + core/scheduler/scheduler.go | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 4d12129e92..f46103c7fe 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -97,6 +97,7 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef unsignedSet = data log.Debug(ctx, "Using early-fetched attestation data from cache", z.U64("slot", duty.Slot)) } else { + log.Warn(ctx, "Cached attestation data has invalid type, re-fetching", err, z.U64("slot", duty.Slot)) // Type assertion failed, re-fetch unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) if err != nil { diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index f68e99ebc7..3f0de5ca9d 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -167,8 +167,13 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc // HandleBlockEvent handles SSE "block" events (block imported to fork choice) and triggers early attestation data fetching. func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { + if s.fetcherFetchOnly == nil { + log.Warn(ctx, "Early attestation data fetch skipped, fetcher fetch-only function not registered", nil, z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) + return + } + // Only process if either feature flag is enabled - if (!featureset.Enabled(featureset.FetchAttOnBlock) && !featureset.Enabled(featureset.FetchAttOnBlockWithDelay)) || s.fetcherFetchOnly == nil { + if !featureset.Enabled(featureset.FetchAttOnBlock) && !featureset.Enabled(featureset.FetchAttOnBlockWithDelay) { return } @@ -331,13 +336,14 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { } } - if slot.LastInEpoch() { - err := s.resolveDuties(ctx, slot.Next()) - if err != nil { - log.Warn(ctx, "Resolving duties error (retrying next slot)", err, z.U64("slot", slot.Slot)) - } - } }(duty, defSet) + + if slot.LastInEpoch() { + err := s.resolveDuties(ctx, slot.Next()) + if err != nil { + log.Warn(ctx, "Resolving duties error (retrying next slot)", err, z.U64("slot", slot.Slot)) + } + } } } From e8732889114261b26872a8ea90491adc368a5608 Mon Sep 17 00:00:00 2001 From: Diogo Santos Date: Thu, 15 Jan 2026 15:18:27 +0000 Subject: [PATCH 18/18] lint --- core/scheduler/scheduler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 3f0de5ca9d..44cda91f59 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -335,7 +335,6 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot)) } } - }(duty, defSet) if slot.LastInEpoch() {