Open
Conversation
…rallel step support
…l agent calls, and add concurrency safety to the SQLite session store.
…y safety for workflow execution, add `OPENAI_API_KEY` to `.env`, and update workflow documentation.
Open
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Cagent Workflow Module
This document designs the three core workflow execution patterns in docker/cagent and answers implementation-specific use cases.
Overview
Workflows define declarative pipelines of agents and conditions. Execution is driven by the runtime: each step runs an agent (or evaluates a condition), and step outputs flow to the next step according to the pattern.
1. Sequential Step Execution
Description: Agents execute one after another in a linear chain. Each agent's output becomes available as input context for the next agent.
Example:
Behavior:
generatorruns first and completes.translatorreceivesgenerator's output as context in its prompt and processes it.publisherreceivestranslator's output as context and finalizes.Output propagation: Each step automatically receives all prior step outputs injected as context into its user message. The executor collects the last assistant message content from each completed step and formats it as a structured context block:
Outputs are also accessible via template expressions:
{{ $steps.<step_id>.output }}.2. Conditional Branching & Loops
Description: The workflow branches based on condition evaluation. When a condition's branch routes back to an earlier step (by step ID or index), it creates a loop. Conditions reference step outputs via templates.
Example:
Behavior:
translator, theqa_checkcondition runs (usingqa_agentoutput when referenced by$steps.qa.output).is_approved == true: workflow proceeds topublisher.is_approved == false: workflow routes to the step that runstranslatoragain (retry loop).Condition schema: Conditions are evaluated after the step(s) that produce the referenced output. The condition expression uses a small expression language (e.g.
{{ $steps.<id>.output.<path> }}) and must resolve to a boolean. Schema validation ensures referenced step IDs exist and that structured output (e.g.is_approved) is declared where needed (e.g. via agentstructured_output).3. Parallel Step Execution
Description: Multiple steps run concurrently. The workflow waits for all parallel steps to complete before moving to the next sequential step.
Example:
Behavior:
generatoragents run concurrently in separate goroutines.translatorstarts.translatorreceives outputs from all parallel steps as context in its prompt (see "Output structure from parallel steps" below).Concurrency safety: Parallel steps use two mechanisms to avoid races:
runnerMumutex on the executor serializesSetCurrentAgent+RunStreamcalls so each goroutine's internal runtime captures the correct agent name.ParentIDset), causingPersistentRuntimeto skip all SQLite persistence for those sessions.SQLiteSessionStorehas async.Mutexon all write methods as an additional safety net.Error handling: If any agent in a parallel block fails, the entire workflow fails immediately (all-or-nothing). No partial success; this keeps data consistency and avoids downstream agents seeing incomplete data.
Use Case: How deep can loops go? (max iteration count)
Answer: Loops are bounded by a max loop iterations setting.
workflow.max_loop_iterations(default:100). Optional per-workflow override:workflow.overrides.max_loop_iterations.max_loop_iterations, the workflow fails with a deterministic error (e.g.workflow: max loop iterations exceeded (step: trans, limit: 100)).This prevents infinite loops while allowing retries (e.g. QA reject → translator) up to a clear limit.
Use Case: Can we nest parallel blocks?
Answer: Yes. Parallel steps are just steps; their children can be any step type, including another
parallel.Example:
Behavior:
generatorruns in parallel with the inner parallel block (researcherandsummarizer). All three agent outputs are available topublisher(see output structure below). Failure of any of the three fails the whole workflow.Use Case: How are outputs from multiple parallel agents structured when passed to the next step?
Answer: Outputs from a parallel block are passed as a keyed map by step ID (and optionally by index for backwards compatibility).
Structure:
{ "steps": { "gen_1": { "output": "<last assistant message content>", "agent": "generator" }, "gen_2": { "output": "<last assistant message content>", "agent": "generator" } }, "order": ["gen_1", "gen_2"] }{{ $steps.par_gen.outputs.gen_1.output }}— output of parallel stepgen_1{{ $steps.par_gen.outputs.gen_2.output }}{{ $steps.par_gen.outputs[0].output }}(usingorderfor deterministic indexing).So: one structured object keyed by step ID (and ordered list for index-based access), passed as context to the next step.
Use Case: What retry behavior exists for failed steps?
Answer: Configurable per-step retry with optional backoff.
retry.max_attempts(default: 0 = no retry)retry.backoff(optional):fixed(e.g. 1s) orexponential(e.g. 1s, 2s, 4s)retry.on(optional): list of error patterns or exit conditions to retry on (e.g.["timeout", "rate_limit"]); if absent, retry on any error.Behavior:
max_attemptstimes on failure.Loops vs retries: Loops (condition → back to earlier step) are logical workflow branches. Retries are transient error handling for the same step. Both can be used: e.g. retry a step 2 times, then continue to a condition that may send the workflow back to an earlier step (e.g. QA reject → translator).
Use Case: How do we access outputs from parallel steps in subsequent agents?
Answer: Two mechanisms:
Automatic context injection: The executor injects a context blob into the next step's session (e.g. as a system or user message) containing:
$steps.<parallel_id>.outputs— the keyed map of step ID →{ output, agent }$steps.<parallel_id>.order— deterministic order for index-based access.Templates in config: In agent instructions or in condition expressions, use:
{{ $steps.par_gen.outputs.gen_1.output }}— output of parallel stepgen_1{{ $steps.par_gen.outputs[0].output }}— first output byorder{{ $steps.outer.outputs.inner.outputs.researcher.output }}(or a flatter key likeinner.researcherby convention).So: structured access by step ID (and by index via
order), both in injected context and in templates.Summary Table
max_loop_iterations(default 100); per-cycle count per step IDorderarray; one blob to next stepretry.max_attempts+ optional backoff; workflow fails after$steps.<id>.outputs.<step_id>.outputand$steps.<id>.outputs[n]How to run workflow via CLI
When your agent config defines a
workflowsection, use exec (non-TUI) to run the workflow:Workflow execution is only wired for exec mode. The
runcommand (TUI) still uses single-agent mode even when the config has a workflow.Implementation Notes
pkg/workflowholds workflow and step types (Config, Step, StepContext, loop counter, condition evaluation). No dependency on runtime or session to avoid import cycles.StepContextis concurrency-safe (sync.RWMutex) and exposes aSnapshot()method for serialization/debugging.pkg/workflowrunholds the executor: runs the workflow DAG (sequential/conditional/parallel), calls runtimeRunStreamper agent step, maintains step outputs and loop counters, evaluates conditions, and injects output context into sessions.workflowrun.NewLocalExecutor(runtime)andExecutor.Run(ctx, cfg, sess, events)which returns(*workflow.StepContext, error).--- Step Context ---).buildPriorContext()collects all prior step outputs and injects them as a structured text block into the next step's user message.runnerMuserializesSetCurrentAgent+RunStreamto prevent agent name races; sub-sessions skip SQLite persistence.SQLiteSessionStorehas async.Mutexprotecting all write methods (AddMessage,UpdateMessage,AddSession,UpdateSession, etc.) to prevent concurrent write panics.pkg/config/latestasConfig.Workflow(type*workflow.Config). Validation invalidate.goensures agent names exist, step types are valid, and condition steps have a condition expression.Config.Workflowis set,cagent execuses the workflow executor and streams events to stdout;cagent run(TUI) still uses single-agent mode.Developer Certificate of Origin
Version 1.1
Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129
Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.
Developer's Certificate of Origin 1.1
By making a contribution to this project, I certify that:
(a) The contribution was created in whole or in part by me and I
have the right to submit it under the open source license
indicated in the file; or
(b) The contribution is based upon previous work that, to the best
of my knowledge, is covered under an appropriate open source
license and I have the right under that license to submit that
work with modifications, whether created in whole or in part
by me, under the same open source license (unless I am
permitted to submit under a different license), as indicated
in the file; or
(c) The contribution was provided directly to me by some other
person who certified (a), (b) or (c) and I have not modified
it.
(d) I understand and agree that this project and the contribution
are public and that a record of the contribution (including all
personal information I submit with it, including my sign-off) is
maintained indefinitely and may be redistributed consistent with
this project or the open source license(s) involved.