Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7eb2044
Refactor coordinator logic and add sharded support
bootjp Feb 4, 2026
7916d7d
Refactor SSH key handling and update .gitignore
bootjp Feb 5, 2026
454b342
Fix go build command in Dockerfile
bootjp Feb 5, 2026
607edc7
Refactor error messages into package-level variables
bootjp Feb 5, 2026
a081a09
plaintextFix alignment of error variable declarations
bootjp Feb 5, 2026
122eff4
Update jepsen/src/elastickv/db.clj
bootjp Feb 5, 2026
f19cf76
Initial plan
Copilot Feb 5, 2026
4ea2ac6
Initial plan
Copilot Feb 5, 2026
13d2237
Update main.go
bootjp Feb 5, 2026
bcde8d2
Update kv/shard_store.go
bootjp Feb 5, 2026
f16c4bd
Fix ScanAt to only query groups with intersecting shard ranges
Copilot Feb 5, 2026
a35bc1a
Add .cache/ to .gitignore and remove from repository
Copilot Feb 5, 2026
18933b5
Fix resource leaks in newRaftGroup error paths
Copilot Feb 5, 2026
4b9881b
Add cache directories to .gitignore
Copilot Feb 5, 2026
ec0e6a5
Refactor cleanup logic to use helper function
Copilot Feb 5, 2026
c8c2d9f
Remove accidentally committed cache files
Copilot Feb 5, 2026
2d344c9
Merge branch 'feature/multi-raft' into copilot/sub-pr-304
bootjp Feb 5, 2026
f3f2f58
Merge branch 'feature/multi-raft' into copilot/sub-pr-304-again
bootjp Feb 5, 2026
5e43d88
Merge pull request #305 from bootjp/copilot/sub-pr-304
bootjp Feb 5, 2026
61712fb
Merge branch 'feature/multi-raft' into copilot/sub-pr-304-again
bootjp Feb 5, 2026
b989eab
Update multiraft_runtime.go
bootjp Feb 5, 2026
c812a8b
Add comments clarifying resource cleanup logic
Copilot Feb 5, 2026
21b8b9a
Merge pull request #306 from bootjp/copilot/sub-pr-304-again
bootjp Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ jepsen/.lein-*
jepsen/.nrepl-port
.m2/
jepsen/store/

# Jepsen local SSH keys (generated locally; never commit)
jepsen/docker/id_rsa
jepsen/.ssh/

# Build and lint cache directories
.cache/
.golangci-cache/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM golang:latest AS build
WORKDIR $GOPATH/src/app
COPY . .

RUN CGO_ENABLED=0 go build -o /app main.go
RUN CGO_ENABLED=0 go build -o /app .

FROM gcr.io/distroless/static:latest
COPY --from=build /app /app
Expand Down
2 changes: 1 addition & 1 deletion adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type DynamoDBServer struct {
httpServer *http.Server
}

