From 23c803d1cdf09e5ff7b712c12a3a02ec758e25a1 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 5 Feb 2026 14:28:18 +0100 Subject: [PATCH 1/3] Draft of dynamic memory snapshots --- src/crawlee/_autoscaling/_types.py | 24 +++++-- src/crawlee/_autoscaling/snapshotter.py | 35 +++++++--- src/crawlee/_utils/byte_size.py | 10 ++- tests/unit/_autoscaling/test_snapshotter.py | 75 ++++++++++++++++++++- 4 files changed, 124 insertions(+), 20 deletions(-) diff --git a/src/crawlee/_autoscaling/_types.py b/src/crawlee/_autoscaling/_types.py index b231c9062d..aa59e45fb0 100644 --- a/src/crawlee/_autoscaling/_types.py +++ b/src/crawlee/_autoscaling/_types.py @@ -2,11 +2,9 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from crawlee._utils.byte_size import ByteSize +from crawlee._utils.byte_size import ByteSize, Ratio +from crawlee._utils.system import get_memory_info SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97 @@ -97,8 +95,12 @@ class MemorySnapshot: system_wide_used_size: ByteSize | None """Memory usage of all processes, system-wide.""" - max_memory_size: ByteSize - """The maximum memory that can be used by `AutoscaledPool`.""" + max_memory_size: ByteSize | Ratio + """The maximum memory that can be used by `AutoscaledPool`. + + When of type `ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory + scaling based on the available system memory. + """ system_wide_memory_size: ByteSize | None """Total memory available in the whole system.""" @@ -117,7 +119,15 @@ def is_overloaded(self) -> bool: if system_wide_utilization > SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD: return True - return (self.current_size / self.max_memory_size) > self.max_used_memory_ratio + if isinstance(self.max_memory_size, Ratio): + # The snapshot overload is decided not when the snapshot was taken, but when `is_overload` property is + # accessed. This allows for dynamic memory scaling. The same memory snapshot that used to be overloaded in + # the past can become non-overloaded if the available memory was increased. + max_memory_size = ByteSize(int(get_memory_info().total_size.bytes * self.max_memory_size.value)) + else: + max_memory_size = self.max_memory_size + + return (self.current_size / max_memory_size) > self.max_used_memory_ratio @dataclass diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 55af9da1dd..219f6605e8 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -9,7 +9,7 @@ from crawlee import service_locator from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot -from crawlee._utils.byte_size import ByteSize +from crawlee._utils.byte_size import ByteSize, Ratio from crawlee._utils.context import ensure_context from crawlee._utils.docs import docs_group from crawlee._utils.recurring_task import RecurringTask @@ -69,7 +69,7 @@ def __init__( max_used_memory_ratio: float, max_event_loop_delay: timedelta, max_client_errors: int, - max_memory_size: ByteSize, + max_memory_size: ByteSize | Ratio, ) -> None: """Initialize a new instance. @@ -85,7 +85,9 @@ def __init__( value, the event loop is considered overloaded. max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors is higher than the provided number, the client is considered overloaded. - max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. + max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. When of type + `ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory + scaling based on the available system memory. """ self._max_used_cpu_ratio = max_used_cpu_ratio self._max_used_memory_ratio = max_used_memory_ratio @@ -121,7 +123,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter: max_memory_size = ( ByteSize.from_mb(config.memory_mbytes) if config.memory_mbytes - else ByteSize(int(get_memory_info().total_size.bytes * config.available_memory_ratio)) + else Ratio(value=config.available_memory_ratio) ) return cls( @@ -296,7 +298,17 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: snapshots = cast('list[Snapshot]', self._memory_snapshots) self._prune_snapshots(snapshots, snapshot.created_at) self._memory_snapshots.add(snapshot) - self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at) + + if isinstance(self._max_memory_size, Ratio): + max_memory_size = ByteSize(int(get_memory_info().total_size.bytes * self._max_memory_size.value)) + else: + max_memory_size = self._max_memory_size + + self._evaluate_memory_load( + event_data.memory_info.current_size, + event_data.memory_info.created_at, + max_memory_size=max_memory_size, + ) def _snapshot_event_loop(self) -> None: """Capture a snapshot of the current event loop usage. @@ -364,27 +376,30 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None: else: snapshots.clear() - def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime) -> None: + def _evaluate_memory_load( + self, current_memory_usage_size: ByteSize, snapshot_timestamp: datetime, max_memory_size: ByteSize + ) -> None: """Evaluate and logs critical memory load conditions based on the system information. Args: current_memory_usage_size: The current memory usage. snapshot_timestamp: The time at which the memory snapshot was taken. + max_memory_size: The maximum memory size to be used for evaluation. """ # Check if the warning has been logged recently to avoid spamming if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._MEMORY_WARNING_COOLDOWN_PERIOD: return - threshold_memory_size = self._max_used_memory_ratio * self._max_memory_size - buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO + threshold_memory_size = self._max_used_memory_ratio * max_memory_size + buffer_memory_size = max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO overload_memory_threshold_size = threshold_memory_size + buffer_memory_size # Log a warning if current memory usage exceeds the critical overload threshold if current_memory_usage_size > overload_memory_threshold_size: - memory_usage_percentage = round((current_memory_usage_size.bytes / self._max_memory_size.bytes) * 100) + memory_usage_percentage = round((current_memory_usage_size.bytes / max_memory_size.bytes) * 100) logger.warning( f'Memory is critically overloaded. Using {current_memory_usage_size} of ' - f'{self._max_memory_size} ({memory_usage_percentage}%). ' + f'{max_memory_size} ({memory_usage_percentage}%). ' 'Consider increasing available memory.' ) self._timestamp_of_last_memory_warning = snapshot_timestamp diff --git a/src/crawlee/_utils/byte_size.py b/src/crawlee/_utils/byte_size.py index da4ba84b28..e51ea3e9e4 100644 --- a/src/crawlee/_utils/byte_size.py +++ b/src/crawlee/_utils/byte_size.py @@ -1,7 +1,9 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any +from typing import Annotated, Any + +from pydantic import BaseModel, Field _BYTES_PER_KB = 1024 _BYTES_PER_MB = _BYTES_PER_KB**2 @@ -9,6 +11,12 @@ _BYTES_PER_TB = _BYTES_PER_KB**4 +class Ratio(BaseModel): + """Represents ratio of memory.""" + + value: Annotated[float, Field(gt=0.0, le=1.0)] + + @dataclass(frozen=True) class ByteSize: """Represents a byte size.""" diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index 7b3d50d75d..1b6587d8b9 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -3,16 +3,22 @@ from datetime import datetime, timedelta, timezone from logging import getLogger from typing import TYPE_CHECKING, cast +from unittest import mock from unittest.mock import MagicMock import pytest from crawlee import service_locator from crawlee._autoscaling import Snapshotter -from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, MemorySnapshot +from crawlee._autoscaling._types import ( + SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD, + ClientSnapshot, + CpuSnapshot, + MemorySnapshot, +) from crawlee._autoscaling.snapshotter import SortedSnapshotList from crawlee._utils.byte_size import ByteSize -from crawlee._utils.system import CpuInfo, MemoryInfo +from crawlee._utils.system import CpuInfo, MemoryInfo, get_memory_info from crawlee.configuration import Configuration from crawlee.events import LocalEventManager from crawlee.events._types import Event, EventSystemInfoData @@ -354,3 +360,68 @@ def test_sorted_snapshot_list_add_maintains_order() -> None: prev_time = sorted_list[i - 1].created_at curr_time = snapshot.created_at assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order' + + +async def test_dynamic_memory(default_cpu_info: CpuInfo, event_manager: LocalEventManager) -> None: + """Test dynamic memory scaling scenario where the system-wide memory can change. + + Create one memory snapshot. In the initial setup the snapshot is overloaded due to system memory not being + sufficient. Then simulate a scale-up event by increasing the total system memory. The same memory snapshot should + no longer be considered overloaded after the scale-up. Then go back to initial memory and the same snapshot becomes + overloaded again + """ + + initial_memory = get_memory_info() + # Big memory usage ratio, that does not trigger system-wide overload + big_usage_ratio = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD + + service_locator.set_event_manager(event_manager) + + async with Snapshotter.from_config(Configuration(available_memory_ratio=big_usage_ratio)) as snapshotter: + # Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded + memory_info_default = MemoryInfo( + total_size=initial_memory.total_size, + current_size=initial_memory.total_size * big_usage_ratio, + system_wide_used_size=initial_memory.total_size * big_usage_ratio, + ) + + event_manager.emit( + event=Event.SYSTEM_INFO, + event_data=EventSystemInfoData( + cpu_info=default_cpu_info, + memory_info=memory_info_default, + ), + ) + await event_manager.wait_for_all_listeners_to_complete() + + with ( + mock.patch('crawlee._autoscaling.snapshotter.get_memory_info', return_value=memory_info_default), + mock.patch('crawlee._autoscaling._types.get_memory_info', return_value=memory_info_default), + ): + memory_samples = snapshotter.get_memory_sample() + assert len(memory_samples) == 1 + assert memory_samples[0].is_overloaded + + # Scaled-up state, memory usage same, but more memory, so no longer overloaded + scaled_up_info = MemoryInfo( + total_size=initial_memory.total_size * 2, # Simulate increased total memory + current_size=initial_memory.total_size * big_usage_ratio, # Simulate memory usage + system_wide_used_size=initial_memory.total_size * big_usage_ratio, + ) + + with ( + mock.patch('crawlee._autoscaling.snapshotter.get_memory_info', return_value=scaled_up_info), + mock.patch('crawlee._autoscaling._types.get_memory_info', return_value=scaled_up_info), + ): + memory_samples = snapshotter.get_memory_sample() + assert len(memory_samples) == 1 + assert not memory_samples[0].is_overloaded + + # Back to default state, should be overloaded again + with ( + mock.patch('crawlee._autoscaling.snapshotter.get_memory_info', return_value=memory_info_default), + mock.patch('crawlee._autoscaling._types.get_memory_info', return_value=memory_info_default), + ): + memory_samples = snapshotter.get_memory_sample() + assert len(memory_samples) == 1 + assert memory_samples[0].is_overloaded From 3f27a149b65b5c39261f0acf32ac6b9566ae588b Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 11 Feb 2026 10:55:26 +0100 Subject: [PATCH 2/3] Review comments and simplification. --- src/crawlee/_autoscaling/_types.py | 15 +-- src/crawlee/_autoscaling/snapshotter.py | 44 +++++--- src/crawlee/configuration.py | 7 +- tests/unit/_autoscaling/test_snapshotter.py | 118 +++++++++++--------- 4 files changed, 104 insertions(+), 80 deletions(-) diff --git a/src/crawlee/_autoscaling/_types.py b/src/crawlee/_autoscaling/_types.py index aa59e45fb0..1f0a1383ae 100644 --- a/src/crawlee/_autoscaling/_types.py +++ b/src/crawlee/_autoscaling/_types.py @@ -2,9 +2,10 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING -from crawlee._utils.byte_size import ByteSize, Ratio -from crawlee._utils.system import get_memory_info +if TYPE_CHECKING: + from crawlee._utils.byte_size import ByteSize, Ratio SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97 @@ -119,15 +120,7 @@ def is_overloaded(self) -> bool: if system_wide_utilization > SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD: return True - if isinstance(self.max_memory_size, Ratio): - # The snapshot overload is decided not when the snapshot was taken, but when `is_overload` property is - # accessed. This allows for dynamic memory scaling. The same memory snapshot that used to be overloaded in - # the past can become non-overloaded if the available memory was increased. - max_memory_size = ByteSize(int(get_memory_info().total_size.bytes * self.max_memory_size.value)) - else: - max_memory_size = self.max_memory_size - - return (self.current_size / max_memory_size) > self.max_used_memory_ratio + return (self.current_size / self.max_memory_size) > self.max_used_memory_ratio @dataclass diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 219f6605e8..6505f3e263 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -13,7 +13,7 @@ from crawlee._utils.context import ensure_context from crawlee._utils.docs import docs_group from crawlee._utils.recurring_task import RecurringTask -from crawlee._utils.system import MemoryInfo, get_memory_info +from crawlee._utils.system import MemoryInfo, MemoryUsageInfo, get_memory_info from crawlee.events._types import Event, EventSystemInfoData if TYPE_CHECKING: @@ -282,28 +282,46 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: Args: event_data: System info data from which memory usage is read. """ + match event_data.memory_info, self._max_memory_size: + case MemoryInfo() as memory_info, Ratio() as ratio: + max_memory_size = memory_info.total_size * ratio.value + system_wide_used_size = memory_info.system_wide_used_size + system_wide_memory_size = memory_info.total_size + + case MemoryUsageInfo(), Ratio() as ratio: + # This is just hypothetical case, that should not happen in practice. + # `LocalEvenManager` should always provide `MemoryInfo` in the event data. + # When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`. + max_memory_size = get_memory_info().total_size * ratio.value + system_wide_used_size = None + system_wide_memory_size = None + + case MemoryInfo() as memory_info, ByteSize() as byte_size: + max_memory_size = byte_size + system_wide_used_size = memory_info.system_wide_used_size + system_wide_memory_size = memory_info.total_size + + case MemoryUsageInfo(), ByteSize() as byte_size: + max_memory_size = byte_size + system_wide_used_size = None + system_wide_memory_size = None + + case _, _: + raise NotImplementedError('Unsupported combination of memory info and max memory size types.') + snapshot = MemorySnapshot( current_size=event_data.memory_info.current_size, - max_memory_size=self._max_memory_size, + max_memory_size=max_memory_size, max_used_memory_ratio=self._max_used_memory_ratio, created_at=event_data.memory_info.created_at, - system_wide_used_size=None, - system_wide_memory_size=None, + system_wide_used_size=system_wide_used_size, + system_wide_memory_size=system_wide_memory_size, ) - if isinstance(memory_info := event_data.memory_info, MemoryInfo): - snapshot.system_wide_used_size = memory_info.system_wide_used_size - snapshot.system_wide_memory_size = memory_info.total_size - snapshots = cast('list[Snapshot]', self._memory_snapshots) self._prune_snapshots(snapshots, snapshot.created_at) self._memory_snapshots.add(snapshot) - if isinstance(self._max_memory_size, Ratio): - max_memory_size = ByteSize(int(get_memory_info().total_size.bytes * self._max_memory_size.value)) - else: - max_memory_size = self._max_memory_size - self._evaluate_memory_load( event_data.memory_info.current_size, event_data.memory_info.created_at, diff --git a/src/crawlee/configuration.py b/src/crawlee/configuration.py index d6dc6b071e..85d4b8c560 100644 --- a/src/crawlee/configuration.py +++ b/src/crawlee/configuration.py @@ -177,11 +177,14 @@ class Configuration(BaseSettings): validation_alias=AliasChoices( 'apify_available_memory_ratio', 'crawlee_available_memory_ratio', - ) + ), + gt=0.0, + le=1.0, ), ] = 0.25 """The maximum proportion of system memory to use. If `memory_mbytes` is not provided, this ratio is used to - calculate the maximum memory. This option is utilized by the `Snapshotter`.""" + calculate the maximum memory. This option is utilized by the `Snapshotter` and supports the dynamic system memory + scaling.""" storage_dir: Annotated[ str, diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index 8197087406..b618d06a2e 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -3,8 +3,8 @@ import asyncio from datetime import datetime, timedelta, timezone from logging import getLogger +from math import floor from typing import TYPE_CHECKING, cast -from unittest import mock from unittest.mock import MagicMock import pytest @@ -28,6 +28,9 @@ from collections.abc import AsyncGenerator +MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD + + @pytest.fixture async def event_manager() -> AsyncGenerator[LocalEventManager, None]: # Use a long interval to avoid interference from periodic system info events during tests and ensure the first @@ -377,66 +380,73 @@ def test_sorted_snapshot_list_add_maintains_order() -> None: assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order' -async def test_dynamic_memory(default_cpu_info: CpuInfo, event_manager: LocalEventManager) -> None: +_initial_memory_info = get_memory_info() + + +@pytest.mark.parametrize( + ('available_memory_ratio', 'memory_mbytes', 'overloaded_after_scale_up'), + [ + pytest.param(MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, None, False, id='Ratio-based memory limit'), + pytest.param( + MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, + floor(_initial_memory_info.total_size.to_mb()), + True, + id='Fixed memory limit', + ), + ], +) +async def test_dynamic_memory( + *, + default_cpu_info: CpuInfo, + event_manager: LocalEventManager, + available_memory_ratio: float, + memory_mbytes: int | None, + overloaded_after_scale_up: bool, +) -> None: """Test dynamic memory scaling scenario where the system-wide memory can change. - Create one memory snapshot. In the initial setup the snapshot is overloaded due to system memory not being - sufficient. Then simulate a scale-up event by increasing the total system memory. The same memory snapshot should - no longer be considered overloaded after the scale-up. Then go back to initial memory and the same snapshot becomes - overloaded again - """ + Create two memory snapshots. They have same memory usage, but different available memory. + First snapshot is created with insufficient memory, so it is overloaded. + Second snapshot is created with sufficient memory. - initial_memory = get_memory_info() - # Big memory usage ratio, that does not trigger system-wide overload - big_usage_ratio = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD + Based on the Snapshotter configuration, it will either take into account the increased available memory or not. + """ service_locator.set_event_manager(event_manager) - async with Snapshotter.from_config(Configuration(available_memory_ratio=big_usage_ratio)) as snapshotter: + async with Snapshotter.from_config( + Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=available_memory_ratio) + ) as snapshotter: # Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded - memory_info_default = MemoryInfo( - total_size=initial_memory.total_size, - current_size=initial_memory.total_size * big_usage_ratio, - system_wide_used_size=initial_memory.total_size * big_usage_ratio, - ) - - event_manager.emit( - event=Event.SYSTEM_INFO, - event_data=EventSystemInfoData( - cpu_info=default_cpu_info, - memory_info=memory_info_default, + memory_infos = [ + # Overloaded sample + MemoryInfo( + total_size=_initial_memory_info.total_size, + current_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, + system_wide_used_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, ), - ) - await event_manager.wait_for_all_listeners_to_complete() + # Same as first sample, with twice as memory available in the system + MemoryInfo( + total_size=_initial_memory_info.total_size * 2, # Simulate increased total memory + current_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, + system_wide_used_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, + ), + ] - with ( - mock.patch('crawlee._autoscaling.snapshotter.get_memory_info', return_value=memory_info_default), - mock.patch('crawlee._autoscaling._types.get_memory_info', return_value=memory_info_default), - ): - memory_samples = snapshotter.get_memory_sample() - assert len(memory_samples) == 1 - assert memory_samples[0].is_overloaded - - # Scaled-up state, memory usage same, but more memory, so no longer overloaded - scaled_up_info = MemoryInfo( - total_size=initial_memory.total_size * 2, # Simulate increased total memory - current_size=initial_memory.total_size * big_usage_ratio, # Simulate memory usage - system_wide_used_size=initial_memory.total_size * big_usage_ratio, - ) + for memory_info in memory_infos: + event_manager.emit( + event=Event.SYSTEM_INFO, + event_data=EventSystemInfoData( + cpu_info=default_cpu_info, + memory_info=memory_info, + ), + ) + + await event_manager.wait_for_all_listeners_to_complete() - with ( - mock.patch('crawlee._autoscaling.snapshotter.get_memory_info', return_value=scaled_up_info), - mock.patch('crawlee._autoscaling._types.get_memory_info', return_value=scaled_up_info), - ): - memory_samples = snapshotter.get_memory_sample() - assert len(memory_samples) == 1 - assert not memory_samples[0].is_overloaded - - # Back to default state, should be overloaded again - with ( - mock.patch('crawlee._autoscaling.snapshotter.get_memory_info', return_value=memory_info_default), - mock.patch('crawlee._autoscaling._types.get_memory_info', return_value=memory_info_default), - ): - memory_samples = snapshotter.get_memory_sample() - assert len(memory_samples) == 1 - assert memory_samples[0].is_overloaded + memory_samples = snapshotter.get_memory_sample() + assert len(memory_samples) == 2 + # First sample will be overloaded. + assert memory_samples[0].is_overloaded + # Second sample can reflect the increased available memory based on the configuration used to create Snapshotter + assert memory_samples[1].is_overloaded == overloaded_after_scale_up From 8a1976ece5025d39c96eb2ead67976a03418f8ad Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 13 Feb 2026 09:05:36 +0100 Subject: [PATCH 3/3] Review comments --- src/crawlee/_autoscaling/_types.py | 22 +++++++----- src/crawlee/_autoscaling/snapshotter.py | 19 +++++++--- src/crawlee/_utils/byte_size.py | 10 +----- tests/unit/_autoscaling/test_snapshotter.py | 39 +++++++-------------- 4 files changed, 42 insertions(+), 48 deletions(-) diff --git a/src/crawlee/_autoscaling/_types.py b/src/crawlee/_autoscaling/_types.py index 1f0a1383ae..f321214313 100644 --- a/src/crawlee/_autoscaling/_types.py +++ b/src/crawlee/_autoscaling/_types.py @@ -2,10 +2,13 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Annotated + +from pydantic import Field +from pydantic.dataclasses import dataclass as pydantic_dataclass if TYPE_CHECKING: - from crawlee._utils.byte_size import ByteSize, Ratio + from crawlee._utils.byte_size import ByteSize SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD = 0.97 @@ -96,12 +99,8 @@ class MemorySnapshot: system_wide_used_size: ByteSize | None """Memory usage of all processes, system-wide.""" - max_memory_size: ByteSize | Ratio - """The maximum memory that can be used by `AutoscaledPool`. - - When of type `ByteSize` then it is used as fixed memory size. When of type `Ratio` then it allows for dynamic memory - scaling based on the available system memory. - """ + max_memory_size: ByteSize + """The maximum memory that can be used by `AutoscaledPool`.""" system_wide_memory_size: ByteSize | None """Total memory available in the whole system.""" @@ -170,3 +169,10 @@ def is_overloaded(self) -> bool: Snapshot = MemorySnapshot | CpuSnapshot | EventLoopSnapshot | ClientSnapshot + + +@pydantic_dataclass +class Ratio: + """Represents ratio of memory.""" + + value: Annotated[float, Field(gt=0.0, le=1.0)] diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 6505f3e263..87328f658e 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -3,13 +3,14 @@ from __future__ import annotations import bisect +import functools from datetime import datetime, timedelta, timezone from logging import getLogger from typing import TYPE_CHECKING, TypeVar, cast from crawlee import service_locator -from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Snapshot -from crawlee._utils.byte_size import ByteSize, Ratio +from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, MemorySnapshot, Ratio, Snapshot +from crawlee._utils.byte_size import ByteSize from crawlee._utils.context import ensure_context from crawlee._utils.docs import docs_group from crawlee._utils.recurring_task import RecurringTask @@ -26,6 +27,12 @@ T = TypeVar('T', bound=Snapshot) +@functools.lru_cache +def _warn_once(warning_message: str) -> None: + """Log a warning message only once.""" + logger.warning(warning_message) + + class SortedSnapshotList(list[T]): """A list that maintains sorted order by `created_at` attribute for snapshot objects.""" @@ -289,9 +296,13 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: system_wide_memory_size = memory_info.total_size case MemoryUsageInfo(), Ratio() as ratio: - # This is just hypothetical case, that should not happen in practice. - # `LocalEvenManager` should always provide `MemoryInfo` in the event data. + # This is just hypothetical case, that will most likely not happen in practice. + # `LocalEventManager` should always provide `MemoryInfo` in the event data. # When running on Apify, `self._max_memory_size` is always `ByteSize`, not `Ratio`. + _warn_once( + 'It is recommended that a custom implementation of `LocalEventManager` emits `SYSTEM_INFO` events ' + 'with `MemoryInfo` and not just `MemoryUsageInfo`.' + ) max_memory_size = get_memory_info().total_size * ratio.value system_wide_used_size = None system_wide_memory_size = None diff --git a/src/crawlee/_utils/byte_size.py b/src/crawlee/_utils/byte_size.py index e51ea3e9e4..da4ba84b28 100644 --- a/src/crawlee/_utils/byte_size.py +++ b/src/crawlee/_utils/byte_size.py @@ -1,9 +1,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Annotated, Any - -from pydantic import BaseModel, Field +from typing import Any _BYTES_PER_KB = 1024 _BYTES_PER_MB = _BYTES_PER_KB**2 @@ -11,12 +9,6 @@ _BYTES_PER_TB = _BYTES_PER_KB**4 -class Ratio(BaseModel): - """Represents ratio of memory.""" - - value: Annotated[float, Field(gt=0.0, le=1.0)] - - @dataclass(frozen=True) class ByteSize: """Represents a byte size.""" diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index b618d06a2e..a4682fce43 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -28,9 +28,6 @@ from collections.abc import AsyncGenerator -MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD - - @pytest.fixture async def event_manager() -> AsyncGenerator[LocalEventManager, None]: # Use a long interval to avoid interference from periodic system info events during tests and ensure the first @@ -380,28 +377,12 @@ def test_sorted_snapshot_list_add_maintains_order() -> None: assert prev_time <= curr_time, f'Items at indices {i - 1} and {i} are not in chronological order' -_initial_memory_info = get_memory_info() - - -@pytest.mark.parametrize( - ('available_memory_ratio', 'memory_mbytes', 'overloaded_after_scale_up'), - [ - pytest.param(MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, None, False, id='Ratio-based memory limit'), - pytest.param( - MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, - floor(_initial_memory_info.total_size.to_mb()), - True, - id='Fixed memory limit', - ), - ], -) +@pytest.mark.parametrize('dynamic_memory', [True, False]) async def test_dynamic_memory( *, default_cpu_info: CpuInfo, event_manager: LocalEventManager, - available_memory_ratio: float, - memory_mbytes: int | None, - overloaded_after_scale_up: bool, + dynamic_memory: bool, ) -> None: """Test dynamic memory scaling scenario where the system-wide memory can change. @@ -411,25 +392,29 @@ async def test_dynamic_memory( Based on the Snapshotter configuration, it will either take into account the increased available memory or not. """ + _initial_memory_info = get_memory_info() + ratio_just_below_system_wide_overload = 0.99 * SYSTEM_WIDE_MEMORY_OVERLOAD_THRESHOLD + + memory_mbytes = 0 if dynamic_memory else floor(_initial_memory_info.total_size.to_mb()) service_locator.set_event_manager(event_manager) async with Snapshotter.from_config( - Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=available_memory_ratio) + Configuration(memory_mbytes=memory_mbytes, available_memory_ratio=ratio_just_below_system_wide_overload) ) as snapshotter: # Default state, memory usage exactly at the overload threshold -> overloaded, but not system-wide overloaded memory_infos = [ # Overloaded sample MemoryInfo( total_size=_initial_memory_info.total_size, - current_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, - system_wide_used_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, + current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, + system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, ), # Same as first sample, with twice as memory available in the system MemoryInfo( total_size=_initial_memory_info.total_size * 2, # Simulate increased total memory - current_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, - system_wide_used_size=_initial_memory_info.total_size * MEMORY_LOAD_JUST_BELOW_SYSTEM_WIDE_OVERLOAD, + current_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, + system_wide_used_size=_initial_memory_info.total_size * ratio_just_below_system_wide_overload, ), ] @@ -449,4 +434,4 @@ async def test_dynamic_memory( # First sample will be overloaded. assert memory_samples[0].is_overloaded # Second sample can reflect the increased available memory based on the configuration used to create Snapshotter - assert memory_samples[1].is_overloaded == overloaded_after_scale_up + assert memory_samples[1].is_overloaded == (not dynamic_memory)