Skip to content

Conversation

@ThomasK33
Copy link
Member

Summary

Migrates the browser frontend from WebSocket to HTTP/fetch transport by first reducing concurrent onChat subscriptions from N (all workspaces) to 1 (selected workspace only), then replacing the WebSocket transport with stateless HTTP/fetch.

Background

The browser frontend previously used a WebSocket transport (@orpc/client/websocket) and subscribed to onChat for every workspace simultaneously. With N workspaces this meant:

  • N concurrent streaming subscriptions (each with full chat.jsonl replay)
  • A persistent WebSocket connection required to multiplex all streams
  • High memory usage from N aggregators holding full message histories

The mobile client already uses @orpc/client/fetch and subscribes to only the viewed workspace — this PR aligns the browser with that pattern.

Implementation

1. Single-workspace onChat subscription (WorkspaceStore.ts)

  • Added selectedWorkspaceId tracking with setSelectedWorkspaceId() method
  • New suspendWorkspace() — caches sidebar state before freeing aggregator/subscription (lighter than removeWorkspace)
  • Modified syncWorkspaces() to only subscribe the selected workspace; all others remain suspended
  • getWorkspaceSidebarState() falls back to cached state for non-subscribed workspaces
  • Added activity stream subscription (workspace.activity.subscribe) that updates cached sidebar state in real-time when streaming status changes for non-selected workspaces

2. Wiring from WorkspaceContext/AppLoader

  • WorkspaceContext.tsx: Calls setSelectedWorkspaceId() before syncWorkspaces() using a stable ref, plus a useEffect for navigation-driven selection changes
  • AppLoader.tsx: Passes selected workspace ID during initial sync

3. HTTP/fetch transport (API.tsx)

  • Replaced @orpc/client/websocket@orpc/client/fetch (using RPCLink with HTTP endpoint at /orpc)
  • Removed all WebSocket-specific code: closeWebSocketSafely(), createWebSocket prop, ws.addEventListener() handlers
  • Removed degraded connection state (no persistent connection to degrade with HTTP)
  • Simplified connect() to Promise-based auth-check ping
  • Simplified liveness check (no degraded recovery path)
  • Updated ConnectionStatusToast.tsx to remove degraded status

Validation

Check Result
make typecheck ✅ Pass
make lint ✅ Pass
make fmt-check ✅ Pass
make static-check ✅ Pass
API.test.tsx ✅ 5 pass
WorkspaceStore.test.ts ✅ 27 pass
WorkspaceContext.test.tsx ✅ 26 pass

Risks

  • SSE streaming reliability: Mobile already validates the fetch/SSE path; backend serves both /orpc (HTTP) and /orpc/ws (WebSocket). The WebSocket endpoint can be removed in a future cleanup.
  • Sidebar state staleness: agentStatus and awaitingUserQuestion for non-selected workspaces use cached values from when the workspace was last active. canInterrupt (streaming indicator) updates in real-time via the activity stream.
  • Workspace switch latency: Switching workspaces replays onChat (reads local chat.jsonl); this is fast but there may be a brief loading moment.
  • getWorkspaceState() assertions: Only called for the selected workspace (verified all callers). Non-selected workspaces go through getWorkspaceSidebarState() which has graceful fallbacks.

📋 Implementation Plan

Plan: Migrate from WebSocket to HTTP Transport by Subscribing Only to the Active Workspace

Context & Why

The browser frontend currently uses a WebSocket transport (@orpc/client/websocket) to communicate with the backend. The reason WebSocket was chosen is that the frontend subscribes to all workspace chat streams simultaneously — one onChat async generator per workspace. With N workspaces, this means:

  • N concurrent streaming subscriptions (each with 5s heartbeat + 2s stall watchdog)
  • N full chat.jsonl replays loaded into memory on connect
  • A persistent WebSocket connection required to multiplex all these streams

If we change the frontend to only subscribe to the currently selected workspace's onChat stream (like mobile already does), we eliminate the need for persistent multiplexed streaming and can switch to HTTP/fetch transport (@orpc/client/fetch) which uses SSE for the remaining streams.

Benefits:

  • Simpler connection lifecycle (no WebSocket reconnection/heartbeat management)
  • Dramatically less data streamed (1 workspace stream vs N)
  • Lower memory usage (only one aggregator with full message history)
  • Aligns browser with mobile/VSCode which already use fetch transport
  • Server already supports HTTP transport at /orpc — no server changes needed for the transport itself

Evidence