func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate *kv.Coordinate) *DynamoDBServer {
func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator) *DynamoDBServer {
d := &DynamoDBServer{
listen: listen,
store: st,
Expand Down
6 changes: 3 additions & 3 deletions adapter/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type GRPCServer struct {
pb.UnimplementedTransactionalKVServer
}

func NewGRPCServer(store store.MVCCStore, coordinate *kv.Coordinate) *GRPCServer {
func NewGRPCServer(store store.MVCCStore, coordinate kv.Coordinator) *GRPCServer {
return &GRPCServer{
log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
Expand All @@ -45,7 +45,7 @@ func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawG
readTS = snapshotTS(r.coordinator.Clock(), r.store)
}

if r.coordinator.IsLeader() {
if r.coordinator.IsLeaderForKey(req.Key) {
v, err := r.store.GetAt(ctx, req.Key, readTS)
if err != nil {
switch {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawG
}

func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
addr := r.coordinator.RaftLeader()
addr := r.coordinator.RaftLeaderForKey(key)
if addr == "" {
return nil, ErrLeaderNotFound
}
Expand Down
163 changes: 111 additions & 52 deletions adapter/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,34 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

const (
cmdGet = "GET"
cmdSet = "SET"
cmdDel = "DEL"
cmdExists = "EXISTS"
cmdPing = "PING"
cmdKeys = "KEYS"
cmdMulti = "MULTI"
cmdExec = "EXEC"
cmdDiscard = "DISCARD"
cmdLRange = "LRANGE"
cmdRPush = "RPUSH"
minKeyedArgs = 2
)

//nolint:mnd
var argsLen = map[string]int{
"GET": 2,
"SET": 3,
"DEL": 2,
"EXISTS": 2,
"PING": 1,
"KEYS": 2,
"MULTI": 1,
"EXEC": 1,
"DISCARD": 1,
"LRANGE": 4,
"RPUSH": -3, // negative means minimum number of args
cmdGet: 2,
cmdSet: 3,
cmdDel: 2,
cmdExists: 2,
cmdPing: 1,
cmdKeys: 2,
cmdMulti: 1,
cmdExec: 1,
cmdDiscard: 1,
cmdLRange: 4,
cmdRPush: -3, // negative means minimum number of args
}

type RedisServer struct {
Expand Down Expand Up @@ -72,7 +87,7 @@ type redisResult struct {
err error
}

func NewRedisServer(listen net.Listener, store store.MVCCStore, coordinate *kv.Coordinate, leaderRedis map[raft.ServerAddress]string) *RedisServer {
func NewRedisServer(listen net.Listener, store store.MVCCStore, coordinate kv.Coordinator, leaderRedis map[raft.ServerAddress]string) *RedisServer {
r := &RedisServer{
listen: listen,
store: store,
Expand All @@ -82,17 +97,17 @@ func NewRedisServer(listen net.Listener, store store.MVCCStore, coordinate *kv.C
}

r.route = map[string]func(conn redcon.Conn, cmd redcon.Command){
"PING": r.ping,
"SET": r.set,
"GET": r.get,
"DEL": r.del,
"EXISTS": r.exists,
"KEYS": r.keys,
"MULTI": r.multi,
"EXEC": r.exec,
"DISCARD": r.discard,
"RPUSH": r.rpush,
"LRANGE": r.lrange,
cmdPing: r.ping,
cmdSet: r.set,
cmdGet: r.get,
cmdDel: r.del,
cmdExists: r.exists,
cmdKeys: r.keys,
cmdMulti: r.multi,
cmdExec: r.exec,
cmdDiscard: r.discard,
cmdRPush: r.rpush,
cmdLRange: r.lrange,
}

return r
Expand Down Expand Up @@ -129,7 +144,7 @@ func (r *RedisServer) Run() error {
}

name := strings.ToUpper(string(cmd.Args[0]))
if state.inTxn && name != "EXEC" && name != "DISCARD" && name != "MULTI" {
if state.inTxn && name != cmdExec && name != cmdDiscard && name != cmdMulti {
state.queue = append(state.queue, cmd)
conn.WriteString("QUEUED")
return
Expand Down Expand Up @@ -255,7 +270,7 @@ func (r *RedisServer) del(conn redcon.Conn, cmd redcon.Command) {
}

func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
if !r.coordinator.IsLeader() {
if !r.coordinator.IsLeaderForKey(cmd.Args[1]) {
res, err := r.proxyExists(cmd.Args[1])
if err != nil {
conn.WriteError(err.Error())
Expand All @@ -265,7 +280,7 @@ func (r *RedisServer) exists(conn redcon.Conn, cmd redcon.Command) {
return
}

if err := r.coordinator.VerifyLeader(); err != nil {
if err := r.coordinator.VerifyLeaderForKey(cmd.Args[1]); err != nil {
conn.WriteError(err.Error())
return
}
Expand Down Expand Up @@ -517,17 +532,17 @@ func (t *txnContext) listLength(st *listTxnState) int64 {

func (t *txnContext) apply(cmd redcon.Command) (redisResult, error) {
switch strings.ToUpper(string(cmd.Args[0])) {
case "SET":
case cmdSet:
return t.applySet(cmd)
case "DEL":
case cmdDel:
return t.applyDel(cmd)
case "GET":
case cmdGet:
return t.applyGet(cmd)
case "EXISTS":
case cmdExists:
return t.applyExists(cmd)
case "RPUSH":
case cmdRPush:
return t.applyRPush(cmd)
case "LRANGE":
case cmdLRange:
return t.applyLRange(cmd)
default:
return redisResult{}, errors.WithStack(errors.Newf("ERR unsupported command '%s'", cmd.Args[0]))
Expand Down Expand Up @@ -770,14 +785,7 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) {
}

func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, error) {
if err := r.coordinator.VerifyLeader(); err != nil {
return nil, errors.WithStack(err)
}

startTS := r.coordinator.Clock().Next()
if last := r.store.LastCommitTS(); last > startTS {
startTS = last
}
startTS := r.txnStartTS(queue)

ctx := &txnContext{
server: r,
Expand All @@ -802,6 +810,57 @@ func (r *RedisServer) runTransaction(queue []redcon.Command) ([]redisResult, err
return results, nil
}

func (r *RedisServer) txnStartTS(queue []redcon.Command) uint64 {
maxTS := r.maxLatestCommitTS(queue)
if r.coordinator != nil && r.coordinator.Clock() != nil && maxTS > 0 {
r.coordinator.Clock().Observe(maxTS)
}
if r.coordinator == nil || r.coordinator.Clock() == nil {
return maxTS
}
return r.coordinator.Clock().Next()
}

func (r *RedisServer) maxLatestCommitTS(queue []redcon.Command) uint64 {
var maxTS uint64
if r.store == nil {
return maxTS
}
seen := make(map[string]struct{})
for _, cmd := range queue {
if len(cmd.Args) < minKeyedArgs {
continue
}
name := strings.ToUpper(string(cmd.Args[0]))
switch name {
case cmdSet, cmdGet, cmdDel, cmdExists, cmdRPush, cmdLRange:
key := cmd.Args[1]
r.bumpLatestCommitTS(&maxTS, key, seen)
// Also account for list metadata keys to avoid stale typing decisions.
r.bumpLatestCommitTS(&maxTS, listMetaKey(key), seen)
}
}
return maxTS
}
Comment on lines +824 to +844
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The maxLatestCommitTS function relies on r.store.LatestCommitTS() to determine the start timestamp for a transaction. However, the underlying kv.ShardStore.LatestCommitTS implementation performs a local read without verifying leadership or proxying to the leader. This can result in reading a stale commit timestamp if the current node is a follower for a shard. Using a stale timestamp to calculate the transaction's startTS can break snapshot isolation guarantees, as the transaction might read data from a point in time before a more recent write occurred.

References
  1. In an MVCC system, all operations within a single transaction must share the same start timestamp to ensure atomicity and a consistent snapshot view. Using a stale start timestamp can violate snapshot isolation.


func (r *RedisServer) bumpLatestCommitTS(maxTS *uint64, key []byte, seen map[string]struct{}) {
if len(key) == 0 {
return
}
k := string(key)
if _, ok := seen[k]; ok {
return
}
seen[k] = struct{}{}
latest, exists, err := r.store.LatestCommitTS(context.Background(), key)
if err != nil || !exists {
return
}
if latest > *maxTS {
*maxTS = latest
}
}

func (r *RedisServer) proxyExec(conn redcon.Conn, queue []redcon.Command) error {
leader := r.coordinator.RaftLeader()
if leader == "" {
Expand Down Expand Up @@ -909,13 +968,13 @@ func newProxyCmd(name string, args []string, ctx context.Context) redis.Cmder {
}

switch name {
case "SET":
case cmdSet:
return redis.NewStatusCmd(ctx, argv...)
case "DEL", "EXISTS", "RPUSH":
case cmdDel, cmdExists, cmdRPush:
return redis.NewIntCmd(ctx, argv...)
case "GET":
case cmdGet:
return redis.NewStringCmd(ctx, argv...)
case "LRANGE":
case cmdLRange:
return redis.NewStringSliceCmd(ctx, argv...)
default:
return redis.NewCmd(ctx, argv...)
Expand Down Expand Up @@ -1082,11 +1141,11 @@ func (r *RedisServer) fetchListRange(ctx context.Context, key []byte, meta store

func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string, error) {
readTS := r.readTS()
if !r.coordinator.IsLeader() {
if !r.coordinator.IsLeaderForKey(key) {
return r.proxyLRange(key, startRaw, endRaw)
}

if err := r.coordinator.VerifyLeader(); err != nil {
if err := r.coordinator.VerifyLeaderForKey(key); err != nil {
return nil, errors.WithStack(err)
}

Expand Down Expand Up @@ -1116,7 +1175,7 @@ func (r *RedisServer) rangeList(key []byte, startRaw, endRaw []byte) ([]string,
}

func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string, error) {
leader := r.coordinator.RaftLeader()
leader := r.coordinator.RaftLeaderForKey(key)
if leader == "" {
return nil, ErrLeaderNotFound
}
Expand All @@ -1142,7 +1201,7 @@ func (r *RedisServer) proxyLRange(key []byte, startRaw, endRaw []byte) ([]string
}

func (r *RedisServer) proxyRPush(key []byte, values [][]byte) (int64, error) {
leader := r.coordinator.RaftLeader()
leader := r.coordinator.RaftLeaderForKey(key)
if leader == "" {
return 0, ErrLeaderNotFound
}
Expand Down Expand Up @@ -1171,7 +1230,7 @@ func parseInt(b []byte) (int, error) {
// tryLeaderGet proxies a GET to the current Raft leader, returning the value and
// whether the proxy succeeded.
func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) {
addr := r.coordinator.RaftLeader()
addr := r.coordinator.RaftLeaderForKey(key)
if addr == "" {
return nil, ErrLeaderNotFound
}
Expand All @@ -1195,8 +1254,8 @@ func (r *RedisServer) tryLeaderGetAt(key []byte, ts uint64) ([]byte, error) {
}

func (r *RedisServer) readValueAt(key []byte, readTS uint64) ([]byte, error) {
if r.coordinator.IsLeader() {
if err := r.coordinator.VerifyLeader(); err != nil {
if r.coordinator.IsLeaderForKey(key) {
if err := r.coordinator.VerifyLeaderForKey(key); err != nil {
return nil, errors.WithStack(err)
}
v, err := r.store.GetAt(context.Background(), key, readTS)
Expand All @@ -1210,7 +1269,7 @@ func (r *RedisServer) rpush(conn redcon.Conn, cmd redcon.Command) {

var length int64
var err error
if r.coordinator.IsLeader() {
if r.coordinator.IsLeaderForKey(cmd.Args[1]) {
length, err = r.listRPush(ctx, cmd.Args[1], cmd.Args[2:])
} else {
length, err = r.proxyRPush(cmd.Args[1], cmd.Args[2:])
Expand Down
2 changes: 1 addition & 1 deletion adapter/redis_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func (r *RedisServer) proxyExists(key []byte) (int, error) {
leader := r.coordinator.RaftLeader()
leader := r.coordinator.RaftLeaderForKey(key)
if leader == "" {
return 0, ErrLeaderNotFound
}
Expand Down
30 changes: 30 additions & 0 deletions distribution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,36 @@ func (e *Engine) Stats() []Route {
return stats
}

// GetIntersectingRoutes returns all routes whose key ranges intersect with [start, end).
// A route [rStart, rEnd) intersects with [start, end) if:
// - rStart < end (or end is nil, meaning unbounded scan)
// - start < rEnd (or rEnd is nil, meaning unbounded route)
func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route {
e.mu.RLock()
defer e.mu.RUnlock()

var result []Route
for _, r := range e.routes {
// Check if route intersects with [start, end)
// Route ends before scan starts: rEnd != nil && rEnd <= start
if r.End != nil && bytes.Compare(r.End, start) <= 0 {
continue
}
// Route starts at or after scan ends: end != nil && rStart >= end
if end != nil && bytes.Compare(r.Start, end) >= 0 {
continue
}
// Route intersects with scan range
result = append(result, Route{
Start: cloneBytes(r.Start),
End: cloneBytes(r.End),
GroupID: r.GroupID,
Load: atomic.LoadUint64(&r.Load),
})
}
return result
}

func (e *Engine) routeIndex(key []byte) int {
if len(e.routes) == 0 {
return -1
Expand Down
Loading
Loading