-
Notifications
You must be signed in to change notification settings - Fork 462
fix(streamable-http): prevent SSE reconnect loops with shadow channels #660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8bd424e
0d03eb5
a7bb822
7cf5406
a7df58c
c3a557e
14eddf4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -293,6 +293,12 @@ pub struct LocalSessionWorker { | |
| tx_router: HashMap<HttpRequestId, HttpRequestWise>, | ||
| resource_router: HashMap<ResourceKey, HttpRequestId>, | ||
| common: CachedTx, | ||
| /// Shadow senders for secondary SSE streams (e.g. from POST EventSource | ||
| /// reconnections). These keep the HTTP connections alive via SSE keep-alive | ||
| /// without receiving notifications, preventing clients like Cursor from | ||
| /// entering infinite reconnect loops when multiple EventSource connections | ||
| /// compete to replace the common channel. | ||
| shadow_txs: Vec<Sender<ServerSseMessage>>, | ||
| event_rx: Receiver<SessionEvent>, | ||
| session_config: SessionConfig, | ||
| } | ||
|
|
@@ -315,6 +321,8 @@ pub enum SessionError { | |
| SessionServiceTerminated, | ||
| #[error("Invalid event id")] | ||
| InvalidEventId, | ||
| #[error("Conflict: Only one standalone SSE stream is allowed per session")] | ||
| Conflict, | ||
|
Comment on lines
+324
to
+325
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this used? |
||
| #[error("IO error: {0}")] | ||
| Io(#[from] std::io::Error), | ||
| } | ||
|
|
@@ -513,36 +521,69 @@ impl LocalSessionWorker { | |
| &mut self, | ||
| last_event_id: EventId, | ||
| ) -> Result<StreamableHttpMessageReceiver, SessionError> { | ||
| // Clean up closed shadow senders before processing | ||
| self.shadow_txs.retain(|tx| !tx.is_closed()); | ||
|
|
||
| match last_event_id.http_request_id { | ||
| Some(http_request_id) => { | ||
| let request_wise = self | ||
| .tx_router | ||
| .get_mut(&http_request_id) | ||
| .ok_or(SessionError::ChannelClosed(Some(http_request_id)))?; | ||
| let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); | ||
| let (tx, rx) = channel; | ||
| request_wise.tx.tx = tx; | ||
| let index = last_event_id.index; | ||
| // sync messages after index | ||
| request_wise.tx.sync(index).await?; | ||
| Ok(StreamableHttpMessageReceiver { | ||
| http_request_id: Some(http_request_id), | ||
| inner: rx, | ||
| }) | ||
| } | ||
| None => { | ||
| let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); | ||
| let (tx, rx) = channel; | ||
| self.common.tx = tx; | ||
| let index = last_event_id.index; | ||
| // sync messages after index | ||
| self.common.sync(index).await?; | ||
| Ok(StreamableHttpMessageReceiver { | ||
| http_request_id: None, | ||
| inner: rx, | ||
| }) | ||
| if let Some(request_wise) = self.tx_router.get_mut(&http_request_id) { | ||
| // Resume existing request-wise channel | ||
| let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity); | ||
| let (tx, rx) = channel; | ||
| request_wise.tx.tx = tx; | ||
| let index = last_event_id.index; | ||
| // sync messages after index | ||
| request_wise.tx.sync(index).await?; | ||
| Ok(StreamableHttpMessageReceiver { | ||
| http_request_id: Some(http_request_id), | ||
| inner: rx, | ||
| }) | ||
| } else { | ||
| // Request-wise channel completed (POST response already delivered). | ||
| // The client's EventSource is reconnecting after the POST SSE stream | ||
| // ended. Fall through to common channel handling below. | ||
| tracing::debug!( | ||
| http_request_id, | ||
| "Request-wise channel completed, falling back to common channel" | ||
| ); | ||
| self.resume_or_shadow_common() | ||
| } | ||
| } | ||
| None => self.resume_or_shadow_common(), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old |
||
| } | ||
| } | ||
|
|
||
| /// Resume the common channel, or create a shadow stream if the primary is | ||
| /// still active. | ||
| /// | ||
| /// When the primary common channel is dead (receiver dropped), replace it | ||
| /// so this stream becomes the new primary notification channel. | ||
| /// | ||
| /// When the primary is still active, create a "shadow" stream — an idle SSE | ||
| /// connection kept alive by keep-alive pings. This prevents multiple | ||
| /// EventSource connections (e.g. from POST response reconnections) from | ||
| /// killing each other by repeatedly replacing the common channel sender. | ||
| fn resume_or_shadow_common(&mut self) -> Result<StreamableHttpMessageReceiver, SessionError> { | ||
| let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity); | ||
| if self.common.tx.is_closed() { | ||
| // Primary common channel is dead — replace it. | ||
| tracing::debug!("Replacing dead common channel with new primary"); | ||
| self.common.tx = tx; | ||
| } else { | ||
| // Primary common channel is still active. Create a shadow stream | ||
| // that stays alive via SSE keep-alive but doesn't receive | ||
| // notifications. This prevents competing EventSource connections | ||
| // from killing each other's channels. | ||
| tracing::debug!( | ||
| shadow_count = self.shadow_txs.len(), | ||
| "Common channel active, creating shadow stream" | ||
| ); | ||
| self.shadow_txs.push(tx); | ||
| } | ||
| Ok(StreamableHttpMessageReceiver { | ||
| http_request_id: None, | ||
| inner: rx, | ||
| }) | ||
| } | ||
|
|
||
| async fn close_sse_stream( | ||
|
|
@@ -584,6 +625,9 @@ impl LocalSessionWorker { | |
| let (tx, _rx) = tokio::sync::mpsc::channel(1); | ||
| self.common.tx = tx; | ||
|
|
||
| // Also close all shadow streams | ||
| self.shadow_txs.clear(); | ||
|
|
||
| tracing::debug!("closed standalone SSE stream for server-initiated disconnection"); | ||
| Ok(()) | ||
| } | ||
|
|
@@ -1036,6 +1080,7 @@ pub fn create_local_session( | |
| tx_router: HashMap::new(), | ||
| resource_router: HashMap::new(), | ||
| common, | ||
| shadow_txs: Vec::new(), | ||
| event_rx, | ||
| session_config: config.clone(), | ||
| }; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest describing what the tests verify rather than what triggered them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree with this