Refactor coordinator logic and add sharded support#304
Conversation
Summary of ChangesHello @bootjp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a fundamental architectural shift by implementing sharding capabilities within the system. It abstracts the coordination logic into a flexible interface, enabling operations to be routed to specific Raft groups based on key ranges. This change impacts how data is stored, retrieved, and how transactions are processed, ensuring scalability and distributed consistency. The accompanying updates to the Jepsen test suite provide robust validation for these new sharding mechanisms. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant architectural change by adding sharding support to the system. It refactors the coordinator logic, introducing a ShardedCoordinator, ShardStore, and ShardRouter to manage distributed data and operations across multiple Raft groups. The adapters for gRPC, Redis, and DynamoDB have been updated to be shard-aware. While this is a comprehensive effort to enable horizontal scaling, the review has identified several critical and high-severity issues. These include a major security vulnerability due to a committed private SSH key, hardcoded paths that break environment portability, and multiple instances of stale data reads that could compromise correctness guarantees like linearizability and snapshot isolation. These issues should be addressed before merging.
| func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error) { | ||
| if p.raft.State() == raft.Leader { | ||
| return p.tm.Commit(reqs) | ||
| } | ||
| return p.forward(reqs) | ||
| } |
There was a problem hiding this comment.
The leadership check p.raft.State() == raft.Leader is not sufficient to prevent writes on a stale leader. A deposed leader might still think it's the leader for a short period, which can lead to split-brain scenarios where both the old and new leader accept writes. To ensure linearizability, you should use p.raft.VerifyLeader(), which confirms leadership with a quorum of nodes before proceeding with the operation. The same issue exists in the Abort method.
func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
if p.raft.State() != raft.Leader {
return p.forward(reqs)
}
if err := p.raft.VerifyLeader().Error(); err != nil {
return p.forward(reqs)
}
return p.tm.Commit(reqs)
}| func (r *RedisServer) maxLatestCommitTS(queue []redcon.Command) uint64 { | ||
| var maxTS uint64 | ||
| if r.store == nil { | ||
| return maxTS | ||
| } | ||
| seen := make(map[string]struct{}) | ||
| for _, cmd := range queue { | ||
| if len(cmd.Args) < minKeyedArgs { | ||
| continue | ||
| } | ||
| name := strings.ToUpper(string(cmd.Args[0])) | ||
| switch name { | ||
| case cmdSet, cmdGet, cmdDel, cmdExists, cmdRPush, cmdLRange: | ||
| key := cmd.Args[1] | ||
| r.bumpLatestCommitTS(&maxTS, key, seen) | ||
| // Also account for list metadata keys to avoid stale typing decisions. | ||
| r.bumpLatestCommitTS(&maxTS, listMetaKey(key), seen) | ||
| } | ||
| } | ||
| return maxTS | ||
| } |
There was a problem hiding this comment.
The maxLatestCommitTS function relies on r.store.LatestCommitTS() to determine the start timestamp for a transaction. However, the underlying kv.ShardStore.LatestCommitTS implementation performs a local read without verifying leadership or proxying to the leader. This can result in reading a stale commit timestamp if the current node is a follower for a shard. Using a stale timestamp to calculate the transaction's startTS can break snapshot isolation guarantees, as the transaction might read data from a point in time before a more recent write occurred.
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. Using a stale start timestamp can violate snapshot isolation.
| func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error) { | ||
| g, ok := s.groupForKey(key) | ||
| if !ok || g.Store == nil { | ||
| return 0, false, nil | ||
| } | ||
| ts, exists, err := g.Store.LatestCommitTS(ctx, key) | ||
| if err != nil { | ||
| return 0, false, errors.WithStack(err) | ||
| } | ||
| return ts, exists, nil | ||
| } |
There was a problem hiding this comment.
The LatestCommitTS method reads from the local store (g.Store) without verifying leadership or proxying the request to the leader. If this node is a follower for the shard containing the key, the returned commit timestamp can be stale. This can cause transactions that rely on this method (like those from the Redis adapter) to start with a startTS that is too old, violating snapshot isolation guarantees. To ensure correctness, this method should either verify leadership before reading or proxy the request to the shard's leader to get an up-to-date commit timestamp.
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. A stale commit timestamp can lead to an incorrect start timestamp, violating snapshot isolation.
| if reqs.IsTxn && reqs.StartTS == 0 { | ||
| reqs.StartTS = c.clock.Next() | ||
| } |
There was a problem hiding this comment.
Assigning a transaction's startTS with just c.clock.Next() is not sufficient to guarantee snapshot isolation. A transaction must start with a timestamp greater than the commit timestamp of any data it might read. The current implementation does not account for this, which can lead to read-write conflicts. A more robust approach, as demonstrated in the Redis adapter's txnStartTS function, is to determine the startTS based on the latest commit timestamps of all keys involved in the transaction. This logic should be generalized and moved into the coordinator to be used by all transactional clients.
References
- In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. The start timestamp must be chosen carefully to ensure snapshot isolation.
There was a problem hiding this comment.
Pull request overview
This PR refactors the coordinator architecture to support multi-raft group sharding, enabling horizontal scaling through key-range partitioning. The changes introduce a pluggable coordinator interface with two implementations: the original single-group Coordinate and a new ShardedCoordinator that routes operations across multiple raft groups based on configurable key ranges.
Changes:
- Introduces sharded coordinator infrastructure with key-range based routing via a new distribution engine
- Refactors main.go to support multi-raft group initialization with configurable shard ranges
- Updates all adapters (gRPC, Redis, DynamoDB) to use the abstract Coordinator interface with key-aware leader operations
- Adds Jepsen test enhancements for sharded deployment validation
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| shard_config.go | Parses and validates raft group and shard range configuration from command-line flags |
| multiraft_runtime.go | Initializes multiple raft groups with per-group data directories |
| main.go | Refactored server initialization to build sharded infrastructure and start per-group gRPC servers |
| kv/sharded_coordinator.go | New coordinator implementation routing operations to shards via distribution engine |
| kv/shard_store.go | MVCC store wrapper routing reads/writes to appropriate shard stores with leader proxying |
| kv/shard_router.go | Routes requests to shard-specific transactional managers, now using routeKey normalization |
| kv/shard_key.go | Normalizes internal keys (list metadata/items) to user keys for routing |
| kv/leader_proxy.go | Forwards transactional requests to raft leaders when local node is follower |
| kv/coordinator.go | Extended interface with key-aware leader methods; refactored transaction log building |
| adapter/grpc.go | Updated to use Coordinator interface with IsLeaderForKey/RaftLeaderForKey |
| adapter/redis.go | Updated to use Coordinator interface; refactored transaction timestamp logic |
| adapter/dynamodb.go | Updated to use Coordinator interface |
| adapter/redis_proxy.go | Updated to use RaftLeaderForKey for key-aware proxying |
| kv/sharded_integration_test.go | Integration test validating multi-shard coordinator dispatch |
| kv/shard_router_test.go | New test validating list key routing with routeKey normalization |
| jepsen/src/elastickv/db.clj | Enhanced to support multi-group raft configuration and validation |
| jepsen/src/elastickv/redis_workload.clj | Added local mode and sharding configuration options |
| jepsen/docker/ssh_config | SSH configuration for local Docker-based Jepsen testing |
| jepsen/docker/run-in-docker.sh | Docker entrypoint script for containerized Jepsen tests |
| jepsen/docker/id_rsa | Standard Vagrant insecure private key for test environments |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error) { | ||
| if limit <= 0 { | ||
| return []*store.KVPair{}, nil | ||
| } | ||
| var out []*store.KVPair | ||
| for _, g := range s.groups { | ||
| if g == nil || g.Store == nil { | ||
| continue | ||
| } | ||
| kvs, err := g.Store.ScanAt(ctx, start, end, limit, ts) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| out = append(out, kvs...) | ||
| } | ||
| sort.Slice(out, func(i, j int) bool { | ||
| return bytes.Compare(out[i].Key, out[j].Key) < 0 | ||
| }) | ||
| if len(out) > limit { | ||
| out = out[:limit] | ||
| } | ||
| return out, nil |
There was a problem hiding this comment.
The ScanAt method iterates through all groups and gathers results without considering the key range boundaries. This means it will fetch results from groups that don't actually contain keys in the requested range [start, end). For a truly sharded implementation, this should only query groups whose shard ranges intersect with the scan range to avoid unnecessary work and incorrect results.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Added GetIntersectingRoutes method to Engine to find routes whose key ranges intersect with the scan range [start, end). Modified ScanAt in ShardStore to only query groups whose shard ranges intersect with the requested scan range, avoiding unnecessary work and incorrect results. Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Fix ScanAt to only query shards with intersecting key ranges
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Fix resource leaks in multiraft_runtime.go error paths
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 25 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| conn, err := grpc.NewClient(string(addr), | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), | ||
| ) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| defer conn.Close() |
There was a problem hiding this comment.
Creating a new gRPC connection for every request can cause significant performance overhead and connection exhaustion under high load. Consider implementing connection pooling or reusing connections, especially since these proxy calls happen on every read/write operation when not the leader. A connection pool or cached connections per leader address would reduce latency and resource consumption significantly.
| conn, err := grpc.NewClient(string(addr), | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), | ||
| ) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| defer conn.Close() |
There was a problem hiding this comment.
Creating a new gRPC connection for every proxied read operation can cause significant performance overhead and connection exhaustion under high load. Consider implementing connection pooling or reusing connections, especially since these proxy calls happen on every read operation from a follower node. A connection pool or cached connections per leader address would reduce latency and resource consumption significantly.
| func parseRaftGroups(raw, defaultAddr string) ([]groupSpec, error) { | ||
| if raw == "" { | ||
| if defaultAddr == "" { | ||
| return nil, ErrAddressRequired | ||
| } | ||
| return []groupSpec{{id: 1, address: defaultAddr}}, nil | ||
| } | ||
| parts := strings.Split(raw, ",") | ||
| groups := make([]groupSpec, 0, len(parts)) | ||
| seen := map[uint64]struct{}{} | ||
| for _, part := range parts { | ||
| part = strings.TrimSpace(part) | ||
| if part == "" { | ||
| continue | ||
| } | ||
| kv := strings.SplitN(part, "=", splitParts) | ||
| if len(kv) != splitParts { | ||
| return nil, errors.WithStack(errors.Newf("invalid raftGroups entry: %q", part)) | ||
| } | ||
| id, err := strconv.ParseUint(kv[0], 10, 64) | ||
| if err != nil { | ||
| return nil, errors.Wrapf(err, "invalid group id %q", kv[0]) | ||
| } | ||
| addr := strings.TrimSpace(kv[1]) | ||
| if addr == "" { | ||
| return nil, errors.WithStack(errors.Newf("empty address for group %d", id)) | ||
| } | ||
| if _, ok := seen[id]; ok { | ||
| return nil, errors.WithStack(errors.Newf("duplicate group id %d", id)) | ||
| } | ||
| seen[id] = struct{}{} | ||
| groups = append(groups, groupSpec{id: id, address: addr}) | ||
| } | ||
| if len(groups) == 0 { | ||
| return nil, ErrNoRaftGroupsConfigured | ||
| } | ||
| return groups, nil | ||
| } | ||
|
|
||
| func parseShardRanges(raw string, defaultGroup uint64) ([]rangeSpec, error) { | ||
| if raw == "" { | ||
| return []rangeSpec{{start: []byte(""), end: nil, groupID: defaultGroup}}, nil | ||
| } | ||
| parts := strings.Split(raw, ",") | ||
| ranges := make([]rangeSpec, 0, len(parts)) | ||
| for _, part := range parts { | ||
| part = strings.TrimSpace(part) | ||
| if part == "" { | ||
| continue | ||
| } | ||
| kv := strings.SplitN(part, "=", splitParts) | ||
| if len(kv) != splitParts { | ||
| return nil, errors.WithStack(errors.Newf("invalid shardRanges entry: %q", part)) | ||
| } | ||
| groupID, err := strconv.ParseUint(kv[1], 10, 64) | ||
| if err != nil { | ||
| return nil, errors.Wrapf(err, "invalid group id in %q", part) | ||
| } | ||
| rangePart := kv[0] | ||
| bounds := strings.SplitN(rangePart, ":", splitParts) | ||
| if len(bounds) != splitParts { | ||
| return nil, errors.WithStack(errors.Newf("invalid range %q (expected start:end)", rangePart)) | ||
| } | ||
| start := []byte(bounds[0]) | ||
| var end []byte | ||
| if bounds[1] != "" { | ||
| end = []byte(bounds[1]) | ||
| } | ||
| ranges = append(ranges, rangeSpec{start: start, end: end, groupID: groupID}) | ||
| } | ||
| if len(ranges) == 0 { | ||
| return nil, ErrNoShardRangesConfigured | ||
| } | ||
| return ranges, nil | ||
| } | ||
|
|
||
| func parseRaftRedisMap(raw string) map[raft.ServerAddress]string { | ||
| out := make(map[raft.ServerAddress]string) | ||
| if raw == "" { | ||
| return out | ||
| } | ||
| parts := strings.Split(raw, ",") | ||
| for _, part := range parts { | ||
| part = strings.TrimSpace(part) | ||
| if part == "" { | ||
| continue | ||
| } | ||
| kv := strings.SplitN(part, "=", splitParts) | ||
| if len(kv) != splitParts { | ||
| continue | ||
| } | ||
| out[raft.ServerAddress(kv[0])] = kv[1] | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| func defaultGroupID(groups []groupSpec) uint64 { | ||
| min := uint64(0) | ||
| for _, g := range groups { | ||
| if min == 0 || g.id < min { | ||
| min = g.id | ||
| } | ||
| } | ||
| if min == 0 { | ||
| return 1 | ||
| } | ||
| return min | ||
| } | ||
|
|
||
| func validateShardRanges(ranges []rangeSpec, groups []groupSpec) error { | ||
| ids := map[uint64]struct{}{} | ||
| for _, g := range groups { | ||
| ids[g.id] = struct{}{} | ||
| } | ||
| for _, r := range ranges { | ||
| if _, ok := ids[r.groupID]; !ok { | ||
| return fmt.Errorf("shard range references unknown group %d", r.groupID) | ||
| } | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The parseRaftGroups, parseShardRanges, parseRaftRedisMap, defaultGroupID, and validateShardRanges functions lack unit tests. Given that these functions handle critical configuration parsing and validation logic that could lead to runtime failures if incorrect, they should have comprehensive test coverage. Consider adding tests for valid inputs, edge cases (empty strings, malformed entries), invalid formats, and boundary conditions (e.g., duplicate group IDs, missing addresses, references to unknown groups).
| func groupDataDir(baseDir, raftID string, groupID uint64, multi bool) string { | ||
| if !multi { | ||
| return filepath.Join(baseDir, raftID) | ||
| } | ||
| return filepath.Join(baseDir, raftID, fmt.Sprintf("group-%d", groupID)) | ||
| } | ||
|
|
||
| func newRaftGroup(raftID string, group groupSpec, baseDir string, multi bool, bootstrap bool, fsm raft.FSM) (*raft.Raft, *transport.Manager, error) { | ||
| c := raft.DefaultConfig() | ||
| c.LocalID = raft.ServerID(raftID) | ||
| c.HeartbeatTimeout = heartbeatTimeout | ||
| c.ElectionTimeout = electionTimeout | ||
| c.LeaderLeaseTimeout = leaderLease | ||
|
|
||
| dir := groupDataDir(baseDir, raftID, group.id, multi) | ||
| if err := os.MkdirAll(dir, raftDirPerm); err != nil { | ||
| return nil, nil, errors.WithStack(err) | ||
| } | ||
|
|
||
| ldb, err := boltdb.NewBoltStore(filepath.Join(dir, "logs.dat")) | ||
| if err != nil { | ||
| // No cleanup needed here - ldb creation failed, so no resource was allocated | ||
| return nil, nil, errors.WithStack(err) | ||
| } | ||
|
|
||
| // Define cleanup function immediately after ldb is created to ensure | ||
| // proper cleanup in all error paths. If we return before this point, | ||
| // no cleanup is needed since ldb creation failed. | ||
| var sdb *boltdb.BoltStore | ||
| var r *raft.Raft | ||
| cleanup := func() { | ||
| if ldb != nil { | ||
| _ = ldb.Close() | ||
| } | ||
| if sdb != nil { | ||
| _ = sdb.Close() | ||
| } | ||
| } | ||
|
|
||
| sdb, err = boltdb.NewBoltStore(filepath.Join(dir, "stable.dat")) | ||
| if err != nil { | ||
| cleanup() | ||
| return nil, nil, errors.WithStack(err) | ||
| } | ||
|
|
||
| fss, err := raft.NewFileSnapshotStore(dir, snapshotRetainCount, os.Stderr) | ||
| if err != nil { | ||
| cleanup() | ||
| return nil, nil, errors.WithStack(err) | ||
| } | ||
|
|
||
| tm := transport.New(raft.ServerAddress(group.address), []grpc.DialOption{ | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| }) | ||
|
|
||
| r, err = raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport()) | ||
| if err != nil { | ||
| cleanup() | ||
| return nil, nil, errors.WithStack(err) | ||
| } | ||
|
|
||
| if bootstrap { | ||
| cfg := raft.Configuration{ | ||
| Servers: []raft.Server{ | ||
| { | ||
| Suffrage: raft.Voter, | ||
| ID: raft.ServerID(raftID), | ||
| Address: raft.ServerAddress(group.address), | ||
| }, | ||
| }, | ||
| } | ||
| f := r.BootstrapCluster(cfg) | ||
| if err := f.Error(); err != nil { | ||
| _ = r.Shutdown().Error() | ||
| cleanup() | ||
| return nil, nil, errors.WithStack(err) | ||
| } | ||
| } | ||
|
|
||
| return r, tm, nil | ||
| } |
There was a problem hiding this comment.
The groupDataDir and newRaftGroup functions lack unit tests. While newRaftGroup has complex resource management logic (which was previously fixed), it should have tests to verify proper cleanup on errors, correct directory creation, and bootstrap behavior. Consider adding tests using in-memory stores and mocked filesystems where appropriate.
| curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-${ARCH}.tar.gz" -o go.tar.gz | ||
| tar -C /usr/local -xzf go.tar.gz | ||
| export PATH=$PATH:/usr/local/go/bin | ||
| fi | ||
|
|
||
| # Install Leiningen | ||
| if ! command -v lein >/dev/null 2>&1; then | ||
| curl -L https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > /usr/local/bin/lein | ||
| chmod +x /usr/local/bin/lein |
There was a problem hiding this comment.
This script downloads and executes build tooling (the Go toolchain tarball and especially the Leiningen bootstrap script from raw.githubusercontent.com) at runtime without any integrity verification or pinning to an immutable reference. If an attacker compromises the download endpoint, TLS path, or the stable reference of the Leiningen repository, they could serve a modified script and gain arbitrary code execution as root inside the test container. To mitigate this supply-chain risk, pin these downloads to specific versioned artifacts or commit SHAs and validate them with checksums or signatures, or prefer base images that already include the required tools instead of curling and executing them during container startup.
No description provided.