diff --git a/app/app.go b/app/app.go index 922ae3179..7e3dee8ee 100644 --- a/app/app.go +++ b/app/app.go @@ -469,6 +469,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } sseListener.SubscribeChainReorgEvent(sched.HandleChainReorgEvent) + sseListener.SubscribeBlockEvent(sched.HandleBlockEvent) feeRecipientFunc := func(pubkey core.PubKey) string { return feeRecipientAddrByCorePubkey[pubkey] diff --git a/app/eth2wrap/eth2wrap_gen.go b/app/eth2wrap/eth2wrap_gen.go index 4de49a6fe..a72176f9c 100644 --- a/app/eth2wrap/eth2wrap_gen.go +++ b/app/eth2wrap/eth2wrap_gen.go @@ -26,6 +26,7 @@ type Client interface { SetForkVersion(forkVersion [4]byte) + ClientForAddress(addr string) Client // Address returns the address of the beacon node. Address() string // Headers returns custom headers to include in requests to the beacon node. diff --git a/app/eth2wrap/httpwrap.go b/app/eth2wrap/httpwrap.go index 8ff04af28..535a8872f 100644 --- a/app/eth2wrap/httpwrap.go +++ b/app/eth2wrap/httpwrap.go @@ -136,6 +136,12 @@ func (h *httpAdapter) Proxy(ctx context.Context, req *http.Request) (*http.Respo return h.Service.Proxy(ctx, req) } +// ClientForAddress returns the same client (self) since httpAdapter wraps a single address. +// The addr parameter is ignored as this client is already scoped to a specific address. +func (h *httpAdapter) ClientForAddress(_ string) Client { + return h +} + func (h *httpAdapter) Headers() map[string]string { return h.headers } diff --git a/app/eth2wrap/lazy.go b/app/eth2wrap/lazy.go index 18a91ccd3..65c82a3f9 100644 --- a/app/eth2wrap/lazy.go +++ b/app/eth2wrap/lazy.go @@ -120,6 +120,16 @@ func (l *lazy) Address() string { return cl.Address() } +// ClientForAddress returns a scoped client that queries only the specified address. +func (l *lazy) ClientForAddress(addr string) Client { + cl, ok := l.getClient() + if !ok { + return l + } + + return cl.ClientForAddress(addr) +} + func (l *lazy) Headers() map[string]string { cl, ok := l.getClient() if !ok { diff --git a/app/eth2wrap/lazy_test.go b/app/eth2wrap/lazy_test.go index 0687a3181..536ca8c98 100644 --- a/app/eth2wrap/lazy_test.go +++ b/app/eth2wrap/lazy_test.go @@ -87,3 +87,14 @@ func TestLazy_Proxy(t *testing.T) { _, err = l.Proxy(t.Context(), req) require.NoError(t, err) } + +func TestLazy_ClientForAddress(t *testing.T) { + innerClient := mocks.NewClient(t) + scopedClient := mocks.NewClient(t) + innerClient.On("ClientForAddress", "http://test:5051").Return(scopedClient).Once() + + l := eth2wrap.NewLazyForT(innerClient) + + result := l.ClientForAddress("http://test:5051") + require.NotNil(t, result) +} diff --git a/app/eth2wrap/mocks/client.go b/app/eth2wrap/mocks/client.go index f2e1a54ae..a4d218724 100644 --- a/app/eth2wrap/mocks/client.go +++ b/app/eth2wrap/mocks/client.go @@ -59,6 +59,26 @@ func (_m *Client) ActiveValidators(_a0 context.Context) (eth2wrap.ActiveValidato return r0, r1 } +// ClientForAddress provides a mock function with given fields: addr +func (_m *Client) ClientForAddress(addr string) eth2wrap.Client { + ret := _m.Called(addr) + + if len(ret) == 0 { + panic("no return value specified for ClientForAddress") + } + + var r0 eth2wrap.Client + if rf, ok := ret.Get(0).(func(string) eth2wrap.Client); ok { + r0 = rf(addr) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(eth2wrap.Client) + } + } + + return r0 +} + // Address provides a mock function with no fields func (_m *Client) Address() string { ret := _m.Called() diff --git a/app/eth2wrap/multi.go b/app/eth2wrap/multi.go index 219c38102..08bd13b21 100644 --- a/app/eth2wrap/multi.go +++ b/app/eth2wrap/multi.go @@ -59,6 +59,41 @@ func (m multi) Address() string { return address } +// ClientForAddress returns a scoped multi client that only queries the specified address. +// Returns the original multi client if the address is not found or is empty, meaning requests +// will be sent to all configured clients using the multi-client's normal selection strategy +// rather than being scoped to a single node. +func (m multi) ClientForAddress(addr string) Client { + if addr == "" { + return m + } + + // Find client matching the address + for _, cl := range m.clients { + if cl.Address() == addr { + return multi{ + clients: []Client{cl}, + fallbacks: m.fallbacks, + selector: m.selector, + } + } + } + + // Address not found in clients, check fallbacks + for _, cl := range m.fallbacks { + if cl.Address() == addr { + return multi{ + clients: []Client{cl}, + fallbacks: nil, + selector: m.selector, + } + } + } + + // Address not found, return original multi client + return m +} + func (m multi) Headers() map[string]string { if len(m.clients) == 0 { return nil diff --git a/app/eth2wrap/multi_test.go b/app/eth2wrap/multi_test.go index bfdd3aae4..4a68a20af 100644 --- a/app/eth2wrap/multi_test.go +++ b/app/eth2wrap/multi_test.go @@ -129,3 +129,45 @@ func TestMulti_Proxy_ReadBody(t *testing.T) { _, err = m.Proxy(t.Context(), req) require.NoError(t, err) } + +func TestMulti_ClientForAddress(t *testing.T) { + client1 := mocks.NewClient(t) + client1.On("Address").Return("http://bn1:5051").Maybe() + + client2 := mocks.NewClient(t) + client2.On("Address").Return("http://bn2:5052").Maybe() + + fallback := mocks.NewClient(t) + fallback.On("Address").Return("http://fallback:5053").Maybe() + + m := eth2wrap.NewMultiForT([]eth2wrap.Client{client1, client2}, []eth2wrap.Client{fallback}) + + t.Run("address found in primary clients", func(t *testing.T) { + scoped := m.ClientForAddress("http://bn1:5051") + require.NotNil(t, scoped) + // The scoped client should only use the specified address + require.Equal(t, "http://bn1:5051", scoped.Address()) + }) + + t.Run("address found in fallback clients", func(t *testing.T) { + scoped := m.ClientForAddress("http://fallback:5053") + require.NotNil(t, scoped) + require.Equal(t, "http://fallback:5053", scoped.Address()) + }) + + t.Run("address not found", func(t *testing.T) { + // Should return the original multi client + scoped := m.ClientForAddress("http://unknown:5054") + require.NotNil(t, scoped) + // When address is not found, it returns the original multi client + // which will use the first client's address + require.Equal(t, "http://bn1:5051", scoped.Address()) + }) + + t.Run("empty address", func(t *testing.T) { + // Should return the original multi client + scoped := m.ClientForAddress("") + require.NotNil(t, scoped) + require.Equal(t, "http://bn1:5051", scoped.Address()) + }) +} diff --git a/app/featureset/config.go b/app/featureset/config.go index 6b3c9234d..7ca5f8ac8 100644 --- a/app/featureset/config.go +++ b/app/featureset/config.go @@ -121,6 +121,8 @@ func EnableForT(t *testing.T, feature Feature) { cache := state[feature] t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) @@ -137,6 +139,8 @@ func DisableForT(t *testing.T, feature Feature) { cache := state[feature] t.Cleanup(func() { + initMu.Lock() + defer initMu.Unlock() state[feature] = cache }) diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index 3cbe35357..7b7782939 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -70,6 +70,10 @@ const ( // ChainSplitHalt compares locally fetched attestation's target and source to leader's proposed target and source attestation. // In case they differ, Charon does not sign the attestation. ChainSplitHalt = "chain_split_halt" + + // FetchAttOnBlock enables fetching attestation data upon block processing event from beacon node via SSE. + // Fallback to T=1/3+300ms if block event is not received in time. + FetchAttOnBlock = "fetch_att_on_block" ) var ( @@ -88,6 +92,7 @@ var ( QUIC: statusAlpha, FetchOnlyCommIdx0: statusAlpha, ChainSplitHalt: statusAlpha, + FetchAttOnBlock: statusAlpha, // Add all features and their status here. } diff --git a/app/sse/listener.go b/app/sse/listener.go index 89f7e65b8..4438a4c48 100644 --- a/app/sse/listener.go +++ b/app/sse/listener.go @@ -21,15 +21,18 @@ import ( ) type ChainReorgEventHandlerFunc func(ctx context.Context, epoch eth2p0.Epoch) +type BlockEventHandlerFunc func(ctx context.Context, slot eth2p0.Slot, bnAddr string) type Listener interface { SubscribeChainReorgEvent(ChainReorgEventHandlerFunc) + SubscribeBlockEvent(BlockEventHandlerFunc) } type listener struct { sync.Mutex chainReorgSubs []ChainReorgEventHandlerFunc + blockSubs []BlockEventHandlerFunc lastReorgEpoch eth2p0.Epoch // blockGossipTimes stores timestamps of block gossip events per slot and beacon node address @@ -59,6 +62,7 @@ func StartListener(ctx context.Context, eth2Cl eth2wrap.Client, addresses, heade l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), blockGossipTimes: make(map[uint64]map[string]time.Time), genesisTime: genesisTime, slotDuration: slotDuration, @@ -99,6 +103,13 @@ func (p *listener) SubscribeChainReorgEvent(handler ChainReorgEventHandlerFunc) p.chainReorgSubs = append(p.chainReorgSubs, handler) } +func (p *listener) SubscribeBlockEvent(handler BlockEventHandlerFunc) { + p.Lock() + defer p.Unlock() + + p.blockSubs = append(p.blockSubs, handler) +} + func (p *listener) eventHandler(ctx context.Context, event *event, addr string) error { switch event.Event { case sseHeadEvent: @@ -257,6 +268,8 @@ func (p *listener) handleBlockEvent(ctx context.Context, event *event, addr stri sseBlockHistogram.WithLabelValues(addr).Observe(delay.Seconds()) + p.notifyBlockEvent(ctx, eth2p0.Slot(slot), addr) + return nil } @@ -274,6 +287,15 @@ func (p *listener) notifyChainReorg(ctx context.Context, epoch eth2p0.Epoch) { } } +func (p *listener) notifyBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { + p.Lock() + defer p.Unlock() + + for _, sub := range p.blockSubs { + sub(ctx, slot, bnAddr) + } +} + // computeDelay computes the delay between start of the slot and receiving the event. func (p *listener) computeDelay(slot uint64, eventTS time.Time, delayOKFunc func(delay time.Duration) bool) (time.Duration, bool) { slotStartTime := p.genesisTime.Add(time.Duration(slot) * p.slotDuration) diff --git a/app/sse/listener_internal_test.go b/app/sse/listener_internal_test.go index 8ca0bd590..05f2dc0e3 100644 --- a/app/sse/listener_internal_test.go +++ b/app/sse/listener_internal_test.go @@ -83,12 +83,40 @@ func TestHandleEvents(t *testing.T) { }, err: errors.New("parse depth to uint64"), }, + { + name: "block event happy path", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"42", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: nil, + }, + { + name: "block event incompatible data payload", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`"error"`), + Timestamp: time.Now(), + }, + err: errors.New("unmarshal SSE block event"), + }, + { + name: "block event parse slot", + event: &event{ + Event: sseBlockEvent, + Data: []byte(`{"slot":"invalid", "block":"0x9a2fefd2fdb57f74993c7780ea5b9030d2897b615b89f808011ca5aebed54eaf", "execution_optimistic": false}`), + Timestamp: time.Now(), + }, + err: errors.New("parse slot to uint64"), + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { l := &listener{ chainReorgSubs: make([]ChainReorgEventHandlerFunc, 0), + blockSubs: make([]BlockEventHandlerFunc, 0), slotDuration: 12 * time.Second, slotsPerEpoch: 32, genesisTime: time.Date(2020, 12, 1, 12, 0, 23, 0, time.UTC), @@ -133,6 +161,28 @@ func TestSubscribeNotifyChainReorg(t *testing.T) { require.Equal(t, eth2p0.Epoch(10), reportedEpochs[1]) } +func TestSubscribeNotifyBlockEvent(t *testing.T) { + ctx := t.Context() + l := &listener{ + blockSubs: make([]BlockEventHandlerFunc, 0), + } + + reportedSlots := make([]eth2p0.Slot, 0) + + l.SubscribeBlockEvent(func(_ context.Context, slot eth2p0.Slot, bnAddr string) { + reportedSlots = append(reportedSlots, slot) + }) + + l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") + l.notifyBlockEvent(ctx, eth2p0.Slot(100), "http://test-bn:5052") // Duplicate should be reported (no dedup for block events) + l.notifyBlockEvent(ctx, eth2p0.Slot(101), "http://test-bn:5052") + + require.Len(t, reportedSlots, 3) + require.Equal(t, eth2p0.Slot(100), reportedSlots[0]) + require.Equal(t, eth2p0.Slot(100), reportedSlots[1]) + require.Equal(t, eth2p0.Slot(101), reportedSlots[2]) +} + func TestComputeDelay(t *testing.T) { genesisTimeString := "2020-12-01T12:00:23+00:00" genesisTime, err := time.Parse(time.RFC3339, genesisTimeString) diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index c0d691d74..4d12129e9 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "strings" + "sync" eth2api "github.com/attestantio/go-eth2-client/api" eth2spec "github.com/attestantio/go-eth2-client/spec" @@ -44,6 +45,7 @@ type Fetcher struct { graffitiBuilder *GraffitiBuilder electraSlot eth2p0.Slot fetchOnlyCommIdx0 bool + attDataCache sync.Map // Cache for early-fetched attestation data (map[uint64]core.UnsignedDataSet) } // Subscribe registers a callback for fetched duties. @@ -52,6 +54,24 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat f.subs = append(f.subs, fn) } +// FetchOnly fetches attestation data and caches it without triggering subscribers. +// This allows early fetching on block events while deferring consensus to the scheduled time. +func (f *Fetcher) FetchOnly(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet, bnAddr string) error { + if duty.Type != core.DutyAttester { + return errors.New("unsupported duty", z.Str("type", duty.Type.String())) + } + + unsignedSet, err := f.fetchAttesterDataFrom(ctx, duty.Slot, defSet, bnAddr) + if err != nil { + return errors.Wrap(err, "fetch attester data for early cache") + } + + f.attDataCache.Store(duty.Slot, unsignedSet) + log.Debug(ctx, "Early attestation data fetched and cached", z.U64("slot", duty.Slot), z.Str("bn_addr", bnAddr)) + + return nil +} + // Fetch triggers fetching of a proposed duty data set. func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error { var ( @@ -70,9 +90,24 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef return errors.Wrap(err, "fetch proposer data") } case core.DutyAttester: - unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) - if err != nil { - return errors.Wrap(err, "fetch attester data") + // Check if attestation data was already fetched early and cached + if cached, ok := f.attDataCache.Load(duty.Slot); ok { + f.attDataCache.Delete(duty.Slot) + if data, valid := cached.(core.UnsignedDataSet); valid { + unsignedSet = data + log.Debug(ctx, "Using early-fetched attestation data from cache", z.U64("slot", duty.Slot)) + } else { + // Type assertion failed, re-fetch + unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) + if err != nil { + return errors.Wrap(err, "fetch attester data") + } + } + } else { + unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) + if err != nil { + return errors.Wrap(err, "fetch attester data") + } } case core.DutyBuilderProposer: return core.ErrDeprecatedDutyBuilderProposer @@ -120,8 +155,23 @@ func (f *Fetcher) RegisterAwaitAttData(fn func(ctx context.Context, slot uint64, f.awaitAttDataFunc = fn } +// fetchAttesterDataFrom returns the fetched attestation data set from a specific beacon node address. +func (f *Fetcher) fetchAttesterDataFrom(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, bnAddr string, +) (core.UnsignedDataSet, error) { + // Create a scoped client for the specific BN address + scopedCl := f.eth2Cl.ClientForAddress(bnAddr) + return f.fetchAttesterDataWithClient(ctx, slot, defSet, scopedCl) +} + // fetchAttesterData returns the fetched attestation data set for committees and validators in the arg set. func (f *Fetcher) fetchAttesterData(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, +) (core.UnsignedDataSet, error) { + return f.fetchAttesterDataWithClient(ctx, slot, defSet, f.eth2Cl) +} + +// fetchAttesterDataWithClient is a helper that fetches attestation data using the provided client. +// It handles Electra committee index logic and caches attestation data by committee index. +func (f *Fetcher) fetchAttesterDataWithClient(ctx context.Context, slot uint64, defSet core.DutyDefinitionSet, client eth2wrap.Client, ) (core.UnsignedDataSet, error) { // We may have multiple validators in the same committee, use the same attestation data in that case. dataByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.AttestationData) @@ -155,7 +205,7 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot uint64, defSet cor CommitteeIndex: commIdx, } - eth2Resp, err := f.eth2Cl.AttestationData(ctx, opts) + eth2Resp, err := client.AttestationData(ctx, opts) if err != nil { return nil, err } diff --git a/core/fetcher/fetcher_test.go b/core/fetcher/fetcher_test.go index f67fd2e61..437fa0bf5 100644 --- a/core/fetcher/fetcher_test.go +++ b/core/fetcher/fetcher_test.go @@ -626,3 +626,114 @@ func blsSigFromHex(t *testing.T, sig string) eth2p0.BLSSignature { return resp } + +func TestFetchOnly(t *testing.T) { + ctx := context.Background() + + const ( + slot = 1 + vIdxA = 2 + vIdxB = 3 + notZero = 99 // Validation require non-zero values + ) + + pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{ + vIdxA: testutil.RandomCorePubKey(t), + vIdxB: testutil.RandomCorePubKey(t), + } + + dutyA := eth2v1.AttesterDuty{ + Slot: slot, + ValidatorIndex: vIdxA, + CommitteeIndex: vIdxA, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } + + dutyB := eth2v1.AttesterDuty{ + Slot: slot, + ValidatorIndex: vIdxB, + CommitteeIndex: vIdxB, + CommitteeLength: notZero, + CommitteesAtSlot: notZero, + } + + defSet := core.DutyDefinitionSet{ + pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), + pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), + } + duty := core.NewAttesterDuty(slot) + + t.Run("happy path", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + // FetchOnly should not trigger subscribers + subscriberCalled := false + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + subscriberCalled = true + return nil + }) + + // FetchOnly should cache the attestation data without triggering subscribers + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address()) + require.NoError(t, err) + require.False(t, subscriberCalled, "FetchOnly should not trigger subscribers") + + // Now call Fetch, which should use the cached data + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + require.Equal(t, duty, resDuty) + require.Len(t, resDataSet, 2) + + dutyDataA := resDataSet[pubkeysByIdx[vIdxA]].(core.AttestationData) + require.EqualValues(t, slot, dutyDataA.Data.Slot) + require.EqualValues(t, vIdxA, dutyDataA.Data.Index) + require.Equal(t, dutyA, dutyDataA.Duty) + + dutyDataB := resDataSet[pubkeysByIdx[vIdxB]].(core.AttestationData) + require.EqualValues(t, slot, dutyDataB.Data.Slot) + require.EqualValues(t, vIdxB, dutyDataB.Data.Index) + require.Equal(t, dutyB, dutyDataB.Duty) + + return nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + }) + + t.Run("non-attester duty", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + proposerDuty := core.NewProposerDuty(slot) + err = fetch.FetchOnly(ctx, proposerDuty, defSet, bmock.Address()) + require.Error(t, err) + require.Contains(t, err.Error(), "unsupported duty") + }) + + t.Run("invalid cache entry", func(t *testing.T) { + bmock, err := beaconmock.New(t.Context()) + require.NoError(t, err) + + fetch := mustCreateFetcher(t, bmock) + + // FetchOnly should cache the data + err = fetch.FetchOnly(ctx, duty, defSet, bmock.Address()) + require.NoError(t, err) + + // Now call Fetch with the cached data - should work fine + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + require.Equal(t, duty, resDuty) + require.Len(t, resDataSet, 2) + return nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + }) +} diff --git a/core/interfaces.go b/core/interfaces.go index ca53ed086..f407e2e5b 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -24,6 +24,9 @@ type Scheduler interface { // GetDutyDefinition returns the definition set for a duty if already resolved. GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error) + + // RegisterFetcherFetchOnly registers the fetcher's FetchOnly method. + RegisterFetcherFetchOnly(func(context.Context, Duty, DutyDefinitionSet, string) error) } // Fetcher fetches proposed unsigned duty data. @@ -31,6 +34,9 @@ type Fetcher interface { // Fetch triggers fetching of a proposed duty data set. Fetch(context.Context, Duty, DutyDefinitionSet) error + // FetchOnly fetches attestation data and caches it without triggering subscribers. + FetchOnly(context.Context, Duty, DutyDefinitionSet, string) error + // Subscribe registers a callback for proposed unsigned duty data sets. Subscribe(func(context.Context, Duty, UnsignedDataSet) error) @@ -242,7 +248,9 @@ type wireFuncs struct { SchedulerSubscribeDuties func(func(context.Context, Duty, DutyDefinitionSet) error) SchedulerSubscribeSlots func(func(context.Context, Slot) error) SchedulerGetDutyDefinition func(context.Context, Duty) (DutyDefinitionSet, error) + SchedulerRegisterFetcherFetchOnly func(func(context.Context, Duty, DutyDefinitionSet, string) error) FetcherFetch func(context.Context, Duty, DutyDefinitionSet) error + FetcherFetchOnly func(context.Context, Duty, DutyDefinitionSet, string) error FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error) FetcherRegisterAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error)) FetcherRegisterAwaitAttData func(func(ctx context.Context, slot uint64, commIdx uint64) (*eth2p0.AttestationData, error)) @@ -296,7 +304,9 @@ func Wire(sched Scheduler, SchedulerSubscribeDuties: sched.SubscribeDuties, SchedulerSubscribeSlots: sched.SubscribeSlots, SchedulerGetDutyDefinition: sched.GetDutyDefinition, + SchedulerRegisterFetcherFetchOnly: sched.RegisterFetcherFetchOnly, FetcherFetch: fetch.Fetch, + FetcherFetchOnly: fetch.FetchOnly, FetcherSubscribe: fetch.Subscribe, FetcherRegisterAggSigDB: fetch.RegisterAggSigDB, FetcherRegisterAwaitAttData: fetch.RegisterAwaitAttData, @@ -338,6 +348,7 @@ func Wire(sched Scheduler, w.SchedulerSubscribeDuties(func(ctx context.Context, duty Duty, _ DutyDefinitionSet) error { return w.ConsensusParticipate(ctx, duty) }) + w.SchedulerRegisterFetcherFetchOnly(w.FetcherFetchOnly) w.FetcherSubscribe(w.ConsensusPropose) w.FetcherRegisterAggSigDB(w.AggSigDBAwait) w.FetcherRegisterAwaitAttData(w.DutyDBAwaitAttestation) diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index 026ea82df..4f8d2a396 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -85,9 +85,11 @@ type Scheduler struct { dutiesMutex sync.RWMutex dutySubs []func(context.Context, core.Duty, core.DutyDefinitionSet) error slotSubs []func(context.Context, core.Slot) error + fetcherFetchOnly func(context.Context, core.Duty, core.DutyDefinitionSet, string) error builderEnabled bool schedSlotFunc schedSlotFunc epochResolved map[uint64]chan struct{} // Notification channels for epoch resolution + eventTriggeredAttestations sync.Map // Track attestation duties triggered via sse block event (map[uint64]bool) } // SubscribeDuties subscribes a callback function for triggered duties. @@ -96,6 +98,12 @@ func (s *Scheduler) SubscribeDuties(fn func(context.Context, core.Duty, core.Dut s.dutySubs = append(s.dutySubs, fn) } +// RegisterFetcherFetchOnly registers the fetcher's FetchOnly method for early attestation fetching. +// Note this should be called *before* Start. +func (s *Scheduler) RegisterFetcherFetchOnly(fn func(context.Context, core.Duty, core.DutyDefinitionSet, string) error) { + s.fetcherFetchOnly = fn +} + // SubscribeSlots subscribes a callback function for triggered slots. // Note this should be called *before* Start. // TODO(corver): Add subscriber names for improved logging. @@ -157,6 +165,46 @@ func (s *Scheduler) HandleChainReorgEvent(ctx context.Context, epoch eth2p0.Epoc } } +// HandleBlockEvent handles SSE "block" events (block imported to fork choice) and triggers early attestation data fetching. +func (s *Scheduler) HandleBlockEvent(ctx context.Context, slot eth2p0.Slot, bnAddr string) { + if !featureset.Enabled(featureset.FetchAttOnBlock) || s.fetcherFetchOnly == nil { + return + } + + duty := core.Duty{ + Slot: uint64(slot), + Type: core.DutyAttester, + } + defSet, ok := s.getDutyDefinitionSet(duty) + if !ok { + // Nothing for this duty + return + } + + _, alreadyTriggered := s.eventTriggeredAttestations.LoadOrStore(uint64(slot), true) + if alreadyTriggered { + return + } + + // Clone defSet to prevent race conditions when it's modified or trimmed + clonedDefSet, err := defSet.Clone() + if err != nil { + log.Error(ctx, "Failed to clone duty definition set for early fetch", err) + return + } + + log.Debug(ctx, "Early attestation data fetch triggered by SSE block event", z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) + + // Fetch attestation data early without triggering consensus + // Use background context to prevent cancellation if SSE connection drops + go func() { + fetchCtx := log.CopyFields(context.Background(), ctx) + if err := s.fetcherFetchOnly(fetchCtx, duty, clonedDefSet, bnAddr); err != nil { + log.Warn(fetchCtx, "Early attestation data fetch failed", err, z.U64("slot", uint64(slot)), z.Str("bn_addr", bnAddr)) + } + }() +} + // emitCoreSlot calls all slot subscriptions asynchronously with the provided slot. func (s *Scheduler) emitCoreSlot(ctx context.Context, slot core.Slot) { for _, sub := range s.slotSubs { @@ -255,10 +303,16 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { } // Trigger duty async - go func() { + go func(duty core.Duty, defSet core.DutyDefinitionSet) { defer span.End() - if !delaySlotOffset(ctx, slot, duty, s.delayFunc) { + // Special handling for attester duties when FetchAttOnBlock is enabled + if duty.Type == core.DutyAttester && featureset.Enabled(featureset.FetchAttOnBlock) { + if !s.waitForBlockEventOrTimeout(dutyCtx, slot) { + return // context cancelled + } + s.eventTriggeredAttestations.Store(slot.Slot, true) + } else if !delaySlotOffset(dutyCtx, slot, duty, s.delayFunc) { return // context cancelled } @@ -275,14 +329,14 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot core.Slot) { log.Error(dutyCtx, "Failed to trigger duty subscriber", err, z.U64("slot", slot.Slot)) } } - }() - } - if slot.LastInEpoch() { - err := s.resolveDuties(ctx, slot.Next()) - if err != nil { - log.Warn(ctx, "Resolving duties error (retrying next slot)", err, z.U64("slot", slot.Slot)) - } + if slot.LastInEpoch() { + err := s.resolveDuties(ctx, slot.Next()) + if err != nil { + log.Warn(ctx, "Resolving duties error (retrying next slot)", err, z.U64("slot", slot.Slot)) + } + } + }(duty, defSet) } } @@ -306,6 +360,31 @@ func delaySlotOffset(ctx context.Context, slot core.Slot, duty core.Duty, delayF } } +// waitForBlockEventOrTimeout waits until the fallback timeout (T=1/3 + 300ms) is reached. +// Returns false if the context is cancelled, true otherwise. +func (s *Scheduler) waitForBlockEventOrTimeout(ctx context.Context, slot core.Slot) bool { + // Calculate fallback timeout: 1/3 + 300ms + fn, ok := slotOffsets[core.DutyAttester] + if !ok { + log.Warn(ctx, "Slot offset not found for attester duty, proceeding immediately", nil, z.U64("slot", slot.Slot)) + return true + } + offset := fn(slot.SlotDuration) + 300*time.Millisecond + fallbackDeadline := slot.Time.Add(offset) + + select { + case <-ctx.Done(): + return false + case <-s.clock.After(time.Until(fallbackDeadline)): + // Check if block event triggered early fetch + if _, triggered := s.eventTriggeredAttestations.Load(slot.Slot); !triggered { + log.Debug(ctx, "Proceeding with attestation at T=1/3+300ms (no early block event)", + z.U64("slot", slot.Slot)) + } + return true + } +} + // resolveDuties resolves the duties for the slot's epoch, caching the results. func (s *Scheduler) resolveDuties(ctx context.Context, slot core.Slot) error { s.setResolvingEpoch(slot.Epoch()) @@ -663,6 +742,32 @@ func (s *Scheduler) trimDuties(epoch uint64) { } delete(s.dutiesByEpoch, epoch) + + if featureset.Enabled(featureset.FetchAttOnBlock) { + s.trimEventTriggeredAttestations(epoch) + } +} + +// trimEventTriggeredAttestations removes old slot entries from eventTriggeredAttestations. +func (s *Scheduler) trimEventTriggeredAttestations(epoch uint64) { + ctx := context.Background() + _, slotsPerEpoch, err := eth2wrap.FetchSlotsConfig(ctx, s.eth2Cl) + if err != nil { + log.Warn(ctx, "Failed to fetch slots config for trimming event triggered attestations", err, z.U64("epoch", epoch)) + return + } + + minSlotToKeep := (epoch + 1) * slotsPerEpoch // first slot of next epoch + s.eventTriggeredAttestations.Range(func(key, _ any) bool { + slot, ok := key.(uint64) + if !ok { + return true // continue iteration + } + if slot < minSlotToKeep { + s.eventTriggeredAttestations.Delete(slot) + } + return true // continue iteration + }) } // submitValidatorRegistrations submits the validator registrations for all DVs. diff --git a/testutil/beaconmock/beaconmock.go b/testutil/beaconmock/beaconmock.go index 7f2a261cf..7cb176f1a 100644 --- a/testutil/beaconmock/beaconmock.go +++ b/testutil/beaconmock/beaconmock.go @@ -475,6 +475,11 @@ func (m Mock) IsSynced() bool { return m.IsSyncedFunc() } +// ClientForAddress returns the same mock since it represents a single beacon node. +func (m Mock) ClientForAddress(_ string) eth2wrap.Client { + return m +} + func (m Mock) Close() error { m.headProducer.Close()