Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 108 additions & 92 deletions tests/integrations/openai_agents/test_openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,83 @@ async def EXAMPLE_STREAMED_RESPONSE(*args, **kwargs):
)


async def EXAMPLE_STREAMED_RESPONSE_WITH_DELTA(*args, **kwargs):
yield ResponseCreatedEvent(
response=Response(
id="chat-id",
output=[],
parallel_tool_calls=False,
tool_choice="none",
tools=[],
created_at=10000000,
model="response-model-id",
object="response",
),
type="response.created",
sequence_number=0,
)

yield ResponseTextDeltaEvent(
type="response.output_text.delta",
item_id="message-id",
output_index=0,
content_index=0,
delta="Hello",
logprobs=[],
sequence_number=1,
)

yield ResponseTextDeltaEvent(
type="response.output_text.delta",
item_id="message-id",
output_index=0,
content_index=0,
delta=" world!",
logprobs=[],
sequence_number=2,
)

yield ResponseCompletedEvent(
response=Response(
id="chat-id",
output=[
ResponseOutputMessage(
id="message-id",
content=[
ResponseOutputText(
annotations=[],
text="Hello world!",
type="output_text",
),
],
role="assistant",
status="completed",
type="message",
),
],
parallel_tool_calls=False,
tool_choice="none",
tools=[],
created_at=10000000,
model="response-model-id",
object="response",
usage=ResponseUsage(
input_tokens=20,
input_tokens_details=InputTokensDetails(
cached_tokens=5,
),
output_tokens=10,
output_tokens_details=OutputTokensDetails(
reasoning_tokens=8,
),
total_tokens=30,
),
),
type="response.completed",
sequence_number=3,
)


@pytest.fixture
def mock_usage():
return Usage(
Expand Down Expand Up @@ -2692,27 +2769,6 @@ def test_openai_agents_message_truncation(sentry_init, capture_events):
assert "small message 5" in str(parsed_messages[0])


def test_streaming_patches_applied(sentry_init):
"""
Test that the streaming patches are applied correctly.
"""
sentry_init(
integrations=[OpenAIAgentsIntegration()],
traces_sample_rate=1.0,
)

# Verify that run_streamed is patched (will have __wrapped__ attribute if patched)
import agents

# Check that the method exists and has been modified
assert hasattr(agents.run.DEFAULT_AGENT_RUNNER, "run_streamed")
assert hasattr(agents.run.AgentRunner, "_run_single_turn_streamed")

# Verify the patches were applied by checking for our wrapper
run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed
assert run_streamed_func is not None


@pytest.mark.asyncio
async def test_streaming_span_update_captures_response_data(
sentry_init, test_agent, mock_usage
Expand Down Expand Up @@ -2777,86 +2833,46 @@ async def test_streaming_ttft_on_chat_span(sentry_init, test_agent):
Events WITHOUT delta (like ResponseCompletedEvent, ResponseCreatedEvent, etc.)
should NOT trigger TTFT.
"""
from sentry_sdk.integrations.openai_agents.patches.models import (
_create_get_model_wrapper,
client = AsyncOpenAI(api_key="z")
client.responses._post = AsyncMock(return_value=EXAMPLE_RESPONSE)

model = OpenAIResponsesModel(model="gpt-4", openai_client=client)

agent_with_tool = test_agent.clone(
model=model,
)

sentry_init(
integrations=[OpenAIAgentsIntegration()],
traces_sample_rate=1.0,
)

# Create a mock model with stream_response and get_response
class MockModel:
model = "gpt-4"

async def get_response(self, *args, **kwargs):
# Not used in this test, but required by the wrapper
pass

async def stream_response(self, *args, **kwargs):
# First event: ResponseCreatedEvent (no delta - should NOT trigger TTFT)
created_event = MagicMock(spec=["type", "sequence_number"])
created_event.type = "response.created"
yield created_event

# Simulate server-side processing delay before first token
await asyncio.sleep(0.05) # 50ms delay

# Second event: ResponseTextDeltaEvent (HAS delta - triggers TTFT)
text_delta_event = MagicMock(spec=["delta", "type", "content_index"])
text_delta_event.delta = "Hello"
text_delta_event.type = "response.output_text.delta"
yield text_delta_event

# Third event: more text content (also has delta, but TTFT already recorded)
text_delta_event2 = MagicMock(spec=["delta", "type", "content_index"])
text_delta_event2.delta = " world!"
text_delta_event2.type = "response.output_text.delta"
yield text_delta_event2

# Final event: ResponseCompletedEvent (has response, no delta)
completed_event = MagicMock(spec=["response", "type", "sequence_number"])
completed_event.response = MagicMock()
completed_event.response.model = "gpt-4"
completed_event.response.usage = Usage(
requests=1,
input_tokens=10,
output_tokens=5,
total_tokens=15,
)
completed_event.response.output = []
yield completed_event

# Create a mock original _get_model that returns our mock model
def mock_get_model(agent, run_config):
return MockModel()

# Wrap it with our integration wrapper
wrapped_get_model = _create_get_model_wrapper(mock_get_model)

with sentry_sdk.start_transaction(name="test_ttft", sampled=True) as transaction:
# Get the wrapped model (this applies the stream_response wrapper)
wrapped_model = wrapped_get_model(None, test_agent, MagicMock())

# Call the wrapped stream_response and consume all events
async for _event in wrapped_model.stream_response():
pass

# Verify TTFT is recorded on the chat span (must be inside transaction context)
chat_spans = [
s for s in transaction._span_recorder.spans if s.op == "gen_ai.chat"
]
assert len(chat_spans) >= 1
chat_span = chat_spans[0]
with patch.object(
model._client.responses,
"create",
side_effect=EXAMPLE_STREAMED_RESPONSE_WITH_DELTA,
) as _:
with sentry_sdk.start_transaction(
name="test_ttft", sampled=True
) as transaction:
result = agents.Runner.run_streamed(
agent_with_tool,
"Please use the simple test tool",
run_config=test_run_config,
)

assert SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN in chat_span._data
ttft_value = chat_span._data[SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN]
# TTFT should be at least 40ms (our simulated delay minus some variance) but reasonable
assert 0.04 < ttft_value < 1.0, f"TTFT {ttft_value} should be around 50ms"
async for event in result.stream_events():
pass

# Verify TTFT is recorded on the chat span (must be inside transaction context)
chat_spans = [
s for s in transaction._span_recorder.spans if s.op == "gen_ai.chat"
]
assert len(chat_spans) >= 1
chat_span = chat_spans[0]

# Verify streaming flag is set
assert chat_span._data.get(SPANDATA.GEN_AI_RESPONSE_STREAMING) is True
assert SPANDATA.GEN_AI_RESPONSE_TIME_TO_FIRST_TOKEN in chat_span._data
assert chat_span._data.get(SPANDATA.GEN_AI_RESPONSE_STREAMING) is True


@pytest.mark.skipif(
Expand Down
Loading