-
Notifications
You must be signed in to change notification settings - Fork 61
🤖 refactor: migrate browser from WebSocket to HTTP transport with single-workspace subscription #2234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…provider Replace @orpc/client/websocket with @orpc/client/fetch for the browser's RPC transport layer. This simplifies the connection lifecycle since HTTP/fetch has no persistent connection to manage (no open/close/error events). Key changes: - createBrowserClient now uses RPCLink from @orpc/client/fetch with Authorization header instead of WebSocket with token query param - connect() verifies reachability via auth-check ping (Promise-based) instead of WebSocket open/close event listeners - Remove 'degraded' state from APIState — with stateless HTTP requests, there's no half-open connection to degrade. Liveness pings still detect backend downtime and trigger reconnection. - Remove closeWebSocketSafely(), createWebSocket prop, wsFactory memo, forceReconnectInProgressRef - Non-auth connection errors now schedule reconnection with backoff (previously went to error state for non-WS-close failures) - Update ConnectionStatusToast to remove degraded state handling - Rewrite API.test.tsx to mock ping() responses instead of MockWebSocket - Electron MessagePort transport path is completely unchanged The backend already serves /orpc for HTTP (used by mobile), so no backend changes are needed.
Only subscribe to onChat for the currently selected workspace instead of all workspaces. Non-selected workspaces get sidebar state from a cached snapshot updated by the backend activity stream. Changes: - WorkspaceStore: Add selectedWorkspaceId tracking, suspendWorkspace(), cacheSidebarStateFromAggregator(), subscribeToActivityStream(), updateCachedSidebarFromActivity() - WorkspaceStore.syncWorkspaces: Only subscribe selected workspace, suspend all others (preserving metadata + cached sidebar state) - WorkspaceStore.getWorkspaceSidebarState: Fall back to cached state for suspended workspaces, with safe default when no cache exists - WorkspaceStore.getWorkspaceRecency: Include suspended workspaces via recencyCache - WorkspaceStore.setClient: Restart activity stream on reconnect - WorkspaceStore.dispose: Clean up activity subscription + cached state - WorkspaceContext: Wire setSelectedWorkspaceId via ref + useEffect - AppLoader: Set selected workspace before syncing stores - Tests: Update syncWorkspaces test to set selectedWorkspaceId, add test for non-selected workspace behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e61e7f5d32
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
- Cache default WorkspaceSidebarState for suspended workspaces to return referentially stable objects (required by useSyncExternalStore) - Use shared EMPTY_SKILLS constant instead of inline [] allocations - Replace useEffect with synchronous render-time store sync for workspace selection to ensure aggregator exists before children render
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ff39838ef3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
getWorkspaceState() now lazily creates the aggregator if metadata exists but the aggregator is missing. This handles two edge cases: 1. Storybook: singleton store persists across story remounts but aggregators may be torn down when the client changes 2. Race conditions during navigation where the component renders before the useEffect fires setSelectedWorkspaceId
Address Codex review feedback: 1. Retry activity stream with exponential backoff (1s-30s) after transient failures, so sidebar updates resume automatically 2. Subscribe to the event stream before fetching the initial snapshot to close the race window where events could be lost between the two calls (duplicates are harmless — change detection deduplicates)
|
@codex review Addressed both P2 comments in commit 15f42cb:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 15f42cbf19
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for (const id of subscribedIds) { | ||
| if (id !== this.selectedWorkspaceId && metadataIds.has(id)) { | ||
| this.suspendWorkspace(id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep suspended workspaces visible to resume manager
Suspending every non-selected workspace here removes its aggregator-backed state, and getAllStates() still only iterates aggregators; this means background workspaces disappear from the state map that useResumeManager polls (src/browser/hooks/useResumeManager.ts), so interrupted streams in workspaces you navigated away from no longer auto-resume. This is a functional regression from the previous “all workspaces monitored” behavior and leaves recovery dependent on manually re-opening each affected workspace.
Useful? React with 👍 / 👎.
| existing.canInterrupt === canInterrupt && | ||
| existing.currentModel === activity.lastModel && | ||
| existing.recencyTimestamp === activity.recency | ||
| ) { | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear stale isStarting state for suspended workspaces
This dedupe guard ignores isStarting, so if a workspace is suspended while isStarting is true and the activity snapshot later reports streaming=false with unchanged lastModel/recency (common when a stream fails before first token), the function returns early and never clears the starting flag. The sidebar can then show that workspace as perpetually “working” until it is reopened and replayed.
Useful? React with 👍 / 👎.
Summary
Migrates the browser frontend from WebSocket to HTTP/fetch transport by first reducing concurrent
onChatsubscriptions from N (all workspaces) to 1 (selected workspace only), then replacing the WebSocket transport with stateless HTTP/fetch.Background
The browser frontend previously used a WebSocket transport (
@orpc/client/websocket) and subscribed toonChatfor every workspace simultaneously. With N workspaces this meant:chat.jsonlreplay)The mobile client already uses
@orpc/client/fetchand subscribes to only the viewed workspace — this PR aligns the browser with that pattern.Implementation
1. Single-workspace
onChatsubscription (WorkspaceStore.ts)selectedWorkspaceIdtracking withsetSelectedWorkspaceId()methodsuspendWorkspace()— caches sidebar state before freeing aggregator/subscription (lighter thanremoveWorkspace)syncWorkspaces()to only subscribe the selected workspace; all others remain suspendedgetWorkspaceSidebarState()falls back to cached state for non-subscribed workspacesworkspace.activity.subscribe) that updates cached sidebar state in real-time whenstreamingstatus changes for non-selected workspaces2. Wiring from WorkspaceContext/AppLoader
WorkspaceContext.tsx: CallssetSelectedWorkspaceId()beforesyncWorkspaces()using a stable ref, plus auseEffectfor navigation-driven selection changesAppLoader.tsx: Passes selected workspace ID during initial sync3. HTTP/fetch transport (
API.tsx)@orpc/client/websocket→@orpc/client/fetch(usingRPCLinkwith HTTP endpoint at/orpc)closeWebSocketSafely(),createWebSocketprop,ws.addEventListener()handlersdegradedconnection state (no persistent connection to degrade with HTTP)connect()to Promise-based auth-check pingConnectionStatusToast.tsxto remove degraded statusValidation
make typecheckmake lintmake fmt-checkmake static-checkAPI.test.tsxWorkspaceStore.test.tsWorkspaceContext.test.tsxRisks
/orpc(HTTP) and/orpc/ws(WebSocket). The WebSocket endpoint can be removed in a future cleanup.agentStatusandawaitingUserQuestionfor non-selected workspaces use cached values from when the workspace was last active.canInterrupt(streaming indicator) updates in real-time via the activity stream.onChat(reads localchat.jsonl); this is fast but there may be a brief loading moment.getWorkspaceState()assertions: Only called for the selected workspace (verified all callers). Non-selected workspaces go throughgetWorkspaceSidebarState()which has graceful fallbacks.📋 Implementation Plan
Plan: Migrate from WebSocket to HTTP Transport by Subscribing Only to the Active Workspace
Context & Why
The browser frontend currently uses a WebSocket transport (
@orpc/client/websocket) to communicate with the backend. The reason WebSocket was chosen is that the frontend subscribes to all workspace chat streams simultaneously — oneonChatasync generator per workspace. With N workspaces, this means:chat.jsonlreplays loaded into memory on connectIf we change the frontend to only subscribe to the currently selected workspace's
onChatstream (like mobile already does), we eliminate the need for persistent multiplexed streaming and can switch to HTTP/fetch transport (@orpc/client/fetch) which uses SSE for the remaining streams.Benefits:
/orpc— no server changes needed for the transport itselfEvidence
src/browser/contexts/API.tsx:95-119RPCLinkfrom@orpc/client/websocket, connects to/orpc/wssrc/browser/stores/WorkspaceStore.ts:1839-1856syncWorkspaces()callsaddWorkspace()for every workspace → NonChatsubscriptionssrc/browser/stores/WorkspaceStore.ts:1592-1717runOnChatSubscription()— replay + live stream + 5s heartbeat + stall watchdog per workspacesrc/browser/contexts/WorkspaceContext.tsx:788-794selectedWorkspacederived from URL route — canonical "active workspace"mobile/src/orpc/client.ts@orpc/client/fetchwith SSE streaming successfullymobile/src/screens/WorkspaceScreen.tsx:705-729onChatfor only the viewed workspacesrc/node/orpc/server.ts:711-723RPCHandleralready mounted at/orpc, handles streaming via SSEsrc/common/orpc/schemas/workspace.ts:100-107WorkspaceActivitySnapshothasrecency,streaming,lastModel,lastThinkingLevelsrc/browser/components/WorkspaceListItem.tsx:304-305canInterrupt,awaitingUserQuestion,isStarting,agentStatussrc/desktop/main.ts:667-677Implementation
Phase 1: Enrich the global activity stream for sidebar needs (~80 LoC)
The sidebar currently derives
canInterrupt,isStarting,awaitingUserQuestion,agentStatus, andloadedSkillsfrom the per-workspaceonChatstream. To unsubscribe from non-visible workspaces, we need an alternative source for these fields.1a. Extend
WorkspaceActivitySnapshotschema (src/common/orpc/schemas/workspace.ts)Add sidebar-relevant fields to the activity snapshot:
1b. Emit enriched activity snapshots from
workspaceService(src/node/services/workspaceService.ts)Update
emitActivity()to include the new fields, derived from the agent session state. These fields are already tracked by the session — we just need to include them in the snapshot.1c. Consume activity stream in
WorkspaceStorefor sidebar stateAdd a subscription to
workspace.activity.subscribeinWorkspaceStorethat updates a newsidebarActivitymap. The existingWorkspaceSidebarStatederivation for non-selected workspaces will read from this map instead of the aggregator.Phase 2: Subscribe to
onChatonly for the selected workspace (~120 LoC)2a. Pass
selectedWorkspaceIdtoWorkspaceStoreWorkspaceStorecurrently has no concept of the selected workspace. Add asetSelectedWorkspaceId(id: string | null)method that:addWorkspace()for the new workspace (startsonChatsubscription + replay)suspendWorkspace()for the previously selected workspace2b. Add
suspendWorkspace(workspaceId)method (src/browser/stores/WorkspaceStore.ts)A lighter version of
removeWorkspace()that:onChatsubscription (viaipcUnsubscribers)chatTransientStateworkspaceMetadata,recencyCache,sidebarStateCache,sessionUsage2c. Update
syncWorkspaces()to not auto-subscribeChange
syncWorkspaces()to only calladdWorkspace()for the selected workspace, not all workspaces. Non-selected workspaces get their sidebar state from the activity stream (Phase 1c).2d. Wire
selectedWorkspaceIdfromWorkspaceContexttoWorkspaceStoreIn the
AppLoadercomponent (or whereversyncWorkspacesis called), pass through theselectedWorkspace.workspaceIdfrom the router. When the user switches workspaces,syncWorkspacesis called again with the new selection, triggering suspend/subscribe.2e. Update
useWorkspaceSidebarStatefor non-subscribed workspacesFor workspaces without an active aggregator, derive sidebar state from the activity stream data:
Phase 3: Switch browser transport from WebSocket to HTTP/fetch (~40 LoC)
3a. Replace
createBrowserClientinAPI.tsx3b. Simplify connection lifecycle in
APIProviderRemove WebSocket-specific code:
ws.addEventListener("open" / "close" / "error")handlerscloseWebSocketSafely()createWebSocketfactory propConnectionState.degraded(no persistent connection to degrade)Keep:
client.general.ping("auth-check")on mount3c. Remove
/orpc/wsWebSocket endpoint (optional, future cleanup)The WebSocket server setup in
src/node/orpc/server.ts:774-823can be removed once all clients use HTTP. However, this can be deferred since it doesn't hurt to keep it.Phase 4: Handle edge cases (~60 LoC)
4a. Streaming workspace notification in sidebar
When a non-selected workspace starts streaming (e.g., a task workspace), the activity stream will update the sidebar state. The user sees the "working" indicator without needing an
onChatsubscription.4b. Background workspace completion
When a non-selected workspace's stream ends, the activity stream updates sidebar state. If the user switches to that workspace,
onChatreplays the full history including the completed stream.4c. Race condition on workspace switch
When switching workspaces rapidly, ensure the previous
onChatsubscription is aborted before starting a new one. The existingAbortControllerpattern inaddWorkspacehandles this —suspendWorkspaceaborts the controller, andaddWorkspacecreates a new one.4d.
BackgroundBashStoresubscriptionsCurrently per-workspace and lazy — already only subscribes when UI needs it. No change needed, but ensure it only subscribes for the selected workspace (it already does via UI component mount/unmount).
LoC Estimate
Risks & Mitigations
/orpcis battle-testedonChatreplay is fast (reads localchat.jsonl); show loading skeleton during replayloadedSkillsnot available for non-selected workspacesGenerated with
mux• Model:anthropic:claude-opus-4-6• Thinking:xhigh• Cost:$151.18