Source Key finding
src/browser/contexts/API.tsx:95-119 Browser client uses RPCLink from @orpc/client/websocket, connects to /orpc/ws
src/browser/stores/WorkspaceStore.ts:1839-1856 syncWorkspaces() calls addWorkspace() for every workspace → N onChat subscriptions
src/browser/stores/WorkspaceStore.ts:1592-1717 runOnChatSubscription() — replay + live stream + 5s heartbeat + stall watchdog per workspace
src/browser/contexts/WorkspaceContext.tsx:788-794 selectedWorkspace derived from URL route — canonical "active workspace"
mobile/src/orpc/client.ts Mobile already uses @orpc/client/fetch with SSE streaming successfully
mobile/src/screens/WorkspaceScreen.tsx:705-729 Mobile subscribes to onChat for only the viewed workspace
src/node/orpc/server.ts:711-723 HTTP RPCHandler already mounted at /orpc, handles streaming via SSE
src/common/orpc/schemas/workspace.ts:100-107 WorkspaceActivitySnapshot has recency, streaming, lastModel, lastThinkingLevel
src/browser/components/WorkspaceListItem.tsx:304-305 Sidebar needs canInterrupt, awaitingUserQuestion, isStarting, agentStatus
src/desktop/main.ts:667-677 Electron uses MessagePort transport (unaffected by this change)

Implementation

Phase 1: Enrich the global activity stream for sidebar needs (~80 LoC)

The sidebar currently derives canInterrupt, isStarting, awaitingUserQuestion, agentStatus, and loadedSkills from the per-workspace onChat stream. To unsubscribe from non-visible workspaces, we need an alternative source for these fields.

1a. Extend WorkspaceActivitySnapshot schema (src/common/orpc/schemas/workspace.ts)

Add sidebar-relevant fields to the activity snapshot:

export const WorkspaceActivitySnapshotSchema = z.object({
  recency: z.number().meta({ description: "Unix ms timestamp of last user interaction" }),
  streaming: z.boolean().meta({ description: "Whether workspace currently has an active stream" }),
  lastModel: z.string().nullable().meta({ description: "Last model sent from this workspace" }),
  lastThinkingLevel: ThinkingLevelSchema.nullable().meta({ ... }),
  // New fields for sidebar display:
  canInterrupt: z.boolean(),
  isStarting: z.boolean(),
  awaitingUserQuestion: z.boolean(),
  agentStatus: z.object({
    emoji: z.string(),
    message: z.string(),
    url: z.string().optional(),
  }).nullable(),
});

1b. Emit enriched activity snapshots from workspaceService (src/node/services/workspaceService.ts)

Update emitActivity() to include the new fields, derived from the agent session state. These fields are already tracked by the session — we just need to include them in the snapshot.

1c. Consume activity stream in WorkspaceStore for sidebar state

Add a subscription to workspace.activity.subscribe in WorkspaceStore that updates a new sidebarActivity map. The existing WorkspaceSidebarState derivation for non-selected workspaces will read from this map instead of the aggregator.

Phase 2: Subscribe to onChat only for the selected workspace (~120 LoC)

2a. Pass selectedWorkspaceId to WorkspaceStore

WorkspaceStore currently has no concept of the selected workspace. Add a setSelectedWorkspaceId(id: string | null) method that:

  • Calls addWorkspace() for the new workspace (starts onChat subscription + replay)
  • Calls a new suspendWorkspace() for the previously selected workspace

2b. Add suspendWorkspace(workspaceId) method (src/browser/stores/WorkspaceStore.ts)

A lighter version of removeWorkspace() that:

  • Aborts the onChat subscription (via ipcUnsubscribers)
  • Frees the aggregator (full message history)
  • Frees chatTransientState
  • Preserves: workspaceMetadata, recencyCache, sidebarStateCache, sessionUsage

2c. Update syncWorkspaces() to not auto-subscribe

Change syncWorkspaces() to only call addWorkspace() for the selected workspace, not all workspaces. Non-selected workspaces get their sidebar state from the activity stream (Phase 1c).

syncWorkspaces(
  workspaceMetadata: Map<string, FrontendWorkspaceMetadata>,
  selectedWorkspaceId: string | null,
): void {
  // Update metadata for all workspaces
  for (const [id, meta] of workspaceMetadata) {
    this.workspaceMetadata.set(id, meta);
  }
  // Only subscribe to onChat for the selected workspace
  if (selectedWorkspaceId && !this.ipcUnsubscribers.has(selectedWorkspaceId)) {
    const meta = workspaceMetadata.get(selectedWorkspaceId);
    if (meta) this.addWorkspace(meta);
  }
  // Suspend any previously-subscribed workspace that's no longer selected
  for (const id of this.ipcUnsubscribers.keys()) {
    if (id !== selectedWorkspaceId) {
      this.suspendWorkspace(id);
    }
  }
  // Remove workspaces that no longer exist
  // ...
}

