From 91d841c7f90e97e998414a494e69bd39e14be207 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 11 Feb 2026 19:14:09 +0800 Subject: [PATCH 1/3] fix: failover for greptimedb Signed-off-by: Dennis Zhuang --- cmd/devtap/storefactory.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/cmd/devtap/storefactory.go b/cmd/devtap/storefactory.go index 1c609c2..d976aa7 100644 --- a/cmd/devtap/storefactory.go +++ b/cmd/devtap/storefactory.go @@ -153,17 +153,24 @@ func resolveDrainSources(cmd *cobra.Command) ([]mcp.DrainSource, func(), error) return b } - // Configured store unavailable — fall back to local-only single source. + // Configured store unavailable — use lazy wrapper that retries on drain. if configuredErr != nil { - fmt.Fprintf(os.Stderr, "devtap: configured store %q unavailable (%v), using local only\n", + fmt.Fprintf(os.Stderr, "devtap: configured store %q unavailable (%v), will retry on drain\n", configuredBackend, configuredErr) + lazy := newLazyStore(func() (store.Store, error) { + return openStoreStrict(cfg, configuredBackend, storeDir, adapterName) + }) localStore, err := openStoreByBackend(cfg, localBackend, storeDir, adapterName) if err != nil { return nil, nil, fmt.Errorf("open local store: %w", err) } - cleanup := func() { _ = localStore.Close() } + cleanup := func() { + _ = localStore.Close() + _ = lazy.Close() + } return []mcp.DrainSource{ - {Store: localStore, SessionID: localSession, Label: localSession}, + {Store: localStore, SessionID: localSession, Label: "local"}, + {Store: lazy, SessionID: configuredSession, Label: configuredSession}, }, cleanup, nil } From 0fd6c5828b9f1417e792ce4bb0382c68cc6f0726 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 11 Feb 2026 19:29:53 +0800 Subject: [PATCH 2/3] fix: forgot files Signed-off-by: Dennis Zhuang --- cmd/devtap/lazystore.go | 110 ++++++++++++ cmd/devtap/lazystore_test.go | 337 +++++++++++++++++++++++++++++++++++ 2 files changed, 447 insertions(+) create mode 100644 cmd/devtap/lazystore.go create mode 100644 cmd/devtap/lazystore_test.go diff --git a/cmd/devtap/lazystore.go b/cmd/devtap/lazystore.go new file mode 100644 index 0000000..bc6d3f2 --- /dev/null +++ b/cmd/devtap/lazystore.go @@ -0,0 +1,110 @@ +package main + +import ( + "errors" + "sync" + "time" + + "github.com/killme2008/devtap/internal/store" +) + +const defaultLazyCooldown = 30 * time.Second + +// lazyStore wraps a store.Store that is connected on first use and retried +// with a cooldown on failure. This allows the MCP server to start even when +// the configured store (e.g. GreptimeDB) is temporarily unavailable, and +// reconnect once it becomes reachable. +type lazyStore struct { + mu sync.Mutex + inner store.Store + connectFn func() (store.Store, error) + lastAttempt time.Time + cooldown time.Duration + closed bool +} + +func newLazyStore(connectFn func() (store.Store, error)) *lazyStore { + return &lazyStore{ + connectFn: connectFn, + cooldown: defaultLazyCooldown, + } +} + +// ensureConnected attempts to establish the inner store if not already connected. +// Returns nil if connected, or the connection error if unavailable. +func (ls *lazyStore) ensureConnected() error { + if ls.closed { + return errors.New("store closed") + } + if ls.inner != nil { + return nil + } + if !ls.lastAttempt.IsZero() && time.Since(ls.lastAttempt) < ls.cooldown { + return errors.New("store unavailable (cooldown)") + } + ls.lastAttempt = time.Now() + s, err := ls.connectFn() + if err != nil { + return err + } + ls.inner = s + return nil +} + +// reset closes the inner store and nils it out so the next call retries. +// Does NOT update lastAttempt — the next ensureConnected checks elapsed time +// since the original connection, which is typically >cooldown, allowing an +// immediate retry after detecting failure. +func (ls *lazyStore) reset() { + if ls.inner != nil { + _ = ls.inner.Close() + ls.inner = nil + } +} + +func (ls *lazyStore) Drain(sessionID string, maxLines int) ([]store.LogMessage, error) { + ls.mu.Lock() + defer ls.mu.Unlock() + + if err := ls.ensureConnected(); err != nil { + return nil, err + } + msgs, err := ls.inner.Drain(sessionID, maxLines) + if err != nil { + ls.reset() + return nil, err + } + return msgs, nil +} + +func (ls *lazyStore) Status() (map[string]int, error) { + ls.mu.Lock() + defer ls.mu.Unlock() + + if err := ls.ensureConnected(); err != nil { + return nil, err + } + counts, err := ls.inner.Status() + if err != nil { + ls.reset() + return nil, err + } + return counts, nil +} + +func (ls *lazyStore) Write(_ string, _ store.LogMessage) error { + return errors.New("lazyStore: write not supported (read-only)") +} + +func (ls *lazyStore) Close() error { + ls.mu.Lock() + defer ls.mu.Unlock() + + ls.closed = true + if ls.inner != nil { + err := ls.inner.Close() + ls.inner = nil + return err + } + return nil +} diff --git a/cmd/devtap/lazystore_test.go b/cmd/devtap/lazystore_test.go new file mode 100644 index 0000000..1922a56 --- /dev/null +++ b/cmd/devtap/lazystore_test.go @@ -0,0 +1,337 @@ +package main + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/killme2008/devtap/internal/store" +) + +// mockStore is a minimal store.Store for testing lazyStore behavior. +type mockStore struct { + drainFn func(string, int) ([]store.LogMessage, error) + statusFn func() (map[string]int, error) + closed bool +} + +func (m *mockStore) Write(string, store.LogMessage) error { return nil } +func (m *mockStore) Close() error { m.closed = true; return nil } + +func (m *mockStore) Drain(s string, n int) ([]store.LogMessage, error) { + return m.drainFn(s, n) +} + +func (m *mockStore) Status() (map[string]int, error) { + return m.statusFn() +} + +func TestLazyStore_FirstCallConnects(t *testing.T) { + connectCalls := 0 + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return map[string]int{"s": 0}, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return ms, nil + }) + + _, err := ls.Drain("s", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + // Second call should reuse cached connection. + _, err = ls.Status() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 1 { + t.Fatalf("expected still 1 connect call, got %d", connectCalls) + } +} + +func TestLazyStore_CooldownPreventsRapidRetry(t *testing.T) { + connectCalls := 0 + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return nil, errors.New("connection refused") + }) + ls.cooldown = 1 * time.Hour // very long cooldown + + // First call: attempts connection, fails. + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected error") + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + // Second call within cooldown: should NOT retry. + _, err = ls.Drain("s", 10) + if err == nil { + t.Fatal("expected error") + } + if connectCalls != 1 { + t.Fatalf("expected still 1 connect call (cooldown), got %d", connectCalls) + } +} + +func TestLazyStore_RetriesAfterCooldown(t *testing.T) { + connectCalls := 0 + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return nil, errors.New("connection refused") + }) + ls.cooldown = 1 * time.Millisecond + + // First call fails. + _, _ = ls.Drain("s", 10) + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + time.Sleep(5 * time.Millisecond) + + // After cooldown: should retry. + _, _ = ls.Drain("s", 10) + if connectCalls != 2 { + t.Fatalf("expected 2 connect calls after cooldown, got %d", connectCalls) + } +} + +func TestLazyStore_DrainErrorTriggersReset(t *testing.T) { + connectCalls := 0 + drainErr := true + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { + if drainErr { + return nil, errors.New("query failed") + } + return nil, nil + }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return ms, nil + }) + ls.cooldown = 1 * time.Millisecond + + // First call: connects successfully, Drain fails → reset. + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected drain error") + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + if !ms.closed { + t.Fatal("expected inner store to be closed after reset") + } + + time.Sleep(5 * time.Millisecond) + + // Provide a fresh mock for the reconnect. + ms2 := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + drainErr = false + ls.mu.Lock() + ls.connectFn = func() (store.Store, error) { + connectCalls++ + return ms2, nil + } + ls.mu.Unlock() + + // After cooldown: reconnects. + _, err = ls.Drain("s", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 2 { + t.Fatalf("expected 2 connect calls, got %d", connectCalls) + } +} + +func TestLazyStore_StatusErrorTriggersReset(t *testing.T) { + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { + return nil, errors.New("status failed") + }, + } + ls := newLazyStore(func() (store.Store, error) { + return ms, nil + }) + ls.cooldown = 1 * time.Millisecond + + _, err := ls.Status() + if err == nil { + t.Fatal("expected status error") + } + if !ms.closed { + t.Fatal("expected inner store to be closed after reset") + } +} + +func TestLazyStore_WriteReturnsError(t *testing.T) { + ls := newLazyStore(func() (store.Store, error) { + return nil, nil + }) + err := ls.Write("s", store.LogMessage{}) + if err == nil { + t.Fatal("expected write error") + } +} + +func TestLazyStore_CloseWhenNotConnected(t *testing.T) { + ls := newLazyStore(func() (store.Store, error) { + return nil, errors.New("nope") + }) + // Should not panic or error. + if err := ls.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +// Test that after reset, if the original connection was established > cooldown ago, +// the next call retries immediately (no extra wait). This is because reset() does +// NOT update lastAttempt — the elapsed time since the original connect is already +// past the cooldown window. +func TestLazyStore_ResetAllowsImmediateRetry(t *testing.T) { + connectCalls := 0 + failDrain := true + + ls := newLazyStore(func() (store.Store, error) { + connectCalls++ + return &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { + if failDrain { + return nil, errors.New("query failed") + } + return []store.LogMessage{{Tag: "ok"}}, nil + }, + statusFn: func() (map[string]int, error) { return nil, nil }, + }, nil + }) + ls.cooldown = 50 * time.Millisecond + + // Connect succeeds, Drain fails → reset. + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected drain error") + } + if connectCalls != 1 { + t.Fatalf("expected 1 connect call, got %d", connectCalls) + } + + // Wait longer than cooldown so that time.Since(lastAttempt) > cooldown. + time.Sleep(60 * time.Millisecond) + + // Now Drain succeeds. The retry should happen immediately (no extra cooldown) + // because lastAttempt was set during the original connect, >50ms ago. + failDrain = false + msgs, err := ls.Drain("s", 10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if connectCalls != 2 { + t.Fatalf("expected 2 connect calls (immediate retry after reset), got %d", connectCalls) + } + if len(msgs) != 1 || msgs[0].Tag != "ok" { + t.Fatalf("unexpected messages: %v", msgs) + } +} + +func TestLazyStore_ConcurrentAccess(t *testing.T) { + var connectCalls atomic.Int32 + ls := newLazyStore(func() (store.Store, error) { + connectCalls.Add(1) + return &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { + return []store.LogMessage{{Tag: "test"}}, nil + }, + statusFn: func() (map[string]int, error) { + return map[string]int{"s": 1}, nil + }, + }, nil + }) + ls.cooldown = 1 * time.Millisecond + + const goroutines = 20 + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + go func(i int) { + defer wg.Done() + if i%2 == 0 { + _, _ = ls.Drain("s", 10) + } else { + _, _ = ls.Status() + } + }(i) + } + + wg.Wait() + + // Should have connected exactly once (all goroutines share the cached connection). + if c := connectCalls.Load(); c != 1 { + t.Fatalf("expected 1 connect call, got %d", c) + } +} + +func TestLazyStore_DrainAfterCloseReturnsError(t *testing.T) { + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + return ms, nil + }) + + // Connect, then close, then try to drain. + _, _ = ls.Drain("s", 10) + _ = ls.Close() + + _, err := ls.Drain("s", 10) + if err == nil { + t.Fatal("expected error after close") + } + _, err = ls.Status() + if err == nil { + t.Fatal("expected error after close") + } +} + +func TestLazyStore_CloseDelegatesToInner(t *testing.T) { + ms := &mockStore{ + drainFn: func(string, int) ([]store.LogMessage, error) { return nil, nil }, + statusFn: func() (map[string]int, error) { return nil, nil }, + } + ls := newLazyStore(func() (store.Store, error) { + return ms, nil + }) + + // Force connect. + _, _ = ls.Drain("s", 10) + if ms.closed { + t.Fatal("inner should not be closed yet") + } + + if err := ls.Close(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ms.closed { + t.Fatal("expected inner store to be closed") + } +} From fe87eec7f1cd9b3ff2e7a3178b2ccef36112934e Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 11 Feb 2026 19:43:13 +0800 Subject: [PATCH 3/3] chore: improve error message Signed-off-by: Dennis Zhuang --- cmd/devtap/lazystore.go | 13 +++++++++---- cmd/devtap/lazystore_test.go | 10 +++++++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/cmd/devtap/lazystore.go b/cmd/devtap/lazystore.go index bc6d3f2..4ecc788 100644 --- a/cmd/devtap/lazystore.go +++ b/cmd/devtap/lazystore.go @@ -2,6 +2,7 @@ package main import ( "errors" + "fmt" "sync" "time" @@ -19,6 +20,7 @@ type lazyStore struct { inner store.Store connectFn func() (store.Store, error) lastAttempt time.Time + lastErr error cooldown time.Duration closed bool } @@ -40,21 +42,24 @@ func (ls *lazyStore) ensureConnected() error { return nil } if !ls.lastAttempt.IsZero() && time.Since(ls.lastAttempt) < ls.cooldown { - return errors.New("store unavailable (cooldown)") + return fmt.Errorf("store unavailable (cooldown): %w", ls.lastErr) } ls.lastAttempt = time.Now() s, err := ls.connectFn() if err != nil { + ls.lastErr = err return err } + ls.lastErr = nil ls.inner = s return nil } // reset closes the inner store and nils it out so the next call retries. -// Does NOT update lastAttempt — the next ensureConnected checks elapsed time -// since the original connection, which is typically >cooldown, allowing an -// immediate retry after detecting failure. +// Does NOT clear lastAttempt — the cooldown still applies, which prevents +// hammering if the store accepts connections but fails on queries. For +// long-running processes (MCP server), the elapsed time since the original +// connect is typically >cooldown, so the retry is effectively immediate. func (ls *lazyStore) reset() { if ls.inner != nil { _ = ls.inner.Close() diff --git a/cmd/devtap/lazystore_test.go b/cmd/devtap/lazystore_test.go index 1922a56..eaaf3c9 100644 --- a/cmd/devtap/lazystore_test.go +++ b/cmd/devtap/lazystore_test.go @@ -2,6 +2,7 @@ package main import ( "errors" + "strings" "sync" "sync/atomic" "testing" @@ -74,7 +75,8 @@ func TestLazyStore_CooldownPreventsRapidRetry(t *testing.T) { t.Fatalf("expected 1 connect call, got %d", connectCalls) } - // Second call within cooldown: should NOT retry. + // Second call within cooldown: should NOT retry, and error should + // include the original connection error for diagnostics. _, err = ls.Drain("s", 10) if err == nil { t.Fatal("expected error") @@ -82,6 +84,12 @@ func TestLazyStore_CooldownPreventsRapidRetry(t *testing.T) { if connectCalls != 1 { t.Fatalf("expected still 1 connect call (cooldown), got %d", connectCalls) } + if !strings.Contains(err.Error(), "cooldown") { + t.Fatalf("expected cooldown in error, got: %v", err) + } + if !strings.Contains(err.Error(), "connection refused") { + t.Fatalf("expected original error in cooldown message, got: %v", err) + } } func TestLazyStore_RetriesAfterCooldown(t *testing.T) {