From e48f897850b3944b4c35c91d730bf117d1701ed6 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Feb 2026 02:28:30 -0800 Subject: [PATCH 1/6] Serialize Linux URLSession request paths to mitigate _MultiHandle race --- .../Extensions/URLSession+Extensions.swift | 147 +++++++++++++++--- 1 file changed, 125 insertions(+), 22 deletions(-) diff --git a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift index 4c6d0cd..f4b07da 100644 --- a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift +++ b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift @@ -13,6 +13,56 @@ enum HTTP { } } +#if canImport(FoundationNetworking) + /// Serializes Linux URLSession operations to mitigate a FoundationNetworking race. + /// + /// AnyLanguageModel performs many concurrent HTTP requests across model implementations. + /// On Linux, `FoundationNetworking` routes `URLSession` through a shared + /// `_MultiHandle`, which has a known thread-safety bug that can crash under + /// concurrent access (`URLSession._MultiHandle.endOperation(for:)`). + /// + /// This gate intentionally allows only one in-flight request path at a time on Linux. + /// Keep this scoped to Linux-only code paths until the upstream issue is resolved. + /// + /// See: https://github.com/swiftlang/swift-corelibs-foundation/issues/4791 + private actor LinuxURLSessionRequestGate { + static let shared = LinuxURLSessionRequestGate() + + private var isLocked = false + private var waiters: [CheckedContinuation] = [] + + private func acquire() async { + if !isLocked { + isLocked = true + return + } + + await withCheckedContinuation { continuation in + waiters.append(continuation) + } + } + + private func release() { + if waiters.isEmpty { + isLocked = false + return + } + + let continuation = waiters.removeFirst() + continuation.resume() + } + + /// Executes an async operation while holding the gate lock. + func withLock( + _ operation: () async throws -> T + ) async rethrows -> T { + await acquire() + defer { release() } + return try await operation() + } + } +#endif + extension URLSession { func fetch( _ method: HTTP.Method, @@ -34,7 +84,14 @@ extension URLSession { request.addValue("application/json", forHTTPHeaderField: "Content-Type") } - let (data, response) = try await data(for: request) + #if canImport(FoundationNetworking) + let dataAndResponse = try await LinuxURLSessionRequestGate.shared.withLock { + try await data(for: request) + } + let (data, response) = dataAndResponse + #else + let (data, response) = try await data(for: request) + #endif guard let httpResponse = response as? HTTPURLResponse else { throw URLSessionError.invalidResponse @@ -83,7 +140,14 @@ extension URLSession { request.addValue("application/json", forHTTPHeaderField: "Content-Type") } - let (data, response) = try await self.data(for: request) + #if canImport(FoundationNetworking) + let dataAndResponse = try await LinuxURLSessionRequestGate.shared.withLock { + try await self.data(for: request) + } + let (data, response) = dataAndResponse + #else + let (data, response) = try await self.data(for: request) + #endif guard let httpResponse = response as? HTTPURLResponse else { throw URLSessionError.invalidResponse @@ -143,34 +207,73 @@ extension URLSession { } #if canImport(FoundationNetworking) - let (asyncBytes, response) = try await self.linuxBytes(for: request) + try await LinuxURLSessionRequestGate.shared.withLock { + let asyncBytesAndResponse = try await self.linuxBytes(for: request) + let (asyncBytes, response) = asyncBytesAndResponse + + guard let httpResponse = response as? HTTPURLResponse else { + throw URLSessionError.invalidResponse + } + + guard (200 ..< 300).contains(httpResponse.statusCode) else { + var errorData = Data() + for try await byte in asyncBytes { + errorData.append(byte) + } + if let errorString = String(data: errorData, encoding: .utf8) { + throw URLSessionError.httpError( + statusCode: httpResponse.statusCode, + detail: errorString + ) + } + throw URLSessionError.httpError( + statusCode: httpResponse.statusCode, + detail: "Invalid response" + ) + } + + let decoder = JSONDecoder() + + for try await event in asyncBytes.events { + guard let data = event.data.data(using: .utf8) else { continue } + if let decoded = try? decoder.decode(T.self, from: data) { + continuation.yield(decoded) + } + } + } #else let (asyncBytes, response) = try await self.bytes(for: request) - #endif - - guard let httpResponse = response as? HTTPURLResponse else { - throw URLSessionError.invalidResponse - } - guard (200 ..< 300).contains(httpResponse.statusCode) else { - var errorData = Data() - for try await byte in asyncBytes { - errorData.append(byte) + guard let httpResponse = response as? HTTPURLResponse else { + throw URLSessionError.invalidResponse } - if let errorString = String(data: errorData, encoding: .utf8) { - throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: errorString) + + guard (200 ..< 300).contains(httpResponse.statusCode) else { + var errorData = Data() + for try await byte in asyncBytes { + errorData.append(byte) + } + if let errorString = String(data: errorData, encoding: .utf8) { + throw URLSessionError.httpError( + statusCode: httpResponse.statusCode, + detail: errorString + ) + } + throw URLSessionError.httpError( + statusCode: httpResponse.statusCode, + detail: "Invalid response" + ) } - throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: "Invalid response") - } - let decoder = JSONDecoder() + let decoder = JSONDecoder() - for try await event in asyncBytes.events { - guard let data = event.data.data(using: .utf8) else { continue } - if let decoded = try? decoder.decode(T.self, from: data) { - continuation.yield(decoded) + for try await event in asyncBytes.events { + guard let data = event.data.data(using: .utf8) else { continue } + if let decoded = try? decoder.decode(T.self, from: data) { + continuation.yield(decoded) + } } - } + #endif continuation.finish() } catch { From 2027d9ea633b4728d62319e72f09913ad9306f56 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Feb 2026 02:44:33 -0800 Subject: [PATCH 2/6] Incorporate feedback from review --- .../Extensions/URLSession+Extensions.swift | 108 +++++++----------- .../URLSessionExtensionsTests.swift | 87 ++++++++++++++ 2 files changed, 130 insertions(+), 65 deletions(-) diff --git a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift index f4b07da..aa17292 100644 --- a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift +++ b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift @@ -22,10 +22,12 @@ enum HTTP { /// concurrent access (`URLSession._MultiHandle.endOperation(for:)`). /// /// This gate intentionally allows only one in-flight request path at a time on Linux. + /// This fully serializes HTTP request setup paths on Linux and reduces request-level + /// parallelism, which can lower throughput for heavily concurrent workloads. /// Keep this scoped to Linux-only code paths until the upstream issue is resolved. /// /// See: https://github.com/swiftlang/swift-corelibs-foundation/issues/4791 - private actor LinuxURLSessionRequestGate { + actor LinuxURLSessionRequestGate { static let shared = LinuxURLSessionRequestGate() private var isLocked = false @@ -207,74 +209,17 @@ extension URLSession { } #if canImport(FoundationNetworking) - try await LinuxURLSessionRequestGate.shared.withLock { - let asyncBytesAndResponse = try await self.linuxBytes(for: request) - let (asyncBytes, response) = asyncBytesAndResponse - - guard let httpResponse = response as? HTTPURLResponse else { - throw URLSessionError.invalidResponse - } - - guard (200 ..< 300).contains(httpResponse.statusCode) else { - var errorData = Data() - for try await byte in asyncBytes { - errorData.append(byte) - } - if let errorString = String(data: errorData, encoding: .utf8) { - throw URLSessionError.httpError( - statusCode: httpResponse.statusCode, - detail: errorString - ) - } - throw URLSessionError.httpError( - statusCode: httpResponse.statusCode, - detail: "Invalid response" - ) - } - - let decoder = JSONDecoder() - - for try await event in asyncBytes.events { - guard let data = event.data.data(using: .utf8) else { continue } - if let decoded = try? decoder.decode(T.self, from: data) { - continuation.yield(decoded) - } - } + let asyncBytes = try await LinuxURLSessionRequestGate.shared.withLock { + let (bytes, response) = try await self.linuxBytes(for: request) + try await self.validateEventStreamResponse(response, asyncBytes: bytes) + return bytes } + try await decodeAndYieldEventStream(asyncBytes, to: continuation) #else let (asyncBytes, response) = try await self.bytes(for: request) - - guard let httpResponse = response as? HTTPURLResponse else { - throw URLSessionError.invalidResponse - } - - guard (200 ..< 300).contains(httpResponse.statusCode) else { - var errorData = Data() - for try await byte in asyncBytes { - errorData.append(byte) - } - if let errorString = String(data: errorData, encoding: .utf8) { - throw URLSessionError.httpError( - statusCode: httpResponse.statusCode, - detail: errorString - ) - } - throw URLSessionError.httpError( - statusCode: httpResponse.statusCode, - detail: "Invalid response" - ) - } - - let decoder = JSONDecoder() - - for try await event in asyncBytes.events { - guard let data = event.data.data(using: .utf8) else { continue } - if let decoded = try? decoder.decode(T.self, from: data) { - continuation.yield(decoded) - } - } + try await validateEventStreamResponse(response, asyncBytes: asyncBytes) + try await decodeAndYieldEventStream(asyncBytes, to: continuation) #endif - continuation.finish() } catch { continuation.finish(throwing: error) @@ -286,6 +231,39 @@ extension URLSession { } } } + + private func validateEventStreamResponse( + _ response: URLResponse, + asyncBytes: Bytes + ) async throws where Bytes: AsyncSequence, Bytes.Element == UInt8 { + guard let httpResponse = response as? HTTPURLResponse else { + throw URLSessionError.invalidResponse + } + + guard (200 ..< 300).contains(httpResponse.statusCode) else { + var errorData = Data() + for try await byte in asyncBytes { + errorData.append(byte) + } + if let errorString = String(data: errorData, encoding: .utf8) { + throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: errorString) + } + throw URLSessionError.httpError(statusCode: httpResponse.statusCode, detail: "Invalid response") + } + } + + private func decodeAndYieldEventStream( + _ asyncBytes: Bytes, + to continuation: AsyncThrowingStream.Continuation + ) async throws where Bytes: AsyncSequence, Bytes.Element == UInt8 { + let decoder = JSONDecoder() + for try await event in asyncBytes.events { + guard let data = event.data.data(using: .utf8) else { continue } + if let decoded = try? decoder.decode(T.self, from: data) { + continuation.yield(decoded) + } + } + } } #if canImport(FoundationNetworking) diff --git a/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift b/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift index b672f92..d87a40a 100644 --- a/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift +++ b/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift @@ -19,3 +19,90 @@ struct URLSessionExtensionsTests { #expect(error.description == "Decoding error: keyNotFound") } } + +#if canImport(FoundationNetworking) + private actor GateCounter { + private(set) var current = 0 + private(set) var maxConcurrent = 0 + + func enter() { + current += 1 + maxConcurrent = max(maxConcurrent, current) + } + + func leave() { + current -= 1 + } + } + + private enum GateTestError: Error { + case expected + } + + extension URLSessionExtensionsTests { + @Test func linuxGateSerializesConcurrentOperations() async throws { + let gate = LinuxURLSessionRequestGate() + let counter = GateCounter() + + try await withThrowingTaskGroup(of: Void.self) { group in + for _ in 0 ..< 8 { + group.addTask { + try await gate.withLock { + await counter.enter() + do { + try await Task.sleep(for: .milliseconds(20)) + await counter.leave() + } catch { + await counter.leave() + throw error + } + } + } + } + try await group.waitForAll() + } + + #expect(await counter.maxConcurrent == 1) + } + + @Test func linuxGateReleasesAfterError() async throws { + let gate = LinuxURLSessionRequestGate() + + do { + _ = try await gate.withLock { + throw GateTestError.expected + } + Issue.record("Expected error was not thrown") + } catch GateTestError.expected { + // expected + } + + var ranSecondOperation = false + _ = try await gate.withLock { + ranSecondOperation = true + } + #expect(ranSecondOperation) + } + + @Test func linuxGateReleasesAfterCancellation() async throws { + let gate = LinuxURLSessionRequestGate() + + let longTask = Task { + try await gate.withLock { + try await Task.sleep(for: .seconds(10)) + } + } + + try await Task.sleep(for: .milliseconds(30)) + longTask.cancel() + _ = await longTask.result + + var acquiredAfterCancellation = false + _ = try await gate.withLock { + acquiredAfterCancellation = true + } + + #expect(acquiredAfterCancellation) + } + } +#endif From 260abcf1ec1196b2039a04d798b0a7f457b73f54 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Feb 2026 02:49:03 -0800 Subject: [PATCH 3/6] Replace withLock instance method with top-level withLinuxRequestLock helper --- .../Extensions/URLSession+Extensions.swift | 31 ++++++++++++------- .../URLSessionExtensionsTests.swift | 15 +++------ 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift index aa17292..70cd714 100644 --- a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift +++ b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift @@ -33,7 +33,7 @@ enum HTTP { private var isLocked = false private var waiters: [CheckedContinuation] = [] - private func acquire() async { + func acquire() async { if !isLocked { isLocked = true return @@ -44,7 +44,7 @@ enum HTTP { } } - private func release() { + func release() { if waiters.isEmpty { isLocked = false return @@ -54,13 +54,20 @@ enum HTTP { continuation.resume() } - /// Executes an async operation while holding the gate lock. - func withLock( - _ operation: () async throws -> T - ) async rethrows -> T { - await acquire() - defer { release() } - return try await operation() + } + + func withLinuxRequestLock( + _ operation: () async throws -> T + ) async rethrows -> T { + let gate = LinuxURLSessionRequestGate.shared + await gate.acquire() + do { + let result = try await operation() + await gate.release() + return result + } catch { + await gate.release() + throw error } } #endif @@ -87,7 +94,7 @@ extension URLSession { } #if canImport(FoundationNetworking) - let dataAndResponse = try await LinuxURLSessionRequestGate.shared.withLock { + let dataAndResponse = try await withLinuxRequestLock { try await data(for: request) } let (data, response) = dataAndResponse @@ -143,7 +150,7 @@ extension URLSession { } #if canImport(FoundationNetworking) - let dataAndResponse = try await LinuxURLSessionRequestGate.shared.withLock { + let dataAndResponse = try await withLinuxRequestLock { try await self.data(for: request) } let (data, response) = dataAndResponse @@ -209,7 +216,7 @@ extension URLSession { } #if canImport(FoundationNetworking) - let asyncBytes = try await LinuxURLSessionRequestGate.shared.withLock { + let asyncBytes = try await withLinuxRequestLock { let (bytes, response) = try await self.linuxBytes(for: request) try await self.validateEventStreamResponse(response, asyncBytes: bytes) return bytes diff --git a/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift b/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift index d87a40a..d8fb92b 100644 --- a/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift +++ b/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift @@ -41,13 +41,12 @@ struct URLSessionExtensionsTests { extension URLSessionExtensionsTests { @Test func linuxGateSerializesConcurrentOperations() async throws { - let gate = LinuxURLSessionRequestGate() let counter = GateCounter() try await withThrowingTaskGroup(of: Void.self) { group in for _ in 0 ..< 8 { group.addTask { - try await gate.withLock { + try await withLinuxRequestLock { await counter.enter() do { try await Task.sleep(for: .milliseconds(20)) @@ -66,10 +65,8 @@ struct URLSessionExtensionsTests { } @Test func linuxGateReleasesAfterError() async throws { - let gate = LinuxURLSessionRequestGate() - do { - _ = try await gate.withLock { + try await withLinuxRequestLock { throw GateTestError.expected } Issue.record("Expected error was not thrown") @@ -78,17 +75,15 @@ struct URLSessionExtensionsTests { } var ranSecondOperation = false - _ = try await gate.withLock { + try await withLinuxRequestLock { ranSecondOperation = true } #expect(ranSecondOperation) } @Test func linuxGateReleasesAfterCancellation() async throws { - let gate = LinuxURLSessionRequestGate() - let longTask = Task { - try await gate.withLock { + try await withLinuxRequestLock { try await Task.sleep(for: .seconds(10)) } } @@ -98,7 +93,7 @@ struct URLSessionExtensionsTests { _ = await longTask.result var acquiredAfterCancellation = false - _ = try await gate.withLock { + try await withLinuxRequestLock { acquiredAfterCancellation = true } From f94d23375fcff0fd686c68d50fecc19533e4091f Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Feb 2026 02:54:52 -0800 Subject: [PATCH 4/6] Fix Linux compiler bug around generic returning lock helper --- .../Extensions/URLSession+Extensions.swift | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift index 70cd714..ede16cc 100644 --- a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift +++ b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift @@ -56,15 +56,14 @@ enum HTTP { } - func withLinuxRequestLock( - _ operation: () async throws -> T - ) async rethrows -> T { + func withLinuxRequestLock( + _ operation: () async throws -> Void + ) async throws { let gate = LinuxURLSessionRequestGate.shared await gate.acquire() do { - let result = try await operation() + try await operation() await gate.release() - return result } catch { await gate.release() throw error @@ -94,8 +93,12 @@ extension URLSession { } #if canImport(FoundationNetworking) - let dataAndResponse = try await withLinuxRequestLock { - try await data(for: request) + var dataAndResponse: (Data, URLResponse)? + try await withLinuxRequestLock { + dataAndResponse = try await data(for: request) + } + guard let dataAndResponse else { + throw URLSessionError.invalidResponse } let (data, response) = dataAndResponse #else @@ -150,8 +153,12 @@ extension URLSession { } #if canImport(FoundationNetworking) - let dataAndResponse = try await withLinuxRequestLock { - try await self.data(for: request) + var dataAndResponse: (Data, URLResponse)? + try await withLinuxRequestLock { + dataAndResponse = try await self.data(for: request) + } + guard let dataAndResponse else { + throw URLSessionError.invalidResponse } let (data, response) = dataAndResponse #else @@ -216,10 +223,14 @@ extension URLSession { } #if canImport(FoundationNetworking) - let asyncBytes = try await withLinuxRequestLock { + var asyncBytes: AsyncThrowingStream? + try await withLinuxRequestLock { let (bytes, response) = try await self.linuxBytes(for: request) try await self.validateEventStreamResponse(response, asyncBytes: bytes) - return bytes + asyncBytes = bytes + } + guard let asyncBytes else { + throw URLSessionError.invalidResponse } try await decodeAndYieldEventStream(asyncBytes, to: continuation) #else From 6edc374bc6aa7e3f290c38e5b5b3d3b40fa9d515 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Feb 2026 02:59:25 -0800 Subject: [PATCH 5/6] More workarounds for Linux compiler bugs --- .../Extensions/URLSession+Extensions.swift | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift index ede16cc..87efc63 100644 --- a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift +++ b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift @@ -93,14 +93,16 @@ extension URLSession { } #if canImport(FoundationNetworking) - var dataAndResponse: (Data, URLResponse)? + var lockedData: Data? + var lockedResponse: URLResponse? try await withLinuxRequestLock { - dataAndResponse = try await data(for: request) + let (data, response) = try await data(for: request) + lockedData = data + lockedResponse = response } - guard let dataAndResponse else { + guard let data = lockedData, let response = lockedResponse else { throw URLSessionError.invalidResponse } - let (data, response) = dataAndResponse #else let (data, response) = try await data(for: request) #endif @@ -153,14 +155,16 @@ extension URLSession { } #if canImport(FoundationNetworking) - var dataAndResponse: (Data, URLResponse)? + var lockedData: Data? + var lockedResponse: URLResponse? try await withLinuxRequestLock { - dataAndResponse = try await self.data(for: request) + let (data, response) = try await self.data(for: request) + lockedData = data + lockedResponse = response } - guard let dataAndResponse else { + guard let data = lockedData, let response = lockedResponse else { throw URLSessionError.invalidResponse } - let (data, response) = dataAndResponse #else let (data, response) = try await self.data(for: request) #endif @@ -223,13 +227,13 @@ extension URLSession { } #if canImport(FoundationNetworking) - var asyncBytes: AsyncThrowingStream? + var lockedAsyncBytes: AsyncThrowingStream? try await withLinuxRequestLock { let (bytes, response) = try await self.linuxBytes(for: request) try await self.validateEventStreamResponse(response, asyncBytes: bytes) - asyncBytes = bytes + lockedAsyncBytes = bytes } - guard let asyncBytes else { + guard let asyncBytes = lockedAsyncBytes else { throw URLSessionError.invalidResponse } try await decodeAndYieldEventStream(asyncBytes, to: continuation) From f13a1366129fbfad940213ef433428a88807981c Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Feb 2026 03:49:25 -0800 Subject: [PATCH 6/6] Incorporate feedback from review --- .../Extensions/URLSession+Extensions.swift | 57 +++++++++++++++---- .../URLSessionExtensionsTests.swift | 37 ++++++++++++ 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift index 87efc63..8a1e111 100644 --- a/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift +++ b/Sources/AnyLanguageModel/Extensions/URLSession+Extensions.swift @@ -21,26 +21,48 @@ enum HTTP { /// `_MultiHandle`, which has a known thread-safety bug that can crash under /// concurrent access (`URLSession._MultiHandle.endOperation(for:)`). /// - /// This gate intentionally allows only one in-flight request path at a time on Linux. - /// This fully serializes HTTP request setup paths on Linux and reduces request-level - /// parallelism, which can lower throughput for heavily concurrent workloads. + /// This gate intentionally allows only one in-flight request setup path at a time on Linux. + /// For non-streaming requests, callers typically hold this lock for the entire + /// request/response cycle, effectively serializing those operations and reducing + /// request-level parallelism (which can lower throughput for heavily concurrent + /// workloads). + /// + /// For streaming requests, callers usually acquire the gate only during initial + /// request setup and then release it once the stream has been established; stream + /// consumption itself is not serialized by this gate. /// Keep this scoped to Linux-only code paths until the upstream issue is resolved. /// /// See: https://github.com/swiftlang/swift-corelibs-foundation/issues/4791 actor LinuxURLSessionRequestGate { + private struct Waiter { + let id: UUID + let continuation: CheckedContinuation + } + static let shared = LinuxURLSessionRequestGate() private var isLocked = false - private var waiters: [CheckedContinuation] = [] + private var waiters: [Waiter] = [] + + func acquire() async throws { + if Task.isCancelled { + throw CancellationError() + } - func acquire() async { if !isLocked { isLocked = true return } - await withCheckedContinuation { continuation in - waiters.append(continuation) + let waiterID = UUID() + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + waiters.append(Waiter(id: waiterID, continuation: continuation)) + } + } onCancel: { + Task { + await self.cancelWaiter(id: waiterID) + } } } @@ -50,8 +72,17 @@ enum HTTP { return } - let continuation = waiters.removeFirst() - continuation.resume() + let waiter = waiters.removeFirst() + waiter.continuation.resume() + } + + private func cancelWaiter(id: UUID) { + guard let index = waiters.firstIndex(where: { $0.id == id }) else { + return + } + + let waiter = waiters.remove(at: index) + waiter.continuation.resume(throwing: CancellationError()) } } @@ -60,7 +91,7 @@ enum HTTP { _ operation: () async throws -> Void ) async throws { let gate = LinuxURLSessionRequestGate.shared - await gate.acquire() + try await gate.acquire() do { try await operation() await gate.release() @@ -228,14 +259,16 @@ extension URLSession { #if canImport(FoundationNetworking) var lockedAsyncBytes: AsyncThrowingStream? + var lockedResponse: URLResponse? try await withLinuxRequestLock { let (bytes, response) = try await self.linuxBytes(for: request) - try await self.validateEventStreamResponse(response, asyncBytes: bytes) lockedAsyncBytes = bytes + lockedResponse = response } - guard let asyncBytes = lockedAsyncBytes else { + guard let asyncBytes = lockedAsyncBytes, let response = lockedResponse else { throw URLSessionError.invalidResponse } + try await self.validateEventStreamResponse(response, asyncBytes: asyncBytes) try await decodeAndYieldEventStream(asyncBytes, to: continuation) #else let (asyncBytes, response) = try await self.bytes(for: request) diff --git a/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift b/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift index d8fb92b..c1ac266 100644 --- a/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift +++ b/Tests/AnyLanguageModelTests/URLSessionExtensionsTests.swift @@ -39,6 +39,14 @@ struct URLSessionExtensionsTests { case expected } + private actor GateFlag { + private(set) var value = false + + func setTrue() { + value = true + } + } + extension URLSessionExtensionsTests { @Test func linuxGateSerializesConcurrentOperations() async throws { let counter = GateCounter() @@ -99,5 +107,34 @@ struct URLSessionExtensionsTests { #expect(acquiredAfterCancellation) } + + @Test func linuxGateCancelledWaiterDoesNotExecute() async throws { + let ranCancelledOperation = GateFlag() + + let holder = Task { + try await withLinuxRequestLock { + try await Task.sleep(for: .milliseconds(200)) + } + } + + try await Task.sleep(for: .milliseconds(20)) + + let waiter = Task { + do { + try await withLinuxRequestLock { + await ranCancelledOperation.setTrue() + } + } catch { + // Cancellation is expected. + } + } + + waiter.cancel() + _ = await waiter.result + try await holder.value + try await Task.sleep(for: .milliseconds(20)) + + #expect(await ranCancelledOperation.value == false) + } } #endif