2d. Wire selectedWorkspaceId from WorkspaceContext to WorkspaceStore

In the AppLoader component (or wherever syncWorkspaces is called), pass through the selectedWorkspace.workspaceId from the router. When the user switches workspaces, syncWorkspaces is called again with the new selection, triggering suspend/subscribe.

2e. Update useWorkspaceSidebarState for non-subscribed workspaces

For workspaces without an active aggregator, derive sidebar state from the activity stream data:

// If no aggregator exists (workspace is suspended), use activity data
const activity = this.sidebarActivity.get(workspaceId);
if (!aggregator && activity) {
  return {
    canInterrupt: activity.canInterrupt,
    isStarting: activity.isStarting,
    awaitingUserQuestion: activity.awaitingUserQuestion,
    agentStatus: activity.agentStatus,
    recencyTimestamp: activity.recency,
    currentModel: activity.lastModel,
    loadedSkills: [], // Not needed for sidebar rendering of non-selected workspaces
  };
}

Phase 3: Switch browser transport from WebSocket to HTTP/fetch (~40 LoC)

3a. Replace createBrowserClient in API.tsx

import { RPCLink as FetchLink } from "@orpc/client/fetch";

function createBrowserClient(
  authToken: string | null,
): { client: APIClient; cleanup: () => void } {
  const apiBaseUrl = getBrowserBackendBaseUrl();
  const link = new FetchLink({
    url: `${apiBaseUrl}/orpc`,
    headers: authToken ? { Authorization: `Bearer ${authToken}` } : {},
  });
  return { client: createClient(link), cleanup: () => {} };
}

3b. Simplify connection lifecycle in APIProvider

Remove WebSocket-specific code:

  • ws.addEventListener("open" / "close" / "error") handlers
  • WebSocket heartbeat/pong tracking
  • closeWebSocketSafely()
  • createWebSocket factory prop
  • ConnectionState.degraded (no persistent connection to degrade)

Keep:

  • Auth check via client.general.ping("auth-check") on mount
  • Periodic liveness ping (can remain as HTTP request)
  • Reconnection concept changes to "re-auth" — if ping fails, re-fetch auth token

3c. Remove /orpc/ws WebSocket endpoint (optional, future cleanup)

The WebSocket server setup in src/node/orpc/server.ts:774-823 can be removed once all clients use HTTP. However, this can be deferred since it doesn't hurt to keep it.

Phase 4: Handle edge cases (~60 LoC)

4a. Streaming workspace notification in sidebar

When a non-selected workspace starts streaming (e.g., a task workspace), the activity stream will update the sidebar state. The user sees the "working" indicator without needing an onChat subscription.

4b. Background workspace completion

When a non-selected workspace's stream ends, the activity stream updates sidebar state. If the user switches to that workspace, onChat replays the full history including the completed stream.

4c. Race condition on workspace switch

When switching workspaces rapidly, ensure the previous onChat subscription is aborted before starting a new one. The existing AbortController pattern in addWorkspace handles this — suspendWorkspace aborts the controller, and addWorkspace creates a new one.

4d. BackgroundBashStore subscriptions

Currently per-workspace and lazy — already only subscribes when UI needs it. No change needed, but ensure it only subscribes for the selected workspace (it already does via UI component mount/unmount).

LoC Estimate

Phase Product code LoC (net)
Phase 1: Enrich activity stream ~80
Phase 2: Single-workspace onChat ~120
Phase 3: HTTP transport ~-40 (net removal)
Phase 4: Edge cases ~60
Total ~220 net LoC

Risks & Mitigations

Risk Mitigation
SSE streaming reliability vs WebSocket Mobile already validates this path; HTTP RPCHandler at /orpc is battle-tested
Sidebar state lag for non-selected workspaces Activity stream is event-driven (not polling), so updates are near-instant
Workspace switch latency (replay delay) onChat replay is fast (reads local chat.jsonl); show loading skeleton during replay
loadedSkills not available for non-selected workspaces Skills are only displayed for the selected workspace; sidebar doesn't render them
Electron MessagePort transport unaffected Phase 3 only changes the browser (non-Electron) path; desktop app continues using MessagePort

Generated with mux • Model: anthropic:claude-opus-4-6 • Thinking: xhigh • Cost: $151.18

…provider

Replace @orpc/client/websocket with @orpc/client/fetch for the browser's
RPC transport layer. This simplifies the connection lifecycle since
HTTP/fetch has no persistent connection to manage (no open/close/error
events).

Key changes:
- createBrowserClient now uses RPCLink from @orpc/client/fetch with
  Authorization header instead of WebSocket with token query param
- connect() verifies reachability via auth-check ping (Promise-based)
  instead of WebSocket open/close event listeners
