Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ public static ServerAddress getSecondary() {
return serverDescriptions.get(0).getAddress();
}

public static void sleep(final int sleepMS) {
public static void sleep(final long sleepMS) {
try {
Thread.sleep(sleepMS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,10 @@ static Stream<Arguments> shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS()
);
}

@ParameterizedTest
@MethodSource
@DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS")
@ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. "
+ "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}")
@MethodSource
void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS,
final Long timeoutMS,
final long expected) {
Expand All @@ -345,7 +346,7 @@ void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTim
0));

long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs();
assertTrue(expected - calculatedTimeoutMS <= 1);
assertTrue(expected - calculatedTimeoutMS <= 2);
}

private TimeoutContextTest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,29 @@ public static <T> MongoCollection<T> collectionWithTimeout(final MongoCollection

public static <T> Mono<MongoCollection<T>> collectionWithTimeoutMono(final MongoCollection<T> collection,
@Nullable final Timeout timeout) {
return collectionWithTimeoutMono(collection, timeout, DEFAULT_TIMEOUT_MESSAGE);
}

public static <T> Mono<MongoCollection<T>> collectionWithTimeoutMono(final MongoCollection<T> collection,
@Nullable final Timeout timeout,
final String message) {
try {
return Mono.just(collectionWithTimeout(collection, timeout));
return Mono.just(collectionWithTimeout(collection, timeout, message));
} catch (MongoOperationTimeoutException e) {
return Mono.error(e);
}
}

public static <T> Mono<MongoCollection<T>> collectionWithTimeoutDeferred(final MongoCollection<T> collection,
@Nullable final Timeout timeout) {
return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout));
return collectionWithTimeoutDeferred(collection, timeout, DEFAULT_TIMEOUT_MESSAGE);
}

public static <T> Mono<MongoCollection<T>> collectionWithTimeoutDeferred(final MongoCollection<T> collection,
@Nullable final Timeout timeout,
final String message) {
return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout, message));
}

public static MongoDatabase databaseWithTimeout(final MongoDatabase database,
@Nullable final Timeout timeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
*/
public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher<Void> {

private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit.";
private static final String TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING = "Saving chunks exceeded the timeout limit.";
private static final String TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION = "Upload cancellation exceeded the timeout limit.";
private static final Document PROJECTION = new Document("_id", 1);
private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1);
private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1);
Expand Down Expand Up @@ -226,8 +227,8 @@ private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated, @Nullabl
.append("data", data);

Publisher<InsertOneResult> insertOnePublisher = clientSession == null
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument)
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING).insertOne(chunkDocument)
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING)
.insertOne(clientSession, chunkDocument);

return Mono.from(insertOnePublisher).thenReturn(data.length());
Expand Down Expand Up @@ -270,7 +271,8 @@ private Mono<InsertOneResult> createSaveFileDataMono(final AtomicBoolean termina
}

private Mono<DeleteResult> createCancellationMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) {
Mono<MongoCollection<Document>> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout);
Mono<MongoCollection<Document>> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout,
TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION);
Comment on lines +274 to +275
Copy link
Member Author

@vbabanin vbabanin Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this isn’t required for the main change in this PR, but it came up while debugging timeouts.

The current generic error message wasn’t actionable: in the reactive stack trace it points only to a lambda of deferred Mono and doesn’t help identify where the deferred Mono was created. This change adds more context to the timeout failure message.

if (terminated.compareAndSet(false, true)) {
if (clientSession != null) {
return chunksCollectionMono.flatMap(collection -> Mono.from(collection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package com.mongodb.reactivestreams.client;

import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandStartedEvent;
Expand All @@ -43,6 +41,7 @@
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.nio.ByteBuffer;
Expand All @@ -58,12 +57,16 @@

import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
import static com.mongodb.ClusterFixture.isStandalone;
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
import static com.mongodb.ClusterFixture.sleep;
import static com.mongodb.assertions.Assertions.assertTrue;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;


Expand Down Expand Up @@ -104,7 +107,6 @@ protected boolean isAsync() {
@Override
public void testGridFSUploadViaOpenUploadStreamTimeout() {
assumeTrue(serverVersionAtLeast(4, 4));
long rtt = ClusterFixture.getPrimaryRTT();

//given
collectionHelper.runAdminCommand("{"
Expand All @@ -113,12 +115,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
+ " data: {"
+ " failCommands: [\"insert\"],"
+ " blockConnection: true,"
+ " blockTimeMS: " + (rtt + 405)
+ " blockTimeMS: " + 600
+ " }"
+ "}");

try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 400, TimeUnit.MILLISECONDS))) {
.timeout(600, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName());
GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME);

Expand Down Expand Up @@ -158,7 +160,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
@Override
public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException {
assumeTrue(serverVersionAtLeast(4, 4));
long rtt = ClusterFixture.getPrimaryRTT();

//given
CompletableFuture<Throwable> droppedErrorFuture = new CompletableFuture<>();
Expand All @@ -170,12 +171,12 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
+ " data: {"
+ " failCommands: [\"delete\"],"
+ " blockConnection: true,"
+ " blockTimeMS: " + (rtt + 405)
+ " blockTimeMS: " + 405
+ " }"
+ "}");

try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 400, TimeUnit.MILLISECONDS))) {
.timeout(400, TimeUnit.MILLISECONDS))) {
MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName());
GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME);

