From cf3d6a3eaa520bb5999da9dd9ad991aa8cdaafe4 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Mon, 16 Feb 2026 16:56:20 +0100 Subject: [PATCH 1/2] fix: Eliminate race condition in `_fetch_requests_from_url` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation used `add_done_callback` + `asyncio.create_task` to process HTTP responses, but `asyncio.gather` only awaited the HTTP request tasks — not the callback-spawned processing tasks. This caused `created_requests` to be returned before processing completed, yielding empty or incomplete results. Refactored to gather all HTTP responses first, then process each response sequentially, ensuring all extracted URLs are collected before returning. Co-Authored-By: Claude Opus 4.6 --- .../request_loaders/_apify_request_list.py | 58 +++++++------------ 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/src/apify/request_loaders/_apify_request_list.py b/src/apify/request_loaders/_apify_request_list.py index 61a54356..7bf41639 100644 --- a/src/apify/request_loaders/_apify_request_list.py +++ b/src/apify/request_loaders/_apify_request_list.py @@ -2,7 +2,6 @@ import asyncio import re -from asyncio import Task from typing import Annotated, Any from pydantic import BaseModel, Field, TypeAdapter @@ -114,49 +113,34 @@ async def _fetch_requests_from_url( ) -> list[Request]: """Create list of requests from url. - Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Run extracting - callback on each response body and use URL_NO_COMMAS_REGEX regex to find all links. Create list of Requests from - collected links and additional inputs stored in other attributes of each remote_url_requests_inputs. + Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Extract links from + each response body using URL_NO_COMMAS_REGEX regex. Create list of Requests from collected links and additional + inputs stored in other attributes of each remote_url_requests_inputs. """ created_requests: list[Request] = [] - async def create_requests_from_response(request_input: _RequestsFromUrlInput, task: Task) -> None: - """Extract links from response body and use them to create `Request` objects. + # Fetch all remote URLs in parallel. + responses = await asyncio.gather( + *[ + http_client.send_request(method='GET', url=remote_url_input.requests_from_url) + for remote_url_input in remote_url_requests_inputs + ] + ) - Use the regular expression to find all matching links in the response body, then create `Request` - objects from these links and the provided input attributes. - """ - response = await (task.result()).read() - matches = re.finditer(URL_NO_COMMAS_REGEX, response.decode('utf-8')) + # Process each response and extract links. + for request_input, http_response in zip(remote_url_requests_inputs, responses, strict=True): + response_body = await http_response.read() + matches = re.finditer(URL_NO_COMMAS_REGEX, response_body.decode('utf-8')) created_requests.extend( - [ - Request.from_url( - match.group(0), - method=request_input.method, - payload=request_input.payload.encode('utf-8'), - headers=request_input.headers, - user_data=request_input.user_data, - ) - for match in matches - ] - ) - - remote_url_requests = [] - for remote_url_requests_input in remote_url_requests_inputs: - get_response_task = asyncio.create_task( - http_client.send_request( - method='GET', - url=remote_url_requests_input.requests_from_url, - ) - ) - - get_response_task.add_done_callback( - lambda task, inp=remote_url_requests_input: asyncio.create_task( - create_requests_from_response(inp, task) + Request.from_url( + match.group(0), + method=request_input.method, + payload=request_input.payload.encode('utf-8'), + headers=request_input.headers, + user_data=request_input.user_data, ) + for match in matches ) - remote_url_requests.append(get_response_task) - await asyncio.gather(*remote_url_requests) return created_requests From 807edef38a9c156faf9db692ee67e7dd25623f75 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 17 Feb 2026 12:45:45 +0100 Subject: [PATCH 2/2] fix: Parallelize response processing and clean up ApifyRequestList Process all remote URL fetches and their response bodies in parallel using a single asyncio.gather instead of sequential processing. Refactor to use @classmethod, inline _create_request_list into open, and extract _process_remote_url as a static helper. Co-Authored-By: Claude Opus 4.6 --- .../request_loaders/_apify_request_list.py | 82 +++++++++---------- 1 file changed, 37 insertions(+), 45 deletions(-) diff --git a/src/apify/request_loaders/_apify_request_list.py b/src/apify/request_loaders/_apify_request_list.py index 7bf41639..cb54e3c5 100644 --- a/src/apify/request_loaders/_apify_request_list.py +++ b/src/apify/request_loaders/_apify_request_list.py @@ -2,6 +2,7 @@ import asyncio import re +from itertools import chain from typing import Annotated, Any from pydantic import BaseModel, Field, TypeAdapter @@ -43,8 +44,10 @@ class ApifyRequestList(RequestList): Method open is used to create RequestList from actor's requestListSources input. """ - @staticmethod + @classmethod async def open( + cls, + *, name: str | None = None, request_list_sources_input: list[dict[str, Any]] | None = None, http_client: HttpClient | None = None, @@ -72,12 +75,7 @@ async def open( ``` """ request_list_sources_input = request_list_sources_input or [] - return await ApifyRequestList._create_request_list(name, request_list_sources_input, http_client) - @staticmethod - async def _create_request_list( - name: str | None, request_list_sources_input: list[dict[str, Any]], http_client: HttpClient | None - ) -> ApifyRequestList: if not http_client: http_client = ImpitHttpClient() @@ -86,15 +84,30 @@ async def _create_request_list( simple_url_inputs = [url_input for url_input in url_inputs if isinstance(url_input, _SimpleUrlInput)] remote_url_inputs = [url_input for url_input in url_inputs if isinstance(url_input, _RequestsFromUrlInput)] - simple_url_requests = ApifyRequestList._create_requests_from_input(simple_url_inputs) - remote_url_requests = await ApifyRequestList._fetch_requests_from_url( - remote_url_inputs, http_client=http_client - ) + simple_url_requests = cls._create_requests_from_input(simple_url_inputs) + remote_url_requests = await cls._fetch_requests_from_url(remote_url_inputs, http_client) return ApifyRequestList(name=name, requests=simple_url_requests + remote_url_requests) + @classmethod + async def _fetch_requests_from_url( + cls, + remote_url_requests_inputs: list[_RequestsFromUrlInput], + http_client: HttpClient, + ) -> list[Request]: + """Create list of requests from url. + + Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Extract links from + each response body using URL_NO_COMMAS_REGEX regex. Create list of Requests from collected links and additional + inputs stored in other attributes of each remote_url_requests_inputs. + """ + tasks = [cls._process_remote_url(request_input, http_client) for request_input in remote_url_requests_inputs] + results = await asyncio.gather(*tasks) + return list(chain.from_iterable(results)) + @staticmethod def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> list[Request]: + """Create `Request` objects from simple URL inputs.""" return [ Request.from_url( method=request_input.method, @@ -107,40 +120,19 @@ def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> lis ] @staticmethod - async def _fetch_requests_from_url( - remote_url_requests_inputs: list[_RequestsFromUrlInput], - http_client: HttpClient, - ) -> list[Request]: - """Create list of requests from url. + async def _process_remote_url(request_input: _RequestsFromUrlInput, http_client: HttpClient) -> list[Request]: + """Fetch a remote URL and extract links from the response body.""" + http_response = await http_client.send_request(method='GET', url=request_input.requests_from_url) + response_body = await http_response.read() + matches = re.finditer(URL_NO_COMMAS_REGEX, response_body.decode('utf-8')) - Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Extract links from - each response body using URL_NO_COMMAS_REGEX regex. Create list of Requests from collected links and additional - inputs stored in other attributes of each remote_url_requests_inputs. - """ - created_requests: list[Request] = [] - - # Fetch all remote URLs in parallel. - responses = await asyncio.gather( - *[ - http_client.send_request(method='GET', url=remote_url_input.requests_from_url) - for remote_url_input in remote_url_requests_inputs - ] - ) - - # Process each response and extract links. - for request_input, http_response in zip(remote_url_requests_inputs, responses, strict=True): - response_body = await http_response.read() - matches = re.finditer(URL_NO_COMMAS_REGEX, response_body.decode('utf-8')) - - created_requests.extend( - Request.from_url( - match.group(0), - method=request_input.method, - payload=request_input.payload.encode('utf-8'), - headers=request_input.headers, - user_data=request_input.user_data, - ) - for match in matches + return [ + Request.from_url( + url=match.group(0), + method=request_input.method, + payload=request_input.payload.encode('utf-8'), + headers=request_input.headers, + user_data=request_input.user_data, ) - - return created_requests + for match in matches + ]