Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 37 additions & 61 deletions src/apify/request_loaders/_apify_request_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
import re
from asyncio import Task
from itertools import chain
from typing import Annotated, Any

from pydantic import BaseModel, Field, TypeAdapter
Expand Down Expand Up @@ -44,8 +44,10 @@ class ApifyRequestList(RequestList):
Method open is used to create RequestList from actor's requestListSources input.
"""

@staticmethod
@classmethod
async def open(
cls,
*,
name: str | None = None,
request_list_sources_input: list[dict[str, Any]] | None = None,
http_client: HttpClient | None = None,
Expand Down Expand Up @@ -73,12 +75,7 @@ async def open(
```
"""
request_list_sources_input = request_list_sources_input or []
return await ApifyRequestList._create_request_list(name, request_list_sources_input, http_client)

@staticmethod
async def _create_request_list(
name: str | None, request_list_sources_input: list[dict[str, Any]], http_client: HttpClient | None
) -> ApifyRequestList:
if not http_client:
http_client = ImpitHttpClient()

Expand All @@ -87,15 +84,30 @@ async def _create_request_list(
simple_url_inputs = [url_input for url_input in url_inputs if isinstance(url_input, _SimpleUrlInput)]
remote_url_inputs = [url_input for url_input in url_inputs if isinstance(url_input, _RequestsFromUrlInput)]

simple_url_requests = ApifyRequestList._create_requests_from_input(simple_url_inputs)
remote_url_requests = await ApifyRequestList._fetch_requests_from_url(
remote_url_inputs, http_client=http_client
)
simple_url_requests = cls._create_requests_from_input(simple_url_inputs)
remote_url_requests = await cls._fetch_requests_from_url(remote_url_inputs, http_client)

return ApifyRequestList(name=name, requests=simple_url_requests + remote_url_requests)

@classmethod
async def _fetch_requests_from_url(
cls,
remote_url_requests_inputs: list[_RequestsFromUrlInput],
http_client: HttpClient,
) -> list[Request]:
"""Create list of requests from url.

Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Extract links from
each response body using URL_NO_COMMAS_REGEX regex. Create list of Requests from collected links and additional
inputs stored in other attributes of each remote_url_requests_inputs.
"""
tasks = [cls._process_remote_url(request_input, http_client) for request_input in remote_url_requests_inputs]
results = await asyncio.gather(*tasks)
return list(chain.from_iterable(results))

@staticmethod
def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> list[Request]:
"""Create `Request` objects from simple URL inputs."""
return [
Request.from_url(
method=request_input.method,
Expand All @@ -108,55 +120,19 @@ def _create_requests_from_input(simple_url_inputs: list[_SimpleUrlInput]) -> lis
]

@staticmethod
async def _fetch_requests_from_url(
remote_url_requests_inputs: list[_RequestsFromUrlInput],
http_client: HttpClient,
) -> list[Request]:
"""Create list of requests from url.

Send GET requests to urls defined in each requests_from_url of remote_url_requests_inputs. Run extracting
callback on each response body and use URL_NO_COMMAS_REGEX regex to find all links. Create list of Requests from
collected links and additional inputs stored in other attributes of each remote_url_requests_inputs.
"""
created_requests: list[Request] = []

async def create_requests_from_response(request_input: _RequestsFromUrlInput, task: Task) -> None:
"""Extract links from response body and use them to create `Request` objects.

Use the regular expression to find all matching links in the response body, then create `Request`
objects from these links and the provided input attributes.
"""
response = await (task.result()).read()
matches = re.finditer(URL_NO_COMMAS_REGEX, response.decode('utf-8'))

created_requests.extend(
[
Request.from_url(
match.group(0),
method=request_input.method,
payload=request_input.payload.encode('utf-8'),
headers=request_input.headers,
user_data=request_input.user_data,
)
for match in matches
]
)
async def _process_remote_url(request_input: _RequestsFromUrlInput, http_client: HttpClient) -> list[Request]:
"""Fetch a remote URL and extract links from the response body."""
http_response = await http_client.send_request(method='GET', url=request_input.requests_from_url)
response_body = await http_response.read()
matches = re.finditer(URL_NO_COMMAS_REGEX, response_body.decode('utf-8'))

remote_url_requests = []
for remote_url_requests_input in remote_url_requests_inputs:
get_response_task = asyncio.create_task(
http_client.send_request(
method='GET',
url=remote_url_requests_input.requests_from_url,
)
)

get_response_task.add_done_callback(
lambda task, inp=remote_url_requests_input: asyncio.create_task(
create_requests_from_response(inp, task)
)
return [
Request.from_url(
url=match.group(0),
method=request_input.method,
payload=request_input.payload.encode('utf-8'),
headers=request_input.headers,
user_data=request_input.user_data,
)
remote_url_requests.append(get_response_task)

await asyncio.gather(*remote_url_requests)
return created_requests
for match in matches
]