From bbc66a881d49fd635577bba4fbb9d167b7dba164 Mon Sep 17 00:00:00 2001 From: bambriz Date: Tue, 24 Feb 2026 14:42:13 -0800 Subject: [PATCH 1/2] Add Otel Integrations Adding new decorators and tests to allow cosmos specific attributes to be added into otel spans. --- .../azure-cosmos/azure/cosmos/_constants.py | 52 +++ .../azure/cosmos/_cosmos_span_attributes.py | 394 ++++++++++++++++++ .../azure/cosmos/aio/_container.py | 21 + .../azure/cosmos/aio/_cosmos_client.py | 5 + .../aio/_cosmos_span_attributes_async.py | 89 ++++ .../azure/cosmos/aio/_database.py | 17 + .../azure-cosmos/azure/cosmos/aio/_user.py | 10 + .../azure-cosmos/azure/cosmos/container.py | 21 + .../azure/cosmos/cosmos_client.py | 4 +- .../azure-cosmos/azure/cosmos/database.py | 18 + sdk/cosmos/azure-cosmos/azure/cosmos/user.py | 10 + .../samples/tracing_open_telemetry.py | 147 +++++-- .../tests/test_query_sanitization.py | 294 +++++++++++++ .../tests/test_query_sanitization_async.py | 118 ++++++ .../tests/test_telemetry_integration.py | 195 +++++++++ .../tests/test_telemetry_integration_async.py | 196 +++++++++ 16 files changed, 1564 insertions(+), 27 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_span_attributes.py create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_span_attributes_async.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_query_sanitization.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_query_sanitization_async.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_telemetry_integration.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_telemetry_integration_async.py diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py index 9902cd1d31ab..f0a983f16dda 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py @@ -112,6 +112,58 @@ class Kwargs: AVAILABILITY_STRATEGY_CONFIG: Literal["availabilityStrategyConfig"] = "availabilityStrategyConfig" """Availability strategy config. Used either at client level or request level""" + class OpenTelemetryAttributes: + """OpenTelemetry Semantic Convention attributes for Cosmos DB. + Based on OpenTelemetry semantic conventions. + Reference: https://github.com/devopsleague/opentelemetry-semantic-conventions/blob/main/docs/database/cosmosdb.md + """ + + # Core database attributes (required) + DB_SYSTEM: Literal["db.system"] = "db.system" + DB_NAMESPACE: Literal["db.namespace"] = "db.namespace" # Database name + DB_OPERATION_NAME: Literal["db.operation.name"] = "db.operation.name" # Operation name + DB_COLLECTION_NAME: Literal["db.collection.name"] = "db.collection.name" # Container name + + # Query attributes + DB_QUERY_TEXT: Literal["db.query.text"] = "db.query.text" + + # Cosmos DB specific attributes + DB_COSMOSDB_CLIENT_ID: Literal["db.cosmosdb.client_id"] = "db.cosmosdb.client_id" + DB_COSMOSDB_CONNECTION_MODE: Literal["db.cosmosdb.connection_mode"] = "db.cosmosdb.connection_mode" + DB_COSMOSDB_OPERATION_TYPE: Literal["db.cosmosdb.operation_type"] = "db.cosmosdb.operation_type" + DB_COSMOSDB_REQUEST_CHARGE: Literal["db.cosmosdb.request_charge"] = "db.cosmosdb.request_charge" + DB_COSMOSDB_REQUEST_CONTENT_LENGTH: Literal["db.cosmosdb.request_content_length"] = "db.cosmosdb.request_content_length" + DB_COSMOSDB_RESPONSE_CONTENT_LENGTH: Literal["db.cosmosdb.response_content_length"] = "db.cosmosdb.response_content_length" + DB_COSMOSDB_STATUS_CODE: Literal["db.cosmosdb.status_code"] = "db.cosmosdb.status_code" + DB_COSMOSDB_SUB_STATUS_CODE: Literal["db.cosmosdb.sub_status_code"] = "db.cosmosdb.sub_status_code" + DB_COSMOSDB_REQUEST_DIAGNOSTICS_ID: Literal["db.cosmosdb.request_diagnostics_id"] = "db.cosmosdb.request_diagnostics_id" + DB_COSMOSDB_ITEM_COUNT: Literal["db.cosmosdb.item_count"] = "db.cosmosdb.item_count" + DB_COSMOSDB_REGIONS_CONTACTED: Literal["db.cosmosdb.regions_contacted"] = "db.cosmosdb.regions_contacted" + + class OpenTelemetryOperationTypes: + """db.cosmosdb.operation_type values per OpenTelemetry semantic conventions. + + These values are used in the db.cosmosdb.operation_type attribute to indicate + the type of operation being performed on Cosmos DB. + + Reference: https://github.com/devopsleague/opentelemetry-semantic-conventions/blob/main/docs/database/cosmosdb.md + """ + BATCH: Literal["batch"] = "batch" + CREATE: Literal["create"] = "create" + DELETE: Literal["delete"] = "delete" + EXECUTE: Literal["execute"] = "execute" + EXECUTE_JAVASCRIPT: Literal["execute_javascript"] = "execute_javascript" + HEAD: Literal["head"] = "head" + HEAD_FEED: Literal["head_feed"] = "head_feed" + INVALID: Literal["invalid"] = "invalid" + PATCH: Literal["patch"] = "patch" + QUERY: Literal["query"] = "query" + QUERY_PLAN: Literal["query_plan"] = "query_plan" + READ: Literal["read"] = "read" + READ_FEED: Literal["read_feed"] = "read_feed" + REPLACE: Literal["replace"] = "replace" + UPSERT: Literal["upsert"] = "upsert" + class UserAgentFeatureFlags(IntEnum): """ User agent feature flags. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_span_attributes.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_span_attributes.py new file mode 100644 index 000000000000..c67cb95bddbb --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_span_attributes.py @@ -0,0 +1,394 @@ +# The MIT License (MIT) +# Copyright (c) 2024 Microsoft Corporation + +"""Decorator to add Cosmos DB semantic convention attributes to OpenTelemetry spans.""" + +import functools +import re +from typing import Any, Callable, Mapping, Optional, TypeVar, cast +from azure.core.settings import settings +from azure.core.tracing import AbstractSpan +from ._cosmos_responses import CosmosDict, CosmosList +from ._constants import _Constants +from .http_constants import HttpHeaders + +__all__ = ["cosmos_span_attributes", "sanitize_query"] + +F = TypeVar("F", bound=Callable[..., Any]) + + +def sanitize_query(query: str, parameters: Optional[list]) -> str: + """ + Sanitize query text according to OpenTelemetry semantic conventions. + + Per the spec: https://github.com/devopsleague/opentelemetry-semantic-conventions/blob/main/docs/database/database-spans.md#sanitization-of-dbquerytext + + - If the query uses parameterized placeholders (e.g., @param), it's safe to log as-is + - Otherwise, replace all literal values with '?' placeholder + + :param query: The SQL query text + :type query: str + :param parameters: Optional list of query parameters + :type parameters: Optional[list] + :return: Sanitized query text + :rtype: str + """ + if not query: + return query + + # If query uses parameters, it's already safe (values are in parameters, not query text) + if parameters: + return query + + # Sanitize literal values in non-parameterized queries + # Replace string literals (single quotes) + sanitized = re.sub(r"'[^']*'", "'?'", query) + + # Replace numeric literals (integers and floats) + # Match numbers not preceded by @ (to avoid matching @param names) + sanitized = re.sub(r'(? Any: + """ + Decorator that adds Cosmos DB semantic convention attributes to the current OpenTelemetry span. + + The operation name (db.operation.name) is automatically derived from the function name. + The operation_type (db.cosmosdb.operation_type) should be provided from the spec table values. + + This decorator should be used in conjunction with @distributed_trace to enrich + the trace span with Cosmos-specific attributes following the OpenTelemetry semantic conventions. + + :param __func: The function to decorate + :type __func: Optional[Callable] + :keyword name_of_span: Not used, for signature compatibility + :paramtype name_of_span: Optional[str] + :keyword kind: Not used, for signature compatibility + :paramtype kind: Optional[Any] + :keyword tracing_attributes: Not used, for signature compatibility + :paramtype tracing_attributes: Optional[Mapping[str, Any]] + :keyword operation_type: The Cosmos DB operation type from the spec table (e.g., "create", "read", "query") + :paramtype operation_type: Optional[str] + :return: The decorated function + :rtype: Any + + Example usage:: + + @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) + def create_item(self, body, **kwargs): + # ... implementation + return result + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + def wrapper(*args: Any, **func_kwargs: Any) -> Any: + # Auto-derive method name from the function name + method_name = func.__name__ + + # Extract query and parameters BEFORE the function runs (for telemetry) + query_for_telemetry = func_kwargs.get('query') + parameters_for_telemetry = func_kwargs.get('parameters') + + # Execute the function (the parent @distributed_trace will create the span) + try: + result = func(*args, **func_kwargs) + + # Add Cosmos-specific attributes (only if span exists) + # Create a copy of kwargs with the query/parameters we saved + telemetry_kwargs = dict(func_kwargs) + if query_for_telemetry is not None: + telemetry_kwargs['query'] = query_for_telemetry + if parameters_for_telemetry is not None: + telemetry_kwargs['parameters'] = parameters_for_telemetry + + _add_cosmos_telemetry(method_name, operation_type, args, telemetry_kwargs, result) + + return result + except Exception as error: + # Add error attributes if we have a span + _add_cosmos_error_telemetry(error) + raise + + return cast(F, wrapper) + + return decorator if __func is None else decorator(__func) + + +def _add_cosmos_error_telemetry(error: Exception) -> None: + """ + Add Cosmos DB-specific error telemetry to the current span. + + :param error: The exception that occurred + :type error: Exception + """ + try: + # Get the tracing implementation + span_impl_type = settings.tracing_implementation() + + if span_impl_type is None: + return + + # Get the current span + try: + raw_span = span_impl_type.get_current_span() + except Exception: + return + + if raw_span is None: + return + + # Skip if NonRecordingSpan + if hasattr(raw_span, 'is_recording'): + if not raw_span.is_recording(): + return + + # Wrap the span + span: AbstractSpan = span_impl_type(span=raw_span) + + # Always add db.system for Cosmos DB + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_SYSTEM, "cosmosdb") + + # Add error attributes + if hasattr(error, "status_code"): + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_STATUS_CODE, error.status_code) + + if hasattr(error, "sub_status_code"): + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_SUB_STATUS_CODE, error.sub_status_code) + + if hasattr(error, "headers") and error.headers: + _extract_headers(span, error.headers) + + except Exception: + pass + + +def _add_cosmos_telemetry( + method_name: Optional[str], + operation_type: Optional[str], + args: tuple, + kwargs: dict, + result: Any +) -> None: + """ + Add Cosmos DB-specific telemetry attributes to the current span. + + :param method_name: The actual method name from the function (e.g., "create_item") + :type method_name: Optional[str] + :param operation_type: The Cosmos DB operation type from spec (e.g., "create") + :type operation_type: Optional[str] + :param args: Function arguments + :type args: tuple + :param kwargs: Function keyword arguments + :type kwargs: dict + :param result: The function result + :type result: Any + """ + try: + # Get the tracing implementation from azure-core settings + span_impl_type = settings.tracing_implementation() + + if span_impl_type is None: + return + + # Get the current span + try: + raw_span = span_impl_type.get_current_span() + except Exception: + return + + # Skip if no span + if raw_span is None: + return + + # Skip if NonRecordingSpan + if hasattr(raw_span, 'is_recording'): + is_recording = raw_span.is_recording() + if not is_recording: + return + + # Wrap the raw span in the AbstractSpan implementation + span: AbstractSpan = span_impl_type(span=raw_span) + + # Add Cosmos DB attributes + _add_cosmos_attributes(span, method_name, operation_type, args, kwargs) + _add_response_attributes(span, result) + + except Exception: + # Silently fail - telemetry should never break functionality + pass + + +def _add_cosmos_attributes( + span: AbstractSpan, + method_name: Optional[str], + operation_type: Optional[str], + args: tuple, + kwargs: dict +) -> None: + """Add Cosmos DB client-level attributes to the span per OpenTelemetry semantic conventions.""" + try: + # Required: db.system + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_SYSTEM, "cosmosdb") + + # db.operation.name: The actual method name (e.g., "create_item") + # Per spec: "capture the value as provided by the application" + if method_name: + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_OPERATION_NAME, method_name) + + # db.cosmosdb.operation_type: Standardized value from spec table (e.g., "create") + if operation_type: + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_OPERATION_TYPE, operation_type) + + # Extract container and database info from self (first arg) + if args and hasattr(args[0], "id"): + instance = args[0] + + # Required: db.collection.name (container name) + container_name = instance.id + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COLLECTION_NAME, container_name) + + # Required: db.namespace (database name) + # Extract database name from container_link: /dbs/{db}/colls/{container} + if hasattr(instance, "container_link"): + container_link = instance.container_link + if "dbs/" in container_link and "/colls/" in container_link: + parts = container_link.split("/") + try: + db_index = parts.index("dbs") + if db_index + 1 < len(parts): + db_name = parts[db_index + 1] + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_NAMESPACE, db_name) + except (ValueError, IndexError): + pass + + # Extract connection mode and client ID from client_connection + if hasattr(instance, "client_connection"): + client_conn = instance.client_connection + + # db.cosmosdb.connection_mode + if hasattr(client_conn, "connection_policy"): + policy = client_conn.connection_policy + if hasattr(policy, "ConnectionMode"): + # 0 = Gateway, 1 = Direct + mode = "direct" if policy.ConnectionMode == 1 else "gateway" + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_CONNECTION_MODE, mode) + + # db.cosmosdb.client_id + if hasattr(client_conn, "client_id"): + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_CLIENT_ID, str(client_conn.client_id)) + + # db.query.text for query operations + query_text = None + parameters = None + + if "query" in kwargs: + query = kwargs["query"] + if isinstance(query, str): + query_text = query + elif isinstance(query, dict) and "query" in query: + query_text = query["query"] + parameters = query.get("parameters") + + # Also check for parameters passed separately + if "parameters" in kwargs: + parameters = kwargs["parameters"] + + if query_text: + # Sanitize query text per semantic conventions + sanitized_query = sanitize_query(query_text, parameters) + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT, sanitized_query) + + # Note: Query parameter VALUES are opt-in per semantic conventions + # Since we don't have a standard configuration mechanism for this, + # we do NOT log parameter values to avoid exposing sensitive data + + + except Exception: + pass + + +def _add_response_attributes(span: AbstractSpan, result: Any) -> None: + """Add Cosmos DB attributes from the response.""" + try: + if result is None: + return + + headers = None + + # Handle CosmosDict responses (single item operations) + if isinstance(result, CosmosDict): + headers = result.get_response_headers() + # Handle CosmosList responses (query operations) + elif isinstance(result, CosmosList): + headers = result.get_response_headers() + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_ITEM_COUNT, len(result)) + # Handle other types with get_response_headers method + elif hasattr(result, "get_response_headers"): + headers = result.get_response_headers() + + if headers: + _extract_headers(span, headers) + + except Exception: + pass + + +def _extract_headers(span: AbstractSpan, headers: dict) -> None: + """Extract Cosmos DB client-level telemetry from response headers per OpenTelemetry semantic conventions.""" + try: + # db.cosmosdb.request_charge - Request Units consumed + if HttpHeaders.RequestCharge in headers: + charge = float(headers[HttpHeaders.RequestCharge]) + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_REQUEST_CHARGE, charge) # type: ignore[arg-type] + + # db.cosmosdb.request_diagnostics_id - Request correlation ID + if HttpHeaders.ActivityId in headers: + activity_id = headers[HttpHeaders.ActivityId] + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_REQUEST_DIAGNOSTICS_ID, activity_id) + + # db.cosmosdb.sub_status_code - Cosmos-specific error details + if HttpHeaders.SubStatus in headers: + sub_status = int(headers[HttpHeaders.SubStatus]) + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_SUB_STATUS_CODE, sub_status) + + # db.cosmosdb.item_count - Number of items in response + if HttpHeaders.ItemCount in headers: + item_count = int(headers[HttpHeaders.ItemCount]) + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_ITEM_COUNT, item_count) + + # db.cosmosdb.request_content_length - Size of request body + if HttpHeaders.ContentLength in headers: + try: + request_length = int(headers[HttpHeaders.ContentLength]) + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_REQUEST_CONTENT_LENGTH, request_length) + except (ValueError, TypeError): + pass + + # db.cosmosdb.regions_contacted - Regions contacted for hedging/multi-region scenarios + for key in ["x-ms-regions-contacted", "x-ms-cosmos-regions-contacted"]: + if key in headers: + regions_header = headers[key] + if isinstance(regions_header, str): + span.add_attribute(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_REGIONS_CONTACTED, regions_header) + break + + except Exception: + pass diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 2ce07c61c9c9..961b100aa0e7 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -43,6 +43,7 @@ from .._change_feed.feed_range_internal import FeedRangeInternalEpk from .._cosmos_responses import CosmosDict, CosmosList +from ._cosmos_span_attributes_async import cosmos_span_attributes_async from .._constants import _Constants as Constants, TimeoutScope from .._routing.routing_range import Range from .._session_token_helpers import get_latest_session_token @@ -166,6 +167,7 @@ async def _get_epk_range_for_partition_key( return partition_key._get_epk_range_for_partition_key(partition_key_value) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def read( self, *, @@ -214,6 +216,7 @@ async def read( return container @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) async def create_item( self, body: dict[str, Any], @@ -312,6 +315,7 @@ async def create_item( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def read_item( self, item: Union[str, Mapping[str, Any]], @@ -392,6 +396,7 @@ async def read_item( return await self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def read_all_items( self, *, @@ -458,6 +463,7 @@ def read_all_items( return items @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.BATCH) async def read_items( self, items: Sequence[Tuple[str, PartitionKeyType]], @@ -779,6 +785,7 @@ def query_items( ... @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_items( self, *args: Any, @@ -1091,6 +1098,7 @@ def query_items_change_feed( ... @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_items_change_feed( # pylint: disable=unused-argument self, **kwargs: Any @@ -1177,6 +1185,7 @@ def query_items_change_feed( # pylint: disable=unused-argument return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.UPSERT) async def upsert_item( self, body: dict[str, Any], @@ -1312,6 +1321,7 @@ async def semantic_rerank( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) async def replace_item( self, item: Union[str, Mapping[str, Any]], @@ -1404,6 +1414,7 @@ async def replace_item( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.PATCH) async def patch_item( self, item: Union[str, dict[str, Any]], @@ -1503,6 +1514,7 @@ async def patch_item( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_item( self, item: Union[str, Mapping[str, Any]], @@ -1588,6 +1600,7 @@ async def delete_item( await self.client_connection.DeleteItem(document_link=document_link, options=request_options, **kwargs) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def get_throughput( self, *, @@ -1622,6 +1635,7 @@ async def get_throughput( return _deserialize_throughput(throughput=throughput_properties) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) async def replace_throughput( self, throughput: Union[int, ThroughputProperties], @@ -1661,6 +1675,7 @@ async def replace_throughput( return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data) @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def list_conflicts( self, *, @@ -1690,6 +1705,7 @@ def list_conflicts( return result @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_conflicts( self, query: str, @@ -1736,6 +1752,7 @@ def query_conflicts( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def get_conflict( self, conflict: Union[str, Mapping[str, Any]], @@ -1767,6 +1784,7 @@ async def get_conflict( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_conflict( self, conflict: Union[str, Mapping[str, Any]], @@ -1798,6 +1816,7 @@ async def delete_conflict( ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_all_items_by_partition_key( self, partition_key: PartitionKeyType, @@ -1860,6 +1879,7 @@ async def delete_all_items_by_partition_key( options=request_options, **kwargs) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.BATCH) async def execute_item_batch( self, batch_operations: Sequence[Union[Tuple[str, Tuple[Any, ...]], Tuple[str, Tuple[Any, ...], dict[str, Any]]]], @@ -1943,6 +1963,7 @@ async def execute_item_batch( collection_link=self.container_link, batch_operations=batch_operations, options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read_feed_ranges( self, *, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py index 86b6c33ccf2b..08fef6acd735 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py @@ -34,6 +34,7 @@ from azure.cosmos.offer import ThroughputProperties from ._cosmos_client_connection_async import CosmosClientConnection, CredentialDict +from ._cosmos_span_attributes_async import cosmos_span_attributes_async from ._database import DatabaseProxy, _get_database_link from ._retry_utility_async import _ConnectionRetryPolicy from .._base import build_options as _build_options, _set_throughput_options @@ -344,6 +345,7 @@ async def create_database( ... @distributed_trace_async + @cosmos_span_attributes_async() async def create_database( # pylint:disable=docstring-should-be-keyword self, *args: Any, @@ -486,6 +488,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin ... @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) async def create_database_if_not_exists( # pylint:disable=docstring-should-be-keyword self, *args: Any, @@ -663,6 +666,7 @@ def query_databases( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_database( self, database: Union[str, DatabaseProxy, dict[str, Any]], @@ -730,3 +734,4 @@ async def _get_database_account( if response_hook: response_hook(self.client_connection.last_response_headers) return result + diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_span_attributes_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_span_attributes_async.py new file mode 100644 index 000000000000..4798eda1069d --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_span_attributes_async.py @@ -0,0 +1,89 @@ +# The MIT License (MIT) +# Copyright (c) 2024 Microsoft Corporation + +"""Async decorator to add Cosmos DB semantic convention attributes to OpenTelemetry spans.""" + +from .._cosmos_span_attributes import _add_cosmos_telemetry, _add_cosmos_error_telemetry +import functools +from typing import Any, Callable, Mapping, Optional, TypeVar, cast + +__all__ = ["cosmos_span_attributes_async"] + +F = TypeVar("F", bound=Callable[..., Any]) + + +def cosmos_span_attributes_async( + __func: Optional[Callable] = None, + *, + name_of_span: Optional[str] = None, + kind: Optional[Any] = None, + tracing_attributes: Optional[Mapping[str, Any]] = None, + operation_type: Optional[str] = None, + **kwargs: Any, +) -> Any: + """ + Async decorator that adds Cosmos DB semantic convention attributes to the current OpenTelemetry span. + + The operation name (db.operation.name) is automatically derived from the function name. + The operation_type (db.cosmosdb.operation_type) should be provided from the spec table values. + + This decorator should be used in conjunction with @distributed_trace_async to enrich + the trace span with Cosmos-specific attributes following the OpenTelemetry semantic conventions. + + :param __func: The async function to decorate + :type __func: Optional[Callable] + :keyword operation_type: The Cosmos DB operation type from the spec table (e.g., "create", "read") + :paramtype operation_type: Optional[str] + :return: The decorated async function + :rtype: Any + + Example usage:: + + @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) + async def create_item(self, body, **kwargs): + # ... implementation + return result + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + async def async_wrapper(*args: Any, **func_kwargs: Any) -> Any: + # Auto-derive method name from the function name + method_name = func.__name__ + + # Extract query and parameters BEFORE the function runs (for telemetry) + query_for_telemetry = func_kwargs.get('query') + parameters_for_telemetry = func_kwargs.get('parameters') + + # Handle dict-style query with embedded parameters + if isinstance(query_for_telemetry, dict): + if 'query' in query_for_telemetry: + actual_query = query_for_telemetry['query'] + if parameters_for_telemetry is None: + parameters_for_telemetry = query_for_telemetry.get('parameters') + query_for_telemetry = actual_query + + # Execute the async function (the parent @distributed_trace_async will create the span) + try: + result = await func(*args, **func_kwargs) + + # Add Cosmos-specific attributes (only if span exists) + # Create a copy of kwargs with the query/parameters we saved + telemetry_kwargs = dict(func_kwargs) + if query_for_telemetry is not None: + telemetry_kwargs['query'] = query_for_telemetry + if parameters_for_telemetry is not None: + telemetry_kwargs['parameters'] = parameters_for_telemetry + + _add_cosmos_telemetry(method_name, operation_type, args, telemetry_kwargs, result) + + return result + except Exception as error: + # Add error attributes if we have a span + _add_cosmos_error_telemetry(error) + raise + + return cast(F, async_wrapper) + + return decorator if __func is None else decorator(__func) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py index 37c13438ae75..233beb0d8e4d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py @@ -30,8 +30,10 @@ from azure.core.tracing.decorator import distributed_trace from ._cosmos_client_connection_async import CosmosClientConnection +from ._cosmos_span_attributes_async import cosmos_span_attributes_async from .._base import build_options as _build_options, _set_throughput_options, _deserialize_throughput, \ _replace_throughput +from .._constants import _Constants as Constants from ._container import ContainerProxy from ..offer import ThroughputProperties from ..http_constants import StatusCodes @@ -128,6 +130,7 @@ async def _get_properties(self) -> dict[str, Any]: return self._properties @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def read( self, *, @@ -316,6 +319,7 @@ async def create_container( # pylint: disable=too-many-statements ... @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) async def create_container( # pylint:disable=docstring-should-be-keyword, too-many-statements self, *args: Any, @@ -573,6 +577,7 @@ async def create_container_if_not_exists( ... @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) async def create_container_if_not_exists( # pylint:disable=docstring-should-be-keyword self, *args: Any, @@ -711,6 +716,7 @@ def get_container_client(self, container: Union[str, ContainerProxy, dict[str, A return ContainerProxy(self.client_connection, self.database_link, id_value) @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def list_containers( self, *, @@ -758,6 +764,7 @@ def list_containers( return result @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_containers( self, query: str, @@ -927,6 +934,7 @@ async def replace_container( # pylint:disable=docstring-missing-param ... @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) async def replace_container( # pylint:disable=docstring-should-be-keyword self, *args: Any, @@ -1046,6 +1054,7 @@ async def replace_container( # pylint:disable=docstring-should-be-keyword properties=container_properties), container_properties @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_container( self, container: Union[str, ContainerProxy, Mapping[str, Any]], @@ -1091,6 +1100,7 @@ async def delete_container( await self.client_connection.DeleteContainer(collection_link, options=request_options, **kwargs) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) async def create_user( self, body: dict[str, Any], @@ -1149,6 +1159,7 @@ def get_user_client( return UserProxy(client_connection=self.client_connection, id=id_value, database_link=self.database_link) @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def list_users( self, *, @@ -1176,6 +1187,7 @@ def list_users( return result @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_users( self, query: str, @@ -1213,6 +1225,7 @@ def query_users( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.UPSERT) async def upsert_user( self, body: dict[str, Any], @@ -1272,6 +1285,7 @@ async def replace_user( ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_user( self, user: Union[str, UserProxy, Mapping[str, Any]], @@ -1295,6 +1309,7 @@ async def delete_user( ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def get_throughput( self, *, @@ -1330,6 +1345,7 @@ async def get_throughput( return _deserialize_throughput(throughput=throughput_properties) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) async def replace_throughput( self, throughput: Union[int, ThroughputProperties], @@ -1367,3 +1383,4 @@ async def replace_throughput( offer=throughput_properties[0], **kwargs) return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data) + diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_user.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_user.py index 7014b0a090e9..d650531643c6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_user.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_user.py @@ -33,6 +33,8 @@ from ._cosmos_client_connection_async import CosmosClientConnection from .._base import build_options +from .._constants import _Constants as Constants +from ._cosmos_span_attributes_async import cosmos_span_attributes_async from ..permission import Permission # pylint: disable=docstring-keyword-should-match-keyword-only @@ -77,6 +79,7 @@ async def _get_properties( return self._properties @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def read( self, **kwargs: Any @@ -99,6 +102,7 @@ async def read( return self._properties @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def list_permissions( self, *, @@ -126,6 +130,7 @@ def list_permissions( return result @distributed_trace + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_permissions( self, query: str, @@ -163,6 +168,7 @@ def query_permissions( return result @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.READ) async def get_permission( self, permission: Union[str, Mapping[str, Any], Permission], @@ -193,6 +199,7 @@ async def get_permission( ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) async def create_permission(self, body: dict[str, Any], **kwargs: Any) -> Permission: """Create a permission for the user. @@ -221,6 +228,7 @@ async def create_permission(self, body: dict[str, Any], **kwargs: Any) -> Permis ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.UPSERT) async def upsert_permission(self, body: dict[str, Any], **kwargs: Any) -> Permission: """Insert or update the specified permission. @@ -250,6 +258,7 @@ async def upsert_permission(self, body: dict[str, Any], **kwargs: Any) -> Permis ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) async def replace_permission( self, permission: Union[str, Mapping[str, Any], Permission], @@ -287,6 +296,7 @@ async def replace_permission( ) @distributed_trace_async + @cosmos_span_attributes_async(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) async def delete_permission( self, permission: Union[str, Mapping[str, Any], Permission], diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 1bb4e95caa8a..62c0a7fceae2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -40,6 +40,7 @@ from ._constants import _Constants as Constants, TimeoutScope from ._cosmos_client_connection import CosmosClientConnection from ._cosmos_responses import CosmosDict, CosmosList +from ._cosmos_span_attributes import cosmos_span_attributes from ._routing.routing_range import Range from ._session_token_helpers import get_latest_session_token from .exceptions import CosmosHttpResponseError @@ -156,6 +157,7 @@ def __get_client_container_caches(self) -> dict[str, dict[str, Any]]: return self.client_connection._container_properties_cache @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read( # pylint:disable=docstring-missing-param self, populate_query_metrics: Optional[bool] = None, @@ -213,6 +215,7 @@ def read( # pylint:disable=docstring-missing-param return container @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read_item( # pylint:disable=docstring-missing-param self, item: Union[str, Mapping[str, Any]], @@ -300,6 +303,7 @@ def read_item( # pylint:disable=docstring-missing-param return self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read_items( self, items: Sequence[Tuple[str, PartitionKeyType]], @@ -377,6 +381,7 @@ def read_items( @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def read_all_items( # pylint:disable=docstring-missing-param self, max_item_count: Optional[int] = None, @@ -627,6 +632,7 @@ def query_items_change_feed( ... @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_items_change_feed( self, *args: Any, @@ -893,6 +899,7 @@ def query_items( ... @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_items( # pylint:disable=docstring-missing-param self, *args: Any, @@ -1087,6 +1094,7 @@ def semantic_rerank( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) def replace_item( # pylint:disable=docstring-missing-param self, item: Union[str, Mapping[str, Any]], @@ -1191,6 +1199,7 @@ def replace_item( # pylint:disable=docstring-missing-param return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.UPSERT) def upsert_item( # pylint:disable=docstring-missing-param self, body: dict[str, Any], @@ -1291,6 +1300,7 @@ def upsert_item( # pylint:disable=docstring-missing-param return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) def create_item( # pylint:disable=docstring-missing-param self, body: dict[str, Any], @@ -1398,6 +1408,7 @@ def create_item( # pylint:disable=docstring-missing-param return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.PATCH) def patch_item( self, item: Union[str, dict[str, Any]], @@ -1502,6 +1513,7 @@ def patch_item( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.BATCH) def execute_item_batch( self, batch_operations: Sequence[Union[Tuple[str, Tuple[Any, ...]], Tuple[str, Tuple[Any, ...], dict[str, Any]]]], @@ -1588,6 +1600,7 @@ def execute_item_batch( collection_link=self.container_link, batch_operations=batch_operations, options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) def delete_item( # pylint:disable=docstring-missing-param self, item: Union[Mapping[str, Any], str], @@ -1681,6 +1694,7 @@ def delete_item( # pylint:disable=docstring-missing-param self.client_connection.DeleteItem(document_link=document_link, options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read_offer(self, **kwargs: Any) -> Offer: """Get the ThroughputProperties object for this container. If no ThroughputProperties already exist for the container, an exception is raised. @@ -1730,6 +1744,7 @@ def get_throughput( return _deserialize_throughput(throughput=throughput_properties) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) def replace_throughput( self, throughput: Union[int, ThroughputProperties], @@ -1767,6 +1782,7 @@ def replace_throughput( return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def list_conflicts( self, max_item_count: Optional[int] = None, @@ -1796,6 +1812,7 @@ def list_conflicts( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_conflicts( self, query: str, @@ -1846,6 +1863,7 @@ def query_conflicts( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def get_conflict( self, conflict: Union[str, Mapping[str, Any]], @@ -1876,6 +1894,7 @@ def get_conflict( ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) def delete_conflict( self, conflict: Union[str, Mapping[str, Any]], @@ -1908,6 +1927,7 @@ def delete_conflict( ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) def delete_all_items_by_partition_key( self, partition_key: PartitionKeyType, @@ -1974,6 +1994,7 @@ def delete_all_items_by_partition_key( collection_link=self.container_link, options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def read_feed_ranges( self, *, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 879675522a6d..b8c7fe860e4b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -1,4 +1,4 @@ -# The MIT License (MIT) +# The MIT License (MIT) # Copyright (c) 2014 Microsoft Corporation # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -35,6 +35,7 @@ from ._constants import _Constants as Constants from ._cosmos_client_connection import CosmosClientConnection, CredentialDict from ._cosmos_responses import CosmosDict +from ._cosmos_span_attributes import cosmos_span_attributes from ._retry_utility import ConnectionRetryPolicy from .database import DatabaseProxy, _get_database_link from .documents import ConnectionPolicy, DatabaseAccount @@ -359,6 +360,7 @@ def create_database( # pylint:disable=docstring-missing-param ... @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) def create_database( # pylint:disable=docstring-missing-param, docstring-should-be-keyword self, *args: Any, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index 22ae16a04457..5fd9ed8b8614 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -31,6 +31,8 @@ from ._cosmos_client_connection import CosmosClientConnection from ._base import build_options, _set_throughput_options, _deserialize_throughput, _replace_throughput +from ._constants import _Constants as Constants +from ._cosmos_span_attributes import cosmos_span_attributes from .container import ContainerProxy from .offer import Offer, ThroughputProperties from .http_constants import StatusCodes as _StatusCodes @@ -122,6 +124,7 @@ def _get_properties(self) -> dict[str, Any]: return self._properties @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read( # pylint:disable=docstring-missing-param self, populate_query_metrics: Optional[bool] = None, @@ -305,6 +308,7 @@ def create_container( # pylint:disable=docstring-missing-param ... @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) def create_container( # pylint:disable=docstring-missing-param, too-many-statements, docstring-should-be-keyword self, *args: Any, @@ -559,6 +563,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param ... @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) def create_container_if_not_exists( # pylint:disable=docstring-missing-param, docstring-should-be-keyword self, *args: Any, @@ -671,6 +676,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param, d ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) def delete_container( # pylint:disable=docstring-missing-param self, container: Union[str, ContainerProxy, Mapping[str, Any]], @@ -751,6 +757,7 @@ def get_container_client(self, container: Union[str, ContainerProxy, Mapping[str return ContainerProxy(self.client_connection, self.database_link, id_value) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def list_containers( # pylint:disable=docstring-missing-param self, max_item_count: Optional[int] = None, @@ -804,6 +811,7 @@ def list_containers( # pylint:disable=docstring-missing-param return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_containers( # pylint:disable=docstring-missing-param self, query: Optional[str] = None, @@ -975,6 +983,7 @@ def replace_container( # pylint:disable=docstring-missing-param ... @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) def replace_container( # pylint:disable=docstring-missing-param, docstring-should-be-keyword self, *args: Any, @@ -1098,6 +1107,7 @@ def replace_container( # pylint:disable=docstring-missing-param, docstring-shou properties=container_properties), container_properties @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def list_users( self, max_item_count: Optional[int] = None, @@ -1125,6 +1135,7 @@ def list_users( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_users( self, query: str, @@ -1177,6 +1188,7 @@ def get_user_client(self, user: Union[str, UserProxy, Mapping[str, Any]]) -> Use return UserProxy(client_connection=self.client_connection, id=id_value, database_link=self.database_link) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) def create_user(self, body: dict[str, Any], **kwargs: Any) -> UserProxy: """Create a new user in the container. @@ -1209,6 +1221,7 @@ def create_user(self, body: dict[str, Any], **kwargs: Any) -> UserProxy: ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.UPSERT) def upsert_user(self, body: dict[str, Any], **kwargs: Any) -> UserProxy: """Insert or update the specified user. @@ -1263,6 +1276,7 @@ def replace_user( ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) def delete_user(self, user: Union[str, UserProxy, Mapping[str, Any]], **kwargs: Any) -> None: """Delete the specified user from the container. @@ -1279,6 +1293,7 @@ def delete_user(self, user: Union[str, UserProxy, Mapping[str, Any]], **kwargs: self.client_connection.DeleteUser(user_link=self._get_user_link(user), options=request_options, **kwargs) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read_offer(self, **kwargs: Any) -> Offer: """Get the ThroughputProperties object for this database. @@ -1297,6 +1312,7 @@ def read_offer(self, **kwargs: Any) -> Offer: return self.get_throughput(**kwargs) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def get_throughput( self, *, @@ -1331,6 +1347,7 @@ def get_throughput( return _deserialize_throughput(throughput=throughput_properties) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) def replace_throughput( self, throughput: Union[int, ThroughputProperties], @@ -1365,3 +1382,4 @@ def replace_throughput( **kwargs ) return ThroughputProperties(offer_throughput=data["content"]["offerThroughput"], properties=data) + diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/user.py b/sdk/cosmos/azure-cosmos/azure/cosmos/user.py index 220d15fe455f..a5e4c612b9a6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/user.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/user.py @@ -32,6 +32,8 @@ from ._cosmos_client_connection import CosmosClientConnection from ._base import build_options +from ._constants import _Constants as Constants +from ._cosmos_span_attributes import cosmos_span_attributes from .permission import Permission @@ -75,6 +77,7 @@ def _get_properties( return self._properties @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def read( self, **kwargs: Any @@ -95,6 +98,7 @@ def read( return self._properties @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ_FEED) def list_permissions( self, max_item_count: Optional[int] = None, @@ -126,6 +130,7 @@ def list_permissions( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.QUERY) def query_permissions( self, query: str, @@ -164,6 +169,7 @@ def query_permissions( return result @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.READ) def get_permission( self, permission: Union[str, Permission, Mapping[str, Any]], @@ -194,6 +200,7 @@ def get_permission( ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.CREATE) def create_permission(self, body: dict[str, Any], **kwargs: Any) -> Permission: """Create a permission for the user. @@ -221,6 +228,7 @@ def create_permission(self, body: dict[str, Any], **kwargs: Any) -> Permission: ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.UPSERT) def upsert_permission(self, body: dict[str, Any], **kwargs: Any) -> Permission: """Insert or update the specified permission. @@ -246,6 +254,7 @@ def upsert_permission(self, body: dict[str, Any], **kwargs: Any) -> Permission: ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.REPLACE) def replace_permission( self, permission: Union[str, Permission, Mapping[str, Any]], @@ -282,6 +291,7 @@ def replace_permission( ) @distributed_trace + @cosmos_span_attributes(operation_type=Constants.OpenTelemetryOperationTypes.DELETE) def delete_permission( self, permission: Union[str, Permission, Mapping[str, Any]], diff --git a/sdk/cosmos/azure-cosmos/samples/tracing_open_telemetry.py b/sdk/cosmos/azure-cosmos/samples/tracing_open_telemetry.py index 85e1e2f7f3c6..3ab56707a152 100644 --- a/sdk/cosmos/azure-cosmos/samples/tracing_open_telemetry.py +++ b/sdk/cosmos/azure-cosmos/samples/tracing_open_telemetry.py @@ -14,46 +14,141 @@ # 3. Azure Core Tracing OpenTelemetry plugin and OpenTelemetry Python SDK # pip install azure-core-tracing-opentelemetry opentelemetry-sdk # ---------------------------------------------------------------------------------------------------------- -# Sample - demonstrates tracing using OpenTelemetry +# Sample - demonstrates tracing using OpenTelemetry with Cosmos DB +# Shows both basic setup and head sampling configuration # ---------------------------------------------------------------------------------------------------------- +import os +from azure.cosmos.cosmos_client import CosmosClient + # Declare OpenTelemetry as enabled tracing plugin for Azure SDKs from azure.core.settings import settings from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan settings.tracing_implementation = OpenTelemetrySpan -# In the below example, we use a simple console exporter, uncomment these lines to use -# the OpenTelemetry exporter for Azure Monitor. -# Example of a trace exporter for Azure Monitor, but you can use anything OpenTelemetry supports -# from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter -# exporter = AzureMonitorTraceExporter( -# connection_string="the connection string used for your Application Insights resource" -# ) - # Regular open telemetry usage from here, see https://github.com/open-telemetry/opentelemetry-python -# for details from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter +from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased -# Simple console exporter -exporter = ConsoleSpanExporter() +# ------------------------------------ +# Example 1: Basic Setup with Console Exporter +# ------------------------------------ +def basic_tracing_example(): + """Basic OpenTelemetry setup with console output.""" + # Simple console exporter + exporter = ConsoleSpanExporter() -trace.set_tracer_provider(TracerProvider()) -tracer = trace.get_tracer(__name__) -trace.get_tracer_provider().add_span_processor( - SimpleSpanProcessor(exporter) -) + trace.set_tracer_provider(TracerProvider()) + tracer = trace.get_tracer(__name__) + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(exporter) + ) -# Example with Cosmos SDK -import os -from azure.cosmos.cosmos_client import CosmosClient + # Example with Cosmos SDK + account_url = os.environ["COSMOS_ACCOUNT_URL"] + credential = os.environ["COSMOS_CREDENTIAL"] + database_name = os.environ["DATABASE_NAME"] + container_name = os.environ["CONTAINER_NAME"] + + with tracer.start_as_current_span(name="MyApplication"): + client = CosmosClient(url=account_url, credential=credential) + database = client.get_database_client(database_name) + container = database.get_container_client(container_name) + + # Create an item - will be traced with Cosmos DB semantic convention attributes + # Captured attributes include: + # - db.system = "cosmosdb" + # - db.operation.name = "create_item" (actual SDK method name) + # - db.cosmosdb.operation_type = "create" (standardized operation type) + # - db.namespace = database name + # - db.collection.name = container name + # - db.cosmosdb.connection_mode = "gateway" or "direct" + # - db.cosmosdb.request_charge = RU cost + # - db.cosmosdb.client_id = unique client identifier + container.create_item({"id": "item1", "value": "test"}) + + +# ------------------------------------ +# Example 2: Head Sampling Configuration +# ------------------------------------ +def head_sampling_example(): + """ + OpenTelemetry setup with head sampling (probabilistic sampling). + + Head sampling decides at the start of a trace whether to sample it. + This reduces overhead by only tracing a percentage of requests. + """ + + # Configure head sampling to sample 10% of traces + # TraceIdRatioBased(0.1) means 10% sampling rate + sampler = ParentBased( + root=TraceIdRatioBased(0.1) # Sample 10% of root spans + ) + + # Create tracer provider with the sampler + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + + # Set up exporter + exporter = ConsoleSpanExporter() + trace.get_tracer_provider().add_span_processor( + SimpleSpanProcessor(exporter) + ) + + tracer = trace.get_tracer(__name__) + + # Example with Cosmos SDK + account_url = os.environ["COSMOS_ACCOUNT_URL"] + credential = os.environ["COSMOS_CREDENTIAL"] + database_name = os.environ["DATABASE_NAME"] + container_name = os.environ["CONTAINER_NAME"] + + with tracer.start_as_current_span(name="MyApplication"): + client = CosmosClient(url=account_url, credential=credential) + database = client.get_database_client(database_name) + container = database.get_container_client(container_name) + + # Only ~10% of these operations will be sampled and traced + for i in range(100): + container.read_item(item=f"item{i}", partition_key=f"pk{i}") + + +# ------------------------------------ +# Example 3: Azure Monitor Exporter +# ------------------------------------ +def azure_monitor_example(): + """ + Example using Azure Monitor as the exporter. + Uncomment and configure with your Application Insights connection string. + """ + # from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter + # + # exporter = AzureMonitorTraceExporter( + # connection_string="InstrumentationKey=00000000-0000-0000-0000-000000000000;..." + # ) + # + # # Optional: Configure sampling for Azure Monitor + # sampler = ParentBased(root=TraceIdRatioBased(0.5)) # 50% sampling + # + # trace.set_tracer_provider(TracerProvider(sampler=sampler)) + # trace.get_tracer_provider().add_span_processor( + # SimpleSpanProcessor(exporter) + # ) + # + # # Your Cosmos DB operations here + # client = CosmosClient(url=account_url, credential=credential) + # # ... operations will be sent to Azure Monitor + pass + + +if __name__ == "__main__": + # Run basic example + print("Running basic tracing example...") + basic_tracing_example() -account_url = os.environ["COSMOS_ACCOUNT_URL"] -credential = os.environ["COSMOS_CREDENTIAL"] -database_name = os.environ["DATABASE_NAME"] + # Uncomment to run head sampling example + # print("\nRunning head sampling example...") + # head_sampling_example() -with tracer.start_as_current_span(name="MyApplication"): - client = CosmosClient(url=account_url, credential=credential) - client.create_database(database_name) # Call will be traced \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/tests/test_query_sanitization.py b/sdk/cosmos/azure-cosmos/tests/test_query_sanitization.py new file mode 100644 index 000000000000..f63e054004f1 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_query_sanitization.py @@ -0,0 +1,294 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Tests for Cosmos DB OpenTelemetry semantic convention attributes.""" + +import unittest +from unittest.mock import Mock, patch +from azure.cosmos._cosmos_span_attributes import sanitize_query, _add_cosmos_attributes +from azure.cosmos._constants import _Constants +from azure.cosmos._cosmos_span_attributes import cosmos_span_attributes + + +class TestQuerySanitization(unittest.TestCase): + """Test query sanitization logic per OpenTelemetry semantic conventions.""" + + def test_sanitize_parameterized_query(self): + """Parameterized queries should NOT be sanitized.""" + query = "SELECT * FROM c WHERE c.userId = @userId AND c.age > @minAge" + parameters = [{"name": "@userId", "value": "12345"}, {"name": "@minAge", "value": 25}] + + result = sanitize_query(query, parameters) + + # Should return unchanged since it uses parameters + self.assertEqual(result, query) + + def test_sanitize_string_literals(self): + """String literals should be replaced with '?'.""" + query = "SELECT * FROM c WHERE c.name = 'John Doe' AND c.city = 'Seattle'" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.name = '?' AND c.city = '?'" + self.assertEqual(result, expected) + + def test_sanitize_numeric_literals_integer(self): + """Integer literals should be replaced with ?.""" + query = "SELECT * FROM c WHERE c.age = 25 AND c.count > 100" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.age = ? AND c.count > ?" + self.assertEqual(result, expected) + + def test_sanitize_numeric_literals_float(self): + """Float literals should be replaced with ?.""" + query = "SELECT * FROM c WHERE c.price = 19.99 AND c.discount > 0.5" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.price = ? AND c.discount > ?" + self.assertEqual(result, expected) + + def test_sanitize_boolean_literals(self): + """Boolean literals should be replaced with ?.""" + query = "SELECT * FROM c WHERE c.active = true AND c.deleted = false" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.active = ? AND c.deleted = ?" + self.assertEqual(result, expected) + + def test_sanitize_boolean_literals_case_insensitive(self): + """Boolean literals in any case should be replaced.""" + query = "SELECT * FROM c WHERE c.flag1 = TRUE AND c.flag2 = False" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.flag1 = ? AND c.flag2 = ?" + self.assertEqual(result, expected) + + def test_sanitize_null_literals(self): + """Null literals should be replaced with ?.""" + query = "SELECT * FROM c WHERE c.optional = null" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.optional = ?" + self.assertEqual(result, expected) + + def test_sanitize_mixed_literals(self): + """Query with multiple types of literals should all be sanitized.""" + query = "SELECT * FROM c WHERE c.name = 'Alice' AND c.age = 30 AND c.active = true AND c.score = 95.5" + + result = sanitize_query(query, None) + + expected = "SELECT * FROM c WHERE c.name = '?' AND c.age = ? AND c.active = ? AND c.score = ?" + self.assertEqual(result, expected) + + def test_sanitize_preserves_parameter_placeholders(self): + """Should not replace @param placeholders even without parameter list.""" + query = "SELECT * FROM c WHERE c.id = @id AND c.value = 123" + + result = sanitize_query(query, None) + + # @id should remain, but 123 should be sanitized + expected = "SELECT * FROM c WHERE c.id = @id AND c.value = ?" + self.assertEqual(result, expected) + + def test_sanitize_empty_string(self): + """Empty query should return empty.""" + result = sanitize_query("", None) + self.assertEqual(result, "") + + def test_sanitize_complex_query(self): + """Complex query with nested conditions.""" + query = """ + SELECT * FROM c + WHERE c.category = 'electronics' + AND c.price > 100 + AND (c.inStock = true OR c.onOrder = false) + AND c.rating >= 4.5 + """ + + result = sanitize_query(query, None) + + # All literals should be sanitized + self.assertIn("'?'", result) + self.assertNotIn("'electronics'", result) + self.assertNotIn("100", result) + self.assertNotIn("4.5", result) + self.assertIn("?", result) + + +class TestCosmosSpanAttributes(unittest.TestCase): + """Test Cosmos DB span attribute addition logic.""" + + def setUp(self): + """Set up mocks for each test.""" + self.mock_span = Mock() + self.mock_span_impl = Mock() + self.mock_span_impl.get_current_span.return_value = self.mock_span + self.mock_span.is_recording.return_value = True + + # Mock the span wrapper + self.mock_span_wrapper = Mock() + self.mock_span_impl.return_value = self.mock_span_wrapper + + @patch('azure.cosmos._cosmos_span_attributes.settings') + def test_add_cosmos_attributes_basic(self, mock_settings): + """Test basic attribute addition.""" + mock_settings.tracing_implementation.return_value = self.mock_span_impl + + # Create mock container + mock_container = Mock() + mock_container.id = "test-container" + mock_container.container_link = "/dbs/test-db/colls/test-container" + mock_container.client_connection = Mock() + + # Call the function + _add_cosmos_attributes( + self.mock_span_wrapper, + "create_item", + _Constants.OpenTelemetryOperationTypes.CREATE, + (mock_container,), + {} + ) + + # Verify core attributes were added + calls = {call[0][0]: call[0][1] for call in self.mock_span_wrapper.add_attribute.call_args_list} + + self.assertEqual(calls.get(_Constants.OpenTelemetryAttributes.DB_SYSTEM), "cosmosdb") + self.assertEqual(calls.get(_Constants.OpenTelemetryAttributes.DB_OPERATION_NAME), "create_item") + self.assertEqual(calls.get(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_OPERATION_TYPE), + _Constants.OpenTelemetryOperationTypes.CREATE) + self.assertEqual(calls.get(_Constants.OpenTelemetryAttributes.DB_COLLECTION_NAME), "test-container") + self.assertEqual(calls.get(_Constants.OpenTelemetryAttributes.DB_NAMESPACE), "test-db") + + @patch('azure.cosmos._cosmos_span_attributes.settings') + def test_add_cosmos_attributes_with_query(self, mock_settings): + """Test attribute addition with query (should be sanitized).""" + mock_settings.tracing_implementation.return_value = self.mock_span_impl + + mock_container = Mock() + mock_container.id = "test-container" + mock_container.container_link = "/dbs/test-db/colls/test-container" + mock_container.client_connection = Mock() + + query = "SELECT * FROM c WHERE c.id = 'test123' AND c.value = 100" + + _add_cosmos_attributes( + self.mock_span_wrapper, + "query_items", + _Constants.OpenTelemetryOperationTypes.QUERY, + (mock_container,), + {"query": query} + ) + + # Find the query text attribute + calls = {call[0][0]: call[0][1] for call in self.mock_span_wrapper.add_attribute.call_args_list} + query_text = calls.get(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT) + + # Query should be sanitized + self.assertIsNotNone(query_text) + self.assertNotIn("test123", query_text) + self.assertNotIn("100", query_text) + self.assertIn("'?'", query_text) + self.assertIn("?", query_text) + + @patch('azure.cosmos._cosmos_span_attributes.settings') + def test_add_cosmos_attributes_with_parameterized_query(self, mock_settings): + """Test attribute addition with parameterized query (should NOT be sanitized).""" + mock_settings.tracing_implementation.return_value = self.mock_span_impl + + mock_container = Mock() + mock_container.id = "test-container" + mock_container.container_link = "/dbs/test-db/colls/test-container" + mock_container.client_connection = Mock() + + query = "SELECT * FROM c WHERE c.id = @id AND c.value = @value" + parameters = [{"name": "@id", "value": "test123"}, {"name": "@value", "value": 100}] + + _add_cosmos_attributes( + self.mock_span_wrapper, + "query_items", + _Constants.OpenTelemetryOperationTypes.QUERY, + (mock_container,), + {"query": query, "parameters": parameters} + ) + + # Find the query text attribute + calls = {call[0][0]: call[0][1] for call in self.mock_span_wrapper.add_attribute.call_args_list} + query_text = calls.get(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT) + + # Query should NOT be sanitized (has parameters) + self.assertIsNotNone(query_text) + self.assertEqual(query_text, query) + self.assertIn("@id", query_text) + self.assertIn("@value", query_text) + + # Parameter VALUES should NOT be logged + for key in calls.keys(): + self.assertNotIn("db.query.parameter", key) + + @patch('azure.cosmos._cosmos_span_attributes.settings') + def test_add_cosmos_attributes_no_span(self, mock_settings): + """Test that function handles gracefully when no span exists.""" + mock_settings.tracing_implementation.return_value = None + + mock_container = Mock() + mock_container.id = "test-container" + + # Should not raise exception + try: + _add_cosmos_attributes( + self.mock_span_wrapper, + "create_item", + _Constants.OpenTelemetryOperationTypes.CREATE, + (mock_container,), + {} + ) + except Exception as e: + self.fail(f"Function raised exception when no span: {e}") + + +class TestDecoratorIntegration(unittest.TestCase): + """Test the decorator integration.""" + + @patch('azure.cosmos._cosmos_span_attributes.settings') + def test_sync_decorator_execution(self, mock_settings): + """Test sync decorator executes function correctly.""" + mock_span_impl = Mock() + mock_span = Mock() + mock_span.is_recording.return_value = True + mock_span_impl.get_current_span.return_value = mock_span + mock_settings.tracing_implementation.return_value = mock_span_impl + + @cosmos_span_attributes(operation_type=_Constants.OpenTelemetryOperationTypes.READ) + def test_function(): + return "test_result" + + result = test_function() + self.assertEqual(result, "test_result") + + @patch('azure.cosmos._cosmos_span_attributes.settings') + def test_sync_decorator_with_exception(self, mock_settings): + """Test sync decorator handles exceptions correctly.""" + mock_span_impl = Mock() + mock_span = Mock() + mock_span.is_recording.return_value = True + mock_span_impl.get_current_span.return_value = mock_span + mock_settings.tracing_implementation.return_value = mock_span_impl + + @cosmos_span_attributes(operation_type=_Constants.OpenTelemetryOperationTypes.READ) + def test_function(): + raise ValueError("Test error") + + with self.assertRaises(ValueError): + test_function() + + +if __name__ == '__main__': + unittest.main() + + diff --git a/sdk/cosmos/azure-cosmos/tests/test_query_sanitization_async.py b/sdk/cosmos/azure-cosmos/tests/test_query_sanitization_async.py new file mode 100644 index 000000000000..b74cc49df860 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_query_sanitization_async.py @@ -0,0 +1,118 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Unit tests for query sanitization in async cosmos_span_attributes_async decorator.""" + +import unittest +from azure.cosmos.aio._cosmos_span_attributes_async import _sanitize_query + + +class TestQuerySanitizationAsync(unittest.TestCase): + """Test query sanitization for async operations.""" + + def test_parameterized_query_not_sanitized(self): + """Parameterized queries with @param should NOT be sanitized.""" + query = "SELECT * FROM c WHERE c.id = @id" + result = _sanitize_query(query) + self.assertEqual(result, query, "Parameterized query should not be modified") + + def test_parameterized_query_multiple_params(self): + """Multiple @params should be preserved.""" + query = "SELECT * FROM c WHERE c.price > @minPrice AND c.category = @cat" + result = _sanitize_query(query) + self.assertEqual(result, query) + + def test_string_literal_sanitized(self): + """String literals should be replaced with '?'.""" + query = "SELECT * FROM c WHERE c.name = 'John Doe'" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.name = '?'") + + def test_multiple_string_literals(self): + """Multiple string literals should all be sanitized.""" + query = "SELECT * FROM c WHERE c.name = 'John' AND c.city = 'Seattle'" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.name = '?' AND c.city = '?'") + + def test_numeric_literal_sanitized(self): + """Numeric literals should be replaced with ?.""" + query = "SELECT * FROM c WHERE c.age = 25" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.age = ?") + + def test_float_literal_sanitized(self): + """Float literals should be sanitized.""" + query = "SELECT * FROM c WHERE c.price = 19.99" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.price = ?") + + def test_boolean_literals_sanitized(self): + """Boolean literals (true/false) should be sanitized.""" + query = "SELECT * FROM c WHERE c.active = true AND c.deleted = false" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.active = ? AND c.deleted = ?") + + def test_null_literal_sanitized(self): + """Null literals should be sanitized.""" + query = "SELECT * FROM c WHERE c.value = null" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.value = ?") + + def test_mixed_literals(self): + """Mix of different literal types should all be sanitized.""" + query = "SELECT * FROM c WHERE c.name = 'Alice' AND c.age = 30 AND c.active = true" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.name = '?' AND c.age = ? AND c.active = ?") + + def test_parameterized_with_literals(self): + """Parameterized queries can have literals that should be sanitized.""" + query = "SELECT * FROM c WHERE c.id = @id AND c.status = 'active'" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.id = @id AND c.status = '?'") + + def test_complex_query_with_functions(self): + """Complex queries with functions and literals.""" + query = "SELECT * FROM c WHERE CONTAINS(c.name, 'test') AND c.count > 100" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE CONTAINS(c.name, '?') AND c.count > ?") + + def test_in_clause_with_literals(self): + """IN clause with multiple literals.""" + query = "SELECT * FROM c WHERE c.category IN ('A', 'B', 'C')" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.category IN ('?', '?', '?')") + + def test_empty_string_literal(self): + """Empty string literals should be sanitized.""" + query = "SELECT * FROM c WHERE c.name = ''" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.name = '?'") + + def test_string_with_escaped_quotes(self): + """String literals with escaped quotes.""" + query = "SELECT * FROM c WHERE c.message = 'She said \\'hello\\''" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.message = '?'") + + def test_negative_numbers(self): + """Negative numbers should be sanitized.""" + query = "SELECT * FROM c WHERE c.temperature = -5.5 AND c.balance = -100" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.temperature = ? AND c.balance = ?") + + def test_real_world_auth_query(self): + """Real-world authentication query with sensitive data.""" + query = "SELECT * FROM c WHERE c.username = 'admin@example.com' AND c.password = 'P@ssw0rd123'" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.username = '?' AND c.password = '?'") + + def test_real_world_financial_query(self): + """Real-world financial query with sensitive amounts.""" + query = "SELECT * FROM c WHERE c.accountId = 'ACC-12345' AND c.balance > 50000.00" + result = _sanitize_query(query) + self.assertEqual(result, "SELECT * FROM c WHERE c.accountId = '?' AND c.balance > ?") + + +if __name__ == '__main__': + unittest.main() + diff --git a/sdk/cosmos/azure-cosmos/tests/test_telemetry_integration.py b/sdk/cosmos/azure-cosmos/tests/test_telemetry_integration.py new file mode 100644 index 000000000000..0d8454c051d6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_telemetry_integration.py @@ -0,0 +1,195 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Integration tests for Cosmos DB OpenTelemetry with actual operations. + +This test verifies: +1. Query sanitization works correctly for different query types +2. Cosmos-specific attributes are added to spans +3. Parameter values are NOT logged (opt-in per semantic conventions) +4. db.system attribute is added even on errors +5. All operation types are correctly set +""" + +import unittest +import uuid +from azure.cosmos import CosmosClient, PartitionKey, exceptions +from azure.cosmos._constants import _Constants +import test_config + +# OpenTelemetry setup for testing +from azure.core.settings import settings +from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult, SimpleSpanProcessor + + +class InMemorySpanExporter(SpanExporter): + """Captures spans in memory for testing.""" + + def __init__(self): + self.spans = [] + + def export(self, span_data): + for span in span_data: + span_dict = { + "name": span.name, + "attributes": dict(span.attributes) if span.attributes else {} + } + self.spans.append(span_dict) + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + def clear(self): + self.spans = [] + + +class TestTelemetryIntegration(unittest.TestCase): + """Integration tests for OpenTelemetry with Cosmos DB operations.""" + + @classmethod + def setUpClass(cls): + """Set up OpenTelemetry and Cosmos client.""" + # Enable OpenTelemetry tracing + settings.tracing_implementation = OpenTelemetrySpan + + # Set up in-memory exporter + cls.exporter = InMemorySpanExporter() + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(cls.exporter)) + + # Use test config like other cosmos tests + cls.client = CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey) + cls.test_db_name = f"TestTelemetryDB_{uuid.uuid4().hex[:8]}" + cls.test_container_name = "TestTelemetryContainer" + + try: + cls.database = cls.client.create_database(cls.test_db_name) + cls.container = cls.database.create_container( + id=cls.test_container_name, + partition_key=PartitionKey(path="/pk") + ) + except Exception as e: + print(f"Setup failed: {e}") + raise + + @classmethod + def tearDownClass(cls): + """Clean up test resources.""" + try: + cls.client.delete_database(cls.test_db_name) + except: + pass + + def setUp(self): + """Clear captured spans before each test.""" + self.exporter.clear() + + def test_parameterized_query_not_sanitized(self): + """Verify parameterized queries are NOT sanitized in telemetry.""" + # Create test item + self.container.create_item({"id": "test1", "pk": "pk1", "value": 100}) + self.exporter.clear() # Clear the create span + + # Query with parameters + query = "SELECT * FROM c WHERE c.value > @minValue" + parameters = [{"name": "@minValue", "value": 50}] + + list(self.container.query_items(query=query, parameters=parameters, enable_cross_partition_query=True)) + + # Find the query span + query_spans = [s for s in self.exporter.spans if "query" in s.get("name", "").lower()] + + self.assertGreater(len(query_spans), 0, "Should have at least one query span") + attrs = query_spans[0]["attributes"] + query_text = attrs.get(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT, "") + + # Should NOT be sanitized + self.assertIn("@minValue", query_text, "Parameter placeholder should remain") + self.assertNotIn("50", query_text, "Parameter VALUE should not be in query") + + # Verify parameter values are NOT logged as attributes + param_keys = [k for k in attrs.keys() if "db.query.parameter" in k] + self.assertEqual(len(param_keys), 0, "Parameter values should NOT be logged") + + # Verify db.system is present + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_SYSTEM), "cosmosdb") + + # Verify operation type + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_OPERATION_TYPE), "query") + + def test_literal_query_is_sanitized(self): + """Verify queries with literal values ARE sanitized in telemetry.""" + self.exporter.clear() + + # Query with literal values + query = "SELECT * FROM c WHERE c.value = 100 AND c.name = 'test'" + + list(self.container.query_items(query=query, enable_cross_partition_query=True)) + + # Find the query span + query_spans = [s for s in self.exporter.spans if "query" in s.get("name", "").lower()] + + self.assertGreater(len(query_spans), 0) + attrs = query_spans[0]["attributes"] + query_text = attrs.get(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT, "") + + # Should be sanitized + self.assertNotIn("100", query_text, "Numeric literal should be sanitized") + self.assertNotIn("'test'", query_text, "String literal should be sanitized") + self.assertIn("?", query_text, "Literals should be replaced with ?") + + def test_cosmos_attributes_present(self): + """Verify Cosmos-specific attributes are added to spans.""" + self.exporter.clear() + + # Create an item + self.container.create_item({"id": f"attr_test_{uuid.uuid4().hex[:4]}", "pk": "pk1", "value": 200}) + + # Find the create span + create_spans = [s for s in self.exporter.spans if "create" in s.get("name", "").lower()] + + self.assertGreater(len(create_spans), 0) + attrs = create_spans[0]["attributes"] + + # Verify required attributes + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_SYSTEM), "cosmosdb") + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_OPERATION_NAME), "create_item") + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_OPERATION_TYPE), "create") + self.assertIn(self.test_container_name, attrs.get(_Constants.OpenTelemetryAttributes.DB_COLLECTION_NAME, "")) + self.assertIn(self.test_db_name, attrs.get(_Constants.OpenTelemetryAttributes.DB_NAMESPACE, "")) + + # Verify response attributes + self.assertIn(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_REQUEST_CHARGE, attrs) + + def test_error_has_db_system_attribute(self): + """Verify db.system attribute is added even when operations fail.""" + self.exporter.clear() + + try: + # Try to read non-existent item (will fail) + self.container.read_item(item="nonexistent", partition_key="nonexistent") + except exceptions.CosmosResourceNotFoundError: + pass # Expected + + # Find spans + spans = [s for s in self.exporter.spans if s.get("attributes")] + + self.assertGreater(len(spans), 0, "Should have captured error span") + + # Check if db.system is in any span + has_db_system = any( + s["attributes"].get(_Constants.OpenTelemetryAttributes.DB_SYSTEM) == "cosmosdb" + for s in spans + ) + self.assertTrue(has_db_system, "db.system should be present even on error") + + +if __name__ == '__main__': + unittest.main() + + diff --git a/sdk/cosmos/azure-cosmos/tests/test_telemetry_integration_async.py b/sdk/cosmos/azure-cosmos/tests/test_telemetry_integration_async.py new file mode 100644 index 000000000000..1755627b84ab --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_telemetry_integration_async.py @@ -0,0 +1,196 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Integration tests for Cosmos DB OpenTelemetry with actual async operations.""" + +import unittest +import uuid +import asyncio +from azure.cosmos.aio import CosmosClient +from azure.cosmos import PartitionKey, exceptions +from azure.cosmos._constants import _Constants +import test_config + +# OpenTelemetry setup for testing +from azure.core.settings import settings +from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult, SimpleSpanProcessor + + +class InMemorySpanExporter(SpanExporter): + """Captures spans in memory for testing.""" + + def __init__(self): + self.spans = [] + + def export(self, span_data): + for span in span_data: + span_dict = { + "name": span.name, + "attributes": dict(span.attributes) if span.attributes else {} + } + self.spans.append(span_dict) + return SpanExportResult.SUCCESS + + def shutdown(self): + pass + + def clear(self): + self.spans = [] + + +class TestTelemetryIntegrationAsync(unittest.IsolatedAsyncioTestCase): + """Integration tests for OpenTelemetry with async Cosmos DB operations.""" + + @classmethod + def setUpClass(cls): + """Set up OpenTelemetry and Cosmos client.""" + # Enable OpenTelemetry tracing + settings.tracing_implementation = OpenTelemetrySpan + + # Set up in-memory exporter + cls.exporter = InMemorySpanExporter() + trace.set_tracer_provider(TracerProvider()) + trace.get_tracer_provider().add_span_processor(SimpleSpanProcessor(cls.exporter)) + + cls.test_db_name = f"TestTelemetryAsyncDB_{uuid.uuid4().hex[:8]}" + cls.test_container_name = "TestTelemetryAsyncContainer" + + async def asyncSetUp(self): + """Set up async resources.""" + self.client = CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey) + self.database = await self.client.create_database(self.test_db_name) + self.container = await self.database.create_container( + id=self.test_container_name, + partition_key=PartitionKey(path="/pk") + ) + self.exporter.clear() + + async def asyncTearDown(self): + """Clean up async resources.""" + await self.client.close() + + @classmethod + def tearDownClass(cls): + """Clean up test resources.""" + # Need to run async cleanup + async def cleanup(): + client = CosmosClient(test_config.TestConfig.host, test_config.TestConfig.masterKey) + try: + await client.delete_database(cls.test_db_name) + except: + pass + await client.close() + + asyncio.run(cleanup()) + + async def test_parameterized_query_not_sanitized(self): + """Verify parameterized queries are NOT sanitized in telemetry.""" + # Create test item + await self.container.create_item({"id": "test1", "pk": "pk1", "value": 100}) + self.exporter.clear() + + # Query with parameters + query = "SELECT * FROM c WHERE c.value > @minValue" + parameters = [{"name": "@minValue", "value": 50}] + + items = [] + async for item in self.container.query_items(query=query, parameters=parameters, enable_cross_partition_query=True): + items.append(item) + + # Find the query span + query_spans = [s for s in self.exporter.spans if "query" in s.get("name", "").lower()] + + self.assertGreater(len(query_spans), 0, "Should have at least one query span") + attrs = query_spans[0]["attributes"] + query_text = attrs.get(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT, "") + + # Should NOT be sanitized + self.assertIn("@minValue", query_text, "Parameter placeholder should remain") + self.assertNotIn("50", query_text, "Parameter VALUE should not be in query") + + # Verify parameter values are NOT logged as attributes + param_keys = [k for k in attrs.keys() if "db.query.parameter" in k] + self.assertEqual(len(param_keys), 0, "Parameter values should NOT be logged") + + # Verify db.system is present + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_SYSTEM), "cosmosdb") + + # Verify operation type + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_OPERATION_TYPE), "query") + + async def test_literal_query_is_sanitized(self): + """Verify queries with literal values ARE sanitized in telemetry.""" + self.exporter.clear() + + # Query with literal values + query = "SELECT * FROM c WHERE c.value = 100 AND c.name = 'test'" + + items = [] + async for item in self.container.query_items(query=query, enable_cross_partition_query=True): + items.append(item) + + # Find the query span + query_spans = [s for s in self.exporter.spans if "query" in s.get("name", "").lower()] + + self.assertGreater(len(query_spans), 0) + attrs = query_spans[0]["attributes"] + query_text = attrs.get(_Constants.OpenTelemetryAttributes.DB_QUERY_TEXT, "") + + # Should be sanitized + self.assertNotIn("100", query_text, "Numeric literal should be sanitized") + self.assertNotIn("'test'", query_text, "String literal should be sanitized") + self.assertIn("?", query_text, "Literals should be replaced with ?") + + async def test_cosmos_attributes_present(self): + """Verify Cosmos-specific attributes are added to spans.""" + self.exporter.clear() + + # Create an item + await self.container.create_item({"id": f"attr_test_{uuid.uuid4().hex[:4]}", "pk": "pk1", "value": 200}) + + # Find the create span + create_spans = [s for s in self.exporter.spans if "create" in s.get("name", "").lower()] + + self.assertGreater(len(create_spans), 0) + attrs = create_spans[0]["attributes"] + + # Verify required attributes + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_SYSTEM), "cosmosdb") + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_OPERATION_NAME), "create_item") + self.assertEqual(attrs.get(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_OPERATION_TYPE), "create") + self.assertIn(self.test_container_name, attrs.get(_Constants.OpenTelemetryAttributes.DB_COLLECTION_NAME, "")) + self.assertIn(self.test_db_name, attrs.get(_Constants.OpenTelemetryAttributes.DB_NAMESPACE, "")) + + # Verify response attributes + self.assertIn(_Constants.OpenTelemetryAttributes.DB_COSMOSDB_REQUEST_CHARGE, attrs) + + async def test_error_has_db_system_attribute(self): + """Verify db.system attribute is added even when operations fail.""" + self.exporter.clear() + + try: + # Try to read non-existent item (will fail) + await self.container.read_item(item="nonexistent", partition_key="nonexistent") + except exceptions.CosmosResourceNotFoundError: + pass # Expected + + # Find spans + spans = [s for s in self.exporter.spans if s.get("attributes")] + + self.assertGreater(len(spans), 0, "Should have captured error span") + + # Check if db.system is in any span + has_db_system = any( + s["attributes"].get(_Constants.OpenTelemetryAttributes.DB_SYSTEM) == "cosmosdb" + for s in spans + ) + self.assertTrue(has_db_system, "db.system should be present even on error") + + +if __name__ == '__main__': + unittest.main() + From 958cd2650f9b445d493a0f0956a8b402e1cc76a2 Mon Sep 17 00:00:00 2001 From: bambriz Date: Tue, 24 Feb 2026 14:52:44 -0800 Subject: [PATCH 2/2] Update CHANGELOG.md --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 946468d67b01..ee283382f16e 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,7 @@ #### Other Changes * Added tests for multi-language support for full text search. See [PR 44254](https://github.com/Azure/azure-sdk-for-python/pull/44254) +* Added cosmos specific attributes to operation level spans for OpenTelemetry. See [PR 45343](https://github.com/Azure/azure-sdk-for-python/pull/45343) ### 4.15.0b2 (2025-12-16)