Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.models.CosmosBatch;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosItemIdentity;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.rx.TestSuiteBase;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class FaultInjectionTestBase extends TestSuiteBase {
public FaultInjectionTestBase(CosmosClientBuilder cosmosClientBuilder) {
super(cosmosClientBuilder);
Expand All @@ -32,6 +39,24 @@ protected CosmosDiagnostics performDocumentOperation(
OperationType operationType,
TestObject createdItem,
boolean isReadMany) {

return performDocumentOperation(
cosmosAsyncContainer,
operationType,
createdItem,
isReadMany,
true,
false);
}

protected CosmosDiagnostics performDocumentOperation(
CosmosAsyncContainer cosmosAsyncContainer,
OperationType operationType,
TestObject createdItem,
boolean isReadMany,
boolean fetchFeedRangesBeforehandForChangeFeed,
boolean isBulkOperation) {

try {
if (operationType == OperationType.Query && !isReadMany) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
Expand Down Expand Up @@ -87,24 +112,61 @@ protected CosmosDiagnostics performDocumentOperation(
}

if (operationType == OperationType.Batch) {
CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(createdItem.getId()));

batch.upsertItemOperation(createdItem);
batch.readItemOperation(createdItem.getId());
if (isBulkOperation) {

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();

CosmosItemOperation cosmosItemOperation = CosmosBulkOperations.getReadItemOperation(
createdItem.getId(),
new PartitionKey(createdItem.getId()),
TestObject.class);

cosmosItemOperations.add(cosmosItemOperation);

return cosmosAsyncContainer.executeCosmosBatch(batch).block().getDiagnostics();
Flux<CosmosItemOperation> operationsFlux = Flux.fromIterable(cosmosItemOperations);

CosmosBulkOperationResponse<Object> response = cosmosAsyncContainer.executeBulkOperations(operationsFlux).blockLast();

assertThat(response).isNotNull();

return response.getResponse().getCosmosDiagnostics();
} else {
CosmosBatch batch = CosmosBatch.createCosmosBatch(new PartitionKey(createdItem.getId()));

batch.upsertItemOperation(createdItem);
batch.readItemOperation(createdItem.getId());

return cosmosAsyncContainer.executeCosmosBatch(batch).block().getDiagnostics();
}
}
}

if (operationType == OperationType.ReadFeed) {
List<FeedRange> feedRanges = cosmosAsyncContainer.getFeedRanges().block();

if (fetchFeedRangesBeforehandForChangeFeed) {
List<FeedRange> feedRanges = cosmosAsyncContainer.getFeedRanges().block();

assertThat(feedRanges).isNotNull();
assertThat(feedRanges.size()).isGreaterThan(0);

CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(feedRanges.get(0));

FeedResponse<TestObject> firstPage = cosmosAsyncContainer
.queryChangeFeed(changeFeedRequestOptions, TestObject.class)
.byPage()
.blockLast();
return firstPage.getCosmosDiagnostics();
}

CosmosChangeFeedRequestOptions changeFeedRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(feedRanges.get(0));
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());

FeedResponse<TestObject> firstPage = cosmosAsyncContainer
.queryChangeFeed(changeFeedRequestOptions, TestObject.class)
.byPage()
.blockFirst();
.blockLast();
return firstPage.getCosmosDiagnostics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void staledException(int statusCode, int subStatusCode, boolean expectRet

RxCollectionCache rxCollectionCache = Mockito.mock(RxCollectionCache.class);
Mockito
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull()))
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull(), Mockito.isNull(), Mockito.isNull(), Mockito.isNull()))
.thenReturn(Mono.just(documentCollection));
doNothing().when(rxCollectionCache).refresh(Mockito.any(), Mockito.any(), Mockito.isNull());

Expand All @@ -51,7 +51,8 @@ public void staledException(int statusCode, int subStatusCode, boolean expectRet
null,
null,
sessionContainer,
TestUtils.mockDiagnosticsClientContext()
TestUtils.mockDiagnosticsClientContext(),
null
);

CosmosException exception = BridgeInternal.createCosmosException(statusCode);
Expand All @@ -72,7 +73,7 @@ public void suppressRetryForExternalCollectionRid() {

RxCollectionCache rxCollectionCache = Mockito.mock(RxCollectionCache.class);
Mockito
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull()))
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull(), Mockito.isNull(), Mockito.isNull(), Mockito.isNull()))
.thenReturn(Mono.just(documentCollection));
doNothing().when(rxCollectionCache).refresh(Mockito.any(), Mockito.any(), Mockito.isNull());

