Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions core/services/arbiter/arbiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package arbiter

import (
"context"
"net"
"sync"

"google.golang.org/grpc"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"

// TODO: Update this import path once proto is generated
pb "github.com/smartcontractkit/chainlink/v2/core/services/arbiter/proto"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const (
// DefaultGRPCPort is the default port for the gRPC server.
DefaultGRPCPort = ":9090"
)

// Arbiter is the main service interface.
type Arbiter interface {
services.Service
HealthReport() map[string]error
}

type arbiter struct {
services.StateMachine

grpcServer *grpc.Server
grpcHandler *GRPCServer
state *State
decision DecisionEngine
shardConfig ShardConfigReader
lggr logger.Logger

grpcAddr string
stopCh services.StopChan
wg sync.WaitGroup
}

var _ Arbiter = (*arbiter)(nil)

// New creates a new Arbiter service.
func New(
lggr logger.Logger,
contractReader types.ContractReader,
shardConfigAddr string,
) (Arbiter, error) {
lggr = lggr.Named("Arbiter")

// Create state
state := NewState()

// Create ShardConfig reader
shardConfig := NewShardConfigReader(contractReader, shardConfigAddr, lggr)

// Create decision engine
decision := NewDecisionEngine(shardConfig, lggr)

Check failure on line 62 in core/services/arbiter/arbiter.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

cannot use lggr (variable of interface type "github.com/smartcontractkit/chainlink/v2/core/logger".Logger) as "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger value in argument to NewDecisionEngine: "github.com/smartcontractkit/chainlink/v2/core/logger".Logger does not implement "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger (missing method AssumptionViolation)

Check failure on line 62 in core/services/arbiter/arbiter.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

cannot use lggr (variable of interface type "github.com/smartcontractkit/chainlink/v2/core/logger".Logger) as "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger value in argument to NewDecisionEngine: "github.com/smartcontractkit/chainlink/v2/core/logger".Logger does not implement "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger (missing method AssumptionViolation)

Check failure on line 62 in core/services/arbiter/arbiter.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

cannot use lggr (variable of interface type "github.com/smartcontractkit/chainlink/v2/core/logger".Logger) as "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger value in argument to NewDecisionEngine: "github.com/smartcontractkit/chainlink/v2/core/logger".Logger does not implement "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger (missing method AssumptionViolation)

Check failure on line 62 in core/services/arbiter/arbiter.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

cannot use lggr (variable of interface type "github.com/smartcontractkit/chainlink/v2/core/logger".Logger) as "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger value in argument to NewDecisionEngine: "github.com/smartcontractkit/chainlink/v2/core/logger".Logger does not implement "github.com/smartcontractkit/chainlink/v2/core/logger".SugaredLogger (missing method AssumptionViolation)

// Create gRPC handler
grpcHandler := NewGRPCServer(state, decision, lggr)

// Create gRPC server
grpcServer := grpc.NewServer()
pb.RegisterArbiterServiceServer(grpcServer, grpcHandler)

return &arbiter{
grpcServer: grpcServer,
grpcHandler: grpcHandler,
state: state,
decision: decision,
shardConfig: shardConfig,
lggr: lggr,
grpcAddr: DefaultGRPCPort,
stopCh: make(services.StopChan),
}, nil
}

// Start starts the Arbiter service.
func (a *arbiter) Start(ctx context.Context) error {
return a.StartOnce("Arbiter", func() error {
a.lggr.Info("Starting Arbiter service")

// Start gRPC server in a goroutine
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.runGRPCServer()
}()

a.lggr.Infow("Arbiter service started",
"grpcAddr", a.grpcAddr,
)

return nil
})
}

// runGRPCServer starts the gRPC server and blocks until stopped.
func (a *arbiter) runGRPCServer() {
lis, err := net.Listen("tcp", a.grpcAddr)
if err != nil {
a.lggr.Errorw("Failed to listen for gRPC",
"addr", a.grpcAddr,
"error", err,
)
return
}

a.lggr.Infow("gRPC server listening",
"addr", a.grpcAddr,
)

if err := a.grpcServer.Serve(lis); err != nil {
// Check if this is a normal shutdown
select {
case <-a.stopCh:
// Normal shutdown, don't log as error
a.lggr.Debug("gRPC server stopped")
default:
a.lggr.Errorw("gRPC server error",
"error", err,
)
}
}
}

// Close stops the Arbiter service.
func (a *arbiter) Close() error {
return a.StopOnce("Arbiter", func() (err error) {
a.lggr.Info("Stopping Arbiter service")

// Signal stop
close(a.stopCh)

// Graceful shutdown of gRPC server
a.grpcServer.GracefulStop()
a.lggr.Debug("gRPC server stopped gracefully")

// Wait for goroutines
a.wg.Wait()

a.lggr.Info("Arbiter service stopped")

return nil
})
}

// HealthReport returns the health status of the service.
func (a *arbiter) HealthReport() map[string]error {
return map[string]error{
a.Name(): a.Ready(),
}
}

