Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,30 @@ BEGIN
false
) AS _broadcast_result
),
-- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps)
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
skipped_parent_counts AS (
-- Count how many skipped parents each child has
SELECT
dep.step_slug AS child_step_slug,
dep.flow_slug AS child_flow_slug,
COUNT(*) AS skipped_parent_count
FROM skipped_steps parent
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
GROUP BY dep.step_slug, dep.flow_slug
),
dependent_updates AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - 1,
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
-- If child is a map step and this skipped step is its only dependency,
-- set initial_tasks = 0 (skipped dep = empty array)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
ELSE child_state.initial_tasks
END
FROM skipped_steps parent
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
FROM skipped_parent_counts spc
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
WHERE child_state.run_id = cascade_resolve_conditions.run_id
AND child_state.step_slug = dep.step_slug
AND child_state.step_slug = spc.child_step_slug
),
run_update AS (
UPDATE pgflow.runs r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,21 +614,30 @@ BEGIN
false
) AS _broadcast_result
),
-- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps)
-- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps)
skipped_parent_counts AS (
-- Count how many skipped parents each child has
SELECT
dep.step_slug AS child_step_slug,
dep.flow_slug AS child_flow_slug,
COUNT(*) AS skipped_parent_count
FROM skipped_steps parent
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
GROUP BY dep.step_slug, dep.flow_slug
),
dependent_updates AS (
UPDATE pgflow.step_states child_state
SET remaining_deps = child_state.remaining_deps - 1,
SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count,
-- If child is a map step and this skipped step is its only dependency,
-- set initial_tasks = 0 (skipped dep = empty array)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
ELSE child_state.initial_tasks
END
FROM skipped_steps parent
JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug
JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
FROM skipped_parent_counts spc
JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug
WHERE child_state.run_id = cascade_resolve_conditions.run_id
AND child_state.step_slug = dep.step_slug
AND child_state.step_slug = spc.child_step_slug
),
run_update AS (
UPDATE pgflow.runs r
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs=
h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -18,4 +18,4 @@ h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
20260203101513_pgflow_step_conditions.sql h1:pom2NuFU0JsCKvdaeQzdGcVF20ZlDUd94bJmZ98hI5A=
20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow=
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
-- Test: Multiple parents skipped in same iteration should decrement remaining_deps for each
-- This tests the bug where remaining_deps is only decremented once even when multiple parents
-- are skipped simultaneously in cascade_resolve_conditions
--
-- Flow structure:
-- root_a (skip) \
-- -> join (depends on both)
-- root_b (skip) /
--
-- Expected behavior:
-- 1. Both root_a and root_b are skipped due to unmet conditions
-- 2. join.remaining_deps should be decremented by 2 (from 2 to 0)
-- 3. join should become ready and start
--
-- Bug behavior (current):
-- 1. Both root_a and root_b are skipped
-- 2. join.remaining_deps is only decremented by 1 (from 2 to 1)
-- 3. join stays stuck with remaining_deps = 1 forever

begin;
select plan(5);
select pgflow_tests.reset_db();

-- Create flow with two conditional root steps that will both be skipped
select pgflow.create_flow('multi_root_skip');
select pgflow.add_step(
flow_slug => 'multi_root_skip',
step_slug => 'root_a',
required_input_pattern => '{"go": true}'::jsonb,
when_unmet => 'skip'
);
select pgflow.add_step(
flow_slug => 'multi_root_skip',
step_slug => 'root_b',
required_input_pattern => '{"go": true}'::jsonb,
when_unmet => 'skip'
);
select pgflow.add_step(
flow_slug => 'multi_root_skip',
step_slug => 'join',
deps_slugs => ARRAY['root_a', 'root_b']
);

-- Start flow with input that does NOT match either condition
with flow as (
select * from pgflow.start_flow('multi_root_skip', '{"go": false}'::jsonb)
)
select run_id into temporary run_ids from flow;

-- Test 1: root_a should be skipped
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'root_a'),
'skipped',
'root_a should be skipped (condition unmet)'
);