- Remove 'degraded' state from APIState — with stateless HTTP requests,
  there's no half-open connection to degrade. Liveness pings still
  detect backend downtime and trigger reconnection.
- Remove closeWebSocketSafely(), createWebSocket prop, wsFactory memo,
  forceReconnectInProgressRef
- Non-auth connection errors now schedule reconnection with backoff
  (previously went to error state for non-WS-close failures)
- Update ConnectionStatusToast to remove degraded state handling
- Rewrite API.test.tsx to mock ping() responses instead of MockWebSocket
- Electron MessagePort transport path is completely unchanged

The backend already serves /orpc for HTTP (used by mobile), so no
backend changes are needed.
Only subscribe to onChat for the currently selected workspace instead of
all workspaces. Non-selected workspaces get sidebar state from a cached
snapshot updated by the backend activity stream.

Changes:
- WorkspaceStore: Add selectedWorkspaceId tracking, suspendWorkspace(),
  cacheSidebarStateFromAggregator(), subscribeToActivityStream(),
  updateCachedSidebarFromActivity()
- WorkspaceStore.syncWorkspaces: Only subscribe selected workspace,
  suspend all others (preserving metadata + cached sidebar state)
- WorkspaceStore.getWorkspaceSidebarState: Fall back to cached state
  for suspended workspaces, with safe default when no cache exists
- WorkspaceStore.getWorkspaceRecency: Include suspended workspaces
  via recencyCache
- WorkspaceStore.setClient: Restart activity stream on reconnect
- WorkspaceStore.dispose: Clean up activity subscription + cached state
- WorkspaceContext: Wire setSelectedWorkspaceId via ref + useEffect
- AppLoader: Set selected workspace before syncing stores
- Tests: Update syncWorkspaces test to set selectedWorkspaceId,
  add test for non-selected workspace behavior
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e61e7f5d32

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

- Cache default WorkspaceSidebarState for suspended workspaces to return
  referentially stable objects (required by useSyncExternalStore)
- Use shared EMPTY_SKILLS constant instead of inline [] allocations
- Replace useEffect with synchronous render-time store sync for workspace
  selection to ensure aggregator exists before children render
@ThomasK33
Copy link
Member Author

@codex review

Fixed the P1 issue in commit ff39838 — replaced the useEffect with synchronous render-time store sync using a ref-guarded pattern. This ensures the aggregator exists before child components render, preventing the assertGet() crash on workspace navigation.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ff39838ef3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

getWorkspaceState() now lazily creates the aggregator if metadata exists
but the aggregator is missing. This handles two edge cases:
1. Storybook: singleton store persists across story remounts but
   aggregators may be torn down when the client changes
2. Race conditions during navigation where the component renders
   before the useEffect fires setSelectedWorkspaceId
Address Codex review feedback:
1. Retry activity stream with exponential backoff (1s-30s) after
   transient failures, so sidebar updates resume automatically
2. Subscribe to the event stream before fetching the initial snapshot
   to close the race window where events could be lost between the
   two calls (duplicates are harmless — change detection deduplicates)
@ThomasK33
Copy link
Member Author

@codex review

Addressed both P2 comments in commit 15f42cb:

  1. Activity stream retry: Added exponential backoff retry loop (1s–30s) so the activity stream automatically reconnects after transient failures
  2. Subscribe-before-list ordering: Swapped the order to subscribe first, then seed initial state via activity.list(). Events arriving during the gap are deduplicated by updateCachedSidebarFromActivity's change detection.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 15f42cbf19

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +2155 to +2157
for (const id of subscribedIds) {
if (id !== this.selectedWorkspaceId && metadataIds.has(id)) {
this.suspendWorkspace(id);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep suspended workspaces visible to resume manager

Suspending every non-selected workspace here removes its aggregator-backed state, and getAllStates() still only iterates aggregators; this means background workspaces disappear from the state map that useResumeManager polls (src/browser/hooks/useResumeManager.ts), so interrupted streams in workspaces you navigated away from no longer auto-resume. This is a functional regression from the previous “all workspaces monitored” behavior and leaves recovery dependent on manually re-opening each affected workspace.

Useful? React with 👍 / 👎.

Comment on lines +918 to +922
existing.canInterrupt === canInterrupt &&
existing.currentModel === activity.lastModel &&
existing.recencyTimestamp === activity.recency
) {
return;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear stale isStarting state for suspended workspaces

This dedupe guard ignores isStarting, so if a workspace is suspended while isStarting is true and the activity snapshot later reports streaming=false with unchanged lastModel/recency (common when a stream fails before first token), the function returns early and never clears the starting flag. The sidebar can then show that workspace as perpetually “working” until it is reopened and replayed.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant