From 9c44ffc54dd2d426c4f53cabf317ae8895f764f2 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 20 Aug 2025 22:41:25 -0700 Subject: [PATCH 1/9] Adjust timeout handling in client-side operations to account for RTT variations. JAVA-5375 --- .../com/mongodb/ClusterFixture.java | 6 +- .../mongodb/internal/TimeoutContextTest.java | 7 +- .../ClientSideOperationTimeoutProseTest.java | 36 ++--- ...tClientSideOperationsTimeoutProseTest.java | 133 +++++++++++------- ...eOperationsEncryptionTimeoutProseTest.java | 6 +- .../unified/UnifiedTestModifications.java | 12 ++ 6 files changed, 115 insertions(+), 85 deletions(-) diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index 6bbf9233cb1..30a28027002 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -475,7 +475,9 @@ private static Cluster createCluster(final MongoCredential credential, final Str } private static Cluster createCluster(final ConnectionString connectionString, final StreamFactory streamFactory) { - MongoClientSettings mongoClientSettings = MongoClientSettings.builder().applyConnectionString(connectionString).build(); + MongoClientSettings mongoClientSettings = MongoClientSettings.builder().applyConnectionString(connectionString) + .applyToServerSettings(builder -> builder.heartbeatFrequency(1, SECONDS).minHeartbeatFrequency(1, MILLISECONDS)) + .build(); return new DefaultClusterFactory().createCluster(mongoClientSettings.getClusterSettings(), mongoClientSettings.getServerSettings(), mongoClientSettings.getConnectionPoolSettings(), @@ -570,7 +572,7 @@ public static ServerAddress getSecondary() { return serverDescriptions.get(0).getAddress(); } - public static void sleep(final int sleepMS) { + public static void sleep(final long sleepMS) { try { Thread.sleep(sleepMS); } catch (InterruptedException e) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index 130d408076e..545dc4b5458 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -331,9 +331,10 @@ static Stream shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS() ); } - @ParameterizedTest - @MethodSource @DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS") + @ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. " + + "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}") + @MethodSource void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS, final Long timeoutMS, final long expected) { @@ -345,7 +346,7 @@ void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTim 0)); long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs(); - assertTrue(expected - calculatedTimeoutMS <= 1); + assertTrue(expected - calculatedTimeoutMS <= 2); } private TimeoutContextTest() { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 5613e6dbcd8..9823fda2479 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -16,7 +16,6 @@ package com.mongodb.reactivestreams.client; -import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; import com.mongodb.MongoNamespace; @@ -25,7 +24,6 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest; -import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandStartedEvent; @@ -108,7 +106,6 @@ protected boolean isAsync() { @Override public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); //given collectionHelper.runAdminCommand("{" @@ -117,12 +114,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 405) + + " blockTimeMS: " + 405 + " }" + "}"); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 400, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(400, 2), TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -164,7 +161,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { @Override public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); //given CompletableFuture droppedErrorFuture = new CompletableFuture<>(); @@ -176,12 +172,12 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I + " data: {" + " failCommands: [\"delete\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 405) + + " blockTimeMS: " + 405 + " }" + "}"); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 400, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(400, 2), TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -228,9 +224,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { assumeTrue(isDiscoverableReplicaSet()); //given - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 500, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(500), TimeUnit.MILLISECONDS))) { MongoNamespace namespace = generateNamespace(); MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) @@ -282,9 +277,8 @@ public void testTimeoutMSAppliedToInitialAggregate() { assumeTrue(isDiscoverableReplicaSet()); //given - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { + .timeout(200, TimeUnit.MILLISECONDS))) { MongoNamespace namespace = generateNamespace(); MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) @@ -299,7 +293,7 @@ public void testTimeoutMSAppliedToInitialAggregate() { + " data: {" + " failCommands: [\"aggregate\" ]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 201) + + " blockTimeMS: " + 201 + " }" + "}"); @@ -330,13 +324,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { //given BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); sleep(2000); - - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(500), TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -347,7 +338,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { + " data: {" + " failCommands: [\"getMore\", \"aggregate\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 200) + + " blockTimeMS: " + 200 + " }" + "}"); @@ -398,12 +389,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { //given BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); sleep(2000); - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(500), TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()) @@ -415,7 +404,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { + " data: {" + " failCommands: [\"aggregate\", \"getMore\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 200) + + " blockTimeMS: " + 200 + " }" + "}"); @@ -458,9 +447,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt assumeTrue(isDiscoverableReplicaSet()); //given - long rtt = ClusterFixture.getPrimaryRTT(); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(rtt + 2500, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(2500), TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 5cb042eaad4..9caff3a073f 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -51,11 +51,8 @@ import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.test.FlakyTest; -import org.bson.BsonArray; -import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonInt32; -import org.bson.BsonString; import org.bson.BsonTimestamp; import org.bson.Document; import org.bson.codecs.BsonDocumentCodec; @@ -246,7 +243,6 @@ public void testBlockingIterationMethodsChangeStream() { assumeFalse(isAsync()); // Async change stream cursor is non-deterministic for cursor::next BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); sleep(2000); collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY); @@ -288,7 +284,6 @@ public void testBlockingIterationMethodsChangeStream() { @FlakyTest(maxAttempts = 3) public void testGridFSUploadViaOpenUploadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," @@ -296,7 +291,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 205) + + " blockTimeMS: " + 205 + " }" + "}"); @@ -304,7 +299,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { filesCollectionHelper.create(); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(200, 2), TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -319,7 +314,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { @Test public void testAbortingGridFsUploadStreamTimeout() throws Throwable { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," @@ -327,7 +321,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { + " data: {" + " failCommands: [\"delete\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 305) + + " blockTimeMS: " + 320 + " }" + "}"); @@ -335,7 +329,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { filesCollectionHelper.create(); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(300, 2), TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2); @@ -350,7 +344,6 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { @Test public void testGridFsDownloadStreamTimeout() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); chunksCollectionHelper.create(); filesCollectionHelper.create(); @@ -372,18 +365,19 @@ public void testGridFsDownloadStreamTimeout() { + " metadata: {}" + "}" )), WriteConcern.MAJORITY); + collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," + " mode: { skip: 1 }," + " data: {" + " failCommands: [\"find\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 95) + + " blockTimeMS: " + 500 + " }" + "}"); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(rtt + 100, TimeUnit.MILLISECONDS))) { + .timeout(withRttAdjustment(300, 2), TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2); @@ -391,7 +385,9 @@ public void testGridFsDownloadStreamTimeout() { assertThrows(MongoOperationTimeoutException.class, downloadStream::read); List events = commandListener.getCommandStartedEvents(); - List findCommands = events.stream().filter(e -> e.getCommandName().equals("find")).collect(Collectors.toList()); + List findCommands = events.stream() + .filter(e -> e.getCommandName().equals("find")) + .collect(Collectors.toList()); assertEquals(2, findCommands.size()); assertEquals(gridFsFileNamespace.getCollectionName(), findCommands.get(0).getCommand().getString("find").getValue()); @@ -404,7 +400,7 @@ public void testGridFsDownloadStreamTimeout() { @ParameterizedTest(name = "[{index}] {0}") @MethodSource("test8ServerSelectionArguments") public void test8ServerSelection(final String connectionString) { - int timeoutBuffer = 100; // 5 in spec, Java is slower + int timeoutBuffer = 150; // 5 in spec, Java is slower // 1. Create a MongoClient try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() .applyConnectionString(new ConnectionString(connectionString))) @@ -417,6 +413,7 @@ public void test8ServerSelection(final String connectionString) { // Expect this to fail with a server selection timeout error after no more than 15ms [this is increased] long elapsed = msElapsedSince(start); assertTrue(throwable.getMessage().contains("while waiting for a server")); + System.err.println("Elapsed time: " + elapsed + "ms"); assertTrue(elapsed < 10 + timeoutBuffer, "Took too long to time out, elapsedMS: " + elapsed); } } @@ -440,7 +437,7 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in + " data: {" + " failCommands: [\"saslContinue\"]," + " blockConnection: true," - + " blockTimeMS: 350" + + " blockTimeMS: 600" + " }" + "}"); @@ -456,7 +453,8 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in .insertOne(new Document("x", 1)); }); long elapsed = msElapsedSince(start); - assertTrue(elapsed <= 310, "Took too long to time out, elapsedMS: " + elapsed); + System.err.println("test8ServerSelectionHandshake elapsed " + elapsed + "ms"); + assertTrue(elapsed <= 350, "Took too long to time out, elapsedMS: " + elapsed); } } @@ -473,23 +471,23 @@ public void test9EndSessionClientTimeout() { + " data: {" + " failCommands: [\"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: " + 700 + " }" + "}"); try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder().retryWrites(false) - .timeout(100, TimeUnit.MILLISECONDS))) { - MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .timeout(withRttAdjustment(500, 2), TimeUnit.MILLISECONDS))) { + MongoDatabase database = mongoClient.getDatabase(namespace.getDatabaseName()); + MongoCollection collection = database .getCollection(namespace.getCollectionName()); try (ClientSession session = mongoClient.startSession()) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); - long start = System.nanoTime(); session.close(); long elapsed = msElapsedSince(start) - postSessionCloseSleep(); - assertTrue(elapsed <= 150, "Took too long to time out, elapsedMS: " + elapsed); + assertTrue(elapsed <= 700, "Took too long to time out, elapsedMS: " + elapsed); } } CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> @@ -510,7 +508,7 @@ public void test9EndSessionSessionTimeout() { + " data: {" + " failCommands: [\"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: " + 400 + " }" + "}"); @@ -519,14 +517,14 @@ public void test9EndSessionSessionTimeout() { .getCollection(namespace.getCollectionName()); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(100, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(withRttAdjustment(300, 2), TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); long start = System.nanoTime(); session.close(); long elapsed = msElapsedSince(start) - postSessionCloseSleep(); - assertTrue(elapsed <= 150, "Took too long to time out, elapsedMS: " + elapsed); + assertTrue(elapsed <= withRttAdjustment(400, 2), "Took too long to time out, elapsedMS: " + elapsed); } } CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> @@ -553,11 +551,12 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithCommit() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + long defaultTimeout = withRttAdjustment(300); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); assertDoesNotThrow(session::commitTransaction); } @@ -584,11 +583,12 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithAbort() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + long defaultTimeout = withRttAdjustment(300); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); assertDoesNotThrow(session::close); } @@ -608,12 +608,12 @@ public void test10ConvenientTransactions() { + " data: {" + " failCommands: [\"insert\", \"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: " + 200 + " }" + "}"); try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() - .timeout(100, TimeUnit.MILLISECONDS))) { + .timeout(150, TimeUnit.MILLISECONDS))) { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); @@ -651,12 +651,13 @@ public void test10CustomTestWithTransactionUsesASingleTimeout() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + long defaultTimeout = withRttAdjustment(200); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { assertThrows(MongoOperationTimeoutException.class, () -> session.withTransaction(() -> { collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); return true; }) ); @@ -686,12 +687,13 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + long defaultTimeout = withRttAdjustment(200); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { assertThrows(MongoOperationTimeoutException.class, () -> session.withTransaction(() -> { collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); return true; }) ); @@ -700,7 +702,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { } @DisplayName("11. Multi-batch bulkWrites") - @Test + @FlakyTest(maxAttempts = 3) @SuppressWarnings("try") protected void test11MultiBatchBulkWrites() throws InterruptedException { assumeTrue(serverVersionAtLeast(8, 0)); @@ -708,12 +710,22 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException { // a workaround for https://jira.mongodb.org/browse/DRIVERS-2997, remove this block when the aforementioned bug is fixed client.getDatabase(namespace.getDatabaseName()).drop(); } - BsonDocument failPointDocument = new BsonDocument("configureFailPoint", new BsonString("failCommand")) - .append("mode", new BsonDocument("times", new BsonInt32(2))) - .append("data", new BsonDocument("failCommands", new BsonArray(singletonList(new BsonString("bulkWrite")))) - .append("blockConnection", BsonBoolean.TRUE) - .append("blockTimeMS", new BsonInt32(2020))); - try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(4000, TimeUnit.MILLISECONDS)); + BsonDocument failPointDocument = BsonDocument.parse("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 2}," + + " data: {" + + " failCommands: [\"bulkWrite\" ]," + + " blockConnection: true," + + " blockTimeMS: " + 2020 + + " }" + + "}"); + + /* + We use 2 operation count to adjust the timeout by RTT, even though we have only one successful insert to account for, because + the payload is larger than measured RTT on "hello" command. + */ + long timeout = withRttAdjustment(4000, 2); + try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(timeout, TimeUnit.MILLISECONDS)); FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { MongoDatabase db = client.getDatabase(namespace.getDatabaseName()); db.drop(); @@ -736,8 +748,8 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException { * Not a prose spec test. However, it is additional test case for better coverage. */ @Test - @DisplayName("Should ignore wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations") - public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() { + @DisplayName("Should not include wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations") + public void shouldNotIncludeWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() { assumeTrue(serverVersionAtLeast(4, 4)); assumeFalse(isStandalone()); @@ -745,14 +757,15 @@ public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTran MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); + long defaultTimeout = withRttAdjustment(200); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(200, TimeUnit.MILLISECONDS) + .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS) .build())) { session.startTransaction(TransactionOptions.builder() .writeConcern(WriteConcern.ACKNOWLEDGED.withWTimeout(100, TimeUnit.MILLISECONDS)) .build()); collection.insertOne(session, new Document("x", 1)); - sleep(200); + sleep(defaultTimeout); assertDoesNotThrow(session::commitTransaction); //repeat commit. @@ -781,7 +794,6 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkErrorWhenTimeoutIsNot assumeTrue(serverVersionAtLeast(4, 4)); assumeTrue(isLoadBalanced()); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); collectionHelper.insertDocuments(new Document(), new Document()); collectionHelper.runAdminCommand("{" @@ -790,7 +802,7 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkErrorWhenTimeoutIsNot + " data: {" + " failCommands: [\"getMore\" ]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 600) + + " blockTimeMS: " + 600 + " }" + "}"); @@ -828,7 +840,6 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkError() { assumeTrue(serverVersionAtLeast(4, 4)); assumeTrue(isLoadBalanced()); - long rtt = ClusterFixture.getPrimaryRTT(); collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); collectionHelper.insertDocuments(new Document(), new Document()); collectionHelper.runAdminCommand("{" @@ -837,7 +848,7 @@ public void testKillCursorsIsNotExecutedAfterGetMoreNetworkError() { + " data: {" + " failCommands: [\"getMore\" ]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 600) + + " blockTimeMS: " + 600 + " }" + "}"); @@ -923,9 +934,10 @@ private static Stream test8ServerSelectionArguments() { } private static Stream test8ServerSelectionHandshakeArguments() { + return Stream.of( - Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 200, 300), - Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 300, 200) + Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 200, 500), + Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 500, 200) ); } @@ -936,7 +948,9 @@ protected MongoNamespace generateNamespace() { protected MongoClientSettings.Builder getMongoClientSettingsBuilder() { commandListener.reset(); - return Fixture.getMongoClientSettingsBuilder() + System.err.println("CONNECTION STRING TO USE" + ClusterFixture.getConnectionString()); + MongoClientSettings.Builder mongoClientSettingsBuilder = Fixture.getMongoClientSettingsBuilder(); + return mongoClientSettingsBuilder .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .readPreference(ReadPreference.primary()) @@ -950,6 +964,9 @@ public void setUp() { gridFsChunksNamespace = new MongoNamespace(getDefaultDatabaseName(), GRID_FS_BUCKET_NAME + ".chunks"); collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), namespace); + // in some test collection might not have been created yet, thus dropping it in afterEach will throw an error + collectionHelper.create(); + filesCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsFileNamespace); chunksCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsChunksNamespace); commandListener = new TestCommandListener(); @@ -983,4 +1000,14 @@ private MongoClient createMongoClient(final MongoClientSettings.Builder builder) private long msElapsedSince(final long t1) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1); } + + protected long withRttAdjustment(int timeout, int countOfOperations) { + long primaryRTT = ClusterFixture.getPrimaryRTT(); + long adjustedTimeout = timeout + (primaryRTT * countOfOperations); + return adjustedTimeout; + } + + protected long withRttAdjustment(int timeout) { + return withRttAdjustment(timeout, 1); + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java index dd45bc8ae2c..e5dd140cdda 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java @@ -173,12 +173,12 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() { try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) { keyVaultCollectionHelper.runAdminCommand("{" - + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + " mode: { times: 1 }," + " data: {" + " failCommands: [\"find\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 500) + + " blockTimeMS: " + 500 + " }" + "}"); commandListener.reset(); @@ -206,7 +206,7 @@ void shouldDecreaseOperationTimeoutForSubsequentOperations() { + " data: {" + " failCommands: [\"insert\", \"find\", \"listCollections\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 10) + + " blockTimeMS: " + 10 + " }" + "}"); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 6d559e0d666..b6d39e54bdd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -63,6 +63,18 @@ public static void applyCustomizations(final TestDef def) { .test("change-streams", "change-streams-errors", "The watch helper must not throw a custom exception when executed against a single server topology, but instead depend on a server error"); // client-side-operation-timeout (CSOT) + def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs") + .whenFailureContains("timeout") + .test("client-side-operations-timeout", + "timeoutMS behaves correctly for non-tailable cursors", + "timeoutMS is refreshed for getMore if timeoutMode is iteration - success"); + + def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs") + .whenFailureContains("timeout") + .test("client-side-operations-timeout", + "timeoutMS behaves correctly for tailable non-awaitData cursors", + "timeoutMS is refreshed for getMore - success"); + def.skipNoncompliantReactive("No good way to fulfill tryNext() requirement with a Publisher") .test("client-side-operations-timeout", "timeoutMS behaves correctly for tailable awaitData cursors", From 750e44c7361371a0f23ccce6651e25ed83765993 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Sun, 24 Aug 2025 23:43:13 -0700 Subject: [PATCH 2/9] Include MongoExecutionTimeoutException in tests. --- .../ClientSideOperationTimeoutProseTest.java | 14 ++++++++++++-- ...stractClientSideOperationsTimeoutProseTest.java | 1 - 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 95614b0559a..aa1b995ebce 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -18,6 +18,7 @@ import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; +import com.mongodb.MongoExecutionTimeoutException; import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoSocketReadTimeoutException; @@ -59,6 +60,7 @@ import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.ClusterFixture.sleep; +import static com.mongodb.assertions.Assertions.assertTrue; import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -148,7 +150,11 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { Throwable commandError = onErrorEvents.get(0); Throwable operationTimeoutErrorCause = commandError.getCause(); assertInstanceOf(MongoOperationTimeoutException.class, commandError); - assertInstanceOf(MongoSocketReadTimeoutException.class, operationTimeoutErrorCause); + assertTrue(operationTimeoutErrorCause instanceof MongoSocketReadTimeoutException + || operationTimeoutErrorCause instanceof MongoExecutionTimeoutException, + "Expected operationTimeoutErrorCause to be either MongoSocketReadTimeoutException" + + " or MongoExecutionTimeoutException, but was: " + + operationTimeoutErrorCause.getClass().getName()); CommandFailedEvent chunkInsertFailedEvent = commandListener.getCommandFailedEvent("insert"); assertNotNull(chunkInsertFailedEvent); @@ -203,7 +209,11 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I Throwable operationTimeoutErrorCause = commandError.getCause(); assertInstanceOf(MongoOperationTimeoutException.class, commandError); - assertInstanceOf(MongoSocketReadTimeoutException.class, operationTimeoutErrorCause); + assertTrue(operationTimeoutErrorCause instanceof MongoSocketReadTimeoutException + || operationTimeoutErrorCause instanceof MongoExecutionTimeoutException, + "Expected operationTimeoutErrorCause to be either MongoSocketReadTimeoutException" + + " or MongoExecutionTimeoutException, but was: " + + operationTimeoutErrorCause.getClass().getName()); CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete"); assertNotNull(deleteFailedEvent); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 5b3f8de8e7e..ea3c82d9510 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -1058,7 +1058,6 @@ protected MongoNamespace generateNamespace() { protected MongoClientSettings.Builder getMongoClientSettingsBuilder() { commandListener.reset(); - System.err.println("CONNECTION STRING TO USE" + ClusterFixture.getConnectionString()); MongoClientSettings.Builder mongoClientSettingsBuilder = Fixture.getMongoClientSettingsBuilder(); return mongoClientSettingsBuilder .readConcern(ReadConcern.MAJORITY) From 48173e0319fa28f5378720e698c9c486bb6f81bc Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Sat, 3 Jan 2026 23:00:21 -0800 Subject: [PATCH 3/9] Remove Cluster.getPrimaryRTT(). --- .../internal/ClientSessionPublisherImpl.java | 13 +- .../client/internal/TimeoutHelper.java | 15 +- .../gridfs/GridFSUploadPublisherImpl.java | 10 +- .../ClientSideOperationTimeoutProseTest.java | 129 ++++++++++++++---- ...tClientSideOperationsTimeoutProseTest.java | 85 ++++++++---- .../unified/UnifiedTestModifications.java | 6 + 6 files changed, 193 insertions(+), 65 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 5cf0ea103bd..26e896332a3 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -37,6 +37,8 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import java.util.concurrent.atomic.AtomicBoolean; + import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.assertions.Assertions.assertNotNull; @@ -52,6 +54,7 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements private boolean messageSentInCurrentTransaction; private boolean commitInProgress; private TransactionOptions transactionOptions; + private final AtomicBoolean closeInvoked = new AtomicBoolean(false); ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient, @@ -221,10 +224,12 @@ private void clearTransactionContextOnError(final MongoException e) { @Override public void close() { - if (transactionState == TransactionState.IN) { - Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); - } else { - super.close(); + if (closeInvoked.compareAndSet(false, true)) { + if (transactionState == TransactionState.IN) { + Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); + } else { + super.close(); + } } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java index bc4da3026a9..cefdf7184d8 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java @@ -55,8 +55,14 @@ public static MongoCollection collectionWithTimeout(final MongoCollection public static Mono> collectionWithTimeoutMono(final MongoCollection collection, @Nullable final Timeout timeout) { + return collectionWithTimeoutMono(collection, timeout, DEFAULT_TIMEOUT_MESSAGE); + } + + public static Mono> collectionWithTimeoutMono(final MongoCollection collection, + @Nullable final Timeout timeout, + final String message) { try { - return Mono.just(collectionWithTimeout(collection, timeout)); + return Mono.just(collectionWithTimeout(collection, timeout, message)); } catch (MongoOperationTimeoutException e) { return Mono.error(e); } @@ -64,9 +70,14 @@ public static Mono> collectionWithTimeoutMono(final Mongo public static Mono> collectionWithTimeoutDeferred(final MongoCollection collection, @Nullable final Timeout timeout) { - return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout)); + return collectionWithTimeoutDeferred(collection, timeout, DEFAULT_TIMEOUT_MESSAGE); } + public static Mono> collectionWithTimeoutDeferred(final MongoCollection collection, + @Nullable final Timeout timeout, + final String message) { + return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout, message)); + } public static MongoDatabase databaseWithTimeout(final MongoDatabase database, @Nullable final Timeout timeout) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java index 7d9a46cdf3f..50586e92102 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java @@ -54,7 +54,8 @@ */ public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher { - private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit."; + private static final String TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING = "Saving chunks exceeded the timeout limit."; + private static final String TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION = "Upload cancellation exceeded the timeout limit."; private static final Document PROJECTION = new Document("_id", 1); private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1); private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1); @@ -226,8 +227,8 @@ private Mono createSaveChunksMono(final AtomicBoolean terminated, @Nullabl .append("data", data); Publisher insertOnePublisher = clientSession == null - ? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument) - : collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE) + ? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING).insertOne(chunkDocument) + : collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING) .insertOne(clientSession, chunkDocument); return Mono.from(insertOnePublisher).thenReturn(data.length()); @@ -270,7 +271,8 @@ private Mono createSaveFileDataMono(final AtomicBoolean termina } private Mono createCancellationMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) { - Mono> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout); + Mono> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout, + TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION); if (terminated.compareAndSet(false, true)) { if (clientSession != null) { return chunksCollectionMono.flatMap(collection -> Mono.from(collection diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 0e6345c5548..68c1096e8d7 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -18,7 +18,6 @@ import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; -import com.mongodb.MongoExecutionTimeoutException; import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.ReadPreference; @@ -29,6 +28,7 @@ import com.mongodb.event.CommandStartedEvent; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; +import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher; import com.mongodb.reactivestreams.client.syncadapter.SyncGridFSBucket; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.bson.BsonDocument; @@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.nio.ByteBuffer; @@ -57,13 +58,16 @@ import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; +import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.ClusterFixture.sleep; import static com.mongodb.assertions.Assertions.assertTrue; import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -112,12 +116,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + 405 + + " blockTimeMS: " + 600 + " }" + "}"); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(400, 2), TimeUnit.MILLISECONDS))) { + .timeout(600, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -125,8 +129,8 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { TestEventPublisher eventPublisher = new TestEventPublisher<>(); TestSubscriber testSubscriber = new TestSubscriber<>(); - gridFsBucket.uploadFromPublisher("filename", eventPublisher.getEventStream()) - .subscribe(testSubscriber); + GridFSUploadPublisher filename = gridFsBucket.uploadFromPublisher("filename", eventPublisher.getEventStream()); + filename.subscribe(testSubscriber); //when eventPublisher.sendEvent(ByteBuffer.wrap(new byte[]{0x12})); @@ -146,13 +150,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { Throwable commandError = onErrorEvents.get(0); assertInstanceOf(MongoOperationTimeoutException.class, commandError); - //TODO-merge (this line was removed on main, why?) - assertTrue(operationTimeoutErrorCause instanceof MongoSocketReadTimeoutException - || operationTimeoutErrorCause instanceof MongoExecutionTimeoutException, - "Expected operationTimeoutErrorCause to be either MongoSocketReadTimeoutException" - + " or MongoExecutionTimeoutException, but was: " - + operationTimeoutErrorCause.getClass().getName()); - CommandFailedEvent chunkInsertFailedEvent = commandListener.getCommandFailedEvent("insert"); assertNotNull(chunkInsertFailedEvent); assertEquals(commandError, commandListener.getCommandFailedEvent("insert").getThrowable()); @@ -180,7 +177,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I + "}"); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(400, 2), TimeUnit.MILLISECONDS))) { + .timeout(400, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -203,18 +200,25 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I //then Throwable droppedError = droppedErrorFuture.get(TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); Throwable commandError = droppedError.getCause(); - assertInstanceOf(MongoOperationTimeoutException.class, commandError); - //TODO-merge (this lien was removed on main why?) - assertTrue(operationTimeoutErrorCause instanceof MongoSocketReadTimeoutException - || operationTimeoutErrorCause instanceof MongoExecutionTimeoutException, - "Expected operationTimeoutErrorCause to be either MongoSocketReadTimeoutException" - + " or MongoExecutionTimeoutException, but was: " - + operationTimeoutErrorCause.getClass().getName()); CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete"); assertNotNull(deleteFailedEvent); - assertEquals(commandError, commandListener.getCommandFailedEvent("delete").getThrowable()); + CommandStartedEvent deleteStartedEvent = commandListener.getCommandStartedEvent("delete"); + assertTrue(deleteStartedEvent.getCommand().containsKey("maxTimeMS"), "Expected delete command to have maxTimeMS"); + long deleteMaxTimeMS = deleteStartedEvent + .getCommand() + .get("maxTimeMS") + .asNumber() + .longValue(); + + assertTrue(deleteMaxTimeMS <= 420 + // some leeway for timing variations, when compression is used it is often lees then 300. + // Without it, it is more than 300. + && deleteMaxTimeMS >= 150, + "Expected maxTimeMS for delete command to be between 150s and 420ms, " + "but was: " + deleteMaxTimeMS + "ms"); + assertEquals(commandError, deleteFailedEvent.getThrowable()); + // When subscription is cancelled, we should not receive any more events. testSubscriber.assertNoTerminalEvent(); } @@ -231,7 +235,7 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { //given try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(500), TimeUnit.MILLISECONDS))) { + .timeout(500, TimeUnit.MILLISECONDS))) { MongoNamespace namespace = generateNamespace(); MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) @@ -333,7 +337,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { sleep(2000); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(500), TimeUnit.MILLISECONDS))) { + .timeout(500, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -398,7 +402,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { sleep(2000); try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(500), TimeUnit.MILLISECONDS))) { + .timeout(500, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()) @@ -454,7 +458,7 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt //given try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(2500), TimeUnit.MILLISECONDS))) { + .timeout(2500, TimeUnit.MILLISECONDS))) { MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); @@ -471,7 +475,78 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt List commandStartedEvents = commandListener.getCommandStartedEvents(); assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"), commandStartedEvents); - assertOnlyOneCommandTimeoutFailure("getMore"); + + } + } + + @DisplayName("9. End Session. The timeout specified via the MongoClient timeoutMS option") + @Test + @Override + public void test9EndSessionClientTimeout() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeFalse(isStandalone()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1 }," + + " data: {" + + " failCommands: [\"abortTransaction\"]," + + " blockConnection: true," + + " blockTimeMS: " + 400 + + " }" + + "}"); + + try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder().retryWrites(false) + .timeout(300, TimeUnit.MILLISECONDS))) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + try (ClientSession session = Mono.from(mongoClient.startSession()).block()) { + session.startTransaction(); + Mono.from(collection.insertOne(session, new Document("x", 1))).block(); + } + + sleep(postSessionCloseSleep()); + CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); + long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); + assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); + assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); + } + } + + @Test + @DisplayName("9. End Session. The timeout specified via the ClientSession defaultTimeoutMS option") + @Override + public void test9EndSessionSessionTimeout() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeFalse(isStandalone()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: { times: 1 }," + + " data: {" + + " failCommands: [\"abortTransaction\"]," + + " blockConnection: true," + + " blockTimeMS: " + 400 + + " }" + + "}"); + + try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder())) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + try (ClientSession session = Mono.from(mongoClient.startSession(com.mongodb.ClientSessionOptions.builder() + .defaultTimeout(300, TimeUnit.MILLISECONDS).build())).block()) { + + session.startTransaction(); + Mono.from(collection.insertOne(session, new Document("x", 1))).block(); + } + + sleep(postSessionCloseSleep()); + CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); + long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); + assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); + assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); } } @@ -515,6 +590,6 @@ public void tearDown() throws InterruptedException { @Override protected int postSessionCloseSleep() { - return 256; + return 1000; } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index b57d1ec1ea9..da7c4f49291 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -129,6 +129,31 @@ public abstract class AbstractClientSideOperationsTimeoutProseTest { protected TestCommandListener commandListener; +// @RegisterExtension +// public TestWatcher commandEventPrinter = new TestWatcher() { +// @Override +// public void testFailed(ExtensionContext context, Throwable cause) { +// System.err.println("\n=== TEST FAILED: " + context.getDisplayName() + " ==="); +// System.err.println("Failure: " + cause.getMessage()); +// if (commandListener != null) { +// System.err.println("\n--- Command Events ---"); +// commandListener.getEvents().forEach(event -> { +// if (event instanceof CommandStartedEvent) { +// CommandStartedEvent e = (CommandStartedEvent) event; +// System.err.println("STARTED: " + e.getCommandName() + " - " + e.getCommand().toJson()); +// } else if (event instanceof CommandSucceededEvent) { +// CommandSucceededEvent e = (CommandSucceededEvent) event; +// System.err.println("SUCCEEDED: " + e.getCommandName() + " (elapsed: " + e.getElapsedTime(TimeUnit.MILLISECONDS) + "ms)"); +// } else if (event instanceof CommandFailedEvent) { +// CommandFailedEvent e = (CommandFailedEvent) event; +// System.err.println("FAILED: " + e.getCommandName() + " (elapsed: " + e.getElapsedTime(TimeUnit.MILLISECONDS) + "ms) - " + e.getThrowable().getMessage()); +// } +// }); +// System.err.println("=== END COMMAND EVENTS ===\n"); +// } +// } +// }; + protected abstract MongoClient createMongoClient(MongoClientSettings mongoClientSettings); protected abstract GridFSBucket createGridFsBucket(MongoDatabase mongoDatabase, String bucketName); @@ -309,7 +334,7 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { filesCollectionHelper.create(); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(200, 2), TimeUnit.MILLISECONDS))) { + .timeout(200, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME); @@ -339,7 +364,7 @@ public void testAbortingGridFsUploadStreamTimeout() throws Throwable { filesCollectionHelper.create(); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(300, 2), TimeUnit.MILLISECONDS))) { + .timeout(300, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2); @@ -387,7 +412,7 @@ public void testGridFsDownloadStreamTimeout() { + "}"); try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(withRttAdjustment(300, 2), TimeUnit.MILLISECONDS))) { + .timeout(300, TimeUnit.MILLISECONDS))) { MongoDatabase database = client.getDatabase(namespace.getDatabaseName()); GridFSBucket gridFsBucket = createGridFsBucket(database, GRID_FS_BUCKET_NAME).withChunkSizeBytes(2); @@ -481,12 +506,12 @@ public void test9EndSessionClientTimeout() { + " data: {" + " failCommands: [\"abortTransaction\"]," + " blockConnection: true," - + " blockTimeMS: " + 700 + + " blockTimeMS: " + 500 + " }" + "}"); try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder().retryWrites(false) - .timeout(withRttAdjustment(500, 2), TimeUnit.MILLISECONDS))) { + .timeout(250, TimeUnit.MILLISECONDS))) { MongoDatabase database = mongoClient.getDatabase(namespace.getDatabaseName()); MongoCollection collection = database .getCollection(namespace.getCollectionName()); @@ -496,8 +521,8 @@ public void test9EndSessionClientTimeout() { collection.insertOne(session, new Document("x", 1)); long start = System.nanoTime(); session.close(); - long elapsed = msElapsedSince(start) - postSessionCloseSleep(); - assertTrue(elapsed <= 700, "Took too long to time out, elapsedMS: " + elapsed); + long elapsed = msElapsedSince(start); + assertTrue(elapsed <= 300, "Took too long to time out, elapsedMS: " + elapsed); } } CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> @@ -527,14 +552,14 @@ public void test9EndSessionSessionTimeout() { .getCollection(namespace.getCollectionName()); try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() - .defaultTimeout(withRttAdjustment(300, 2), TimeUnit.MILLISECONDS).build())) { + .defaultTimeout(300, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); collection.insertOne(session, new Document("x", 1)); long start = System.nanoTime(); session.close(); - long elapsed = msElapsedSince(start) - postSessionCloseSleep(); - assertTrue(elapsed <= withRttAdjustment(400, 2), "Took too long to time out, elapsedMS: " + elapsed); + long elapsed = msElapsedSince(start); + assertTrue(elapsed <= 400, "Took too long to time out, elapsedMS: " + elapsed); } } CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> @@ -561,7 +586,7 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithCommit() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); - long defaultTimeout = withRttAdjustment(300); + long defaultTimeout = 300; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); @@ -593,7 +618,7 @@ public void test9EndSessionCustomTesEachOperationHasItsOwnTimeoutWithAbort() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); - long defaultTimeout = withRttAdjustment(300); + long defaultTimeout = 300; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { session.startTransaction(); @@ -661,7 +686,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeout() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); - long defaultTimeout = withRttAdjustment(200); + long defaultTimeout = 200; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { assertThrows(MongoOperationTimeoutException.class, @@ -697,7 +722,7 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); - long defaultTimeout = withRttAdjustment(200); + long defaultTimeout = 200; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS).build())) { assertThrows(MongoOperationTimeoutException.class, @@ -730,11 +755,7 @@ protected void test11MultiBatchBulkWrites() throws InterruptedException { + " }" + "}"); - /* - We use 2 operation count to adjust the timeout by RTT, even though we have only one successful insert to account for, because - the payload is larger than measured RTT on "hello" command. - */ - long timeout = withRttAdjustment(4000, 2); + long timeout = 4000; try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().timeout(timeout, TimeUnit.MILLISECONDS)); FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { MongoDatabase db = client.getDatabase(namespace.getDatabaseName()); @@ -767,7 +788,7 @@ public void shouldNotIncludeWtimeoutMsOfWriteConcernToInitialAndSubsequentCommit MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) .getCollection(namespace.getCollectionName()); - long defaultTimeout = withRttAdjustment(200); + long defaultTimeout = 200; try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() .defaultTimeout(defaultTimeout, TimeUnit.MILLISECONDS) .build())) { @@ -818,12 +839,12 @@ public void shouldIgnoreWaitQueueTimeoutMSWhenTimeoutMsIsSet() { + " data: {" + " failCommands: [\"find\" ]," + " blockConnection: true," - + " blockTimeMS: " + 300 + + " blockTimeMS: " + 450 + " }" + "}"); executor.submit(() -> collection.find().first()); - sleep(100); + sleep(150); //when && then assertDoesNotThrow(() -> collection.find().first()); @@ -876,7 +897,7 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() { //given try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() .applyToConnectionPoolSettings(builder -> builder - .maxWaitTime(100, TimeUnit.MILLISECONDS) + .maxWaitTime(20, TimeUnit.MILLISECONDS) .maxSize(1) ))) { MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) @@ -888,12 +909,12 @@ public void shouldUseWaitQueueTimeoutMSWhenTimeoutIsNotSet() { + " data: {" + " failCommands: [\"find\" ]," + " blockConnection: true," - + " blockTimeMS: " + 300 + + " blockTimeMS: " + 400 + " }" + "}"); executor.submit(() -> collection.find().first()); - sleep(100); + sleep(200); //when & then assertThrows(MongoTimeoutException.class, () -> collection.find().first()); @@ -1051,11 +1072,16 @@ public void shouldUseConnectTimeoutMsWhenEstablishingConnectionInBackground() { + " data: {" + " failCommands: [\"hello\", \"isMaster\"]," + " blockConnection: true," - + " blockTimeMS: " + 500 + + " blockTimeMS: " + 500 + "," + // The appName is unique to prevent this failpoint from affecting ClusterFixture's ServerMonitor. + // Without the appName, ClusterFixture's heartbeats would be blocked, polluting RTT measurements with 500ms values, + // which would cause flakiness in other prose tests that use ClusterFixture.getPrimaryRTT() for timeout adjustments. + + " appName: \"connectTimeoutBackgroundTest\"" + " }" + "}"); try (MongoClient ignored = createMongoClient(getMongoClientSettingsBuilder() + .applicationName("connectTimeoutBackgroundTest") .applyToConnectionPoolSettings(builder -> builder.minSize(1)) // Use a very short timeout to ensure that the connection establishment will fail on the first handshake command. .timeout(10, TimeUnit.MILLISECONDS))) { @@ -1128,10 +1154,13 @@ public void setUp() { public void tearDown() throws InterruptedException { ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME); if (collectionHelper != null) { + // Due to testing abortTransaction via failpoint, there may be open transactions + // after the test finishes, thus drop() command hangs for 60 seconds until transaction + // is automatically rolled back. + collectionHelper.runAdminCommand("{killAllSessions: []}"); collectionHelper.drop(); filesCollectionHelper.drop(); chunksCollectionHelper.drop(); - commandListener.reset(); try { ServerHelper.checkPool(getPrimary()); } catch (InterruptedException e) { @@ -1155,7 +1184,7 @@ private MongoClient createMongoClient(final MongoClientSettings.Builder builder) return createMongoClient(builder.build()); } - private long msElapsedSince(final long t1) { + protected long msElapsedSince(final long t1) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 7fe4eccbf43..428d6210313 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -75,6 +75,12 @@ public static void applyCustomizations(final TestDef def) { "timeoutMS behaves correctly for tailable non-awaitData cursors", "timeoutMS is refreshed for getMore - success"); + def.retry("Unified CSOT tests do not account for RTT which varies in TLS vs non-TLS runs") + .whenFailureContains("timeout") + .test("client-side-operations-timeout", + "timeoutMS behaves correctly for tailable non-awaitData cursors", + "timeoutMS is refreshed for getMore - success"); + //TODO-invistigate /* As to the background connection pooling section: From 992563c1be2b24e3b5fd099bf894e462345c1c1a Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Sun, 4 Jan 2026 20:48:33 -0800 Subject: [PATCH 4/9] Remove ClusterFixture from encryption tests. --- ...eOperationsEncryptionTimeoutProseTest.java | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java index e5dd140cdda..04303833bf5 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/csot/AbstractClientSideOperationsEncryptionTimeoutProseTest.java @@ -93,14 +93,13 @@ public abstract class AbstractClientSideOperationsEncryptionTimeoutProseTest { @Test void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); Map> kmsProviders = new HashMap<>(); Map localProviderMap = new HashMap<>(); localProviderMap.put("key", Base64.getDecoder().decode(MASTER_KEY)); kmsProviders.put("local", localProviderMap); - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 100))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(100))) { keyVaultCollectionHelper.runAdminCommand("{" + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," @@ -108,7 +107,7 @@ void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() { + " data: {" + " failCommands: [\"insert\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 100) + + " blockTimeMS: " + 100 + " }" + "}"); @@ -126,9 +125,8 @@ void shouldThrowOperationTimeoutExceptionWhenCreateDataKey() { @Test void shouldThrowOperationTimeoutExceptionWhenEncryptData() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 150))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(150))) { clientEncryption.createDataKey("local"); @@ -138,7 +136,7 @@ void shouldThrowOperationTimeoutExceptionWhenEncryptData() { + " data: {" + " failCommands: [\"find\"]," + " blockConnection: true," - + " blockTimeMS: " + (rtt + 150) + + " blockTimeMS: " + 150 + " }" + "}"); @@ -160,10 +158,9 @@ void shouldThrowOperationTimeoutExceptionWhenEncryptData() { @Test void shouldThrowOperationTimeoutExceptionWhenDecryptData() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); BsonBinary encrypted; - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(400))) { clientEncryption.createDataKey("local"); BsonBinary dataKey = clientEncryption.createDataKey("local"); EncryptOptions encryptOptions = new EncryptOptions("AEAD_AES_256_CBC_HMAC_SHA_512-Deterministic"); @@ -171,7 +168,7 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() { encrypted = clientEncryption.encrypt(new BsonString("hello"), encryptOptions); } - try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(rtt + 400))) { + try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder(400))) { keyVaultCollectionHelper.runAdminCommand("{" + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," + " mode: { times: 1 }," @@ -197,8 +194,7 @@ void shouldThrowOperationTimeoutExceptionWhenDecryptData() { @Test void shouldDecreaseOperationTimeoutForSubsequentOperations() { assumeTrue(serverVersionAtLeast(4, 4)); - long rtt = ClusterFixture.getPrimaryRTT(); - long initialTimeoutMS = rtt + 2500; + long initialTimeoutMS = 2500; keyVaultCollectionHelper.runAdminCommand("{" + " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\"," @@ -272,8 +268,7 @@ void shouldDecreaseOperationTimeoutForSubsequentOperations() { void shouldThrowTimeoutExceptionWhenCreateEncryptedCollection(final String commandToTimeout) { assumeTrue(serverVersionAtLeast(7, 0)); //given - long rtt = ClusterFixture.getPrimaryRTT(); - long initialTimeoutMS = rtt + 200; + long initialTimeoutMS = 200; try (ClientEncryption clientEncryption = createClientEncryption(getClientEncryptionSettingsBuilder() .timeout(initialTimeoutMS, MILLISECONDS))) { From c48053d252f94c38084df6aa954e713eed2415ae Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Sun, 4 Jan 2026 20:49:55 -0800 Subject: [PATCH 5/9] Revert changes to ClusterFixture. --- .../src/test/functional/com/mongodb/ClusterFixture.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index bd4142407ba..c89d5f51e01 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -484,9 +484,7 @@ private static Cluster createCluster(final MongoCredential credential, final Str } private static Cluster createCluster(final ConnectionString connectionString, final StreamFactory streamFactory) { - MongoClientSettings mongoClientSettings = MongoClientSettings.builder().applyConnectionString(connectionString) - .applyToServerSettings(builder -> builder.heartbeatFrequency(1, SECONDS).minHeartbeatFrequency(1, MILLISECONDS)) - .build(); + MongoClientSettings mongoClientSettings = MongoClientSettings.builder().applyConnectionString(connectionString).build(); return new DefaultClusterFactory().createCluster(mongoClientSettings.getClusterSettings(), mongoClientSettings.getServerSettings(), mongoClientSettings.getConnectionPoolSettings(), From 8af26b467c3368333e76231bb0dfaad68d2dde33 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 7 Jan 2026 23:01:41 -0800 Subject: [PATCH 6/9] Fix static checks. --- .../test/unit/com/mongodb/internal/TimeoutContextTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index 4b31ac77710..5f736f421c2 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -332,8 +332,8 @@ static Stream shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS() } @DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS") - @ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. " + - "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}") + @ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. " + + "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}") @MethodSource void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS, final Long timeoutMS, From bc80aca2655b9d3ec1cc6526f7ec22c4d9ed722e Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 13 Jan 2026 22:59:52 -0800 Subject: [PATCH 7/9] Remove debug logging. --- ...AbstractClientSideOperationsTimeoutProseTest.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index da7c4f49291..c61aa08c68e 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -448,7 +448,6 @@ public void test8ServerSelection(final String connectionString) { // Expect this to fail with a server selection timeout error after no more than 15ms [this is increased] long elapsed = msElapsedSince(start); assertTrue(throwable.getMessage().contains("while waiting for a server")); - System.err.println("Elapsed time: " + elapsed + "ms"); assertTrue(elapsed < 10 + timeoutBuffer, "Took too long to time out, elapsedMS: " + elapsed); } } @@ -488,7 +487,6 @@ public void test8ServerSelectionHandshake(final String ignoredTestName, final in .insertOne(new Document("x", 1)); }); long elapsed = msElapsedSince(start); - System.err.println("test8ServerSelectionHandshake elapsed " + elapsed + "ms"); assertTrue(elapsed <= 350, "Took too long to time out, elapsedMS: " + elapsed); } } @@ -1195,14 +1193,4 @@ protected long msElapsedSince(final long t1) { private String getHandshakeCommandName() { return ClusterFixture.getServerApi() == null ? LEGACY_HELLO : HELLO; } - - protected long withRttAdjustment(int timeout, int countOfOperations) { - long primaryRTT = ClusterFixture.getPrimaryRTT(); - long adjustedTimeout = timeout + (primaryRTT * countOfOperations); - return adjustedTimeout; - } - - protected long withRttAdjustment(int timeout) { - return withRttAdjustment(timeout, 1); - } } From 9663a626c49956a616ab05d0b053bb3bfbc5dab3 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 14 Jan 2026 14:59:15 -0800 Subject: [PATCH 8/9] Remove changes related to idempotent close. --- .../internal/ClientSessionPublisherImpl.java | 15 +++++---------- .../ClientSideOperationTimeoutProseTest.java | 5 ++--- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 26e896332a3..b4faa2a1081 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -37,8 +37,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; -import java.util.concurrent.atomic.AtomicBoolean; - import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.assertions.Assertions.assertNotNull; @@ -54,11 +52,10 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements private boolean messageSentInCurrentTransaction; private boolean commitInProgress; private TransactionOptions transactionOptions; - private final AtomicBoolean closeInvoked = new AtomicBoolean(false); ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient, - final ClientSessionOptions options, final OperationExecutor executor) { + final ClientSessionOptions options, final OperationExecutor executor) { super(serverSessionPool, mongoClient, options); this.executor = executor; this.mongoClient = mongoClient; @@ -224,12 +221,10 @@ private void clearTransactionContextOnError(final MongoException e) { @Override public void close() { - if (closeInvoked.compareAndSet(false, true)) { - if (transactionState == TransactionState.IN) { - Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); - } else { - super.close(); - } + if (transactionState == TransactionState.IN) { + Mono.from(abortTransaction()).doFinally(it -> super.close()).subscribe(); + } else { + super.close(); } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index 68c1096e8d7..afde47cd0a7 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -28,7 +28,6 @@ import com.mongodb.event.CommandStartedEvent; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; -import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher; import com.mongodb.reactivestreams.client.syncadapter.SyncGridFSBucket; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.bson.BsonDocument; @@ -129,8 +128,8 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { TestEventPublisher eventPublisher = new TestEventPublisher<>(); TestSubscriber testSubscriber = new TestSubscriber<>(); - GridFSUploadPublisher filename = gridFsBucket.uploadFromPublisher("filename", eventPublisher.getEventStream()); - filename.subscribe(testSubscriber); + gridFsBucket.uploadFromPublisher("filename", eventPublisher.getEventStream()) + .subscribe(testSubscriber); //when eventPublisher.sendEvent(ByteBuffer.wrap(new byte[]{0x12})); From b73c6679291bf3497e9fac9de3228c262aadacae Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Wed, 14 Jan 2026 15:00:09 -0800 Subject: [PATCH 9/9] Fix formatting in ClientSessionPublisherImpl constructor --- .../client/internal/ClientSessionPublisherImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index b4faa2a1081..5cf0ea103bd 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -55,7 +55,7 @@ final class ClientSessionPublisherImpl extends BaseClientSessionImpl implements ClientSessionPublisherImpl(final ServerSessionPool serverSessionPool, final MongoClientImpl mongoClient, - final ClientSessionOptions options, final OperationExecutor executor) { + final ClientSessionOptions options, final OperationExecutor executor) { super(serverSessionPool, mongoClient, options); this.executor = executor; this.mongoClient = mongoClient;