-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Adjust timeout handling in client-side operations to account for RTT variations #1793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9c44ffc
26de0bb
750e44c
acb5ff9
48173e0
a48828d
992563c
c48053d
8af26b4
e0b045b
bc80aca
9663a62
b73c667
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,15 +16,13 @@ | |
|
|
||
| package com.mongodb.reactivestreams.client; | ||
|
|
||
| import com.mongodb.ClusterFixture; | ||
| import com.mongodb.MongoClientSettings; | ||
| import com.mongodb.MongoCommandException; | ||
| import com.mongodb.MongoNamespace; | ||
| import com.mongodb.MongoOperationTimeoutException; | ||
| import com.mongodb.ReadPreference; | ||
| import com.mongodb.WriteConcern; | ||
| import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest; | ||
| import com.mongodb.client.model.CreateCollectionOptions; | ||
| import com.mongodb.client.model.changestream.FullDocument; | ||
| import com.mongodb.event.CommandFailedEvent; | ||
| import com.mongodb.event.CommandStartedEvent; | ||
|
|
@@ -43,6 +41,7 @@ | |
| import org.junit.jupiter.api.Test; | ||
| import reactor.core.publisher.Flux; | ||
| import reactor.core.publisher.Hooks; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.test.StepVerifier; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
|
|
@@ -58,12 +57,16 @@ | |
|
|
||
| import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; | ||
| import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; | ||
| import static com.mongodb.ClusterFixture.isStandalone; | ||
| import static com.mongodb.ClusterFixture.serverVersionAtLeast; | ||
| import static com.mongodb.ClusterFixture.sleep; | ||
| import static com.mongodb.assertions.Assertions.assertTrue; | ||
| import static java.util.Collections.singletonList; | ||
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertInstanceOf; | ||
| import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
| import static org.junit.jupiter.api.Assumptions.assumeFalse; | ||
| import static org.junit.jupiter.api.Assumptions.assumeTrue; | ||
|
|
||
|
|
||
|
|
@@ -104,7 +107,6 @@ protected boolean isAsync() { | |
| @Override | ||
| public void testGridFSUploadViaOpenUploadStreamTimeout() { | ||
| assumeTrue(serverVersionAtLeast(4, 4)); | ||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
|
|
||
| //given | ||
| collectionHelper.runAdminCommand("{" | ||
|
|
@@ -113,12 +115,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { | |
| + " data: {" | ||
| + " failCommands: [\"insert\"]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + (rtt + 405) | ||
| + " blockTimeMS: " + 600 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 400, TimeUnit.MILLISECONDS))) { | ||
| .timeout(600, TimeUnit.MILLISECONDS))) { | ||
| MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); | ||
| GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); | ||
|
|
||
|
|
@@ -158,7 +160,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() { | |
| @Override | ||
| public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException { | ||
| assumeTrue(serverVersionAtLeast(4, 4)); | ||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
|
|
||
| //given | ||
| CompletableFuture<Throwable> droppedErrorFuture = new CompletableFuture<>(); | ||
|
|
@@ -170,12 +171,12 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I | |
| + " data: {" | ||
| + " failCommands: [\"delete\"]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + (rtt + 405) | ||
| + " blockTimeMS: " + 405 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 400, TimeUnit.MILLISECONDS))) { | ||
| .timeout(400, TimeUnit.MILLISECONDS))) { | ||
| MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName()); | ||
| GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME); | ||
|
|
||
|
|
@@ -198,12 +199,25 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I | |
| //then | ||
| Throwable droppedError = droppedErrorFuture.get(TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS); | ||
| Throwable commandError = droppedError.getCause(); | ||
| assertInstanceOf(MongoOperationTimeoutException.class, commandError); | ||
|
|
||
| CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete"); | ||
| assertNotNull(deleteFailedEvent); | ||
|
|
||
| assertEquals(commandError, commandListener.getCommandFailedEvent("delete").getThrowable()); | ||
| CommandStartedEvent deleteStartedEvent = commandListener.getCommandStartedEvent("delete"); | ||
| assertTrue(deleteStartedEvent.getCommand().containsKey("maxTimeMS"), "Expected delete command to have maxTimeMS"); | ||
| long deleteMaxTimeMS = deleteStartedEvent | ||
| .getCommand() | ||
| .get("maxTimeMS") | ||
| .asNumber() | ||
| .longValue(); | ||
|
|
||
| assertTrue(deleteMaxTimeMS <= 420 | ||
| // some leeway for timing variations, when compression is used it is often lees then 300. | ||
| // Without it, it is more than 300. | ||
| && deleteMaxTimeMS >= 150, | ||
| "Expected maxTimeMS for delete command to be between 150s and 420ms, " + "but was: " + deleteMaxTimeMS + "ms"); | ||
| assertEquals(commandError, deleteFailedEvent.getThrowable()); | ||
|
|
||
| // When subscription is cancelled, we should not receive any more events. | ||
| testSubscriber.assertNoTerminalEvent(); | ||
| } | ||
|
|
@@ -219,9 +233,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() { | |
| assumeTrue(isDiscoverableReplicaSet()); | ||
|
|
||
| //given | ||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 500, TimeUnit.MILLISECONDS))) { | ||
| .timeout(500, TimeUnit.MILLISECONDS))) { | ||
|
|
||
| MongoNamespace namespace = generateNamespace(); | ||
| MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName()) | ||
|
|
@@ -273,9 +286,8 @@ public void testTimeoutMSAppliedToInitialAggregate() { | |
| assumeTrue(isDiscoverableReplicaSet()); | ||
|
|
||
| //given | ||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 200, TimeUnit.MILLISECONDS))) { | ||
| .timeout(200, TimeUnit.MILLISECONDS))) { | ||
|
|
||
| MongoNamespace namespace = generateNamespace(); | ||
| MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName()) | ||
|
|
@@ -290,7 +302,7 @@ public void testTimeoutMSAppliedToInitialAggregate() { | |
| + " data: {" | ||
| + " failCommands: [\"aggregate\" ]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + (rtt + 201) | ||
| + " blockTimeMS: " + 201 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
|
|
@@ -321,13 +333,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { | |
|
|
||
| //given | ||
| BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); | ||
| collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); | ||
| sleep(2000); | ||
|
|
||
|
|
||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { | ||
| .timeout(500, TimeUnit.MILLISECONDS))) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This follows the same approach used in: https://github.com/mongodb/mongo-java-driver/pull/1793/files#r2691975997 |
||
|
|
||
| MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName()) | ||
| .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); | ||
|
|
@@ -338,7 +347,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() { | |
| + " data: {" | ||
| + " failCommands: [\"getMore\", \"aggregate\"]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + (rtt + 200) | ||
| + " blockTimeMS: " + 200 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
|
|
@@ -389,12 +398,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { | |
|
|
||
| //given | ||
| BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); | ||
| collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); | ||
| sleep(2000); | ||
|
|
||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { | ||
| .timeout(500, TimeUnit.MILLISECONDS))) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still wait The longer await is purely to reduce flakiness under slower setups (e.g., occasional TLS handshake / connection establishment overhead and thus consuming more of the allocated timeout time). |
||
|
|
||
| MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName()) | ||
| .getCollection(namespace.getCollectionName()) | ||
|
|
@@ -406,7 +413,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() { | |
| + " data: {" | ||
| + " failCommands: [\"aggregate\", \"getMore\"]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + (rtt + 200) | ||
| + " blockTimeMS: " + 200 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
|
|
@@ -449,9 +456,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt | |
| assumeTrue(isDiscoverableReplicaSet()); | ||
|
|
||
| //given | ||
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 2500, TimeUnit.MILLISECONDS))) { | ||
| .timeout(2500, TimeUnit.MILLISECONDS))) { | ||
|
|
||
| MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName()) | ||
| .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); | ||
|
|
@@ -468,7 +474,78 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt | |
| List<CommandStartedEvent> commandStartedEvents = commandListener.getCommandStartedEvents(); | ||
| assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"), | ||
| commandStartedEvents); | ||
| assertOnlyOneCommandTimeoutFailure("getMore"); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no guarantee that the last This means a response can arrive right before expiration (especially when RTT is near 0), and the driver can throw a timeout on the next operation before sending the next Example from logs: This was one of the cases for flakiness in this test. |
||
|
|
||
| } | ||
| } | ||
|
|
||
| @DisplayName("9. End Session. The timeout specified via the MongoClient timeoutMS option") | ||
| @Test | ||
| @Override | ||
| public void test9EndSessionClientTimeout() { | ||
| assumeTrue(serverVersionAtLeast(4, 4)); | ||
| assumeFalse(isStandalone()); | ||
|
|
||
| collectionHelper.runAdminCommand("{" | ||
| + " configureFailPoint: \"failCommand\"," | ||
| + " mode: { times: 1 }," | ||
| + " data: {" | ||
| + " failCommands: [\"abortTransaction\"]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + 400 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
| try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder().retryWrites(false) | ||
| .timeout(300, TimeUnit.MILLISECONDS))) { | ||
| MongoCollection<Document> collection = mongoClient.getDatabase(namespace.getDatabaseName()) | ||
| .getCollection(namespace.getCollectionName()); | ||
|
|
||
| try (ClientSession session = Mono.from(mongoClient.startSession()).block()) { | ||
| session.startTransaction(); | ||
| Mono.from(collection.insertOne(session, new Document("x", 1))).block(); | ||
| } | ||
|
|
||
| sleep(postSessionCloseSleep()); | ||
| CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); | ||
| long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); | ||
| assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); | ||
| assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); | ||
|
Comment on lines
+508
to
+512
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Context
Why this moved/split
The change
|
||
| } | ||
| } | ||
|
|
||
| @Test | ||
| @DisplayName("9. End Session. The timeout specified via the ClientSession defaultTimeoutMS option") | ||
| @Override | ||
| public void test9EndSessionSessionTimeout() { | ||
| assumeTrue(serverVersionAtLeast(4, 4)); | ||
| assumeFalse(isStandalone()); | ||
|
|
||
| collectionHelper.runAdminCommand("{" | ||
| + " configureFailPoint: \"failCommand\"," | ||
| + " mode: { times: 1 }," | ||
| + " data: {" | ||
| + " failCommands: [\"abortTransaction\"]," | ||
| + " blockConnection: true," | ||
| + " blockTimeMS: " + 400 | ||
| + " }" | ||
| + "}"); | ||
|
|
||
| try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder())) { | ||
| MongoCollection<Document> collection = mongoClient.getDatabase(namespace.getDatabaseName()) | ||
| .getCollection(namespace.getCollectionName()); | ||
|
|
||
| try (ClientSession session = Mono.from(mongoClient.startSession(com.mongodb.ClientSessionOptions.builder() | ||
| .defaultTimeout(300, TimeUnit.MILLISECONDS).build())).block()) { | ||
|
|
||
| session.startTransaction(); | ||
| Mono.from(collection.insertOne(session, new Document("x", 1))).block(); | ||
| } | ||
|
|
||
| sleep(postSessionCloseSleep()); | ||
| CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); | ||
| long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); | ||
| assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); | ||
| assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -512,6 +589,6 @@ public void tearDown() throws InterruptedException { | |
|
|
||
| @Override | ||
| protected int postSessionCloseSleep() { | ||
| return 256; | ||
| return 1000; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this isn’t required for the main change in this PR, but it came up while debugging timeouts.
The current generic error message wasn’t actionable: in the reactive stack trace it points only to a lambda of
deferred Monoand doesn’t help identify where thedeferred Monowas created. This change adds more context to the timeout failure message.