Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
Expand Down Expand Up @@ -787,6 +788,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
Expand Down Expand Up @@ -815,6 +817,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export interface ExecutionGroup {
*/
interface IterationGroup {
iterationType: string
iterationContainerId: string
iterationCurrent: number
iterationTotal?: number
blocks: ConsoleEntry[]
Expand All @@ -169,7 +170,7 @@ interface IterationGroup {

/**
* Builds a tree structure from flat entries.
* Groups iteration entries by (iterationType, iterationCurrent), showing all blocks
* Groups iteration entries by (iterationType, iterationContainerId, iterationCurrent), showing all blocks
* that executed within each iteration.
* Sorts by start time to ensure chronological order.
*/
Expand All @@ -186,16 +187,18 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
}
}

// Group iteration entries by (iterationType, iterationCurrent)
// Group iteration entries by (iterationType, iterationContainerId, iterationCurrent)
const iterationGroupsMap = new Map<string, IterationGroup>()
for (const entry of iterationEntries) {
const key = `${entry.iterationType}-${entry.iterationCurrent}`
const iterationContainerId = entry.iterationContainerId || 'unknown'
const key = `${entry.iterationType}-${iterationContainerId}-${entry.iterationCurrent}`
let group = iterationGroupsMap.get(key)
const entryStartMs = new Date(entry.startedAt || entry.timestamp).getTime()

if (!group) {
group = {
iterationType: entry.iterationType!,
iterationContainerId,
iterationCurrent: entry.iterationCurrent!,
iterationTotal: entry.iterationTotal,
blocks: [],
Expand All @@ -220,26 +223,34 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
group.blocks.sort((a, b) => a.executionOrder - b.executionOrder)
}

// Group iterations by iterationType to create subflow parents
const subflowGroups = new Map<string, IterationGroup[]>()
// Group iterations by (iterationType, iterationContainerId) to create subflow parents
const subflowGroups = new Map<
string,
{ iterationType: string; iterationContainerId: string; groups: IterationGroup[] }
>()
for (const group of iterationGroupsMap.values()) {
const type = group.iterationType
let groups = subflowGroups.get(type)
if (!groups) {
groups = []
subflowGroups.set(type, groups)
const key = `${group.iterationType}-${group.iterationContainerId}`
let subflowGroup = subflowGroups.get(key)
if (!subflowGroup) {
subflowGroup = {
iterationType: group.iterationType,
iterationContainerId: group.iterationContainerId,
groups: [],
}
subflowGroups.set(key, subflowGroup)
}
groups.push(group)
subflowGroup.groups.push(group)
}

// Sort iterations within each subflow by iteration number
for (const groups of subflowGroups.values()) {
groups.sort((a, b) => a.iterationCurrent - b.iterationCurrent)
for (const subflowGroup of subflowGroups.values()) {
subflowGroup.groups.sort((a, b) => a.iterationCurrent - b.iterationCurrent)
}

// Build subflow nodes with iteration children
const subflowNodes: EntryNode[] = []
for (const [iterationType, iterationGroups] of subflowGroups.entries()) {
for (const subflowGroup of subflowGroups.values()) {
const { iterationType, iterationContainerId, groups: iterationGroups } = subflowGroup
// Calculate subflow timing from all its iterations
const firstIteration = iterationGroups[0]
const allBlocks = iterationGroups.flatMap((g) => g.blocks)
Expand All @@ -255,10 +266,10 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
// Use the minimum executionOrder from all child blocks for proper ordering
const subflowExecutionOrder = Math.min(...allBlocks.map((b) => b.executionOrder))
const syntheticSubflow: ConsoleEntry = {
id: `subflow-${iterationType}-${firstIteration.blocks[0]?.executionId || 'unknown'}`,
id: `subflow-${iterationType}-${iterationContainerId}-${firstIteration.blocks[0]?.executionId || 'unknown'}`,
timestamp: new Date(subflowStartMs).toISOString(),
workflowId: firstIteration.blocks[0]?.workflowId || '',
blockId: `${iterationType}-container`,
blockId: `${iterationType}-container-${iterationContainerId}`,
blockName: iterationType.charAt(0).toUpperCase() + iterationType.slice(1),
blockType: iterationType,
executionId: firstIteration.blocks[0]?.executionId,
Expand All @@ -284,10 +295,10 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
// Use the minimum executionOrder from blocks in this iteration
const iterExecutionOrder = Math.min(...iterBlocks.map((b) => b.executionOrder))
const syntheticIteration: ConsoleEntry = {
id: `iteration-${iterationType}-${iterGroup.iterationCurrent}-${iterBlocks[0]?.executionId || 'unknown'}`,
id: `iteration-${iterationType}-${iterGroup.iterationContainerId}-${iterGroup.iterationCurrent}-${iterBlocks[0]?.executionId || 'unknown'}`,
timestamp: new Date(iterStartMs).toISOString(),
workflowId: iterBlocks[0]?.workflowId || '',
blockId: `iteration-${iterGroup.iterationCurrent}`,
blockId: `iteration-${iterGroup.iterationContainerId}-${iterGroup.iterationCurrent}`,
blockName: `Iteration ${iterGroup.iterationCurrent}${iterGroup.iterationTotal !== undefined ? ` / ${iterGroup.iterationTotal}` : ''}`,
blockType: iterationType,
executionId: iterBlocks[0]?.executionId,
Expand All @@ -299,6 +310,7 @@ function buildEntryTree(entries: ConsoleEntry[]): EntryNode[] {
iterationCurrent: iterGroup.iterationCurrent,
iterationTotal: iterGroup.iterationTotal,
iterationType: iterationType as 'loop' | 'parallel',
iterationContainerId: iterGroup.iterationContainerId,
}

// Block nodes within this iteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
})
}

Expand All @@ -387,13 +388,15 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
})
}

const updateConsoleEntry = (data: BlockCompletedData) => {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: data.output,
success: true,
Expand All @@ -404,6 +407,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
},
executionId
)
Expand All @@ -413,6 +417,7 @@ export function useWorkflowExecution() {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: {},
success: false,
Expand All @@ -424,6 +429,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
},
executionId
)
Expand Down Expand Up @@ -453,6 +459,7 @@ export function useWorkflowExecution() {
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
})
}