-- Test 2: root_b should be skipped
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'root_b'),
'skipped',
'root_b should be skipped (condition unmet)'
);

-- Test 3: join.remaining_deps should be 0 (both parents skipped)
select is(
(select remaining_deps from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'join'),
0,
'join.remaining_deps should be 0 when both parents are skipped'
);

-- Test 4: join should be started (ready to run)
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'join'),
'started',
'join should start after both parents are skipped'
);

-- Test 5: join should have a task created
select is(
(select count(*)::int from pgflow.step_tasks
where run_id = (select run_id from run_ids) and step_slug = 'join'),
1,
'join should have one task created'
);

drop table if exists run_ids;
select finish();
rollback;
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
-- Test: Multiple parents skipped then one completes should properly decrement remaining_deps
-- This tests the bug where remaining_deps under-decrementing causes the join step to never start
--
-- Flow structure:
-- branch_a (conditional, will run) \
-- branch_b (skip) -> join (depends on all three)
-- branch_c (skip) /
--
-- Expected behavior:
-- 1. branch_a starts (condition met), branch_b and branch_c are skipped
-- 2. join.remaining_deps should be decremented by 2 (for skipped branches) to 1
-- 3. After branch_a completes, join.remaining_deps goes from 1 to 0
-- 4. join should start
--
-- Bug behavior (current):
-- 1. branch_a runs, branch_b and branch_c are skipped
-- 2. join.remaining_deps is only decremented by 1 (from 3 to 2) instead of 2 (to 1)
-- 3. After branch_a completes, join.remaining_deps goes from 2 to 1
-- 4. join stays stuck with remaining_deps = 1 forever

begin;
select plan(6);
select pgflow_tests.reset_db();

-- Create flow with three conditional branches
select pgflow.create_flow('multi_skip_partial');
select pgflow.add_step(
flow_slug => 'multi_skip_partial',
step_slug => 'branch_a',
required_input_pattern => '{"route": "a"}'::jsonb,
when_unmet => 'skip'
);
select pgflow.add_step(
flow_slug => 'multi_skip_partial',
step_slug => 'branch_b',
required_input_pattern => '{"route": "b"}'::jsonb,
when_unmet => 'skip'
);
select pgflow.add_step(
flow_slug => 'multi_skip_partial',
step_slug => 'branch_c',
required_input_pattern => '{"route": "c"}'::jsonb,
when_unmet => 'skip'
);
select pgflow.add_step(
flow_slug => 'multi_skip_partial',
step_slug => 'join',
deps_slugs => ARRAY['branch_a', 'branch_b', 'branch_c']
);

-- Start flow with input that only matches branch_a's condition
with flow as (
select * from pgflow.start_flow('multi_skip_partial', '{"route": "a"}'::jsonb)
)
select run_id into temporary run_ids from flow;

-- Test 1: branch_a should be started (condition met)
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'branch_a'),
'started',
'branch_a should start (condition met)'
);

-- Test 2: branch_b should be skipped
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'branch_b'),
'skipped',
'branch_b should be skipped (condition unmet)'
);

-- Test 3: branch_c should be skipped
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'branch_c'),
'skipped',
'branch_c should be skipped (condition unmet)'
);

-- Test 4: join.remaining_deps should be 1 (one running, two skipped)
-- After skips: should be 3 - 2 = 1
select is(
(select remaining_deps from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'join'),
1,
'join.remaining_deps should be 1 after two parents are skipped (one still running)'
);

-- Complete branch_a
select pgflow_tests.poll_and_complete('multi_skip_partial');

-- Test 5: After branch_a completes, join.remaining_deps should be 0
select is(
(select remaining_deps from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'join'),
0,
'join.remaining_deps should be 0 after branch_a completes'
);

-- Test 6: join should be started
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'join'),
'started',
'join should start after all dependencies are resolved'
);

drop table if exists run_ids;
select finish();
rollback;
1 change: 0 additions & 1 deletion x.md

This file was deleted.