Skip to content

Refactor coordinator logic and add sharded support#304

Open
bootjp wants to merge 23 commits intomainfrom
feature/multi-raft
Open

Refactor coordinator logic and add sharded support#304
bootjp wants to merge 23 commits intomainfrom
feature/multi-raft

Conversation

@bootjp
Copy link
Owner

@bootjp bootjp commented Feb 5, 2026

No description provided.

Copilot AI review requested due to automatic review settings February 5, 2026 12:54
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @bootjp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a fundamental architectural shift by implementing sharding capabilities within the system. It abstracts the coordination logic into a flexible interface, enabling operations to be routed to specific Raft groups based on key ranges. This change impacts how data is stored, retrieved, and how transactions are processed, ensuring scalability and distributed consistency. The accompanying updates to the Jepsen test suite provide robust validation for these new sharding mechanisms.

Highlights

  • Sharding Support: Introduced sharding capabilities by refactoring the coordinator logic to support multiple Raft groups and key-based routing. New components like LeaderProxy, ShardStore, and ShardedCoordinator manage distributed operations.
  • Coordinator Interface Refactoring: The kv.Coordinate type has been replaced with a kv.Coordinator interface, which now includes key-aware leader checking methods (IsLeaderForKey, VerifyLeaderForKey, RaftLeaderForKey) to facilitate sharding.
  • Transaction Handling Improvements: Transaction dispatching logic has been streamlined to explicitly handle PREPARE and COMMIT phases using a new elemToMutation helper, simplifying the creation of transactional requests.
  • Redis Adapter Updates: The Redis adapter now uses constants for commands, improving readability and maintainability. It also integrates the new key-aware coordinator methods for sharded operations like EXISTS, LRANGE, and RPUSH.
  • Jepsen Test Suite Enhancement: The Jepsen test suite has been significantly updated to support and validate multi-group Raft configurations and shard ranges, including new Docker files for test environment setup and execution.
