1- """
2- StreamableHTTP Client Transport Module
1+ """Implements StreamableHTTP transport for MCP clients."""
32
4- This module implements the StreamableHTTP transport for MCP clients,
5- providing support for HTTP POST requests with optional SSE streaming responses
6- and session management.
7- """
3+ from __future__ import annotations as _annotations
84
95import contextlib
106import logging
117from collections .abc import AsyncGenerator , Awaitable , Callable
128from contextlib import asynccontextmanager
139from dataclasses import dataclass
14- from datetime import timedelta
15- from typing import Any , overload
16- from warnings import warn
1710
1811import anyio
1912import httpx
2013from anyio .abc import TaskGroup
2114from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
2215from httpx_sse import EventSource , ServerSentEvent , aconnect_sse
23- from typing_extensions import deprecated
2416
25- from mcp .shared ._httpx_utils import (
26- McpHttpClientFactory ,
27- create_mcp_http_client ,
28- )
17+ from mcp .shared ._httpx_utils import create_mcp_http_client
2918from mcp .shared .message import ClientMessageMetadata , SessionMessage
3019from mcp .types import (
3120 ErrorData ,
5342# Reconnection defaults
5443DEFAULT_RECONNECTION_DELAY_MS = 1000 # 1 second fallback when server doesn't provide retry
5544MAX_RECONNECTION_ATTEMPTS = 2 # Max retry attempts before giving up
56- CONTENT_TYPE = "content-type"
57- ACCEPT = "accept"
58-
59-
60- JSON = "application/json"
61- SSE = "text/event-stream"
62-
63- # Sentinel value for detecting unset optional parameters
64- _UNSET = object ()
6545
6646
6747class StreamableHTTPError (Exception ):
@@ -81,80 +61,31 @@ class RequestContext:
8161 session_message : SessionMessage
8262 metadata : ClientMessageMetadata | None
8363 read_stream_writer : StreamWriter
84- headers : dict [str , str ] | None = None # Deprecated - no longer used
85- sse_read_timeout : float | None = None # Deprecated - no longer used
8664
8765
8866class StreamableHTTPTransport :
8967 """StreamableHTTP client transport implementation."""
9068
91- @overload
92- def __init__ (self , url : str ) -> None : ...
93-
94- @overload
95- @deprecated (
96- "Parameters headers, timeout, sse_read_timeout, and auth are deprecated. "
97- "Configure these on the httpx.AsyncClient instead."
98- )
99- def __init__ (
100- self ,
101- url : str ,
102- headers : dict [str , str ] | None = None ,
103- timeout : float = 30.0 ,
104- sse_read_timeout : float = 300.0 ,
105- auth : httpx .Auth | None = None ,
106- ) -> None : ...
107-
108- def __init__ (
109- self ,
110- url : str ,
111- headers : Any = _UNSET ,
112- timeout : Any = _UNSET ,
113- sse_read_timeout : Any = _UNSET ,
114- auth : Any = _UNSET ,
115- ) -> None :
69+ def __init__ (self , url : str ) -> None :
11670 """Initialize the StreamableHTTP transport.
11771
11872 Args:
11973 url: The endpoint URL.
120- headers: Optional headers to include in requests.
121- timeout: HTTP timeout for regular operations (in seconds).
122- sse_read_timeout: Timeout for SSE read operations (in seconds).
123- auth: Optional HTTPX authentication handler.
12474 """
125- # Check for deprecated parameters and issue runtime warning
126- deprecated_params : list [str ] = []
127- if headers is not _UNSET :
128- deprecated_params .append ("headers" )
129- if timeout is not _UNSET :
130- deprecated_params .append ("timeout" )
131- if sse_read_timeout is not _UNSET :
132- deprecated_params .append ("sse_read_timeout" )
133- if auth is not _UNSET :
134- deprecated_params .append ("auth" )
135-
136- if deprecated_params :
137- warn (
138- f"Parameters { ', ' .join (deprecated_params )} are deprecated and will be ignored. "
139- "Configure these on the httpx.AsyncClient instead." ,
140- DeprecationWarning ,
141- stacklevel = 2 ,
142- )
143-
14475 self .url = url
145- self .session_id = None
146- self .protocol_version = None
76+ self .session_id : str | None = None
77+ self .protocol_version : str | None = None
14778
14879 def _prepare_headers (self ) -> dict [str , str ]:
14980 """Build MCP-specific request headers.
15081
15182 These headers will be merged with the httpx.AsyncClient's default headers,
15283 with these MCP-specific headers taking precedence.
15384 """
154- headers : dict [str , str ] = {}
155- # Add MCP protocol headers
156- headers [ ACCEPT ] = f" { JSON } , { SSE } "
157- headers [ CONTENT_TYPE ] = JSON
85+ headers : dict [str , str ] = {
86+ "accept" : "application/json, text/event-stream" ,
87+ "content-type" : "application/json" ,
88+ }
15889 # Add session headers if available
15990 if self .session_id :
16091 headers [MCP_SESSION_ID ] = self .session_id
@@ -170,31 +101,23 @@ def _is_initialized_notification(self, message: JSONRPCMessage) -> bool:
170101 """Check if the message is an initialized notification."""
171102 return isinstance (message .root , JSONRPCNotification ) and message .root .method == "notifications/initialized"
172103
173- def _maybe_extract_session_id_from_response (
174- self ,
175- response : httpx .Response ,
176- ) -> None :
104+ def _maybe_extract_session_id_from_response (self , response : httpx .Response ) -> None :
177105 """Extract and store session ID from response headers."""
178106 new_session_id = response .headers .get (MCP_SESSION_ID )
179107 if new_session_id :
180108 self .session_id = new_session_id
181109 logger .info (f"Received session ID: { self .session_id } " )
182110
183- def _maybe_extract_protocol_version_from_message (
184- self ,
185- message : JSONRPCMessage ,
186- ) -> None :
111+ def _maybe_extract_protocol_version_from_message (self , message : JSONRPCMessage ) -> None :
187112 """Extract protocol version from initialization response message."""
188113 if isinstance (message .root , JSONRPCResponse ) and message .root .result : # pragma: no branch
189114 try :
190115 # Parse the result as InitializeResult for type safety
191116 init_result = InitializeResult .model_validate (message .root .result )
192117 self .protocol_version = str (init_result .protocolVersion )
193118 logger .info (f"Negotiated protocol version: { self .protocol_version } " )
194- except Exception as exc : # pragma: no cover
195- logger .warning (
196- f"Failed to parse initialization response as InitializeResult: { exc } "
197- ) # pragma: no cover
119+ except Exception : # pragma: no cover
120+ logger .warning ("Failed to parse initialization response as InitializeResult" , exc_info = True )
198121 logger .warning (f"Raw result: { message .root .result } " )
199122
200123 async def _handle_sse_event (
@@ -244,11 +167,7 @@ async def _handle_sse_event(
244167 logger .warning (f"Unknown SSE event: { sse .event } " )
245168 return False
246169
247- async def handle_get_stream (
248- self ,
249- client : httpx .AsyncClient ,
250- read_stream_writer : StreamWriter ,
251- ) -> None :
170+ async def handle_get_stream (self , client : httpx .AsyncClient , read_stream_writer : StreamWriter ) -> None :
252171 """Handle GET stream for server-initiated messages with auto-reconnect."""
253172 last_event_id : str | None = None
254173 retry_interval_ms : int | None = None
@@ -263,12 +182,7 @@ async def handle_get_stream(
263182 if last_event_id :
264183 headers [LAST_EVENT_ID ] = last_event_id # pragma: no cover
265184
266- async with aconnect_sse (
267- client ,
268- "GET" ,
269- self .url ,
270- headers = headers ,
271- ) as event_source :
185+ async with aconnect_sse (client , "GET" , self .url , headers = headers ) as event_source :
272186 event_source .response .raise_for_status ()
273187 logger .debug ("GET SSE connection established" )
274188
@@ -311,12 +225,7 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None:
311225 if isinstance (ctx .session_message .message .root , JSONRPCRequest ): # pragma: no branch
312226 original_request_id = ctx .session_message .message .root .id
313227
314- async with aconnect_sse (
315- ctx .client ,
316- "GET" ,
317- self .url ,
318- headers = headers ,
319- ) as event_source :
228+ async with aconnect_sse (ctx .client , "GET" , self .url , headers = headers ) as event_source :
320229 event_source .response .raise_for_status ()
321230 logger .debug ("Resumption GET SSE connection established" )
322231
@@ -362,10 +271,10 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
362271 # Per https://modelcontextprotocol.io/specification/2025-06-18/basic#notifications:
363272 # The server MUST NOT send a response to notifications.
364273 if isinstance (message .root , JSONRPCRequest ):
365- content_type = response .headers .get (CONTENT_TYPE , "" ).lower ()
366- if content_type .startswith (JSON ):
274+ content_type = response .headers .get ("content-type" , "" ).lower ()
275+ if content_type .startswith ("application/json" ):
367276 await self ._handle_json_response (response , ctx .read_stream_writer , is_initialization )
368- elif content_type .startswith (SSE ):
277+ elif content_type .startswith ("text/event-stream" ):
369278 await self ._handle_sse_response (response , ctx , is_initialization )
370279 else :
371280 await self ._handle_unexpected_content_type ( # pragma: no cover
@@ -460,12 +369,7 @@ async def _handle_reconnection(
460369 original_request_id = ctx .session_message .message .root .id
461370
462371 try :
463- async with aconnect_sse (
464- ctx .client ,
465- "GET" ,
466- self .url ,
467- headers = headers ,
468- ) as event_source :
372+ async with aconnect_sse (ctx .client , "GET" , self .url , headers = headers ) as event_source :
469373 event_source .response .raise_for_status ()
470374 logger .info ("Reconnected to SSE stream" )
471375
@@ -498,20 +402,14 @@ async def _handle_reconnection(
498402 await self ._handle_reconnection (ctx , last_event_id , retry_interval_ms , attempt + 1 )
499403
500404 async def _handle_unexpected_content_type (
501- self ,
502- content_type : str ,
503- read_stream_writer : StreamWriter ,
405+ self , content_type : str , read_stream_writer : StreamWriter
504406 ) -> None : # pragma: no cover
505407 """Handle unexpected content type in response."""
506408 error_msg = f"Unexpected content type: { content_type } " # pragma: no cover
507409 logger .error (error_msg ) # pragma: no cover
508410 await read_stream_writer .send (ValueError (error_msg )) # pragma: no cover
509411
510- async def _send_session_terminated_error (
511- self ,
512- read_stream_writer : StreamWriter ,
513- request_id : RequestId ,
514- ) -> None :
412+ async def _send_session_terminated_error (self , read_stream_writer : StreamWriter , request_id : RequestId ) -> None :
515413 """Send a session terminated error response."""
516414 jsonrpc_error = JSONRPCError (
517415 jsonrpc = "2.0" ,
@@ -619,8 +517,7 @@ async def streamable_http_client(
619517 http_client: Optional pre-configured httpx.AsyncClient. If None, a default
620518 client with recommended MCP timeouts will be created. To configure headers,
621519 authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here.
622- terminate_on_close: If True, send a DELETE request to terminate the session
623- when the context exits.
520+ terminate_on_close: If True, send a DELETE request to terminate the session when the context exits.
624521
625522 Yields:
626523 Tuple containing:
@@ -667,56 +564,11 @@ def start_get_stream() -> None:
667564 )
668565
669566 try :
670- yield (
671- read_stream ,
672- write_stream ,
673- transport .get_session_id ,
674- )
567+ yield (read_stream , write_stream , transport .get_session_id )
675568 finally :
676569 if transport .session_id and terminate_on_close :
677570 await transport .terminate_session (client )
678571 tg .cancel_scope .cancel ()
679572 finally :
680573 await read_stream_writer .aclose ()
681574 await write_stream .aclose ()
682-
683-
684- @asynccontextmanager
685- @deprecated ("Use `streamable_http_client` instead." )
686- async def streamablehttp_client (
687- url : str ,
688- headers : dict [str , str ] | None = None ,
689- timeout : float | timedelta = 30 ,
690- sse_read_timeout : float | timedelta = 60 * 5 ,
691- terminate_on_close : bool = True ,
692- httpx_client_factory : McpHttpClientFactory = create_mcp_http_client ,
693- auth : httpx .Auth | None = None ,
694- ) -> AsyncGenerator [
695- tuple [
696- MemoryObjectReceiveStream [SessionMessage | Exception ],
697- MemoryObjectSendStream [SessionMessage ],
698- GetSessionIdCallback ,
699- ],
700- None ,
701- ]:
702- # Convert timeout parameters
703- timeout_seconds = timeout .total_seconds () if isinstance (timeout , timedelta ) else timeout
704- sse_read_timeout_seconds = (
705- sse_read_timeout .total_seconds () if isinstance (sse_read_timeout , timedelta ) else sse_read_timeout
706- )
707-
708- # Create httpx client using the factory with old-style parameters
709- client = httpx_client_factory (
710- headers = headers ,
711- timeout = httpx .Timeout (timeout_seconds , read = sse_read_timeout_seconds ),
712- auth = auth ,
713- )
714-
715- # Manage client lifecycle since we created it
716- async with client :
717- async with streamable_http_client (
718- url ,
719- http_client = client ,
720- terminate_on_close = terminate_on_close ,
721- ) as streams :
722- yield streams
0 commit comments