Expand All @@ -88,7 +89,8 @@ public void suppressRetryForExternalCollectionRid() {
null,
customHeaders,
sessionContainer,
TestUtils.mockDiagnosticsClientContext()
TestUtils.mockDiagnosticsClientContext(),
null
);

InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
Expand All @@ -109,7 +111,7 @@ public void cleanSessionToken() {

RxCollectionCache rxCollectionCache = Mockito.mock(RxCollectionCache.class);
Mockito
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull()))
.when(rxCollectionCache.resolveByNameAsync(Mockito.any(), Mockito.any(), Mockito.isNull(), Mockito.isNull(), Mockito.isNull(), Mockito.isNull()))
.thenReturn(Mono.just(documentCollection))
.thenReturn(Mono.just(documentCollectionAfterRefresh));

Expand All @@ -125,7 +127,8 @@ public void cleanSessionToken() {
null,
null,
sessionContainer,
TestUtils.mockDiagnosticsClientContext()
TestUtils.mockDiagnosticsClientContext(),
null
);

InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Remaps sub-status to 1003 for requests to child resources against non-existent container. - [PR 47604](https://github.com/Azure/azure-sdk-for-java/pull/47604)

### 4.76.0 (2025-12-09)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,16 @@ public void setRequestUri(CosmosException cosmosException, Uri requestUri) {
public Uri getRequestUri(CosmosException cosmosException) {
return cosmosException.getRequestUri();
}

@Override
public void setSubStatusCode(CosmosException cosmosException, int subStatusCode) {
Map<String, String> responseHeaders = cosmosException.getResponseHeaders();

responseHeaders.put(HttpConstants.HttpHeaders.SUB_STATUS, String.valueOf(subStatusCode));
cosmosException.setSubStatusCode(subStatusCode);
}


});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ public interface CosmosExceptionAccessor {
List<String> getFaultInjectionEvaluationResults(CosmosException cosmosException);
void setRequestUri(CosmosException cosmosException, Uri requestUri);
Uri getRequestUri(CosmosException cosmosException);
void setSubStatusCode(CosmosException cosmosException, int subStatusCode);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
private static final ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor bulkExecutionOptionsAccessor =
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor();

private static final ImplementationBridgeHelpers.CosmosExceptionHelper.CosmosExceptionAccessor cosmosExceptionAccessor =
ImplementationBridgeHelpers.CosmosExceptionHelper.getCosmosExceptionAccessor();

private static final String tempMachineId = "uuid:" + UUIDs.nonBlockingRandomUUID();
private static final AtomicInteger activeClientsCnt = new AtomicInteger(0);
private static final Map<String, Integer> clientMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -1272,7 +1275,9 @@ private <T> Flux<FeedResponse<T>> createQuery(
qryOptAccessor.getProperties(nonNullQueryOptions),
qryOptAccessor.getHeaders(nonNullQueryOptions),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

return
ObservableHelper.fluxInlineIfPossibleAsObs(
Expand Down Expand Up @@ -2609,7 +2614,8 @@ private Mono<ResourceResponse<Document>> createDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
collectionLink);
collectionLink
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -2996,7 +3002,8 @@ private Mono<ResourceResponse<Document>> upsertDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
collectionLink);;
collectionLink
);
AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Consumer<CosmosException> gwModeE2ETimeoutDiagnosticHandler
Expand Down Expand Up @@ -3138,7 +3145,8 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);
AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Consumer<CosmosException> gwModeE2ETimeoutDiagnosticHandler
Expand Down Expand Up @@ -3471,7 +3479,8 @@ private Mono<ResourceResponse<Document>> patchDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -3684,7 +3693,8 @@ private Mono<ResourceResponse<Document>> deleteDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -3876,7 +3886,8 @@ private Mono<ResourceResponse<Document>> readDocumentCore(
this.getRetryPolicyForPointOperation(
scopedDiagnosticsFactory,
nonNullRequestOptions,
Utils.getCollectionName(documentLink));
Utils.getCollectionName(documentLink)
);

AtomicReference<RxDocumentServiceRequest> requestReference = new AtomicReference<>();