Changelog
  • adapter/dynamodb.go
    • Updated NewDynamoDBServer to use the kv.Coordinator interface.
  • adapter/grpc.go
    • Updated NewGRPCServer to use the kv.Coordinator interface.
    • Modified RawGet to perform key-specific leader checks using IsLeaderForKey.
    • Modified tryLeaderGet to retrieve key-specific Raft leaders using RaftLeaderForKey.
  • adapter/redis.go
    • Introduced constants for Redis commands (e.g., cmdGet, cmdSet) for improved code clarity.
    • Updated NewRedisServer to accept the kv.Coordinator interface.
    • Replaced hardcoded Redis command strings with new constants in command routing and transaction logic.
    • Implemented key-specific leader checks (IsLeaderForKey, VerifyLeaderForKey) for commands like EXISTS, LRANGE, and RPUSH.
    • Refactored transaction timestamp generation with new txnStartTS, maxLatestCommitTS, and bumpLatestCommitTS helpers.
    • Updated proxy functions (proxyLRange, proxyRPush, tryLeaderGetAt, readValueAt) to use RaftLeaderForKey.
  • adapter/redis_proxy.go
    • Modified proxyExists to use RaftLeaderForKey for key-specific leader retrieval.
  • jepsen/docker/id_rsa
    • Added a new RSA private key for Jepsen control node SSH access.
  • jepsen/docker/run-in-docker.sh
    • Added a new script to set up and run Jepsen tests within a Docker container, including Go and Leiningen installation, and SSH configuration.
  • jepsen/docker/ssh_config
    • Added a new SSH configuration file for Jepsen nodes.
  • jepsen/src/elastickv/db.clj
    • Added functions (group-ids, group-addr, build-raft-groups-arg) to manage multiple Raft groups.
    • Updated build-raft-redis-map to support mapping Redis addresses to specific Raft group addresses.
    • Modified start-node! to accept raft-groups and shard-ranges options, enabling group-specific gRPC addresses.
    • Added validation to ensure shard-ranges are provided when multiple raft-groups are configured.
    • Updated wait-for-grpc! to handle waiting on multiple gRPC ports for sharded setups.
    • Modified join-node! to allow peers to join specific Raft groups.
    • Updated db function documentation to reflect new raft-groups and shard-ranges options.
  • jepsen/src/elastickv/redis_workload.clj
    • Imported additional Jepsen modules (jepsen.db, jepsen.os, jepsen.nemesis).
    • Introduced a --local CLI option to run tests without SSH or nemesis for local development.
    • Updated db initialization to pass raft-groups and shard-ranges to the database configuration.
    • Conditionalized OS, network, and nemesis setup based on the --local flag.
    • Added new CLI options: --local, --host, --ports, --raft-groups, --shard-ranges.
    • Modified the main function to process new CLI options and dynamically generate node lists for local testing.
  • kv/coordinator.go
    • Extended the Coordinator interface with IsLeaderForKey, VerifyLeaderForKey, and RaftLeaderForKey methods.
    • Provided default implementations for the new key-aware leader methods in the Coordinate struct.
    • Refactored dispatchTxn to use elemToMutation and explicitly construct PREPARE and COMMIT phase requests.
    • Removed the toTxnRequests helper function.
    • Added ErrLeaderNotFound error constant.
    • Updated redirect to use elemToMutation for transactional requests.
    • Added elemToMutation helper function to convert operation elements to protobuf mutations.
  • kv/leader_proxy.go
    • New file: Implements LeaderProxy to forward transactional requests to the current Raft leader, handling cases where the local node is not the leader.
  • kv/shard_key.go
    • New file: Provides the routeKey function to normalize internal keys (e.g., list metadata) to their logical user key for consistent shard routing.
  • kv/shard_router.go
    • Modified groupRequests and Get methods to utilize routeKey for proper key-based routing to shards.
  • kv/shard_router_test.go
    • Added TestShardRouterRoutesListKeys to verify that list metadata keys are correctly routed to their respective shards.
  • kv/shard_store.go
    • New file: Implements ShardStore, an MVCC store wrapper that routes read operations to the correct shard-specific store and proxies requests to leaders when necessary.
  • kv/sharded_coordinator.go
    • New file: Implements ShardedCoordinator, which routes operations to shard-specific Raft groups, manages a shared Hybrid Logical Clock (HLC), and dispatches requests via the ShardRouter.
  • kv/sharded_integration_test.go
    • New file: Contains integration tests for the sharded coordinator, including setup for multiple Raft groups and verification of key distribution.
  • main.go
    • Removed direct NewRaft call and related imports, transitioning to a multi-Raft group setup.
    • Added new command-line flags: raftGroups, shardRanges, and raftRedisMap for configuring sharding.
    • Refactored the main execution flow into a run function for better error handling and modularity.
    • Introduced helper functions (parseRaftGroups, parseShardRanges, buildEngine, buildLeaderRedis, buildShardGroups, startRaftServers, startRedisServer) to manage the initialization of sharded components.
    • Updated gRPC server registration to use ShardStore and ShardedCoordinator, and registered the DistributionServer.
  • multiraft_runtime.go
    • New file: Defines raftGroupRuntime and newRaftGroup for managing the lifecycle and configuration of individual Raft groups in a multi-raft deployment.
  • shard_config.go
    • New file: Defines groupSpec and rangeSpec structs, along with parsing and validation functions for Raft group configurations and shard ranges.
Activity
  • The pull request introduces a major architectural change by implementing sharding, moving from a single Raft group to a multi-Raft group setup.
  • Existing coordinator logic has been abstracted into an interface (kv.Coordinator) to support key-aware routing for sharded operations.
  • New core components (LeaderProxy, ShardStore, ShardedCoordinator, ShardRouter) have been added to manage key distribution, transaction forwarding, and MVCC reads across shards.
  • The Redis adapter has been updated to integrate with the new sharding logic, ensuring Redis commands are routed to the correct Raft group.
  • The Jepsen test suite has been extended with new configurations and Docker support to rigorously test the consistency and fault tolerance of the sharded system.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant architectural change by adding sharding support to the system. It refactors the coordinator logic, introducing a ShardedCoordinator, ShardStore, and ShardRouter to manage distributed data and operations across multiple Raft groups. The adapters for gRPC, Redis, and DynamoDB have been updated to be shard-aware. While this is a comprehensive effort to enable horizontal scaling, the review has identified several critical and high-severity issues. These include a major security vulnerability due to a committed private SSH key, hardcoded paths that break environment portability, and multiple instances of stale data reads that could compromise correctness guarantees like linearizability and snapshot isolation. These issues should be addressed before merging.

