diff --git a/Lib/profiling/sampling/heatmap_collector.py b/Lib/profiling/sampling/heatmap_collector.py index bb810fa485be63..022e94d014f9b7 100644 --- a/Lib/profiling/sampling/heatmap_collector.py +++ b/Lib/profiling/sampling/heatmap_collector.py @@ -472,6 +472,10 @@ def __init__(self, *args, **kwargs): self.callers_graph = collections.defaultdict(set) self.function_definitions = {} + # Map each sampled line to its function for proper caller lookup + # (filename, lineno) -> funcname + self.line_to_function = {} + # Edge counting for call path analysis self.edge_samples = collections.Counter() @@ -596,6 +600,10 @@ def _record_line_sample(self, filename, lineno, funcname, is_leaf=False, if funcname and (filename, funcname) not in self.function_definitions: self.function_definitions[(filename, funcname)] = lineno + # Map this line to its function for caller/callee navigation + if funcname: + self.line_to_function[(filename, lineno)] = funcname + def _record_bytecode_sample(self, filename, lineno, opcode, end_lineno=None, col_offset=None, end_col_offset=None, weight=1): @@ -1150,13 +1158,36 @@ def _format_specialization_color(self, spec_pct: int) -> str: return f"rgba({r}, {g}, {b}, {alpha})" def _build_navigation_buttons(self, filename: str, line_num: int) -> str: - """Build navigation buttons for callers/callees.""" + """Build navigation buttons for callers/callees. + + - Callers: All lines in a function show who calls this function + - Callees: Only actual call site lines show what they call + """ line_key = (filename, line_num) - caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set())) + + funcname = self.line_to_function.get(line_key) + + # Get callers: look up by function definition line, not current line + # This ensures all lines in a function show who calls this function + if funcname: + func_def_line = self.function_definitions.get((filename, funcname), line_num) + func_def_key = (filename, func_def_line) + caller_list = self._deduplicate_by_function(self.callers_graph.get(func_def_key, set())) + else: + caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, set())) + + # Get callees: only show for actual call site lines (not every line in function) callee_list = self._deduplicate_by_function(self.call_graph.get(line_key, set())) # Get edge counts for each caller/callee - callers_with_counts = self._get_edge_counts(line_key, caller_list, is_caller=True) + # For callers, use the function definition key for edge lookup + if funcname: + func_def_line = self.function_definitions.get((filename, funcname), line_num) + caller_edge_key = (filename, func_def_line) + else: + caller_edge_key = line_key + callers_with_counts = self._get_edge_counts(caller_edge_key, caller_list, is_caller=True) + # For callees, use the actual line key since that's where the call happens callees_with_counts = self._get_edge_counts(line_key, callee_list, is_caller=False) # Build navigation buttons with counts diff --git a/Lib/test/test_profiling/test_heatmap.py b/Lib/test/test_profiling/test_heatmap.py index b1bfdf868b085a..b2acb1cf577341 100644 --- a/Lib/test/test_profiling/test_heatmap.py +++ b/Lib/test/test_profiling/test_heatmap.py @@ -367,6 +367,96 @@ def test_process_frames_with_file_samples_dict(self): self.assertEqual(collector.file_samples['test.py'][10], 1) +def frame(filename, line, func): + """Create a frame tuple: (filename, location, funcname, opcode).""" + return (filename, (line, line, -1, -1), func, None) + + +class TestHeatmapCollectorNavigationButtons(unittest.TestCase): + """Test navigation button behavior for caller/callee relationships. + + For every call stack: + - Root frames (entry points): only DOWN arrow (callees) + - Middle frames: both UP and DOWN arrows + - Leaf frames: only UP arrow (callers) + """ + + def collect(self, *stacks): + """Create collector and process frame stacks.""" + collector = HeatmapCollector(sample_interval_usec=100) + for stack in stacks: + collector.process_frames(stack, thread_id=1) + return collector + + def test_deep_call_stack_relationships(self): + """Test root/middle/leaf navigation in a 5-level call stack.""" + # Stack: root -> A -> B -> C -> leaf + stack = [ + frame('leaf.py', 5, 'leaf'), + frame('c.py', 10, 'func_c'), + frame('b.py', 15, 'func_b'), + frame('a.py', 20, 'func_a'), + frame('root.py', 25, 'root'), + ] + c = self.collect(stack) + + # Root: only callees (no one calls it) + self.assertIn(('root.py', 25), c.call_graph) + self.assertNotIn(('root.py', 25), c.callers_graph) + + # Middle frames: both callers and callees + for key in [('a.py', 20), ('b.py', 15), ('c.py', 10)]: + self.assertIn(key, c.call_graph) + self.assertIn(key, c.callers_graph) + + # Leaf: only callers (doesn't call anyone) + self.assertNotIn(('leaf.py', 5), c.call_graph) + self.assertIn(('leaf.py', 5), c.callers_graph) + + def test_all_lines_in_function_see_callers(self): + """Test that interior lines map to their function for caller lookup.""" + # Same function sampled at different lines (12, 15, 10) + c = self.collect( + [frame('mod.py', 12, 'my_func'), frame('caller.py', 100, 'caller')], + [frame('mod.py', 15, 'my_func'), frame('caller.py', 100, 'caller')], + [frame('mod.py', 10, 'my_func'), frame('caller.py', 100, 'caller')], + ) + + # All lines should map to same function + for line in [10, 12, 15]: + self.assertEqual(c.line_to_function[('mod.py', line)], 'my_func') + + # Function definition line should have callers + func_def = c.function_definitions[('mod.py', 'my_func')] + self.assertIn(('mod.py', func_def), c.callers_graph) + + def test_multiple_callers_and_callees(self): + """Test multiple callers/callees are recorded correctly.""" + # Two callers -> target, and caller -> two callees + c = self.collect( + [frame('target.py', 10, 'target'), frame('caller1.py', 20, 'c1')], + [frame('target.py', 10, 'target'), frame('caller2.py', 30, 'c2')], + [frame('callee1.py', 5, 'f1'), frame('dispatcher.py', 40, 'dispatch')], + [frame('callee2.py', 6, 'f2'), frame('dispatcher.py', 40, 'dispatch')], + ) + + # Target has 2 callers + callers = c.callers_graph[('target.py', 10)] + self.assertEqual({x[0] for x in callers}, {'caller1.py', 'caller2.py'}) + + # Dispatcher has 2 callees + callees = c.call_graph[('dispatcher.py', 40)] + self.assertEqual({x[0] for x in callees}, {'callee1.py', 'callee2.py'}) + + def test_edge_samples_counted(self): + """Test that repeated calls accumulate edge counts.""" + stack = [frame('callee.py', 10, 'callee'), frame('caller.py', 20, 'caller')] + c = self.collect(stack, stack, stack) + + edge_key = (('caller.py', 20), ('callee.py', 10)) + self.assertEqual(c.edge_samples[edge_key], 3) + + class TestHeatmapCollectorExport(unittest.TestCase): """Test HeatmapCollector.export() method."""