// Name returns the service name.
func (a *arbiter) Name() string {
return a.lggr.Name()
}
234 changes: 234 additions & 0 deletions core/services/arbiter/arbiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package arbiter

import (
"context"
"iter"
"math/big"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

// mockContractReader is a mock implementation of types.ContractReader for testing.
type mockContractReader struct {
types.UnimplementedContractReader
desiredShardCount uint64
err error
}

func (m *mockContractReader) Name() string {
return "mockContractReader"
}

func (m *mockContractReader) Start(ctx context.Context) error {
return nil
}

func (m *mockContractReader) Close() error {
return nil
}

func (m *mockContractReader) Ready() error {
return nil
}

func (m *mockContractReader) HealthReport() map[string]error {
return nil
}

func (m *mockContractReader) Bind(ctx context.Context, bindings []types.BoundContract) error {
return nil
}

func (m *mockContractReader) Unbind(ctx context.Context, bindings []types.BoundContract) error {
return nil
}

func (m *mockContractReader) GetLatestValue(ctx context.Context, readIdentifier string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) error {
if m.err != nil {
return m.err
}
// Set the result to our mock value
if ptr, ok := returnVal.(**big.Int); ok {
*ptr = big.NewInt(int64(m.desiredShardCount))
}
return nil
}

func (m *mockContractReader) GetLatestValueWithHeadData(ctx context.Context, readIdentifier string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (head *types.Head, err error) {
err = m.GetLatestValue(ctx, readIdentifier, confidenceLevel, params, returnVal)
return nil, err
}

func (m *mockContractReader) BatchGetLatestValues(ctx context.Context, request types.BatchGetLatestValuesRequest) (types.BatchGetLatestValuesResult, error) {
return nil, nil
}

func (m *mockContractReader) QueryKey(ctx context.Context, contract types.BoundContract, filter query.KeyFilter, limitAndSort query.LimitAndSort, sequenceDataType any) ([]types.Sequence, error) {
return nil, nil
}

func (m *mockContractReader) QueryKeys(ctx context.Context, filters []types.ContractKeyFilter, limitAndSort query.LimitAndSort) (iter.Seq2[string, types.Sequence], error) {
return nil, nil
}

func TestArbiter_New(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")

require.NoError(t, err)
require.NotNil(t, arb)
assert.Equal(t, "Arbiter", arb.Name())
}

func TestArbiter_StartClose(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

// Test start
err = arb.Start(context.Background())
require.NoError(t, err)

// Give gRPC server a moment to start
time.Sleep(50 * time.Millisecond)

// Test health after start
healthReport := arb.HealthReport()
require.Contains(t, healthReport, arb.Name())
assert.NoError(t, healthReport[arb.Name()])

// Test close
err = arb.Close()
require.NoError(t, err)
}

func TestArbiter_ServiceTestRun(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

// Use servicetest.Run to handle lifecycle
// This starts the service and registers cleanup to stop it
servicetest.Run(t, arb)

// Service should be running after servicetest.Run
err = arb.Ready()
require.NoError(t, err)
}

func TestArbiter_HealthReport(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

t.Run("before start - not ready", func(t *testing.T) {
healthReport := arb.HealthReport()
require.Contains(t, healthReport, arb.Name())
// Before start, Ready() should return an error
assert.Error(t, healthReport[arb.Name()])
})

t.Run("after start - ready", func(t *testing.T) {
err := arb.Start(context.Background())
require.NoError(t, err)

healthReport := arb.HealthReport()
require.Contains(t, healthReport, arb.Name())
assert.NoError(t, healthReport[arb.Name()])

err = arb.Close()
require.NoError(t, err)
})
}

func TestArbiter_DoubleStart(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

// First start should succeed
err = arb.Start(context.Background())
require.NoError(t, err)

// Second start should return error (StartOnce)
err = arb.Start(context.Background())
assert.Error(t, err)

err = arb.Close()
require.NoError(t, err)
}

func TestArbiter_DoubleClose(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

err = arb.Start(context.Background())
require.NoError(t, err)

// First close should succeed
err = arb.Close()
require.NoError(t, err)

// Second close should return error (StopOnce)
err = arb.Close()
assert.Error(t, err)
}

func TestArbiter_Name(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

assert.Equal(t, "Arbiter", arb.Name())
}

func TestArbiter_Ready(t *testing.T) {
lggr := logger.TestLogger(t)
mockReader := &mockContractReader{desiredShardCount: 10}

arb, err := New(lggr, mockReader, "0x1234567890abcdef")
require.NoError(t, err)

// Before start, Ready should return error
err = arb.Ready()
assert.Error(t, err)

// After start, Ready should return nil
err = arb.Start(context.Background())
require.NoError(t, err)

err = arb.Ready()
assert.NoError(t, err)

// After close, Ready should return error
err = arb.Close()
require.NoError(t, err)

err = arb.Ready()
assert.Error(t, err)
}
Loading
Loading