Comment on lines +28 to +33
func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
if p.raft.State() == raft.Leader {
return p.tm.Commit(reqs)
}
return p.forward(reqs)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The leadership check p.raft.State() == raft.Leader is not sufficient to prevent writes on a stale leader. A deposed leader might still think it's the leader for a short period, which can lead to split-brain scenarios where both the old and new leader accept writes. To ensure linearizability, you should use p.raft.VerifyLeader(), which confirms leadership with a quorum of nodes before proceeding with the operation. The same issue exists in the Abort method.

func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
	if p.raft.State() != raft.Leader {
		return p.forward(reqs)
	}
	if err := p.raft.VerifyLeader().Error(); err != nil {
		return p.forward(reqs)
	}
	return p.tm.Commit(reqs)
}

Comment on lines +824 to +844
func (r *RedisServer) maxLatestCommitTS(queue []redcon.Command) uint64 {
var maxTS uint64
if r.store == nil {
return maxTS
}
seen := make(map[string]struct{})
for _, cmd := range queue {
if len(cmd.Args) < minKeyedArgs {
continue
}
name := strings.ToUpper(string(cmd.Args[0]))
switch name {
case cmdSet, cmdGet, cmdDel, cmdExists, cmdRPush, cmdLRange:
key := cmd.Args[1]
r.bumpLatestCommitTS(&maxTS, key, seen)
// Also account for list metadata keys to avoid stale typing decisions.
r.bumpLatestCommitTS(&maxTS, listMetaKey(key), seen)
}
}
return maxTS
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The maxLatestCommitTS function relies on r.store.LatestCommitTS() to determine the start timestamp for a transaction. However, the underlying kv.ShardStore.LatestCommitTS implementation performs a local read without verifying leadership or proxying to the leader. This can result in reading a stale commit timestamp if the current node is a follower for a shard. Using a stale timestamp to calculate the transaction's startTS can break snapshot isolation guarantees, as the transaction might read data from a point in time before a more recent write occurred.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. Using a stale start timestamp can violate snapshot isolation.

Comment on lines +114 to +124
func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error) {
g, ok := s.groupForKey(key)
if !ok || g.Store == nil {
return 0, false, nil
}
ts, exists, err := g.Store.LatestCommitTS(ctx, key)
if err != nil {
return 0, false, errors.WithStack(err)
}
return ts, exists, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The LatestCommitTS method reads from the local store (g.Store) without verifying leadership or proxying the request to the leader. If this node is a follower for the shard containing the key, the returned commit timestamp can be stale. This can cause transactions that rely on this method (like those from the Redis adapter) to start with a startTS that is too old, violating snapshot isolation guarantees. To ensure correctness, this method should either verify leadership before reading or proxy the request to the shard's leader to get an up-to-date commit timestamp.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. A stale commit timestamp can lead to an incorrect start timestamp, violating snapshot isolation.

Comment on lines +52 to +54
if reqs.IsTxn && reqs.StartTS == 0 {
reqs.StartTS = c.clock.Next()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Assigning a transaction's startTS with just c.clock.Next() is not sufficient to guarantee snapshot isolation. A transaction must start with a timestamp greater than the commit timestamp of any data it might read. The current implementation does not account for this, which can lead to read-write conflicts. A more robust approach, as demonstrated in the Redis adapter's txnStartTS function, is to determine the startTS based on the latest commit timestamps of all keys involved in the transaction. This logic should be generalized and moved into the coordinator to be used by all transactional clients.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. The start timestamp must be chosen carefully to ensure snapshot isolation.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the coordinator architecture to support multi-raft group sharding, enabling horizontal scaling through key-range partitioning. The changes introduce a pluggable coordinator interface with two implementations: the original single-group Coordinate and a new ShardedCoordinator that routes operations across multiple raft groups based on configurable key ranges.

Changes:

  • Introduces sharded coordinator infrastructure with key-range based routing via a new distribution engine
  • Refactors main.go to support multi-raft group initialization with configurable shard ranges
  • Updates all adapters (gRPC, Redis, DynamoDB) to use the abstract Coordinator interface with key-aware leader operations
  • Adds Jepsen test enhancements for sharded deployment validation

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
shard_config.go Parses and validates raft group and shard range configuration from command-line flags
multiraft_runtime.go Initializes multiple raft groups with per-group data directories
main.go Refactored server initialization to build sharded infrastructure and start per-group gRPC servers
kv/sharded_coordinator.go New coordinator implementation routing operations to shards via distribution engine
kv/shard_store.go MVCC store wrapper routing reads/writes to appropriate shard stores with leader proxying
kv/shard_router.go Routes requests to shard-specific transactional managers, now using routeKey normalization
kv/shard_key.go Normalizes internal keys (list metadata/items) to user keys for routing
kv/leader_proxy.go Forwards transactional requests to raft leaders when local node is follower
kv/coordinator.go Extended interface with key-aware leader methods; refactored transaction log building
adapter/grpc.go Updated to use Coordinator interface with IsLeaderForKey/RaftLeaderForKey
adapter/redis.go Updated to use Coordinator interface; refactored transaction timestamp logic
adapter/dynamodb.go Updated to use Coordinator interface
adapter/redis_proxy.go Updated to use RaftLeaderForKey for key-aware proxying
kv/sharded_integration_test.go Integration test validating multi-shard coordinator dispatch
kv/shard_router_test.go New test validating list key routing with routeKey normalization
jepsen/src/elastickv/db.clj Enhanced to support multi-group raft configuration and validation
jepsen/src/elastickv/redis_workload.clj Added local mode and sharding configuration options
jepsen/docker/ssh_config SSH configuration for local Docker-based Jepsen testing
jepsen/docker/run-in-docker.sh Docker entrypoint script for containerized Jepsen tests
jepsen/docker/id_rsa Standard Vagrant insecure private key for test environments

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 58 to 79
func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error) {
if limit <= 0 {
return []*store.KVPair{}, nil
}
var out []*store.KVPair
for _, g := range s.groups {
if g == nil || g.Store == nil {
continue
}
kvs, err := g.Store.ScanAt(ctx, start, end, limit, ts)
if err != nil {
return nil, errors.WithStack(err)
}
out = append(out, kvs...)
}
sort.Slice(out, func(i, j int) bool {
return bytes.Compare(out[i].Key, out[j].Key) < 0
})
if len(out) > limit {
out = out[:limit]
}
return out, nil
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScanAt method iterates through all groups and gathers results without considering the key range boundaries. This means it will fetch results from groups that don't actually contain keys in the requested range [start, end). For a truly sharded implementation, this should only query groups whose shard ranges intersect with the scan range to avoid unnecessary work and incorrect results.

Copilot uses AI. Check for mistakes.
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot open a new pull request to apply changes based on this feedback

Copy link
Contributor

Copilot AI commented Feb 5, 2026

@bootjp I've opened a new pull request, #305, to work on those changes. Once the pull request is ready, I'll request review from you.

Copy link
Contributor

Copilot AI commented Feb 5, 2026

@bootjp I've opened a new pull request, #306, to work on those changes. Once the pull request is ready, I'll request review from you.

bootjp and others added 13 commits February 5, 2026 22:52
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Added GetIntersectingRoutes method to Engine to find routes whose key ranges
intersect with the scan range [start, end). Modified ScanAt in ShardStore to
only query groups whose shard ranges intersect with the requested scan range,
avoiding unnecessary work and incorrect results.

Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Fix ScanAt to only query shards with intersecting key ranges
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copilot AI and others added 2 commits February 5, 2026 14:27
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Fix resource leaks in multiraft_runtime.go error paths
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 25 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +51 to +58
conn, err := grpc.NewClient(string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
if err != nil {
return nil, errors.WithStack(err)
}
defer conn.Close()
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new gRPC connection for every request can cause significant performance overhead and connection exhaustion under high load. Consider implementing connection pooling or reusing connections, especially since these proxy calls happen on every read/write operation when not the leader. A connection pool or cached connections per leader address would reduce latency and resource consumption significantly.

Copilot uses AI. Check for mistakes.
Comment on lines +218 to +225
conn, err := grpc.NewClient(string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
if err != nil {
return nil, errors.WithStack(err)
}
defer conn.Close()
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new gRPC connection for every proxied read operation can cause significant performance overhead and connection exhaustion under high load. Consider implementing connection pooling or reusing connections, especially since these proxy calls happen on every read operation from a follower node. A connection pool or cached connections per leader address would reduce latency and resource consumption significantly.

Copilot uses AI. Check for mistakes.
Comment on lines +31 to +151
func parseRaftGroups(raw, defaultAddr string) ([]groupSpec, error) {
if raw == "" {
if defaultAddr == "" {
return nil, ErrAddressRequired
}
return []groupSpec{{id: 1, address: defaultAddr}}, nil
}
parts := strings.Split(raw, ",")
groups := make([]groupSpec, 0, len(parts))
seen := map[uint64]struct{}{}
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" {
continue
}
kv := strings.SplitN(part, "=", splitParts)
if len(kv) != splitParts {
return nil, errors.WithStack(errors.Newf("invalid raftGroups entry: %q", part))
}
id, err := strconv.ParseUint(kv[0], 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "invalid group id %q", kv[0])
}
addr := strings.TrimSpace(kv[1])
if addr == "" {
return nil, errors.WithStack(errors.Newf("empty address for group %d", id))
}
if _, ok := seen[id]; ok {
return nil, errors.WithStack(errors.Newf("duplicate group id %d", id))
}
seen[id] = struct{}{}
groups = append(groups, groupSpec{id: id, address: addr})
}
if len(groups) == 0 {
return nil, ErrNoRaftGroupsConfigured
}
return groups, nil
}

func parseShardRanges(raw string, defaultGroup uint64) ([]rangeSpec, error) {
if raw == "" {
return []rangeSpec{{start: []byte(""), end: nil, groupID: defaultGroup}}, nil
}
parts := strings.Split(raw, ",")
ranges := make([]rangeSpec, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" {
continue
}
kv := strings.SplitN(part, "=", splitParts)
if len(kv) != splitParts {
return nil, errors.WithStack(errors.Newf("invalid shardRanges entry: %q", part))
}
groupID, err := strconv.ParseUint(kv[1], 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "invalid group id in %q", part)
}
rangePart := kv[0]
bounds := strings.SplitN(rangePart, ":", splitParts)
if len(bounds) != splitParts {
return nil, errors.WithStack(errors.Newf("invalid range %q (expected start:end)", rangePart))
}
start := []byte(bounds[0])
var end []byte
if bounds[1] != "" {
end = []byte(bounds[1])
}
ranges = append(ranges, rangeSpec{start: start, end: end, groupID: groupID})
}
if len(ranges) == 0 {
return nil, ErrNoShardRangesConfigured
}
return ranges, nil
}

func parseRaftRedisMap(raw string) map[raft.ServerAddress]string {
out := make(map[raft.ServerAddress]string)
if raw == "" {
return out
}
parts := strings.Split(raw, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "" {
continue
}
kv := strings.SplitN(part, "=", splitParts)
if len(kv) != splitParts {
continue
}
out[raft.ServerAddress(kv[0])] = kv[1]
}
return out
}

func defaultGroupID(groups []groupSpec) uint64 {
min := uint64(0)
for _, g := range groups {
if min == 0 || g.id < min {
min = g.id
}
}
if min == 0 {
return 1
}
return min
}

func validateShardRanges(ranges []rangeSpec, groups []groupSpec) error {
ids := map[uint64]struct{}{}
for _, g := range groups {
ids[g.id] = struct{}{}
}
for _, r := range ranges {
if _, ok := ids[r.groupID]; !ok {
return fmt.Errorf("shard range references unknown group %d", r.groupID)
}
}
return nil
}
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parseRaftGroups, parseShardRanges, parseRaftRedisMap, defaultGroupID, and validateShardRanges functions lack unit tests. Given that these functions handle critical configuration parsing and validation logic that could lead to runtime failures if incorrect, they should have comprehensive test coverage. Consider adding tests for valid inputs, edge cases (empty strings, malformed entries), invalid formats, and boundary conditions (e.g., duplicate group IDs, missing addresses, references to unknown groups).

