From cd6a2ac93311a54c83ab5558a5c86f8261477c10 Mon Sep 17 00:00:00 2001 From: Nabil Hachicha Date: Thu, 11 Dec 2025 15:35:23 +0000 Subject: [PATCH 1/4] Fixes JAVA-5949 prevent connection churn on backpressure errors when establishing connections --- .../src/main/com/mongodb/MongoException.java | 16 ++++++ .../DefaultSdamServerDescriptionManager.java | 26 ++++++++-- .../InternalStreamConnectionInitializer.java | 2 + .../SdamServerDescriptionManager.java | 51 +++++++++++++++++++ 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/driver-core/src/main/com/mongodb/MongoException.java b/driver-core/src/main/com/mongodb/MongoException.java index a668dd344b7..62610a62feb 100644 --- a/driver-core/src/main/com/mongodb/MongoException.java +++ b/driver-core/src/main/com/mongodb/MongoException.java @@ -50,6 +50,22 @@ public class MongoException extends RuntimeException { */ public static final String UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL = "UnknownTransactionCommitResult"; + /** + * An error label indicating that the server is overloaded. + * + * @see #hasErrorLabel(String) + * @since 5.7 + */ + public static final String SYSTEM_OVERLOADED_ERROR_LABEL = "SystemOverloadedError"; + + /** + * An error label indicating that the operation is safely retryable. + * + * @see #hasErrorLabel(String) + * @since 5.7 + */ + public static final String RETRYABLE_ERROR_LABEL = "RetryableError"; + private static final long serialVersionUID = -4415279469780082174L; private final int code; diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java index af4acd8c031..b6154a00cf2 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java @@ -16,6 +16,7 @@ package com.mongodb.internal.connection; +import com.mongodb.MongoException; import com.mongodb.annotations.ThreadSafe; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ServerDescription; @@ -137,9 +138,28 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand serverMonitor.connect(); } else if (sdamIssue.relatedToNetworkNotTimeout() || (beforeHandshake && (sdamIssue.relatedToNetworkTimeout() || sdamIssue.relatedToAuth()))) { - updateDescription(sdamIssue.serverDescription()); - connectionPool.invalidate(sdamIssue.exception().orElse(null)); - serverMonitor.cancelCurrentCheck(); + // Backpressure spec: Don't clear pool or mark server unknown for connection establishment failures + // (network errors or timeouts during handshake). Authentication errors after handshake should still + // clear the pool as they're not related to overload. + // TLS configuration errors (certificate validation, protocol mismatches) should also clear the pool + // as they indicate configuration issues, not server overload. + if (beforeHandshake && (sdamIssue.relatedToNetworkNotTimeout() || sdamIssue.relatedToNetworkTimeout()) + && !sdamIssue.relatedToAuth() && !sdamIssue.relatedToTlsConfigurationError()) { + // Don't update server description to Unknown + // Don't invalidate the connection pool + // Apply error labels for backpressure + sdamIssue.exception().ifPresent(exception -> { + if (exception instanceof MongoException) { + MongoException mongoException = (MongoException) exception; + mongoException.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL); + mongoException.addLabel(MongoException.RETRYABLE_ERROR_LABEL); + } + }); + } else { + updateDescription(sdamIssue.serverDescription()); + connectionPool.invalidate(sdamIssue.exception().orElse(null)); + serverMonitor.cancelCurrentCheck(); + } } else if (sdamIssue.relatedToWriteConcern() || sdamIssue.relatedToStalePrimary()) { updateDescription(sdamIssue.serverDescription()); serverMonitor.connect(); diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index 574a85669d0..6463a101dfd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -198,6 +198,8 @@ private BsonDocument createHelloCommand(final Authenticator authenticator, final helloCommandDocument.append("speculativeAuthenticate", speculativeAuthenticateDocument); } } + // Add backpressure support indication + helloCommandDocument.append("backpressure", BsonBoolean.TRUE); return helloCommandDocument; } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java index 7f014d7ede6..a2135085535 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java @@ -30,6 +30,10 @@ import com.mongodb.connection.TopologyVersion; import com.mongodb.lang.Nullable; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; +import java.security.cert.CertPathValidatorException; +import java.security.cert.CertificateException; import java.util.Optional; import static com.mongodb.assertions.Assertions.assertNotNull; @@ -162,6 +166,53 @@ boolean relatedToWriteConcern() { return exception instanceof MongoWriteConcernWithResponseException; } + /** + * Checks if the exception is related to TLS configuration errors that are NOT due to server overload. + * These include certificate validation failures, protocol mismatches, etc. + * + * @return true if this is a TLS configuration error (not network-related) + */ + boolean relatedToTlsConfigurationError() { + if (!(exception instanceof MongoSocketException)) { + return false; + } + Throwable cause = exception.getCause(); + while (cause != null) { + // Check for various certificate validation and TLS configuration errors + if (cause instanceof CertificateException + || cause instanceof CertPathValidatorException + || cause instanceof SSLPeerUnverifiedException) { + return true; // Certificate/peer validation failure + } + + // Check for SunCertPathBuilderException by class name to avoid compile-time dependency on internal classes + String className = cause.getClass().getName(); + if (className.equals("sun.security.provider.certpath.SunCertPathBuilderException")) { + return true; // Certificate path building failure + } + + // SSLHandshakeException can be either network or config, so we check the message + if (cause instanceof SSLHandshakeException) { + String message = cause.getMessage(); + if (message != null) { + String lowerMessage = message.toLowerCase(); + // These indicate configuration issues, not network issues + if (lowerMessage.contains("certificate") + || lowerMessage.contains("verify") + || lowerMessage.contains("trust") + || lowerMessage.contains("hostname") + || lowerMessage.contains("protocol") + || lowerMessage.contains("cipher") + || lowerMessage.contains("handshake_failure")) { + return true; + } + } + } + cause = cause.getCause(); + } + return false; + } + private static boolean stale(@Nullable final Throwable t, final ServerDescription currentServerDescription) { return TopologyVersionHelper.topologyVersion(t) .map(candidateTopologyVersion -> TopologyVersionHelper.newerOrEqual( From 0d102c79af0e2dcf79f606782ccb768c5155efd9 Mon Sep 17 00:00:00 2001 From: Nabil Hachicha Date: Mon, 15 Dec 2025 16:08:49 +0000 Subject: [PATCH 2/4] Remove handshake and update submodule including new tests --- .../connection/InternalStreamConnectionInitializer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index 6463a101dfd..574a85669d0 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -198,8 +198,6 @@ private BsonDocument createHelloCommand(final Authenticator authenticator, final helloCommandDocument.append("speculativeAuthenticate", speculativeAuthenticateDocument); } } - // Add backpressure support indication - helloCommandDocument.append("backpressure", BsonBoolean.TRUE); return helloCommandDocument; } From db9eb2037e5bb38ca239bc293f72c71e15cd0d4f Mon Sep 17 00:00:00 2001 From: Nabil Hachicha Date: Tue, 20 Jan 2026 13:02:32 +0000 Subject: [PATCH 3/4] Update spec test; fix test runner --- driver-core/src/test/resources/specifications | 2 +- .../connection/DefaultServerSpecification.groovy | 16 +++++++++------- .../com/mongodb/client/unified/EventMatcher.java | 11 +++++++++-- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/driver-core/src/test/resources/specifications b/driver-core/src/test/resources/specifications index a8d34be0df2..28fdaf9b378 160000 --- a/driver-core/src/test/resources/specifications +++ b/driver-core/src/test/resources/specifications @@ -1 +1 @@ -Subproject commit a8d34be0df234365600a9269af5a463f581562fd +Subproject commit 28fdaf9b37851f8d479a510be9f3717338c94608 diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy index 3910da575f0..a9fb84d4ba5 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy @@ -234,10 +234,12 @@ class DefaultServerSpecification extends Specification { ] } - def 'failed open should invalidate the server'() { + def 'network error should not invalidate the pool'() { given: def connectionPool = Mock(ConnectionPool) - connectionPool.get(_) >> { throw exceptionToThrow } + connectionPool.get(_) >> { + throw exceptionToThrow + } def serverMonitor = Mock(ServerMonitor) def server = defaultServer(connectionPool, serverMonitor) @@ -247,8 +249,8 @@ class DefaultServerSpecification extends Specification { then: def e = thrown(MongoException) e.is(exceptionToThrow) - 1 * connectionPool.invalidate(exceptionToThrow) - 1 * serverMonitor.cancelCurrentCheck() + 0 * connectionPool.invalidate(_) + 0 * serverMonitor.cancelCurrentCheck() where: exceptionToThrow << [ @@ -281,7 +283,7 @@ class DefaultServerSpecification extends Specification { ] } - def 'failed open should invalidate the server asynchronously'() { + def 'failed open should not invalidate the pool asynchronously'() { given: def connectionPool = Mock(ConnectionPool) connectionPool.getAsync(_, _) >> { it.last().onResult(null, exceptionToThrow) } @@ -301,8 +303,8 @@ class DefaultServerSpecification extends Specification { then: !receivedConnection receivedThrowable.is(exceptionToThrow) - 1 * connectionPool.invalidate(exceptionToThrow) - 1 * serverMonitor.cancelCurrentCheck() + 0 * connectionPool.invalidate(exceptionToThrow) + 0 * serverMonitor.cancelCurrentCheck() where: diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index b2718b4b2d7..7c069b8d95b 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -436,9 +436,16 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e switch (newType) { case "Unknown": return event.getNewDescription().getType() == ServerType.UNKNOWN; - case "LoadBalancer": { + case "LoadBalancer": return event.getNewDescription().getType() == ServerType.LOAD_BALANCER; - } + case "Mongos": + return event.getNewDescription().getType() == ServerType.SHARD_ROUTER; + case "Standalone": + return event.getNewDescription().getType() == ServerType.STANDALONE; + case "RSPrimary": + return event.getNewDescription().getType() == ServerType.REPLICA_SET_PRIMARY; + case "RSSecondary": + return event.getNewDescription().getType() == ServerType.REPLICA_SET_SECONDARY; default: throw new UnsupportedOperationException(); } From 2f0f5be0296d8fce18a488f91552e438201a8e67 Mon Sep 17 00:00:00 2001 From: Nabil Hachicha Date: Mon, 26 Jan 2026 16:53:21 +0000 Subject: [PATCH 4/4] Add prose test --- ...erverDiscoveryAndMonitoringProseTests.java | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java index 18b3b3f4fc5..943ee78de31 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java @@ -18,6 +18,7 @@ import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; +import com.mongodb.event.ConnectionCheckOutFailedEvent; import com.mongodb.event.ConnectionPoolClearedEvent; import com.mongodb.event.ConnectionPoolListener; import com.mongodb.event.ConnectionPoolReadyEvent; @@ -47,7 +48,10 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import static com.mongodb.ClusterFixture.configureFailPoint; import static com.mongodb.ClusterFixture.disableFailPoint; @@ -268,6 +272,80 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() { // As it requires mocking and package access to `com.mongodb.internal.connection` } + /** + * See + * Connection Pool Backpressure. + */ + @Test + public void testConnectionPoolBackpressure() throws InterruptedException { + assumeTrue(serverVersionAtLeast(7, 0)); + + AtomicInteger connectionCheckOutFailedEventCount = new AtomicInteger(0); + AtomicInteger poolClearedEventCount = new AtomicInteger(0); + + ConnectionPoolListener connectionPoolListener = new ConnectionPoolListener() { + @Override + public void connectionCheckOutFailed(final ConnectionCheckOutFailedEvent event) { + connectionCheckOutFailedEventCount.incrementAndGet(); + } + + @Override + public void connectionPoolCleared(final ConnectionPoolClearedEvent event) { + poolClearedEventCount.incrementAndGet(); + } + }; + + MongoClientSettings clientSettings = getMongoClientSettingsBuilder() + .applyToConnectionPoolSettings(builder -> builder + .maxConnecting(100) + .addConnectionPoolListener(connectionPoolListener)) + .build(); + + try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build()); + MongoClient client = MongoClients.create(clientSettings)) { + + MongoDatabase adminDatabase = adminClient.getDatabase("admin"); + MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); + MongoCollection collection = database.getCollection("testCollection"); + + // Configure rate limiter using admin commands + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentRateLimiterEnabled", true)); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentRatePerSec", 20)); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentBurstCapacitySecs", 1)); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentMaxQueueDepth", 1)); + + // Add a document to the collection + collection.insertOne(Document.parse("{}"));// change + + // Run 100 parallel find operations with 2-seconds sleep + ExecutorService executor = Executors.newFixedThreadPool(100); + for (int i = 0; i < 100; i++) { + executor.submit(() -> collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first()); + } + + // Wait for all operations to complete (max 10 seconds) + executor.shutdown(); + boolean terminated = executor.awaitTermination(10, SECONDS); + assertTrue("Executor did not terminate within timeout", terminated); + + // Assert at least 10 ConnectionCheckOutFailedEvents occurred + assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + connectionCheckOutFailedEventCount.get(), + connectionCheckOutFailedEventCount.get() >= 10); + + // Assert 0 PoolClearedEvents occurred + assertEquals("Expected 0 PoolClearedEvents", 0, poolClearedEventCount.get()); + + // Teardown: sleep 1 second and reset rate limiter + Thread.sleep(1000); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentRateLimiterEnabled", false)); + } + } + private static void assertPoll(final BlockingQueue queue, @Nullable final Class allowed, final Set> required) throws InterruptedException { assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED));