.NET: Fixing issue where OpenTelemetry span is never exported in .NET in-process workflow execution#4196
Conversation
…ity never stopped in streaming OffThread path The WorkflowRunActivity_IsStopped_Streaming_OffThread test demonstrates that the workflow.run OpenTelemetry Activity created in StreamingRunEventStream.RunLoopAsync is started but never stopped when using the OffThread/Default streaming execution. The background run loop keeps running after event consumption completes, so the using Activity? declaration never disposes until explicit StopAsync() is called. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> 2. Fix workflow.run Activity never stopped in streaming OffThread execution (microsoft#4155) The workflow.run OpenTelemetry Activity in StreamingRunEventStream.RunLoopAsync was scoped to the method lifetime via 'using'. Since the run loop only exits on cancellation, the Activity was never stopped/exported until explicit disposal. Fix: Remove 'using' and explicitly dispose the Activity when the workflow reaches Idle status (all supersteps complete). A safety-net disposal in the finally block handles cancellation and error paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR aims to ensure OpenTelemetry workflow-run spans (Activity) are reliably stopped/disposed (and therefore exported) during .NET in-process workflow execution, including streaming scenarios, and adds regression tests around activity lifecycle behavior.
Changes:
- Updated
StreamingRunEventStream.RunLoopAsyncto manually manage the workflow-runActivitylifecycle (stop onIdleand ensure disposal on loop exit). - Added
WorkflowRunActivityStopTeststo assert workflow-run activities are started and stopped across multiple execution modes.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs | Changes workflow-run Activity disposal timing to stop/export spans earlier and adds a safety-net disposal on exit. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs | Adds regression coverage validating workflow-run activities are stopped/disposed in lockstep, off-thread, and streaming usage. |
dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs
Show resolved
Hide resolved
dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs
Outdated
Show resolved
Hide resolved
…\nImplements two-level telemetry hierarchy per PR feedback from lokitoth:\n- workflow.session: spans the entire run loop / stream lifetime\n- workflow_invoke: per input-to-halt cycle, nested within the session\n\nThis ensures the session activity stays open across multiple turns,\nwhile individual run activities are created and disposed per cycle.\n\nAlso fixes linkedSource CancellationTokenSource disposal leak in\nStreamingRunEventStream (added using declaration)."
| CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken); | ||
|
|
There was a problem hiding this comment.
linkedSource is created with CancellationTokenSource.CreateLinkedTokenSource(...) but never disposed. This can leak registrations/timers over repeated enumerations; wrap it in using (or dispose it in the finally) similar to StreamingRunEventStream.
| runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { | ||
| { Tags.ErrorType, ex.GetType().FullName }, | ||
| { Tags.BuildErrorMessage, ex.Message }, | ||
| })); | ||
| runActivity.CaptureException(ex); | ||
| } | ||
|
|
||
| // Record error on the session activity | ||
| if (sessionActivity is not null) | ||
| { | ||
| activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { | ||
| sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionError, tags: new() { | ||
| { Tags.ErrorType, ex.GetType().FullName }, | ||
| { Tags.BuildErrorMessage, ex.Message }, | ||
| })); |
There was a problem hiding this comment.
The error telemetry on workflow/session activities is using Tags.BuildErrorMessage ("build.error.message") for runtime execution failures. This mislabels execution errors as build errors in exported telemetry; consider introducing a workflow/session-specific error message tag (or rely on the standard exception tags produced by CaptureException).
| public const string WorkflowSession = "workflow.session"; | ||
| public const string WorkflowRun = "workflow_invoke"; | ||
| public const string MessageSend = "message.send"; |
There was a problem hiding this comment.
ActivityNames.WorkflowRun is set to "workflow_invoke", but the PR/issue description and new tests refer to the span as workflow.run (and Python uses workflow.run). This mismatch makes cross-language telemetry queries harder and reads like a discrepancy between the intent and the emitted span name; consider renaming the activity to workflow.run (or updating the PR/docs/tests to consistently use workflow_invoke if the name is intentionally different).
| /// Starts a workflow session activity if enabled. This is a root-level span | ||
| /// that represents the entire lifetime of a workflow execution (from start | ||
| /// until stop, cancellation, or error). Individual run stages are nested within it. |
There was a problem hiding this comment.
The XML doc for StartWorkflowSessionActivity calls it a "root-level span", but it is created via ActivitySource.StartActivity(...) and will be parented to Activity.Current when one exists. To avoid misleading consumers, consider rewording to describe it as the outer/parent span for workflow execution within the current trace rather than strictly root-level.
| /// Starts a workflow session activity if enabled. This is a root-level span | |
| /// that represents the entire lifetime of a workflow execution (from start | |
| /// until stop, cancellation, or error). Individual run stages are nested within it. | |
| /// Starts a workflow session activity if enabled. This is the outer/parent span | |
| /// that represents the entire lifetime of a workflow execution (from start | |
| /// until stop, cancellation, or error) within the current trace. Individual run | |
| /// stages are typically nested within it. |
| // Assert | ||
| var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); | ||
| capturedActivities.Should().NotContain( | ||
| a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), | ||
| "WorkflowRun activity should be disabled."); | ||
| capturedActivities.Should().Contain( | ||
| a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), | ||
| "Other activities should still be created."); | ||
| } |
There was a problem hiding this comment.
DisableWorkflowRun_PreventsWorkflowRunActivityAsync now implicitly also controls the new workflow.session activity (since StartWorkflowSessionActivity() checks DisableWorkflowRun). The test currently asserts only that WorkflowRun is absent; consider also asserting that WorkflowSession is absent to prevent regressions where session spans are still emitted when run telemetry is disabled.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
| // Assert - workflow.run should have been stopped | ||
| var stoppedWorkflowRuns = this._stoppedActivities | ||
| .Where(a => a.RootId == testActivity.RootId && | ||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||
| .ToList(); | ||
| stoppedWorkflowRuns.Should().HaveCount(1, | ||
| "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); |
There was a problem hiding this comment.
In WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync, the assertions run immediately after the await foreach completes, but the run-stage Activity is disposed in the producer loop after writing the halt signal. This can race (consumer exits on the halt signal before the stop callback fires) and make the test flaky. Consider waiting/polling until the stopped Activity is observed (with a timeout), or disposing the StreamingRun before asserting if that still validates the intended behavior.
| using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); | ||
| activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); | ||
|
|
There was a problem hiding this comment.
LockstepRunEventStream still creates the workflow-run Activity via a using declaration inside an async IAsyncEnumerable that exits via multiple yield break paths. This is the same pattern described in #4155 and can still prevent the Activity from being disposed/stopped (and therefore exported). Consider removing the using and explicitly disposing/stopping the run Activity on all early-exit paths (cancellation, RequestHaltEvent, etc.), with a finally safety net.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
This pull request addresses the issue where workflow run telemetry spans (
Activityobjects) were not always properly stopped and exported, particularly in streaming and lockstep execution environments. The changes ensure that workflow run activities are disposed as soon as the workflow reaches the idle state or when the run loop exits, preventing telemetry data from being lost. Additionally, comprehensive regression tests are added to verify correct activity lifecycle management.Improvements to Activity Lifecycle Management:
workflow.runActivityis disposed immediately when the workflow reaches theIdlestate, so telemetry spans are promptly exported rather than waiting for cancellation or disposal.workflow.runActivityif it was not already stopped when the run loop exits, covering cancellation and error scenarios.usingstatement from the activity initialization to allow manual control over the activity's disposal timing.Testing and Regression Coverage:
WorkflowRunActivityStopTests.csto verify that workflow run activities are always properly stopped and exported to telemetry backends, covering lockstep, off-thread, and streaming execution environments, as well as ensuring that all started activities are stopped.Closes #4155
Contribution Checklist