Copilot uses AI. Check for mistakes.
Comment on lines +26 to +106
func groupDataDir(baseDir, raftID string, groupID uint64, multi bool) string {
if !multi {
return filepath.Join(baseDir, raftID)
}
return filepath.Join(baseDir, raftID, fmt.Sprintf("group-%d", groupID))
}

func newRaftGroup(raftID string, group groupSpec, baseDir string, multi bool, bootstrap bool, fsm raft.FSM) (*raft.Raft, *transport.Manager, error) {
c := raft.DefaultConfig()
c.LocalID = raft.ServerID(raftID)
c.HeartbeatTimeout = heartbeatTimeout
c.ElectionTimeout = electionTimeout
c.LeaderLeaseTimeout = leaderLease

dir := groupDataDir(baseDir, raftID, group.id, multi)
if err := os.MkdirAll(dir, raftDirPerm); err != nil {
return nil, nil, errors.WithStack(err)
}

ldb, err := boltdb.NewBoltStore(filepath.Join(dir, "logs.dat"))
if err != nil {
// No cleanup needed here - ldb creation failed, so no resource was allocated
return nil, nil, errors.WithStack(err)
}

// Define cleanup function immediately after ldb is created to ensure
// proper cleanup in all error paths. If we return before this point,
// no cleanup is needed since ldb creation failed.
var sdb *boltdb.BoltStore
var r *raft.Raft
cleanup := func() {
if ldb != nil {
_ = ldb.Close()
}
if sdb != nil {
_ = sdb.Close()
}
}

sdb, err = boltdb.NewBoltStore(filepath.Join(dir, "stable.dat"))
if err != nil {
cleanup()
return nil, nil, errors.WithStack(err)
}

fss, err := raft.NewFileSnapshotStore(dir, snapshotRetainCount, os.Stderr)
if err != nil {
cleanup()
return nil, nil, errors.WithStack(err)
}

tm := transport.New(raft.ServerAddress(group.address), []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
})

