diff --git a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql index 927e4564b..2a86d4861 100644 --- a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql +++ b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql @@ -197,21 +197,30 @@ BEGIN false ) AS _broadcast_result ), - -- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps) + -- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps) + skipped_parent_counts AS ( + -- Count how many skipped parents each child has + SELECT + dep.step_slug AS child_step_slug, + dep.flow_slug AS child_flow_slug, + COUNT(*) AS skipped_parent_count + FROM skipped_steps parent + JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug + GROUP BY dep.step_slug, dep.flow_slug + ), dependent_updates AS ( UPDATE pgflow.step_states child_state - SET remaining_deps = child_state.remaining_deps - 1, + SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count, -- If child is a map step and this skipped step is its only dependency, -- set initial_tasks = 0 (skipped dep = empty array) initial_tasks = CASE WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0 ELSE child_state.initial_tasks END - FROM skipped_steps parent - JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug - JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug + FROM skipped_parent_counts spc + JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug WHERE child_state.run_id = cascade_resolve_conditions.run_id - AND child_state.step_slug = dep.step_slug + AND child_state.step_slug = spc.child_step_slug ), run_update AS ( UPDATE pgflow.runs r diff --git a/pkgs/core/supabase/migrations/20260203101513_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql similarity index 99% rename from pkgs/core/supabase/migrations/20260203101513_pgflow_step_conditions.sql rename to pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql index 1b20a7e9d..7550a3c86 100644 --- a/pkgs/core/supabase/migrations/20260203101513_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql @@ -614,21 +614,30 @@ BEGIN false ) AS _broadcast_result ), - -- NEW: Update dependent steps (decrement remaining_deps, set initial_tasks=0 for maps) + -- NEW: Update dependent steps (decrement remaining_deps by count of skipped parents, set initial_tasks=0 for maps) + skipped_parent_counts AS ( + -- Count how many skipped parents each child has + SELECT + dep.step_slug AS child_step_slug, + dep.flow_slug AS child_flow_slug, + COUNT(*) AS skipped_parent_count + FROM skipped_steps parent + JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug + GROUP BY dep.step_slug, dep.flow_slug + ), dependent_updates AS ( UPDATE pgflow.step_states child_state - SET remaining_deps = child_state.remaining_deps - 1, + SET remaining_deps = child_state.remaining_deps - spc.skipped_parent_count, -- If child is a map step and this skipped step is its only dependency, -- set initial_tasks = 0 (skipped dep = empty array) initial_tasks = CASE WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0 ELSE child_state.initial_tasks END - FROM skipped_steps parent - JOIN pgflow.deps dep ON dep.flow_slug = parent.flow_slug AND dep.dep_slug = parent.step_slug - JOIN pgflow.steps child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug + FROM skipped_parent_counts spc + JOIN pgflow.steps child_step ON child_step.flow_slug = spc.child_flow_slug AND child_step.step_slug = spc.child_step_slug WHERE child_state.run_id = cascade_resolve_conditions.run_id - AND child_state.step_slug = dep.step_slug + AND child_state.step_slug = spc.child_step_slug ), run_update AS ( UPDATE pgflow.runs r diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 564a96a6e..c98885fa9 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs= +h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,4 +18,4 @@ h1:C7dqsqxBJfaYoFyWUWyOwQEqSTbZm2ob13rGC3EvYOs= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= -20260203101513_pgflow_step_conditions.sql h1:pom2NuFU0JsCKvdaeQzdGcVF20ZlDUd94bJmZ98hI5A= +20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow= diff --git a/pkgs/core/supabase/tests/condition_evaluation/multi_parent_skip_decrements_remaining_deps.test.sql b/pkgs/core/supabase/tests/condition_evaluation/multi_parent_skip_decrements_remaining_deps.test.sql new file mode 100644 index 000000000..bab137ac6 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/multi_parent_skip_decrements_remaining_deps.test.sql @@ -0,0 +1,92 @@ +-- Test: Multiple parents skipped in same iteration should decrement remaining_deps for each +-- This tests the bug where remaining_deps is only decremented once even when multiple parents +-- are skipped simultaneously in cascade_resolve_conditions +-- +-- Flow structure: +-- root_a (skip) \ +-- -> join (depends on both) +-- root_b (skip) / +-- +-- Expected behavior: +-- 1. Both root_a and root_b are skipped due to unmet conditions +-- 2. join.remaining_deps should be decremented by 2 (from 2 to 0) +-- 3. join should become ready and start +-- +-- Bug behavior (current): +-- 1. Both root_a and root_b are skipped +-- 2. join.remaining_deps is only decremented by 1 (from 2 to 1) +-- 3. join stays stuck with remaining_deps = 1 forever + +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Create flow with two conditional root steps that will both be skipped +select pgflow.create_flow('multi_root_skip'); +select pgflow.add_step( + flow_slug => 'multi_root_skip', + step_slug => 'root_a', + required_input_pattern => '{"go": true}'::jsonb, + when_unmet => 'skip' +); +select pgflow.add_step( + flow_slug => 'multi_root_skip', + step_slug => 'root_b', + required_input_pattern => '{"go": true}'::jsonb, + when_unmet => 'skip' +); +select pgflow.add_step( + flow_slug => 'multi_root_skip', + step_slug => 'join', + deps_slugs => ARRAY['root_a', 'root_b'] +); + +-- Start flow with input that does NOT match either condition +with flow as ( + select * from pgflow.start_flow('multi_root_skip', '{"go": false}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: root_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'root_a'), + 'skipped', + 'root_a should be skipped (condition unmet)' +); + +-- Test 2: root_b should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'root_b'), + 'skipped', + 'root_b should be skipped (condition unmet)' +); + +-- Test 3: join.remaining_deps should be 0 (both parents skipped) +select is( + (select remaining_deps from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'join'), + 0, + 'join.remaining_deps should be 0 when both parents are skipped' +); + +-- Test 4: join should be started (ready to run) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'join'), + 'started', + 'join should start after both parents are skipped' +); + +-- Test 5: join should have a task created +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'join'), + 1, + 'join should have one task created' +); + +drop table if exists run_ids; +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/multi_parent_skip_then_complete.test.sql b/pkgs/core/supabase/tests/condition_evaluation/multi_parent_skip_then_complete.test.sql new file mode 100644 index 000000000..c8d7e2457 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/multi_parent_skip_then_complete.test.sql @@ -0,0 +1,111 @@ +-- Test: Multiple parents skipped then one completes should properly decrement remaining_deps +-- This tests the bug where remaining_deps under-decrementing causes the join step to never start +-- +-- Flow structure: +-- branch_a (conditional, will run) \ +-- branch_b (skip) -> join (depends on all three) +-- branch_c (skip) / +-- +-- Expected behavior: +-- 1. branch_a starts (condition met), branch_b and branch_c are skipped +-- 2. join.remaining_deps should be decremented by 2 (for skipped branches) to 1 +-- 3. After branch_a completes, join.remaining_deps goes from 1 to 0 +-- 4. join should start +-- +-- Bug behavior (current): +-- 1. branch_a runs, branch_b and branch_c are skipped +-- 2. join.remaining_deps is only decremented by 1 (from 3 to 2) instead of 2 (to 1) +-- 3. After branch_a completes, join.remaining_deps goes from 2 to 1 +-- 4. join stays stuck with remaining_deps = 1 forever + +begin; +select plan(6); +select pgflow_tests.reset_db(); + +-- Create flow with three conditional branches +select pgflow.create_flow('multi_skip_partial'); +select pgflow.add_step( + flow_slug => 'multi_skip_partial', + step_slug => 'branch_a', + required_input_pattern => '{"route": "a"}'::jsonb, + when_unmet => 'skip' +); +select pgflow.add_step( + flow_slug => 'multi_skip_partial', + step_slug => 'branch_b', + required_input_pattern => '{"route": "b"}'::jsonb, + when_unmet => 'skip' +); +select pgflow.add_step( + flow_slug => 'multi_skip_partial', + step_slug => 'branch_c', + required_input_pattern => '{"route": "c"}'::jsonb, + when_unmet => 'skip' +); +select pgflow.add_step( + flow_slug => 'multi_skip_partial', + step_slug => 'join', + deps_slugs => ARRAY['branch_a', 'branch_b', 'branch_c'] +); + +-- Start flow with input that only matches branch_a's condition +with flow as ( + select * from pgflow.start_flow('multi_skip_partial', '{"route": "a"}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: branch_a should be started (condition met) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'branch_a'), + 'started', + 'branch_a should start (condition met)' +); + +-- Test 2: branch_b should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'branch_b'), + 'skipped', + 'branch_b should be skipped (condition unmet)' +); + +-- Test 3: branch_c should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'branch_c'), + 'skipped', + 'branch_c should be skipped (condition unmet)' +); + +-- Test 4: join.remaining_deps should be 1 (one running, two skipped) +-- After skips: should be 3 - 2 = 1 +select is( + (select remaining_deps from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'join'), + 1, + 'join.remaining_deps should be 1 after two parents are skipped (one still running)' +); + +-- Complete branch_a +select pgflow_tests.poll_and_complete('multi_skip_partial'); + +-- Test 5: After branch_a completes, join.remaining_deps should be 0 +select is( + (select remaining_deps from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'join'), + 0, + 'join.remaining_deps should be 0 after branch_a completes' +); + +-- Test 6: join should be started +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'join'), + 'started', + 'join should start after all dependencies are resolved' +); + +drop table if exists run_ids; +select finish(); +rollback; diff --git a/x.md b/x.md deleted file mode 100644 index 04632a2e8..000000000 --- a/x.md +++ /dev/null @@ -1 +0,0 @@ -please go through all the docs we have updated in this PR and make sure that we are adhering to the writing-pgflow-flows skill - there is no input.run anymore, but flowInput. and dependent steps need to await ctx.flowInput as they dont have the access to it directly anymore, input.run is no longer available