Expand All @@ -198,12 +199,25 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
//then
Throwable droppedError = droppedErrorFuture.get(TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS);
Throwable commandError = droppedError.getCause();
assertInstanceOf(MongoOperationTimeoutException.class, commandError);

CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete");
assertNotNull(deleteFailedEvent);

assertEquals(commandError, commandListener.getCommandFailedEvent("delete").getThrowable());
CommandStartedEvent deleteStartedEvent = commandListener.getCommandStartedEvent("delete");
assertTrue(deleteStartedEvent.getCommand().containsKey("maxTimeMS"), "Expected delete command to have maxTimeMS");
long deleteMaxTimeMS = deleteStartedEvent
.getCommand()
.get("maxTimeMS")
.asNumber()
.longValue();

assertTrue(deleteMaxTimeMS <= 420
// some leeway for timing variations, when compression is used it is often lees then 300.
// Without it, it is more than 300.
&& deleteMaxTimeMS >= 150,
"Expected maxTimeMS for delete command to be between 150s and 420ms, " + "but was: " + deleteMaxTimeMS + "ms");
assertEquals(commandError, deleteFailedEvent.getThrowable());

// When subscription is cancelled, we should not receive any more events.
testSubscriber.assertNoTerminalEvent();
}
Expand All @@ -219,9 +233,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() {
assumeTrue(isDiscoverableReplicaSet());

//given
long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 500, TimeUnit.MILLISECONDS))) {
.timeout(500, TimeUnit.MILLISECONDS))) {

MongoNamespace namespace = generateNamespace();
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
Expand Down Expand Up @@ -273,9 +286,8 @@ public void testTimeoutMSAppliedToInitialAggregate() {
assumeTrue(isDiscoverableReplicaSet());

//given
long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 200, TimeUnit.MILLISECONDS))) {
.timeout(200, TimeUnit.MILLISECONDS))) {

MongoNamespace namespace = generateNamespace();
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
Expand All @@ -290,7 +302,7 @@ public void testTimeoutMSAppliedToInitialAggregate() {
+ " data: {"
+ " failCommands: [\"aggregate\" ],"
+ " blockConnection: true,"
+ " blockTimeMS: " + (rtt + 201)
+ " blockTimeMS: " + 201
+ " }"
+ "}");

Expand Down Expand Up @@ -321,13 +333,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {

//given
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
sleep(2000);


long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
.timeout(500, TimeUnit.MILLISECONDS))) {
Copy link
Member Author

@vbabanin vbabanin Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
Expand All @@ -338,7 +347,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
+ " data: {"
+ " failCommands: [\"getMore\", \"aggregate\"],"
+ " blockConnection: true,"
+ " blockTimeMS: " + (rtt + 200)
+ " blockTimeMS: " + 200
+ " }"
+ "}");

Expand Down Expand Up @@ -389,12 +398,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {

//given
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
sleep(2000);

long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
.timeout(500, TimeUnit.MILLISECONDS))) {
Copy link
Member Author

@vbabanin vbabanin Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still wait >500ms (currently thenAwait(Duration.ofMillis(600))) before issuing the next getMore, so the test continues to validate that the timeout is refreshed for the follow-up getMore operation.

The longer await is purely to reduce flakiness under slower setups (e.g., occasional TLS handshake / connection establishment overhead and thus consuming more of the allocated timeout time).


MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName())
Expand All @@ -406,7 +413,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
+ " data: {"
+ " failCommands: [\"aggregate\", \"getMore\"],"
+ " blockConnection: true,"
+ " blockTimeMS: " + (rtt + 200)
+ " blockTimeMS: " + 200
+ " }"
+ "}");

Expand Down Expand Up @@ -449,9 +456,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
assumeTrue(isDiscoverableReplicaSet());

//given
long rtt = ClusterFixture.getPrimaryRTT();
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
.timeout(rtt + 2500, TimeUnit.MILLISECONDS))) {
.timeout(2500, TimeUnit.MILLISECONDS))) {

MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
Expand All @@ -468,7 +474,78 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
List<CommandStartedEvent> commandStartedEvents = commandListener.getCommandStartedEvents();
assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"),
commandStartedEvents);
assertOnlyOneCommandTimeoutFailure("getMore");
Copy link
Member Author

@vbabanin vbabanin Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that the last getMore command will fail with a timeout. If maxAwaitTimeMS is higher than the remaining overall timeout, the driver uses the remaining timeout as maxTimeMS.

This means a response can arrive right before expiration (especially when RTT is near 0), and the driver can throw a timeout on the next operation before sending the next getMore command.

Example from logs:

 * [2025/12/30 19:25:05.019] STARTED: aggregate - {"aggregate": "ClientSideOperationTimeoutProseTest_42", "readConcern": {"level": "majority"}, "pipeline": [{"$changeStream": {}}], "cursor": {"batchSize": 2}, "maxTimeMS": 2495, "$db": "JavaDriverTest", "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
 * [2025/12/30 19:25:05.019] SUCCEEDED: aggregate (elapsed: 1ms)
 * [2025/12/30 19:25:05.019] STARTED: getMore - {"getMore": 5528154821143727891, "collection": "ClientSideOperationTimeoutProseTest_42", "batchSize": 2, "maxTimeMS": 1000, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151095, "i": 4}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
 * [2025/12/30 19:25:05.019] SUCCEEDED: getMore (elapsed: 999ms)
 * [2025/12/30 19:25:05.019] STARTED: getMore - {"getMore": 5528154821143727891, "collection": "ClientSideOperationTimeoutProseTest_42", "batchSize": 2, "maxTimeMS": 1000, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151095, "i": 4}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
 * [2025/12/30 19:25:05.019] SUCCEEDED: getMore (elapsed: 999ms)
 * [2025/12/30 19:25:05.019] STARTED: getMore - {"getMore": 5528154821143727891, "collection": "ClientSideOperationTimeoutProseTest_42", "batchSize": 2, "maxTimeMS": 499, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151097, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
 * [2025/12/30 19:25:05.019] SUCCEEDED: getMore (elapsed: 498ms)
 * [2025/12/30 19:25:05.019] STARTED: killCursors - {"killCursors": "ClientSideOperationTimeoutProseTest_42", "cursors": [5528154821143727891], "maxTimeMS": 1000, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151097, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
 * [2025/12/30 19:25:05.019] SUCCEEDED: killCursors (elapsed: 0ms)
 * [2025/12/30 19:25:05.019] STARTED: endSessions - {"endSessions": [{"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}], "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151097, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "$readPreference": {"mode": "primaryPreferred"}}
 * [2025/12/30 19:25:05.019] SUCCEEDED: endSessions (elapsed: 0ms)

This was one of the cases for flakiness in this test.


}
}

@DisplayName("9. End Session. The timeout specified via the MongoClient timeoutMS option")
@Test
@Override
public void test9EndSessionClientTimeout() {
assumeTrue(serverVersionAtLeast(4, 4));
assumeFalse(isStandalone());

collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
+ " mode: { times: 1 },"
+ " data: {"
+ " failCommands: [\"abortTransaction\"],"
+ " blockConnection: true,"
+ " blockTimeMS: " + 400
+ " }"
+ "}");

try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder().retryWrites(false)
.timeout(300, TimeUnit.MILLISECONDS))) {
MongoCollection<Document> collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());

try (ClientSession session = Mono.from(mongoClient.startSession()).block()) {
session.startTransaction();
Mono.from(collection.insertOne(session, new Document("x", 1))).block();
}

sleep(postSessionCloseSleep());
CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction"));
long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS);
assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable());
assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime);
Comment on lines +508 to +512
Copy link
Member Author

@vbabanin vbabanin Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context

  • This “End Session” tests used to live in the shared abstract test, so they ran for both sync and async.
  • postSessionCloseSleep() is used to wait until async close operation is finished, because we can't use Mono.block as publisher is not exposed in reactive close().

Why this moved/split

  • Async computed “elapsed time” as 400ms - postSessionCloseSleep(). That’s just subtracting a constant sleep from a constant budget. It does not measure how long session close actually took, because the async close path doesn’t give us a point where we can say “close finished” (no callback/future is checked here).
  • The sync test also effectively depended on that fixed sleep for async test. Unless 400ms and postSessionCloseSleep() are tuned to match real timing, the assertion becomes sensitive.

The change

  • Sync: measure time around the actual close() (sample before/after).
  • Async: we can’t time close() directly, so we anchor the timeout check to something we can observe: the CommandFailedEvent timestamp (i.e., when the failure is observed on the async event path).

}
}

@Test
@DisplayName("9. End Session. The timeout specified via the ClientSession defaultTimeoutMS option")
@Override
public void test9EndSessionSessionTimeout() {
assumeTrue(serverVersionAtLeast(4, 4));
assumeFalse(isStandalone());

collectionHelper.runAdminCommand("{"
+ " configureFailPoint: \"failCommand\","
+ " mode: { times: 1 },"
+ " data: {"
+ " failCommands: [\"abortTransaction\"],"
+ " blockConnection: true,"
+ " blockTimeMS: " + 400
+ " }"
+ "}");

try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder())) {
MongoCollection<Document> collection = mongoClient.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName());

try (ClientSession session = Mono.from(mongoClient.startSession(com.mongodb.ClientSessionOptions.builder()
.defaultTimeout(300, TimeUnit.MILLISECONDS).build())).block()) {

session.startTransaction();
Mono.from(collection.insertOne(session, new Document("x", 1))).block();
}

sleep(postSessionCloseSleep());
CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction"));
long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS);
assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable());
assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime);
}
}

Expand Down Expand Up @@ -512,6 +589,6 @@ public void tearDown() throws InterruptedException {

@Override
protected int postSessionCloseSleep() {
return 256;
return 1000;
}
}
Loading