Expand Down Expand Up @@ -4025,7 +4036,9 @@ public <T> Mono<FeedResponse<T>> readMany(
qryOptAccessor.getProperties(state.getQueryOptions()),
qryOptAccessor.getHeaders(state.getQueryOptions()),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

return ObservableHelper
.inlineIfPossibleAsObs(
Expand Down Expand Up @@ -4722,7 +4735,9 @@ public <T> Flux<FeedResponse<T>> queryDocumentChangeFeedFromPagedFlux(
changeFeedOptionsAccessor.getProperties(state.getChangeFeedOptions()),
changeFeedOptionsAccessor.getHeaders(state.getChangeFeedOptions()),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

return ObservableHelper
.fluxInlineIfPossibleAsObs(
Expand All @@ -4747,6 +4762,20 @@ private <T> Flux<FeedResponse<T>> queryDocumentChangeFeedFromPagedFluxInternal(

return this.getCollectionCache()
.resolveByNameAsync(null, collectionLink, null)
.onErrorMap(throwable -> {
if (throwable instanceof CosmosException) {

CosmosException ce = Utils.as(throwable, CosmosException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible to use the resolveByNameAsync which has the exception remapping? then no need for remapping here


if (HttpConstants.StatusCodes.NOTFOUND == ce.getStatusCode() && HttpConstants.SubStatusCodes.UNKNOWN == ce.getSubStatusCode()) {
cosmosExceptionAccessor.setSubStatusCode(ce, HttpConstants.SubStatusCodes.OWNER_RESOURCE_NOT_EXISTS);
}

return ce;
}

return throwable;
})
.flatMapMany(collection -> {
if (collection == null) {
throw new IllegalStateException("Collection can not be null");
Expand Down Expand Up @@ -4867,7 +4896,9 @@ public <T> Flux<FeedResponse<T>> readAllDocuments(
qryOptAccessor.getProperties(effectiveOptions),
qryOptAccessor.getHeaders(effectiveOptions),
this.sessionContainer,
diagnosticsFactory);
diagnosticsFactory,
ResourceType.Document
);

Flux<FeedResponse<T>> innerFlux = ObservableHelper.fluxInlineIfPossibleAsObs(
() -> {
Expand Down Expand Up @@ -5196,7 +5227,9 @@ public Mono<CosmosBatchResponse> executeBatchRequest(String collectionLink,
nonNullRequestOptions.getProperties(),
nonNullRequestOptions.getHeaders(),
this.sessionContainer,
scopedDiagnosticsFactory);
scopedDiagnosticsFactory,
ResourceType.Document
);
}

final DocumentClientRetryPolicy finalRetryPolicy = documentClientRetryPolicy;
Expand Down Expand Up @@ -6633,7 +6666,9 @@ public Mono<List<FeedRange>> getFeedRanges(String collectionLink, boolean forceR
new HashMap<>(),
new HashMap<>(),
this.sessionContainer,
null);
null,
ResourceType.PartitionKeyRange
);

RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
this,
Expand Down Expand Up @@ -8020,7 +8055,9 @@ private DocumentClientRetryPolicy getRetryPolicyForPointOperation(
requestOptions.getProperties(),
requestOptions.getHeaders(),
this.sessionContainer,
diagnosticsClientContext);
diagnosticsClientContext,
ResourceType.Document
);

return requestRetryPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.azure.cosmos.implementation.http.HttpTransportSerializer;
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
import com.azure.cosmos.implementation.interceptor.ITransportClientInterceptor;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
Expand Down Expand Up @@ -635,7 +633,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
CrossRegionAvailabilityContextForRxDocumentServiceRequest availabilityStrategyContextForReq =
request.requestContext.getCrossRegionAvailabilityContext();

if (availabilityStrategyContextForReq.getAvailabilityStrategyContext().isAvailabilityStrategyEnabled() && !availabilityStrategyContextForReq.getAvailabilityStrategyContext().isHedgedRequest()) {
if (availabilityStrategyContextForReq.getAvailabilityStrategyContext() != null && availabilityStrategyContextForReq.getAvailabilityStrategyContext().isAvailabilityStrategyEnabled() && !availabilityStrategyContextForReq.getAvailabilityStrategyContext().isHedgedRequest()) {

BridgeInternal.setRequestTimeline(oce, reactorNettyRequestRecord.takeTimelineSnapshot());

Expand Down
Loading
Loading