Expand Down Expand Up @@ -921,6 +928,7 @@ export function useWorkflowExecution() {
useTerminalConsoleStore.getState().updateConsole(
log.blockId,
{
executionOrder: log.executionOrder,
replaceOutput: log.output,
success: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export async function executeWorkflowWithFullLogging(
iterationCurrent: event.data.iterationCurrent,
iterationTotal: event.data.iterationTotal,
iterationType: event.data.iterationType,
iterationContainerId: event.data.iterationContainerId,
})

if (options.onBlockComplete) {
Expand Down Expand Up @@ -167,6 +168,7 @@ export async function executeWorkflowWithFullLogging(
iterationCurrent: event.data.iterationCurrent,
iterationTotal: event.data.iterationTotal,
iterationType: event.data.iterationType,
iterationContainerId: event.data.iterationContainerId,
})
break

Expand Down
51 changes: 34 additions & 17 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import {
} from '@/executor/constants'
import type { DAGNode } from '@/executor/dag/builder'
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import type {
BlockStateWriter,
ContextExtensions,
IterationContext,
} from '@/executor/execution/types'
import {
generatePauseContextId,
mapNodeMetadataToPauseScopes,
Expand Down Expand Up @@ -473,28 +477,41 @@ export class BlockExecutor {
}
}

private getIterationContext(
ctx: ExecutionContext,
node: DAGNode
): { iterationCurrent: number; iterationTotal: number; iterationType: SubflowType } | undefined {
private createIterationContext(
iterationCurrent: number,
iterationType: SubflowType,
iterationContainerId?: string,
iterationTotal?: number
): IterationContext {
return {
iterationCurrent,
iterationTotal,
iterationType,
iterationContainerId,
}
}

private getIterationContext(ctx: ExecutionContext, node: DAGNode): IterationContext | undefined {
if (!node?.metadata) return undefined

if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal) {
return {
iterationCurrent: node.metadata.branchIndex,
iterationTotal: node.metadata.branchTotal,
iterationType: 'parallel',
}
if (node.metadata.branchIndex !== undefined && node.metadata.branchTotal !== undefined) {
return this.createIterationContext(
node.metadata.branchIndex,
'parallel',
node.metadata.parallelId,
node.metadata.branchTotal
)
}

if (node.metadata.isLoopNode && node.metadata.loopId) {
const loopScope = ctx.loopExecutions?.get(node.metadata.loopId)
if (loopScope && loopScope.iteration !== undefined && loopScope.maxIterations) {
return {
iterationCurrent: loopScope.iteration,
iterationTotal: loopScope.maxIterations,
iterationType: 'loop',
}
if (loopScope && loopScope.iteration !== undefined) {
return this.createIterationContext(
loopScope.iteration,
'loop',
node.metadata.loopId,
loopScope.maxIterations
)
}
}

Expand Down
3 changes: 2 additions & 1 deletion apps/sim/executor/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ export interface SerializableExecutionState {

export interface IterationContext {
iterationCurrent: number
iterationTotal: number
iterationTotal?: number
iterationType: SubflowType
iterationContainerId?: string
}

export interface ExecutionCallbacks {
Expand Down
19 changes: 17 additions & 2 deletions apps/sim/lib/workflows/executor/execution-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export interface BlockStartedEvent extends BaseExecutionEvent {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
}

Expand All @@ -102,6 +103,7 @@ export interface BlockCompletedEvent extends BaseExecutionEvent {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
}

Expand All @@ -124,6 +126,7 @@ export interface BlockErrorEvent extends BaseExecutionEvent {
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
iterationContainerId?: string
}
}

Expand Down Expand Up @@ -219,7 +222,12 @@ export function createSSECallbacks(options: SSECallbackOptions) {
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
iterationContext?: {
iterationCurrent: number
iterationTotal?: number
iterationType: string
iterationContainerId?: string
}
) => {
sendEvent({
type: 'block:started',
Expand All @@ -235,6 +243,7 @@ export function createSSECallbacks(options: SSECallbackOptions) {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
iterationContainerId: iterationContext.iterationContainerId,
}),
},
})
Expand All @@ -252,14 +261,20 @@ export function createSSECallbacks(options: SSECallbackOptions) {
executionOrder: number
endedAt: string
},
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
iterationContext?: {
iterationCurrent: number
iterationTotal?: number
iterationType: string
iterationContainerId?: string
}
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
? {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
iterationContainerId: iterationContext.iterationContainerId,
}
: {}

Expand Down
Loading