diff --git a/driver-core/src/main/com/mongodb/MongoException.java b/driver-core/src/main/com/mongodb/MongoException.java
index a668dd344b..62610a62fe 100644
--- a/driver-core/src/main/com/mongodb/MongoException.java
+++ b/driver-core/src/main/com/mongodb/MongoException.java
@@ -50,6 +50,22 @@ public class MongoException extends RuntimeException {
*/
public static final String UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL = "UnknownTransactionCommitResult";
+ /**
+ * An error label indicating that the server is overloaded.
+ *
+ * @see #hasErrorLabel(String)
+ * @since 5.7
+ */
+ public static final String SYSTEM_OVERLOADED_ERROR_LABEL = "SystemOverloadedError";
+
+ /**
+ * An error label indicating that the operation is safely retryable.
+ *
+ * @see #hasErrorLabel(String)
+ * @since 5.7
+ */
+ public static final String RETRYABLE_ERROR_LABEL = "RetryableError";
+
private static final long serialVersionUID = -4415279469780082174L;
private final int code;
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java
index af4acd8c03..b6154a00cf 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java
@@ -16,6 +16,7 @@
package com.mongodb.internal.connection;
+import com.mongodb.MongoException;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ServerDescription;
@@ -137,9 +138,28 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand
serverMonitor.connect();
} else if (sdamIssue.relatedToNetworkNotTimeout()
|| (beforeHandshake && (sdamIssue.relatedToNetworkTimeout() || sdamIssue.relatedToAuth()))) {
- updateDescription(sdamIssue.serverDescription());
- connectionPool.invalidate(sdamIssue.exception().orElse(null));
- serverMonitor.cancelCurrentCheck();
+ // Backpressure spec: Don't clear pool or mark server unknown for connection establishment failures
+ // (network errors or timeouts during handshake). Authentication errors after handshake should still
+ // clear the pool as they're not related to overload.
+ // TLS configuration errors (certificate validation, protocol mismatches) should also clear the pool
+ // as they indicate configuration issues, not server overload.
+ if (beforeHandshake && (sdamIssue.relatedToNetworkNotTimeout() || sdamIssue.relatedToNetworkTimeout())
+ && !sdamIssue.relatedToAuth() && !sdamIssue.relatedToTlsConfigurationError()) {
+ // Don't update server description to Unknown
+ // Don't invalidate the connection pool
+ // Apply error labels for backpressure
+ sdamIssue.exception().ifPresent(exception -> {
+ if (exception instanceof MongoException) {
+ MongoException mongoException = (MongoException) exception;
+ mongoException.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL);
+ mongoException.addLabel(MongoException.RETRYABLE_ERROR_LABEL);
+ }
+ });
+ } else {
+ updateDescription(sdamIssue.serverDescription());
+ connectionPool.invalidate(sdamIssue.exception().orElse(null));
+ serverMonitor.cancelCurrentCheck();
+ }
} else if (sdamIssue.relatedToWriteConcern() || sdamIssue.relatedToStalePrimary()) {
updateDescription(sdamIssue.serverDescription());
serverMonitor.connect();
diff --git a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java
index 7f014d7ede..a213508553 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java
@@ -30,6 +30,10 @@
import com.mongodb.connection.TopologyVersion;
import com.mongodb.lang.Nullable;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import java.security.cert.CertPathValidatorException;
+import java.security.cert.CertificateException;
import java.util.Optional;
import static com.mongodb.assertions.Assertions.assertNotNull;
@@ -162,6 +166,53 @@ boolean relatedToWriteConcern() {
return exception instanceof MongoWriteConcernWithResponseException;
}
+ /**
+ * Checks if the exception is related to TLS configuration errors that are NOT due to server overload.
+ * These include certificate validation failures, protocol mismatches, etc.
+ *
+ * @return true if this is a TLS configuration error (not network-related)
+ */
+ boolean relatedToTlsConfigurationError() {
+ if (!(exception instanceof MongoSocketException)) {
+ return false;
+ }
+ Throwable cause = exception.getCause();
+ while (cause != null) {
+ // Check for various certificate validation and TLS configuration errors
+ if (cause instanceof CertificateException
+ || cause instanceof CertPathValidatorException
+ || cause instanceof SSLPeerUnverifiedException) {
+ return true; // Certificate/peer validation failure
+ }
+
+ // Check for SunCertPathBuilderException by class name to avoid compile-time dependency on internal classes
+ String className = cause.getClass().getName();
+ if (className.equals("sun.security.provider.certpath.SunCertPathBuilderException")) {
+ return true; // Certificate path building failure
+ }
+
+ // SSLHandshakeException can be either network or config, so we check the message
+ if (cause instanceof SSLHandshakeException) {
+ String message = cause.getMessage();
+ if (message != null) {
+ String lowerMessage = message.toLowerCase();
+ // These indicate configuration issues, not network issues
+ if (lowerMessage.contains("certificate")
+ || lowerMessage.contains("verify")
+ || lowerMessage.contains("trust")
+ || lowerMessage.contains("hostname")
+ || lowerMessage.contains("protocol")
+ || lowerMessage.contains("cipher")
+ || lowerMessage.contains("handshake_failure")) {
+ return true;
+ }
+ }
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
+
private static boolean stale(@Nullable final Throwable t, final ServerDescription currentServerDescription) {
return TopologyVersionHelper.topologyVersion(t)
.map(candidateTopologyVersion -> TopologyVersionHelper.newerOrEqual(
diff --git a/driver-core/src/test/resources/specifications b/driver-core/src/test/resources/specifications
index a8d34be0df..28fdaf9b37 160000
--- a/driver-core/src/test/resources/specifications
+++ b/driver-core/src/test/resources/specifications
@@ -1 +1 @@
-Subproject commit a8d34be0df234365600a9269af5a463f581562fd
+Subproject commit 28fdaf9b37851f8d479a510be9f3717338c94608
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy
index 3910da575f..a9fb84d4ba 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy
@@ -234,10 +234,12 @@ class DefaultServerSpecification extends Specification {
]
}
- def 'failed open should invalidate the server'() {
+ def 'network error should not invalidate the pool'() {
given:
def connectionPool = Mock(ConnectionPool)
- connectionPool.get(_) >> { throw exceptionToThrow }
+ connectionPool.get(_) >> {
+ throw exceptionToThrow
+ }
def serverMonitor = Mock(ServerMonitor)
def server = defaultServer(connectionPool, serverMonitor)
@@ -247,8 +249,8 @@ class DefaultServerSpecification extends Specification {
then:
def e = thrown(MongoException)
e.is(exceptionToThrow)
- 1 * connectionPool.invalidate(exceptionToThrow)
- 1 * serverMonitor.cancelCurrentCheck()
+ 0 * connectionPool.invalidate(_)
+ 0 * serverMonitor.cancelCurrentCheck()
where:
exceptionToThrow << [
@@ -281,7 +283,7 @@ class DefaultServerSpecification extends Specification {
]
}
- def 'failed open should invalidate the server asynchronously'() {
+ def 'failed open should not invalidate the pool asynchronously'() {
given:
def connectionPool = Mock(ConnectionPool)
connectionPool.getAsync(_, _) >> { it.last().onResult(null, exceptionToThrow) }
@@ -301,8 +303,8 @@ class DefaultServerSpecification extends Specification {
then:
!receivedConnection
receivedThrowable.is(exceptionToThrow)
- 1 * connectionPool.invalidate(exceptionToThrow)
- 1 * serverMonitor.cancelCurrentCheck()
+ 0 * connectionPool.invalidate(exceptionToThrow)
+ 0 * serverMonitor.cancelCurrentCheck()
where:
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java
index 18b3b3f4fc..943ee78de3 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java
@@ -18,6 +18,7 @@
import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
+import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
@@ -47,7 +48,10 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import static com.mongodb.ClusterFixture.configureFailPoint;
import static com.mongodb.ClusterFixture.disableFailPoint;
@@ -268,6 +272,80 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() {
// As it requires mocking and package access to `com.mongodb.internal.connection`
}
+ /**
+ * See
+ * Connection Pool Backpressure.
+ */
+ @Test
+ public void testConnectionPoolBackpressure() throws InterruptedException {
+ assumeTrue(serverVersionAtLeast(7, 0));
+
+ AtomicInteger connectionCheckOutFailedEventCount = new AtomicInteger(0);
+ AtomicInteger poolClearedEventCount = new AtomicInteger(0);
+
+ ConnectionPoolListener connectionPoolListener = new ConnectionPoolListener() {
+ @Override
+ public void connectionCheckOutFailed(final ConnectionCheckOutFailedEvent event) {
+ connectionCheckOutFailedEventCount.incrementAndGet();
+ }
+
+ @Override
+ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
+ poolClearedEventCount.incrementAndGet();
+ }
+ };
+
+ MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
+ .applyToConnectionPoolSettings(builder -> builder
+ .maxConnecting(100)
+ .addConnectionPoolListener(connectionPoolListener))
+ .build();
+
+ try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build());
+ MongoClient client = MongoClients.create(clientSettings)) {
+
+ MongoDatabase adminDatabase = adminClient.getDatabase("admin");
+ MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
+ MongoCollection collection = database.getCollection("testCollection");
+
+ // Configure rate limiter using admin commands
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentRateLimiterEnabled", true));
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentRatePerSec", 20));
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentBurstCapacitySecs", 1));
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentMaxQueueDepth", 1));
+
+ // Add a document to the collection
+ collection.insertOne(Document.parse("{}"));// change
+
+ // Run 100 parallel find operations with 2-seconds sleep
+ ExecutorService executor = Executors.newFixedThreadPool(100);
+ for (int i = 0; i < 100; i++) {
+ executor.submit(() -> collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first());
+ }
+
+ // Wait for all operations to complete (max 10 seconds)
+ executor.shutdown();
+ boolean terminated = executor.awaitTermination(10, SECONDS);
+ assertTrue("Executor did not terminate within timeout", terminated);
+
+ // Assert at least 10 ConnectionCheckOutFailedEvents occurred
+ assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + connectionCheckOutFailedEventCount.get(),
+ connectionCheckOutFailedEventCount.get() >= 10);
+
+ // Assert 0 PoolClearedEvents occurred
+ assertEquals("Expected 0 PoolClearedEvents", 0, poolClearedEventCount.get());
+
+ // Teardown: sleep 1 second and reset rate limiter
+ Thread.sleep(1000);
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentRateLimiterEnabled", false));
+ }
+ }
+
private static void assertPoll(final BlockingQueue> queue, @Nullable final Class> allowed, final Set> required)
throws InterruptedException {
assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED));
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
index b2718b4b2d..7c069b8d95 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
@@ -436,9 +436,16 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e
switch (newType) {
case "Unknown":
return event.getNewDescription().getType() == ServerType.UNKNOWN;
- case "LoadBalancer": {
+ case "LoadBalancer":
return event.getNewDescription().getType() == ServerType.LOAD_BALANCER;
- }
+ case "Mongos":
+ return event.getNewDescription().getType() == ServerType.SHARD_ROUTER;
+ case "Standalone":
+ return event.getNewDescription().getType() == ServerType.STANDALONE;
+ case "RSPrimary":
+ return event.getNewDescription().getType() == ServerType.REPLICA_SET_PRIMARY;
+ case "RSSecondary":
+ return event.getNewDescription().getType() == ServerType.REPLICA_SET_SECONDARY;
default:
throw new UnsupportedOperationException();
}