diff --git a/sentry_sdk/integrations/openai_agents/__init__.py b/sentry_sdk/integrations/openai_agents/__init__.py index deb136de01..f5516e908f 100644 --- a/sentry_sdk/integrations/openai_agents/__init__.py +++ b/sentry_sdk/integrations/openai_agents/__init__.py @@ -1,11 +1,21 @@ from sentry_sdk.integrations import DidNotEnable, Integration +from sentry_sdk.utils import parse_version from .patches import ( - _create_get_model_wrapper, - _create_get_all_tools_wrapper, + _create_runner_get_model_wrapper, + _create_turn_preparation_get_model_wrapper, + _create_runner_get_all_tools_wrapper, + _create_run_loop_get_all_tools_wrapper, _create_run_wrapper, _create_run_streamed_wrapper, - _patch_agent_run, + _patch_agent_runner_run_single_turn, + _patch_run_loop_run_single_turn, + _patch_agent_runner_run_single_turn_streamed, + _patch_run_loop_run_single_turn_streamed, + _patch_run_impl_execute_handoffs, + _patch_turn_resolution_execute_handoffs, + _patch_run_impl_execute_final_output, + _patch_turn_resolution_execute_final_output, _patch_error_tracing, ) @@ -17,37 +27,42 @@ # after it, even if we don't use it. import agents from agents.run import DEFAULT_AGENT_RUNNER + from agents.version import __version__ as OPENAI_AGENTS_VERSION except ImportError: raise DidNotEnable("OpenAI Agents not installed") -def _patch_runner() -> None: - # Create the root span for one full agent run (including eventual handoffs) - # Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around - # agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately. - agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper( - agents.run.DEFAULT_AGENT_RUNNER.run - ) +try: + # AgentRunner methods moved in v0.8 + # https://github.com/openai/openai-agents-python/commit/3ce7c24d349b77bb750062b7e0e856d9ff48a5d5#diff-7470b3a5c5cbe2fcbb2703dc24f326f45a5819d853be2b1f395d122d278cd911 + from agents.run_internal import run_loop, turn_preparation +except ImportError: + run_loop = None + turn_preparation = None - # Patch streaming runner - agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper( - agents.run.DEFAULT_AGENT_RUNNER.run_streamed - ) - # Creating the actual spans for each agent run (works for both streaming and non-streaming). - _patch_agent_run() +def _patch_agent_runner_get_model() -> None: + agents.run.AgentRunner._get_model = classmethod( + _create_runner_get_model_wrapper(agents.run.AgentRunner._get_model), + ) -def _patch_model() -> None: - agents.run.AgentRunner._get_model = classmethod( - _create_get_model_wrapper(agents.run.AgentRunner._get_model), +def _patch_run_internal_get_model() -> None: + agents.run_internal.run_loop.get_model = _create_turn_preparation_get_model_wrapper( + turn_preparation.get_model ) -def _patch_tools() -> None: +def _patch_agent_runner_get_all_tools() -> None: agents.run.AgentRunner._get_all_tools = classmethod( - _create_get_all_tools_wrapper(agents.run.AgentRunner._get_all_tools), + _create_runner_get_all_tools_wrapper(agents.run.AgentRunner._get_all_tools), + ) + + +def _patch_run_get_all_tools() -> None: + agents.run.get_all_tools = _create_run_loop_get_all_tools_wrapper( + run_loop.get_all_tools ) @@ -56,7 +71,39 @@ class OpenAIAgentsIntegration(Integration): @staticmethod def setup_once() -> None: + # Create the root span for one full agent run (including eventual handoffs) + # Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around + # agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately. + agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper( + agents.run.DEFAULT_AGENT_RUNNER.run + ) + + # Patch streaming runner + agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper( + agents.run.DEFAULT_AGENT_RUNNER.run_streamed + ) + + library_version = parse_version(OPENAI_AGENTS_VERSION) + if library_version is not None and library_version >= ( + 0, + 8, + ): + _patch_error_tracing() + _patch_run_get_all_tools() + _patch_run_internal_get_model() + + _patch_run_loop_run_single_turn() + _patch_run_loop_run_single_turn_streamed() + _patch_turn_resolution_execute_handoffs() + _patch_turn_resolution_execute_final_output() + + return + _patch_error_tracing() - _patch_tools() - _patch_model() - _patch_runner() + _patch_agent_runner_get_all_tools() + _patch_agent_runner_get_model() + + _patch_agent_runner_run_single_turn() + _patch_agent_runner_run_single_turn_streamed() + _patch_run_impl_execute_handoffs() + _patch_run_impl_execute_final_output() diff --git a/sentry_sdk/integrations/openai_agents/patches/__init__.py b/sentry_sdk/integrations/openai_agents/patches/__init__.py index b53ca79e19..3bfb3b0d12 100644 --- a/sentry_sdk/integrations/openai_agents/patches/__init__.py +++ b/sentry_sdk/integrations/openai_agents/patches/__init__.py @@ -1,5 +1,20 @@ -from .models import _create_get_model_wrapper # noqa: F401 -from .tools import _create_get_all_tools_wrapper # noqa: F401 +from .models import ( + _create_runner_get_model_wrapper, + _create_turn_preparation_get_model_wrapper, +) # noqa: F401 +from .tools import ( + _create_runner_get_all_tools_wrapper, + _create_run_loop_get_all_tools_wrapper, +) # noqa: F401 from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 -from .agent_run import _patch_agent_run # noqa: F401 +from .agent_run import ( + _patch_agent_runner_run_single_turn, + _patch_run_loop_run_single_turn, + _patch_agent_runner_run_single_turn_streamed, + _patch_run_loop_run_single_turn_streamed, + _patch_run_impl_execute_handoffs, + _patch_turn_resolution_execute_handoffs, + _patch_run_impl_execute_final_output, + _patch_turn_resolution_execute_final_output, +) # noqa: F401 from .error_tracing import _patch_error_tracing # noqa: F401 diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index eeb821d42a..0bf8a55ff9 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -14,77 +14,109 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any, Optional + from typing import Any, Optional, Callable, Awaitable from sentry_sdk.tracing import Span + from agents.run_internal.run_steps import SingleStepResult + try: import agents except ImportError: raise DidNotEnable("OpenAI Agents not installed") -def _patch_agent_run() -> None: +try: + # AgentRunner methods moved in v0.8 + # https://github.com/openai/openai-agents-python/commit/3ce7c24d349b77bb750062b7e0e856d9ff48a5d5#diff-7470b3a5c5cbe2fcbb2703dc24f326f45a5819d853be2b1f395d122d278cd911 + from agents.run_internal import run_loop, turn_resolution + +except ImportError: + run_loop = None + turn_resolution = None + + +def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool: + """Check if there's an active agent span for this context""" + return getattr(context_wrapper, "_sentry_current_agent", None) is not None + + +def _get_current_agent( + context_wrapper: "agents.RunContextWrapper", +) -> "Optional[agents.Agent]": + """Get the current agent from context wrapper""" + return getattr(context_wrapper, "_sentry_current_agent", None) + + +def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None: + """Close the workflow span for streaming executions if it exists.""" + if agent and hasattr(agent, "_sentry_workflow_span"): + workflow_span = agent._sentry_workflow_span + workflow_span.__exit__(*sys.exc_info()) + delattr(agent, "_sentry_workflow_span") + + +def _maybe_start_agent_span( + context_wrapper: "agents.RunContextWrapper", + agent: "agents.Agent", + should_run_agent_start_hooks: bool, + span_kwargs: "dict[str, Any]", + is_streaming: bool = False, +) -> "Optional[Span]": """ - Patches AgentRunner methods to create agent invocation spans. - This directly patches the execution flow to track when agents start and stop. + Start an agent invocation span if conditions are met. + Handles ending any existing span for a different agent. + + Returns the new span if started, or the existing span if conditions aren't met. """ + if not (should_run_agent_start_hooks and agent and context_wrapper): + return getattr(context_wrapper, "_sentry_agent_span", None) + + # End any existing span for a different agent + if _has_active_agent_span(context_wrapper): + current_agent = _get_current_agent(context_wrapper) + if current_agent and current_agent != agent: + end_invoke_agent_span(context_wrapper, current_agent) + + # Store the agent on the context wrapper so we can access it later + context_wrapper._sentry_current_agent = agent + span = invoke_agent_span(context_wrapper, agent, span_kwargs) + context_wrapper._sentry_agent_span = span + agent._sentry_agent_span = span + + if is_streaming: + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + return span + + +async def _single_turn( + original_run_single_turn: "Callable[..., Awaitable[SingleStepResult]]", + *args: "Any", + **kwargs: "Any", +) -> "SingleStepResult": + """Patched _run_single_turn that creates agent invocation spans""" + agent = kwargs.get("agent") + context_wrapper = kwargs.get("context_wrapper") + should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False) + + span = _maybe_start_agent_span( + context_wrapper, agent, should_run_agent_start_hooks, kwargs + ) - # Store original methods - original_run_single_turn = agents.run.AgentRunner._run_single_turn - original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed - original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs - original_execute_final_output = agents._run_impl.RunImpl.execute_final_output + try: + result = await original_run_single_turn(*args, **kwargs) + except Exception as exc: + if span is not None and span.timestamp is None: + _record_exception_on_span(span, exc) + end_invoke_agent_span(context_wrapper, agent) + reraise(*sys.exc_info()) - def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool: - """Check if there's an active agent span for this context""" - return getattr(context_wrapper, "_sentry_current_agent", None) is not None - - def _get_current_agent( - context_wrapper: "agents.RunContextWrapper", - ) -> "Optional[agents.Agent]": - """Get the current agent from context wrapper""" - return getattr(context_wrapper, "_sentry_current_agent", None) - - def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None: - """Close the workflow span for streaming executions if it exists.""" - if agent and hasattr(agent, "_sentry_workflow_span"): - workflow_span = agent._sentry_workflow_span - workflow_span.__exit__(*sys.exc_info()) - delattr(agent, "_sentry_workflow_span") - - def _maybe_start_agent_span( - context_wrapper: "agents.RunContextWrapper", - agent: "agents.Agent", - should_run_agent_start_hooks: bool, - span_kwargs: "dict[str, Any]", - is_streaming: bool = False, - ) -> "Optional[Span]": - """ - Start an agent invocation span if conditions are met. - Handles ending any existing span for a different agent. - - Returns the new span if started, or the existing span if conditions aren't met. - """ - if not (should_run_agent_start_hooks and agent and context_wrapper): - return getattr(context_wrapper, "_sentry_agent_span", None) - - # End any existing span for a different agent - if _has_active_agent_span(context_wrapper): - current_agent = _get_current_agent(context_wrapper) - if current_agent and current_agent != agent: - end_invoke_agent_span(context_wrapper, current_agent) - - # Store the agent on the context wrapper so we can access it later - context_wrapper._sentry_current_agent = agent - span = invoke_agent_span(context_wrapper, agent, span_kwargs) - context_wrapper._sentry_agent_span = span - agent._sentry_agent_span = span - - if is_streaming: - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) - - return span + return result + + +def _patch_agent_runner_run_single_turn() -> None: + original_run_single_turn = agents.run.AgentRunner._run_single_turn @wraps( original_run_single_turn.__func__ @@ -95,23 +127,144 @@ async def patched_run_single_turn( cls: "agents.Runner", *args: "Any", **kwargs: "Any" ) -> "Any": """Patched _run_single_turn that creates agent invocation spans""" - agent = kwargs.get("agent") - context_wrapper = kwargs.get("context_wrapper") - should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False) + return await _single_turn(original_run_single_turn, *args, **kwargs) + + agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn) - span = _maybe_start_agent_span( - context_wrapper, agent, should_run_agent_start_hooks, kwargs - ) - try: - result = await original_run_single_turn(*args, **kwargs) - except Exception as exc: +def _patch_run_loop_run_single_turn() -> None: + original_run_single_turn = run_loop.run_single_turn + + @wraps(original_run_single_turn) + async def patched_run_single_turn(*args: "Any", **kwargs: "Any") -> "Any": + """Patched _run_single_turn that creates agent invocation spans""" + return await _single_turn(original_run_single_turn, *args, **kwargs) + + agents.run.run_single_turn = patched_run_single_turn + + +async def _single_turn_streamed( + original_run_single_turn_streamed: "Callable[..., Awaitable[SingleStepResult]]", + *args: "Any", + **kwargs: "Any", +) -> "SingleStepResult": + """Patched _run_single_turn_streamed that creates agent invocation spans for streaming. + + Note: Unlike _run_single_turn which uses keyword-only arguments (*,), + _run_single_turn_streamed uses positional arguments. The call signature is: + _run_single_turn_streamed( + streamed_result, # args[0] + agent, # args[1] + hooks, # args[2] + context_wrapper, # args[3] + run_config, # args[4] + should_run_agent_start_hooks, # args[5] + tool_use_tracker, # args[6] + all_tools, # args[7] + server_conversation_tracker, # args[8] (optional) + ) + """ + streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") + agent = args[1] if len(args) > 1 else kwargs.get("agent") + context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") + should_run_agent_start_hooks = bool( + args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks", False) + ) + + span_kwargs: "dict[str, Any]" = {} + if streamed_result and hasattr(streamed_result, "input"): + span_kwargs["original_input"] = streamed_result.input + + span = _maybe_start_agent_span( + context_wrapper, + agent, + should_run_agent_start_hooks, + span_kwargs, + is_streaming=True, + ) + + try: + result = await original_run_single_turn_streamed(*args, **kwargs) + except Exception as exc: + exc_info = sys.exc_info() + with capture_internal_exceptions(): if span is not None and span.timestamp is None: _record_exception_on_span(span, exc) end_invoke_agent_span(context_wrapper, agent) - reraise(*sys.exc_info()) + _close_streaming_workflow_span(agent) + reraise(*exc_info) - return result + return result + + +def _patch_agent_runner_run_single_turn_streamed() -> None: + original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed + + @wraps( + original_run_single_turn_streamed.__func__ + if hasattr(original_run_single_turn_streamed, "__func__") + else original_run_single_turn_streamed + ) + async def patched_run_single_turn_streamed( + cls: "agents.Runner", *args: "Any", **kwargs: "Any" + ) -> "Any": + return await _single_turn_streamed( + original_run_single_turn_streamed, *args, **kwargs + ) + + agents.run.AgentRunner._run_single_turn_streamed = classmethod( + patched_run_single_turn_streamed + ) + + +def _patch_run_loop_run_single_turn_streamed() -> None: + original_run_single_turn_streamed = run_loop.run_single_turn_streamed + + @wraps(original_run_single_turn_streamed) + async def patched_run_single_turn_streamed(*args: "Any", **kwargs: "Any") -> "Any": + return await _single_turn_streamed( + original_run_single_turn_streamed, *args, **kwargs + ) + + agents.run_internal.run_loop.run_single_turn_streamed = ( + patched_run_single_turn_streamed + ) + + +async def execute_handoffs( + original_execute_handoffs: "Callable[..., Awaitable[SingleStepResult]]", + *args: "Any", + **kwargs: "Any", +) -> "SingleStepResult": + """Patched execute_handoffs that creates handoff spans and ends agent span for handoffs""" + context_wrapper = kwargs.get("context_wrapper") + run_handoffs = kwargs.get("run_handoffs") + agent = kwargs.get("agent") + + # Create Sentry handoff span for the first handoff (agents library only processes the first one) + if run_handoffs: + first_handoff = run_handoffs[0] + handoff_agent_name = first_handoff.handoff.agent_name + handoff_span(context_wrapper, agent, handoff_agent_name) + + # Call original method with all parameters + try: + result = await original_execute_handoffs(*args, **kwargs) + except Exception: + exc_info = sys.exc_info() + with capture_internal_exceptions(): + _close_streaming_workflow_span(agent) + reraise(*exc_info) + finally: + # End span for current agent after handoff processing is complete + if agent and context_wrapper and _has_active_agent_span(context_wrapper): + end_invoke_agent_span(context_wrapper, agent) + + return result + + +def _patch_run_impl_execute_handoffs() -> None: + original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs @wraps( original_execute_handoffs.__func__ @@ -121,32 +274,46 @@ async def patched_run_single_turn( async def patched_execute_handoffs( cls: "agents.Runner", *args: "Any", **kwargs: "Any" ) -> "Any": - """Patched execute_handoffs that creates handoff spans and ends agent span for handoffs""" - - context_wrapper = kwargs.get("context_wrapper") - run_handoffs = kwargs.get("run_handoffs") - agent = kwargs.get("agent") - - # Create Sentry handoff span for the first handoff (agents library only processes the first one) - if run_handoffs: - first_handoff = run_handoffs[0] - handoff_agent_name = first_handoff.handoff.agent_name - handoff_span(context_wrapper, agent, handoff_agent_name) - - # Call original method with all parameters - try: - result = await original_execute_handoffs(*args, **kwargs) - except Exception: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - _close_streaming_workflow_span(agent) - reraise(*exc_info) - finally: - # End span for current agent after handoff processing is complete + return await execute_handoffs(original_execute_handoffs, *args, **kwargs) + + agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs) + + +def _patch_turn_resolution_execute_handoffs() -> None: + original_execute_handoffs = turn_resolution.execute_handoffs + + @wraps(original_execute_handoffs) + async def patched_execute_handoffs(*args: "Any", **kwargs: "Any") -> "Any": + return await execute_handoffs(original_execute_handoffs, *args, **kwargs) + + agents.run_internal.turn_resolution.execute_handoffs = patched_execute_handoffs + + +async def execute_final_output( + original_execute_final_output: "Callable[..., Awaitable[SingleStepResult]]", + *args: "Any", + **kwargs: "Any", +) -> "SingleStepResult": + """Patched execute_final_output that ends agent span for final outputs""" + + agent = kwargs.get("agent") + context_wrapper = kwargs.get("context_wrapper") + final_output = kwargs.get("final_output") + + try: + result = await original_execute_final_output(*args, **kwargs) + finally: + with capture_internal_exceptions(): if agent and context_wrapper and _has_active_agent_span(context_wrapper): - end_invoke_agent_span(context_wrapper, agent) + end_invoke_agent_span(context_wrapper, agent, final_output) + # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper) + _close_streaming_workflow_span(agent) + + return result - return result + +def _patch_run_impl_execute_final_output() -> None: + original_execute_final_output = agents._run_impl.RunImpl.execute_final_output @wraps( original_execute_final_output.__func__ @@ -156,91 +323,24 @@ async def patched_execute_handoffs( async def patched_execute_final_output( cls: "agents.Runner", *args: "Any", **kwargs: "Any" ) -> "Any": - """Patched execute_final_output that ends agent span for final outputs""" - - agent = kwargs.get("agent") - context_wrapper = kwargs.get("context_wrapper") - final_output = kwargs.get("final_output") - - try: - result = await original_execute_final_output(*args, **kwargs) - finally: - with capture_internal_exceptions(): - if ( - agent - and context_wrapper - and _has_active_agent_span(context_wrapper) - ): - end_invoke_agent_span(context_wrapper, agent, final_output) - # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper) - _close_streaming_workflow_span(agent) - - return result - - @wraps( - original_run_single_turn_streamed.__func__ - if hasattr(original_run_single_turn_streamed, "__func__") - else original_run_single_turn_streamed - ) - async def patched_run_single_turn_streamed( - cls: "agents.Runner", *args: "Any", **kwargs: "Any" - ) -> "Any": - """Patched _run_single_turn_streamed that creates agent invocation spans for streaming. - - Note: Unlike _run_single_turn which uses keyword-only arguments (*,), - _run_single_turn_streamed uses positional arguments. The call signature is: - _run_single_turn_streamed( - streamed_result, # args[0] - agent, # args[1] - hooks, # args[2] - context_wrapper, # args[3] - run_config, # args[4] - should_run_agent_start_hooks, # args[5] - tool_use_tracker, # args[6] - all_tools, # args[7] - server_conversation_tracker, # args[8] (optional) - ) - """ - streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") - agent = args[1] if len(args) > 1 else kwargs.get("agent") - context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") - should_run_agent_start_hooks = bool( - args[5] - if len(args) > 5 - else kwargs.get("should_run_agent_start_hooks", False) + return await execute_final_output( + original_execute_final_output, *args, **kwargs ) - span_kwargs: "dict[str, Any]" = {} - if streamed_result and hasattr(streamed_result, "input"): - span_kwargs["original_input"] = streamed_result.input + agents._run_impl.RunImpl.execute_final_output = classmethod( + patched_execute_final_output + ) - span = _maybe_start_agent_span( - context_wrapper, - agent, - should_run_agent_start_hooks, - span_kwargs, - is_streaming=True, - ) - try: - result = await original_run_single_turn_streamed(*args, **kwargs) - except Exception as exc: - exc_info = sys.exc_info() - with capture_internal_exceptions(): - if span is not None and span.timestamp is None: - _record_exception_on_span(span, exc) - end_invoke_agent_span(context_wrapper, agent) - _close_streaming_workflow_span(agent) - reraise(*exc_info) +def _patch_turn_resolution_execute_final_output() -> None: + original_execute_final_output = turn_resolution.execute_final_output - return result + @wraps(original_execute_final_output) + async def patched_execute_final_output(*args: "Any", **kwargs: "Any") -> "Any": + return await execute_final_output( + original_execute_final_output, *args, **kwargs + ) - # Apply patches - agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn) - agents.run.AgentRunner._run_single_turn_streamed = classmethod( - patched_run_single_turn_streamed - ) - agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs) - agents._run_impl.RunImpl.execute_final_output = classmethod( + agents.run_internal.turn_resolution.execute_final_output = ( patched_execute_final_output ) diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index b6a69ae9f7..fea712e1ed 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -1,6 +1,6 @@ import copy import time -from functools import wraps +from functools import wraps, partial from sentry_sdk.integrations import DidNotEnable @@ -66,136 +66,150 @@ def _inject_trace_propagation_headers( headers[key] = value -def _create_get_model_wrapper( - original_get_model: "Callable[..., Any]", -) -> "Callable[..., Any]": - """ - Wraps the agents.Runner._get_model method to wrap the get_response method of the model to create a AI client span. - """ +def _get_model( + original_get_model: "Callable[..., agents.Model]", + agent: "agents.Agent", + run_config: "agents.RunConfig", +) -> "agents.Model": + # copy the model to double patching its methods. We use copy on purpose here (instead of deepcopy) + # because we only patch its direct methods, all underlying data can remain unchanged. + model = copy.copy(original_get_model(agent, run_config)) - @wraps( - original_get_model.__func__ - if hasattr(original_get_model, "__func__") - else original_get_model - ) - def wrapped_get_model( - cls: "agents.Runner", agent: "agents.Agent", run_config: "agents.RunConfig" - ) -> "agents.Model": - # copy the model to double patching its methods. We use copy on purpose here (instead of deepcopy) - # because we only patch its direct methods, all underlying data can remain unchanged. - model = copy.copy(original_get_model(agent, run_config)) + # Capture the request model name for spans (agent.model can be None when using defaults) + request_model_name = model.model if hasattr(model, "model") else str(model) + agent._sentry_request_model = request_model_name + + # Wrap _fetch_response if it exists (for OpenAI models) to capture response model + if hasattr(model, "_fetch_response"): + original_fetch_response = model._fetch_response - # Capture the request model name for spans (agent.model can be None when using defaults) - request_model_name = model.model if hasattr(model, "model") else str(model) - agent._sentry_request_model = request_model_name + @wraps(original_fetch_response) + async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": + response = await original_fetch_response(*args, **kwargs) + if hasattr(response, "model") and response.model: + agent._sentry_response_model = str(response.model) + return response - # Wrap _fetch_response if it exists (for OpenAI models) to capture response model - if hasattr(model, "_fetch_response"): - original_fetch_response = model._fetch_response + model._fetch_response = wrapped_fetch_response - @wraps(original_fetch_response) - async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": - response = await original_fetch_response(*args, **kwargs) - if hasattr(response, "model") and response.model: - agent._sentry_response_model = str(response.model) - return response + original_get_response = model.get_response - model._fetch_response = wrapped_fetch_response + @wraps(original_get_response) + async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": + mcp_tools = kwargs.get("tools") + hosted_tools = [] + if mcp_tools is not None: + hosted_tools = [ + tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) + ] - original_get_response = model.get_response + with ai_client_span(agent, kwargs) as span: + for hosted_tool in hosted_tools: + _inject_trace_propagation_headers(hosted_tool, span=span) + + result = await original_get_response(*args, **kwargs) + + # Get response model captured from _fetch_response and clean up + response_model = getattr(agent, "_sentry_response_model", None) + if response_model: + delattr(agent, "_sentry_response_model") + + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span(span, result, response_model, agent) + + return result + + model.get_response = wrapped_get_response + + # Also wrap stream_response for streaming support + if hasattr(model, "stream_response"): + original_stream_response = model.stream_response + + @wraps(original_stream_response) + async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": + # Uses explicit try/finally instead of context manager to ensure cleanup + # even if the consumer abandons the stream (GeneratorExit). + span_kwargs = dict(kwargs) + if len(args) > 0: + span_kwargs["system_instructions"] = args[0] + if len(args) > 1: + span_kwargs["input"] = args[1] - @wraps(original_get_response) - async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": - mcp_tools = kwargs.get("tools") hosted_tools = [] - if mcp_tools is not None: - hosted_tools = [ - tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) - ] + if len(args) > 3: + mcp_tools = args[3] + + if mcp_tools is not None: + hosted_tools = [ + tool for tool in mcp_tools if isinstance(tool, HostedMCPTool) + ] - with ai_client_span(agent, kwargs) as span: + with ai_client_span(agent, span_kwargs) as span: for hosted_tool in hosted_tools: _inject_trace_propagation_headers(hosted_tool, span=span) - result = await original_get_response(*args, **kwargs) - - # Get response model captured from _fetch_response and clean up - response_model = getattr(agent, "_sentry_response_model", None) - if response_model: - delattr(agent, "_sentry_response_model") - - _set_response_model_on_agent_span(agent, response_model) - update_ai_client_span(span, result, response_model, agent) - - return result - - model.get_response = wrapped_get_response - - # Also wrap stream_response for streaming support - if hasattr(model, "stream_response"): - original_stream_response = model.stream_response - - @wraps(original_stream_response) - async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": - # Uses explicit try/finally instead of context manager to ensure cleanup - # even if the consumer abandons the stream (GeneratorExit). - span_kwargs = dict(kwargs) - if len(args) > 0: - span_kwargs["system_instructions"] = args[0] - if len(args) > 1: - span_kwargs["input"] = args[1] - - hosted_tools = [] - if len(args) > 3: - mcp_tools = args[3] - - if mcp_tools is not None: - hosted_tools = [ - tool - for tool in mcp_tools - if isinstance(tool, HostedMCPTool) - ] - - with ai_client_span(agent, span_kwargs) as span: - for hosted_tool in hosted_tools: - _inject_trace_propagation_headers(hosted_tool, span=span) - - span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) - - streaming_response = None - ttft_recorded = False - # Capture start time locally to avoid race conditions with concurrent requests - start_time = time.perf_counter() - - async for event in original_stream_response(*args, **kwargs): - # Detect first content token (text delta event) - if not ttft_recorded and hasattr(event, "delta"): - ttft = time.perf_counter() - start_time - span.set_data( - SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft - ) - ttft_recorded = True - - # Capture the full response from ResponseCompletedEvent - if hasattr(event, "response"): - streaming_response = event.response - yield event - - # Update span with response data (usage, output, model) - if streaming_response: - response_model = ( - str(streaming_response.model) - if hasattr(streaming_response, "model") - and streaming_response.model - else None - ) - _set_response_model_on_agent_span(agent, response_model) - update_ai_client_span( - span, streaming_response, response_model, agent + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + streaming_response = None + ttft_recorded = False + # Capture start time locally to avoid race conditions with concurrent requests + start_time = time.perf_counter() + + async for event in original_stream_response(*args, **kwargs): + # Detect first content token (text delta event) + if not ttft_recorded and hasattr(event, "delta"): + ttft = time.perf_counter() - start_time + span.set_data( + SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN, ttft ) + ttft_recorded = True - model.stream_response = wrapped_stream_response + # Capture the full response from ResponseCompletedEvent + if hasattr(event, "response"): + streaming_response = event.response + yield event - return model + # Update span with response data (usage, output, model) + if streaming_response: + response_model = ( + str(streaming_response.model) + if hasattr(streaming_response, "model") + and streaming_response.model + else None + ) + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span( + span, streaming_response, response_model, agent + ) + + model.stream_response = wrapped_stream_response + + return model + + +def _create_runner_get_model_wrapper( + original_get_model: "Callable[..., Any]", +) -> "Callable[..., Any]": + @wraps( + original_get_model.__func__ + if hasattr(original_get_model, "__func__") + else original_get_model + ) + def wrapped_get_model( + cls: "agents.Runner", agent: "agents.Agent", run_config: "agents.RunConfig" + ) -> "agents.Model": + return _get_model(original_get_model, agent, run_config) + + return wrapped_get_model + + +def _create_turn_preparation_get_model_wrapper( + original_get_model: "Callable[..., Any]", +) -> "Callable[..., Any]": + @wraps(original_get_model) + def wrapped_get_model( + agent: "agents.Agent", run_config: "agents.RunConfig" + ) -> "agents.Model": + return _get_model(original_get_model, agent, run_config) return wrapped_get_model diff --git a/sentry_sdk/integrations/openai_agents/patches/tools.py b/sentry_sdk/integrations/openai_agents/patches/tools.py index d14a3019aa..1ff2f6e705 100644 --- a/sentry_sdk/integrations/openai_agents/patches/tools.py +++ b/sentry_sdk/integrations/openai_agents/patches/tools.py @@ -1,4 +1,4 @@ -from functools import wraps +from functools import wraps, partial from sentry_sdk.integrations import DidNotEnable @@ -7,7 +7,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any, Callable + from typing import Any, Callable, Awaitable try: import agents @@ -15,7 +15,60 @@ raise DidNotEnable("OpenAI Agents not installed") -def _create_get_all_tools_wrapper( +async def _get_all_tools( + original_get_all_tools: "Callable[..., Awaitable[list[agents.Tool]]]", + agent: "agents.Agent", + context_wrapper: "agents.RunContextWrapper", +) -> "list[agents.Tool]": + # Get the original tools + tools = await original_get_all_tools(agent, context_wrapper) + + wrapped_tools = [] + for tool in tools: + # Wrap only the function tools (for now) + if tool.__class__.__name__ != "FunctionTool": + wrapped_tools.append(tool) + continue + + # Create a new FunctionTool with our wrapped invoke method + original_on_invoke = tool.on_invoke_tool + + def create_wrapped_invoke( + current_tool: "agents.Tool", current_on_invoke: "Callable[..., Any]" + ) -> "Callable[..., Any]": + @wraps(current_on_invoke) + async def sentry_wrapped_on_invoke_tool( + *args: "Any", **kwargs: "Any" + ) -> "Any": + with execute_tool_span(current_tool, *args, **kwargs) as span: + # We can not capture exceptions in tool execution here because + # `_on_invoke_tool` is swallowing the exception here: + # https://github.com/openai/openai-agents-python/blob/main/src/agents/tool.py#L409-L422 + # And because function_tool is a decorator with `default_tool_error_function` set as a default parameter + # I was unable to monkey patch it because those are evaluated at module import time + # and the SDK is too late to patch it. I was also unable to patch `_on_invoke_tool_impl` + # because it is nested inside this import time code. As if they made it hard to patch on purpose... + result = await current_on_invoke(*args, **kwargs) + update_execute_tool_span(span, agent, current_tool, result) + + return result + + return sentry_wrapped_on_invoke_tool + + wrapped_tool = agents.FunctionTool( + name=tool.name, + description=tool.description, + params_json_schema=tool.params_json_schema, + on_invoke_tool=create_wrapped_invoke(tool, original_on_invoke), + strict_json_schema=tool.strict_json_schema, + is_enabled=tool.is_enabled, + ) + wrapped_tools.append(wrapped_tool) + + return wrapped_tools + + +def _create_runner_get_all_tools_wrapper( original_get_all_tools: "Callable[..., Any]", ) -> "Callable[..., Any]": """ @@ -32,51 +85,23 @@ async def wrapped_get_all_tools( agent: "agents.Agent", context_wrapper: "agents.RunContextWrapper", ) -> "list[agents.Tool]": - # Get the original tools - tools = await original_get_all_tools(agent, context_wrapper) - - wrapped_tools = [] - for tool in tools: - # Wrap only the function tools (for now) - if tool.__class__.__name__ != "FunctionTool": - wrapped_tools.append(tool) - continue - - # Create a new FunctionTool with our wrapped invoke method - original_on_invoke = tool.on_invoke_tool - - def create_wrapped_invoke( - current_tool: "agents.Tool", current_on_invoke: "Callable[..., Any]" - ) -> "Callable[..., Any]": - @wraps(current_on_invoke) - async def sentry_wrapped_on_invoke_tool( - *args: "Any", **kwargs: "Any" - ) -> "Any": - with execute_tool_span(current_tool, *args, **kwargs) as span: - # We can not capture exceptions in tool execution here because - # `_on_invoke_tool` is swallowing the exception here: - # https://github.com/openai/openai-agents-python/blob/main/src/agents/tool.py#L409-L422 - # And because function_tool is a decorator with `default_tool_error_function` set as a default parameter - # I was unable to monkey patch it because those are evaluated at module import time - # and the SDK is too late to patch it. I was also unable to patch `_on_invoke_tool_impl` - # because it is nested inside this import time code. As if they made it hard to patch on purpose... - result = await current_on_invoke(*args, **kwargs) - update_execute_tool_span(span, agent, current_tool, result) - - return result - - return sentry_wrapped_on_invoke_tool - - wrapped_tool = agents.FunctionTool( - name=tool.name, - description=tool.description, - params_json_schema=tool.params_json_schema, - on_invoke_tool=create_wrapped_invoke(tool, original_on_invoke), - strict_json_schema=tool.strict_json_schema, - is_enabled=tool.is_enabled, - ) - wrapped_tools.append(wrapped_tool) - - return wrapped_tools + return await _get_all_tools(original_get_all_tools, agent, context_wrapper) + + return wrapped_get_all_tools + + +def _create_run_loop_get_all_tools_wrapper( + original_get_all_tools: "Callable[..., Any]", +) -> "Callable[..., Any]": + """ + Wraps the agents.Runner._get_all_tools method of the Runner class to wrap all function tools with Sentry instrumentation. + """ + + @wraps(original_get_all_tools) + async def wrapped_get_all_tools( + agent: "agents.Agent", + context_wrapper: "agents.RunContextWrapper", + ) -> "list[agents.Tool]": + return await _get_all_tools(original_get_all_tools, agent, context_wrapper) return wrapped_get_all_tools