diff --git a/src/apify/_configuration.py b/src/apify/_configuration.py index ddf55513..61110b8b 100644 --- a/src/apify/_configuration.py +++ b/src/apify/_configuration.py @@ -1,5 +1,6 @@ from __future__ import annotations +import dataclasses import json from datetime import datetime, timedelta from decimal import Decimal @@ -34,6 +35,29 @@ def _transform_to_list(value: Any) -> list[str] | None: return value if isinstance(value, list) else str(value).split(',') +@dataclasses.dataclass +class ActorStorages: + """Storage IDs for different storage types used by an Actor.""" + + key_value_stores: dict[str, str] + datasets: dict[str, str] + request_queues: dict[str, str] + + +def _load_storage_keys(data: None | str | dict | ActorStorages) -> ActorStorages | None: + """Load storage keys from environment.""" + if data is None: + return None + if isinstance(data, ActorStorages): + return data + storage_mapping = data if isinstance(data, dict) else json.loads(data) + return ActorStorages( + key_value_stores=storage_mapping.get('keyValueStores', {}), + datasets=storage_mapping.get('datasets', {}), + request_queues=storage_mapping.get('requestQueues', {}), + ) + + @docs_group('Configuration') class Configuration(CrawleeConfiguration): """A class for specifying the configuration of an Actor. @@ -446,6 +470,15 @@ class Configuration(CrawleeConfiguration): BeforeValidator(lambda data: json.loads(data) if isinstance(data, str) else data or None), ] = None + actor_storages: Annotated[ + ActorStorages | None, + Field( + alias='actor_storages_json', + description='Storage IDs for the actor', + ), + BeforeValidator(_load_storage_keys), + ] = None + @model_validator(mode='after') def disable_browser_sandbox_on_platform(self) -> Self: """Disable the browser sandbox mode when running on the Apify platform. diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py index e357333f..b2ad3df4 100644 --- a/src/apify/storage_clients/_apify/_alias_resolving.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -5,6 +5,8 @@ from logging import getLogger from typing import TYPE_CHECKING, ClassVar, Literal, overload +from propcache import cached_property + from apify_client import ApifyClientAsync from ._utils import hash_api_base_url_and_token @@ -139,7 +141,6 @@ def __init__( self._storage_type = storage_type self._alias = alias self._configuration = configuration - self._additional_cache_key = hash_api_base_url_and_token(configuration) async def __aenter__(self) -> AliasResolver: """Context manager to prevent race condition in alias creation.""" @@ -183,15 +184,7 @@ async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]: default_kvs_client = await cls._get_default_kvs_client(configuration) record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict): - if 'value' in record and isinstance(record['value'], dict): - cls._alias_map = record['value'] - else: - cls._alias_map = record - else: - cls._alias_map = dict[str, str]() + cls._alias_map = record.get('value', {}) if record else {} return cls._alias_map @@ -201,6 +194,18 @@ async def resolve_id(self) -> str | None: Returns: Storage id if it exists, None otherwise. """ + # First try to find the alias in the configuration mapping to avoid any API calls. + # This mapping is maintained by the Apify platform and does not have to be maintained in the default KVS. + if self._configuration.actor_storages and self._alias != 'default': + storage_maps = { + 'Dataset': self._configuration.actor_storages.datasets, + 'KeyValueStore': self._configuration.actor_storages.key_value_stores, + 'RequestQueue': self._configuration.actor_storages.request_queues, + } + if storage_id := storage_maps.get(self._storage_type, {}).get(self._alias): + return storage_id + + # Fallback to the mapping saved in the default KVS return (await self._get_alias_map(self._configuration)).get(self._storage_key, None) async def store_mapping(self, storage_id: str) -> None: @@ -220,30 +225,21 @@ async def store_mapping(self, storage_id: str) -> None: try: record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) - - # get_record can return {key: ..., value: ..., content_type: ...} - if isinstance(record, dict) and 'value' in record: - record = record['value'] - - # Update or create the record with the new alias mapping - if isinstance(record, dict): - record[self._storage_key] = storage_id - else: - record = {self._storage_key: storage_id} + value = record.get('value', {}) if record else {} + value[self._storage_key] = storage_id # Store the mapping back in the KVS. - await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record) + await default_kvs_client.set_record(key=self._ALIAS_MAPPING_KEY, value=value) except Exception as exc: logger.warning(f'Error storing alias mapping for {self._alias}: {exc}') - @property + @cached_property def _storage_key(self) -> str: - """Get a unique storage key used for storing the alias in the mapping.""" return self._ALIAS_STORAGE_KEY_SEPARATOR.join( [ self._storage_type, self._alias, - self._additional_cache_key, + hash_api_base_url_and_token(self._configuration), ] ) diff --git a/tests/e2e/test_schema_storages/__init__.py b/tests/e2e/test_schema_storages/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/test_schema_storages/actor_source/actor.json b/tests/e2e/test_schema_storages/actor_source/actor.json new file mode 100644 index 00000000..8501bd99 --- /dev/null +++ b/tests/e2e/test_schema_storages/actor_source/actor.json @@ -0,0 +1,24 @@ +{ + "actorSpecification": 1, + "version": "0.0", + "storages": { + "datasets": { + "default": { + "actorSpecification": 1, + "fields": { + "properties": { + "id": { "type": "string" } + } + } + }, + "custom": { + "actorSpecification": 1, + "fields": { + "properties": { + "id": { "type": "string" } + } + } + } + } + } +} diff --git a/tests/e2e/test_schema_storages/actor_source/main.py b/tests/e2e/test_schema_storages/actor_source/main.py new file mode 100644 index 00000000..ebed9ba4 --- /dev/null +++ b/tests/e2e/test_schema_storages/actor_source/main.py @@ -0,0 +1,7 @@ +from apify import Actor + + +async def main() -> None: + async with Actor: + assert Actor.configuration.actor_storages + assert (await Actor.open_dataset(alias='custom')).id == Actor.configuration.actor_storages.datasets['custom'] diff --git a/tests/e2e/test_schema_storages/test_schema_storages.py b/tests/e2e/test_schema_storages/test_schema_storages.py new file mode 100644 index 00000000..8ad98b7a --- /dev/null +++ b/tests/e2e/test_schema_storages/test_schema_storages.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ..conftest import MakeActorFunction, RunActorFunction + +_ACTOR_SOURCE_DIR = Path(__file__).parent / 'actor_source' + + +def read_actor_source(filename: str) -> str: + return (_ACTOR_SOURCE_DIR / filename).read_text() + + +async def test_configuration_storages(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None: + actor = await make_actor( + label='schema_storages', + source_files={ + 'src/main.py': read_actor_source('main.py'), + '.actor/actor.json': read_actor_source('actor.json'), + }, + ) + run_result = await run_actor(actor) + + assert run_result.status == 'SUCCEEDED' diff --git a/tests/unit/actor/test_configuration.py b/tests/unit/actor/test_configuration.py index ccd20849..461a103a 100644 --- a/tests/unit/actor/test_configuration.py +++ b/tests/unit/actor/test_configuration.py @@ -300,3 +300,26 @@ def test_actor_pricing_info_from_json_env_var(monkeypatch: pytest.MonkeyPatch) - config = ApifyConfiguration() assert config.actor_pricing_info is not None assert config.actor_pricing_info.pricing_model == 'PAY_PER_EVENT' + + +def test_actor_storage_json_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + """Test that actor_storages_json is parsed from JSON env var.""" + import json + + datasets = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'} + request_queues = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'} + key_value_stores = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'} + + actor_storages_json = json.dumps( + { + 'datasets': datasets, + 'requestQueues': request_queues, + 'keyValueStores': key_value_stores, + } + ) + monkeypatch.setenv('ACTOR_STORAGES_JSON', actor_storages_json) + config = ApifyConfiguration() + assert config.actor_storages + assert config.actor_storages.datasets == datasets + assert config.actor_storages.request_queues == request_queues + assert config.actor_storages.key_value_stores == key_value_stores diff --git a/tests/unit/storage_clients/test_alias_resolver.py b/tests/unit/storage_clients/test_alias_resolver.py index 56821ca1..28a757b8 100644 --- a/tests/unit/storage_clients/test_alias_resolver.py +++ b/tests/unit/storage_clients/test_alias_resolver.py @@ -1,6 +1,6 @@ from __future__ import annotations -from apify._configuration import Configuration +from apify._configuration import ActorStorages, Configuration from apify.storage_clients._apify._alias_resolving import AliasResolver @@ -76,3 +76,26 @@ async def test_get_alias_map_returns_in_memory_map() -> None: AliasResolver._alias_map = {} result = await AliasResolver._get_alias_map(config) assert result == {} + + +async def test_configuration_storages_alias_resolving() -> None: + """Test that `AliasResolver` correctly resolves ids of storages registered in Configuration.""" + + # Actor storages + datasets = {'default': 'default_dataset_id', 'custom': 'custom_Dataset_id'} + request_queues = {'default': 'default_request_queue_id', 'custom': 'custom_RequestQueue_id'} + key_value_stores = {'default': 'default_key_value_store_id', 'custom': 'custom_KeyValueStore_id'} + + # Set up the configuration with the storage mapping + configuration = Configuration( + actor_storages=ActorStorages( + datasets=datasets, request_queues=request_queues, key_value_stores=key_value_stores + ), + ) + + # Check that id of each non-default storage saved in the mapping is resolved + for storage_type in ('Dataset', 'KeyValueStore', 'RequestQueue'): + assert ( + await AliasResolver(storage_type=storage_type, alias='custom', configuration=configuration).resolve_id() + == f'custom_{storage_type}_id' + )