From 11fbd75dc9437145bae9cf9deb09ab3320e72ece Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 30 Jan 2026 04:40:59 +0000 Subject: [PATCH 1/4] Initial plan From 8d1d4679a044c0457daed71d86a1ab2151382c74 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 30 Jan 2026 04:50:41 +0000 Subject: [PATCH 2/4] Add ILoggerFactory parameter to StreamableHttpServerTransport constructor Co-authored-by: stephentoub <2642209+stephentoub@users.noreply.github.com> --- .../StreamableHttpHandler.cs | 4 +-- .../Server/StreamableHttpPostTransport.cs | 25 ++++++++++++++++--- .../Server/StreamableHttpServerTransport.cs | 18 +++++++++++-- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs index c50e51388..22c861326 100644 --- a/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs +++ b/src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs @@ -238,7 +238,7 @@ private async ValueTask StartNewSessionAsync(HttpContext if (!HttpServerTransportOptions.Stateless) { sessionId = MakeNewSessionId(); - transport = new() + transport = new(loggerFactory) { SessionId = sessionId, FlowExecutionContextFromRequests = !HttpServerTransportOptions.PerSessionExecutionContext, @@ -252,7 +252,7 @@ private async ValueTask StartNewSessionAsync(HttpContext // If in the future we support resuming stateless requests, we should populate // the event stream store and retry interval here as well. sessionId = ""; - transport = new() + transport = new(loggerFactory) { Stateless = true, }; diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs index f0f94c270..bc8d52fbb 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs @@ -1,3 +1,5 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using ModelContextProtocol.Protocol; using System.Diagnostics; using System.Net.ServerSentEvents; @@ -10,7 +12,11 @@ namespace ModelContextProtocol.Server; /// Handles processing the request/response body pairs for the Streamable HTTP transport. /// This is typically used via . /// -internal sealed class StreamableHttpPostTransport(StreamableHttpServerTransport parentTransport, Stream responseStream, CancellationToken sessionCancellationToken) : ITransport +internal sealed partial class StreamableHttpPostTransport( + StreamableHttpServerTransport parentTransport, + Stream responseStream, + CancellationToken sessionCancellationToken, + ILogger logger) : ITransport { private readonly SemaphoreSlim _messageLock = new(1, 1); private readonly TaskCompletionSource _httpResponseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); @@ -195,11 +201,21 @@ async Task HandleStoreStreamDisposalAsync(Task streamTask) { await streamTask.WaitAsync(sessionCancellationToken).ConfigureAwait(false); } + catch (OperationCanceledException) + { + } finally { - using var _ = await _messageLock.LockAsync().ConfigureAwait(false); + try + { + using var _ = await _messageLock.LockAsync().ConfigureAwait(false); - await _storeSseWriter!.DisposeAsync().ConfigureAwait(false); + await _storeSseWriter!.DisposeAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + LogStoreStreamDisposalFailed(ex); + } } } } @@ -222,4 +238,7 @@ public async ValueTask DisposeAsync() // Don't dispose the event stream writer here, as we may continue to write to the event store // after disposal if there are pending messages. } + + [LoggerMessage(Level = LogLevel.Warning, Message = "Failed to dispose SSE event stream writer.")] + private partial void LogStoreStreamDisposalFailed(Exception exception); } diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index 55b6650b1..6719c4f98 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -1,3 +1,5 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using ModelContextProtocol.Protocol; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -21,7 +23,7 @@ namespace ModelContextProtocol.Server; /// such as when streaming completion results or providing progress updates during long-running operations. /// /// -public sealed class StreamableHttpServerTransport : ITransport +public sealed partial class StreamableHttpServerTransport : ITransport { /// /// The stream ID used for unsolicited messages sent via the standalone GET SSE stream. @@ -35,6 +37,8 @@ public sealed class StreamableHttpServerTransport : ITransport }); private readonly CancellationTokenSource _transportDisposedCts = new(); private readonly SemaphoreSlim _unsolicitedMessageLock = new(1, 1); + private readonly ILoggerFactory? _loggerFactory; + private readonly ILogger _logger; private SseEventWriter? _httpSseWriter; private ISseEventStreamWriter? _storeSseWriter; @@ -42,6 +46,16 @@ public sealed class StreamableHttpServerTransport : ITransport private bool _getHttpRequestStarted; private bool _getHttpResponseCompleted; + /// + /// Initializes a new instance of the class. + /// + /// Optional logger factory used for logging employed by the transport. + public StreamableHttpServerTransport(ILoggerFactory? loggerFactory = null) + { + _loggerFactory = loggerFactory; + _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; + } + /// public string? SessionId { get; init; } @@ -161,7 +175,7 @@ public async Task HandlePostRequestAsync(JsonRpcMessage message, Stream re Throw.IfNull(message); Throw.IfNull(responseStream); - var postTransport = new StreamableHttpPostTransport(this, responseStream, _transportDisposedCts.Token); + var postTransport = new StreamableHttpPostTransport(this, responseStream, _transportDisposedCts.Token, _logger); using var postCts = CancellationTokenSource.CreateLinkedTokenSource(_transportDisposedCts.Token, cancellationToken); await using (postTransport.ConfigureAwait(false)) { From f141d0421621397ea496584eb28933620a5668db Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 30 Jan 2026 04:57:34 +0000 Subject: [PATCH 3/4] Address code review feedback: fix logger field capture and remove unused field Co-authored-by: stephentoub <2642209+stephentoub@users.noreply.github.com> --- .../Server/StreamableHttpPostTransport.cs | 47 ++++++++++++------- .../Server/StreamableHttpServerTransport.cs | 2 - 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs index bc8d52fbb..794e79029 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs @@ -12,15 +12,14 @@ namespace ModelContextProtocol.Server; /// Handles processing the request/response body pairs for the Streamable HTTP transport. /// This is typically used via . /// -internal sealed partial class StreamableHttpPostTransport( - StreamableHttpServerTransport parentTransport, - Stream responseStream, - CancellationToken sessionCancellationToken, - ILogger logger) : ITransport +internal sealed partial class StreamableHttpPostTransport : ITransport { + private readonly StreamableHttpServerTransport _parentTransport; + private readonly CancellationToken _sessionCancellationToken; + private readonly ILogger _logger; private readonly SemaphoreSlim _messageLock = new(1, 1); private readonly TaskCompletionSource _httpResponseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly SseEventWriter _httpSseWriter = new(responseStream); + private readonly SseEventWriter _httpSseWriter; private TaskCompletionSource? _storeStreamTcs; private ISseEventStreamWriter? _storeSseWriter; @@ -29,9 +28,21 @@ internal sealed partial class StreamableHttpPostTransport( private bool _finalResponseMessageSent; private bool _httpResponseCompleted; + public StreamableHttpPostTransport( + StreamableHttpServerTransport parentTransport, + Stream responseStream, + CancellationToken sessionCancellationToken, + ILogger logger) + { + _parentTransport = parentTransport; + _sessionCancellationToken = sessionCancellationToken; + _logger = logger; + _httpSseWriter = new(responseStream); + } + public ChannelReader MessageReader => throw new NotSupportedException("JsonRpcMessage.Context.RelatedTransport should only be used for sending messages."); - string? ITransport.SessionId => parentTransport.SessionId; + string? ITransport.SessionId => _parentTransport.SessionId; /// /// True, if data was written to the response body. @@ -50,21 +61,21 @@ public async ValueTask HandlePostAsync(JsonRpcMessage message, Cancellatio if (request.Method == RequestMethods.Initialize) { var initializeRequest = JsonSerializer.Deserialize(request.Params, McpJsonUtilities.JsonContext.Default.InitializeRequestParams); - await parentTransport.HandleInitRequestAsync(initializeRequest).ConfigureAwait(false); + await _parentTransport.HandleInitRequestAsync(initializeRequest).ConfigureAwait(false); } } message.Context ??= new JsonRpcMessageContext(); message.Context.RelatedTransport = this; - if (parentTransport.FlowExecutionContextFromRequests) + if (_parentTransport.FlowExecutionContextFromRequests) { message.Context.ExecutionContext = ExecutionContext.Capture(); } if (_pendingRequest.Id is null) { - await parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); + await _parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); return false; } @@ -77,7 +88,7 @@ public async ValueTask HandlePostAsync(JsonRpcMessage message, Cancellatio } // Ensure that we've sent the priming event before processing the incoming request. - await parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); + await _parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); } // Wait for the response to be written before returning from the handler. @@ -91,7 +102,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can { Throw.IfNull(message); - if (parentTransport.Stateless && message is JsonRpcRequest) + if (_parentTransport.Stateless && message is JsonRpcRequest) { throw new InvalidOperationException("Server to client requests are not supported in stateless mode."); } @@ -105,7 +116,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can { // The final response message has already been sent. // Rather than drop the message, fall back to sending it via the parent transport. - await parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); + await _parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); return; } @@ -144,7 +155,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationToken cancellationToken) { - if (parentTransport.Stateless) + if (_parentTransport.Stateless) { throw new InvalidOperationException("Polling is not supported in stateless mode."); } @@ -180,9 +191,9 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo { Debug.Assert(_storeSseWriter is null); - _storeSseWriter = await parentTransport.TryCreateEventStreamAsync( + _storeSseWriter = await _parentTransport.TryCreateEventStreamAsync( streamId: requestId.Id!.ToString()!, - cancellationToken: sessionCancellationToken) + cancellationToken: _sessionCancellationToken) .ConfigureAwait(false); if (_storeSseWriter is null) @@ -193,13 +204,13 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo _storeStreamTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _ = HandleStoreStreamDisposalAsync(_storeStreamTcs.Task); - return await _storeSseWriter.WriteEventAsync(SseItem.Prime(), sessionCancellationToken).ConfigureAwait(false); + return await _storeSseWriter.WriteEventAsync(SseItem.Prime(), _sessionCancellationToken).ConfigureAwait(false); async Task HandleStoreStreamDisposalAsync(Task streamTask) { try { - await streamTask.WaitAsync(sessionCancellationToken).ConfigureAwait(false); + await streamTask.WaitAsync(_sessionCancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs index 6719c4f98..58227757b 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpServerTransport.cs @@ -37,7 +37,6 @@ public sealed partial class StreamableHttpServerTransport : ITransport }); private readonly CancellationTokenSource _transportDisposedCts = new(); private readonly SemaphoreSlim _unsolicitedMessageLock = new(1, 1); - private readonly ILoggerFactory? _loggerFactory; private readonly ILogger _logger; private SseEventWriter? _httpSseWriter; @@ -52,7 +51,6 @@ public sealed partial class StreamableHttpServerTransport : ITransport /// Optional logger factory used for logging employed by the transport. public StreamableHttpServerTransport(ILoggerFactory? loggerFactory = null) { - _loggerFactory = loggerFactory; _logger = loggerFactory?.CreateLogger() ?? NullLogger.Instance; } From cff6d2ede51fc3a234d3f116f95d6cfbd0120558 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 30 Jan 2026 16:23:15 +0000 Subject: [PATCH 4/4] Restore primary constructor and fix HandleStoreStreamDisposalAsync Co-authored-by: stephentoub <2642209+stephentoub@users.noreply.github.com> --- .../Server/StreamableHttpPostTransport.cs | 55 +++++++------------ 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs index 794e79029..b6f6b663e 100644 --- a/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs +++ b/src/ModelContextProtocol.Core/Server/StreamableHttpPostTransport.cs @@ -1,5 +1,4 @@ using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; using ModelContextProtocol.Protocol; using System.Diagnostics; using System.Net.ServerSentEvents; @@ -12,14 +11,15 @@ namespace ModelContextProtocol.Server; /// Handles processing the request/response body pairs for the Streamable HTTP transport. /// This is typically used via . /// -internal sealed partial class StreamableHttpPostTransport : ITransport +internal sealed partial class StreamableHttpPostTransport( + StreamableHttpServerTransport parentTransport, + Stream responseStream, + CancellationToken sessionCancellationToken, + ILogger logger) : ITransport { - private readonly StreamableHttpServerTransport _parentTransport; - private readonly CancellationToken _sessionCancellationToken; - private readonly ILogger _logger; private readonly SemaphoreSlim _messageLock = new(1, 1); private readonly TaskCompletionSource _httpResponseTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly SseEventWriter _httpSseWriter; + private readonly SseEventWriter _httpSseWriter = new(responseStream); private TaskCompletionSource? _storeStreamTcs; private ISseEventStreamWriter? _storeSseWriter; @@ -28,21 +28,9 @@ internal sealed partial class StreamableHttpPostTransport : ITransport private bool _finalResponseMessageSent; private bool _httpResponseCompleted; - public StreamableHttpPostTransport( - StreamableHttpServerTransport parentTransport, - Stream responseStream, - CancellationToken sessionCancellationToken, - ILogger logger) - { - _parentTransport = parentTransport; - _sessionCancellationToken = sessionCancellationToken; - _logger = logger; - _httpSseWriter = new(responseStream); - } - public ChannelReader MessageReader => throw new NotSupportedException("JsonRpcMessage.Context.RelatedTransport should only be used for sending messages."); - string? ITransport.SessionId => _parentTransport.SessionId; + string? ITransport.SessionId => parentTransport.SessionId; /// /// True, if data was written to the response body. @@ -61,21 +49,21 @@ public async ValueTask HandlePostAsync(JsonRpcMessage message, Cancellatio if (request.Method == RequestMethods.Initialize) { var initializeRequest = JsonSerializer.Deserialize(request.Params, McpJsonUtilities.JsonContext.Default.InitializeRequestParams); - await _parentTransport.HandleInitRequestAsync(initializeRequest).ConfigureAwait(false); + await parentTransport.HandleInitRequestAsync(initializeRequest).ConfigureAwait(false); } } message.Context ??= new JsonRpcMessageContext(); message.Context.RelatedTransport = this; - if (_parentTransport.FlowExecutionContextFromRequests) + if (parentTransport.FlowExecutionContextFromRequests) { message.Context.ExecutionContext = ExecutionContext.Capture(); } if (_pendingRequest.Id is null) { - await _parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); + await parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); return false; } @@ -88,7 +76,7 @@ public async ValueTask HandlePostAsync(JsonRpcMessage message, Cancellatio } // Ensure that we've sent the priming event before processing the incoming request. - await _parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); + await parentTransport.MessageWriter.WriteAsync(message, cancellationToken).ConfigureAwait(false); } // Wait for the response to be written before returning from the handler. @@ -102,7 +90,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can { Throw.IfNull(message); - if (_parentTransport.Stateless && message is JsonRpcRequest) + if (parentTransport.Stateless && message is JsonRpcRequest) { throw new InvalidOperationException("Server to client requests are not supported in stateless mode."); } @@ -116,7 +104,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can { // The final response message has already been sent. // Rather than drop the message, fall back to sending it via the parent transport. - await _parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); + await parentTransport.SendMessageAsync(message, cancellationToken).ConfigureAwait(false); return; } @@ -155,7 +143,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationToken cancellationToken) { - if (_parentTransport.Stateless) + if (parentTransport.Stateless) { throw new InvalidOperationException("Polling is not supported in stateless mode."); } @@ -191,9 +179,9 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo { Debug.Assert(_storeSseWriter is null); - _storeSseWriter = await _parentTransport.TryCreateEventStreamAsync( + _storeSseWriter = await parentTransport.TryCreateEventStreamAsync( streamId: requestId.Id!.ToString()!, - cancellationToken: _sessionCancellationToken) + cancellationToken: sessionCancellationToken) .ConfigureAwait(false); if (_storeSseWriter is null) @@ -204,23 +192,20 @@ public async ValueTask EnablePollingAsync(TimeSpan retryInterval, CancellationTo _storeStreamTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _ = HandleStoreStreamDisposalAsync(_storeStreamTcs.Task); - return await _storeSseWriter.WriteEventAsync(SseItem.Prime(), _sessionCancellationToken).ConfigureAwait(false); + return await _storeSseWriter.WriteEventAsync(SseItem.Prime(), sessionCancellationToken).ConfigureAwait(false); async Task HandleStoreStreamDisposalAsync(Task streamTask) { try { - await streamTask.WaitAsync(_sessionCancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { + await streamTask.WaitAsync(sessionCancellationToken).ConfigureAwait(false); } finally { + using var _ = await _messageLock.LockAsync().ConfigureAwait(false); + try { - using var _ = await _messageLock.LockAsync().ConfigureAwait(false); - await _storeSseWriter!.DisposeAsync().ConfigureAwait(false); } catch (Exception ex)