r, err = raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
if err != nil {
cleanup()
return nil, nil, errors.WithStack(err)
}

if bootstrap {
cfg := raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
ID: raft.ServerID(raftID),
Address: raft.ServerAddress(group.address),
},
},
}
f := r.BootstrapCluster(cfg)
if err := f.Error(); err != nil {
_ = r.Shutdown().Error()
cleanup()
return nil, nil, errors.WithStack(err)
}
}

return r, tm, nil
}
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The groupDataDir and newRaftGroup functions lack unit tests. While newRaftGroup has complex resource management logic (which was previously fixed), it should have tests to verify proper cleanup on errors, correct directory creation, and bootstrap behavior. Consider adding tests using in-memory stores and mocked filesystems where appropriate.

Copilot uses AI. Check for mistakes.
Comment on lines +16 to +24
curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-${ARCH}.tar.gz" -o go.tar.gz
tar -C /usr/local -xzf go.tar.gz
export PATH=$PATH:/usr/local/go/bin
fi

# Install Leiningen
if ! command -v lein >/dev/null 2>&1; then
curl -L https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > /usr/local/bin/lein
chmod +x /usr/local/bin/lein
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script downloads and executes build tooling (the Go toolchain tarball and especially the Leiningen bootstrap script from raw.githubusercontent.com) at runtime without any integrity verification or pinning to an immutable reference. If an attacker compromises the download endpoint, TLS path, or the stable reference of the Leiningen repository, they could serve a modified script and gain arbitrary code execution as root inside the test container. To mitigate this supply-chain risk, pin these downloads to specific versioned artifacts or commit SHAs and validate them with checksums or signatures, or prefer base images that already include the required tools instead of curling and executing them during container startup.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants