Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) ->
)


async def _fetch_next_request_with_retry(rq: RequestQueue, *, max_retries: int = 10, delay: float = 1.0) -> Any:
"""Fetch the next request with retries to handle propagation delays in shared mode."""
for _ in range(max_retries):
request = await rq.fetch_next_request()
if request is not None:
return request
await asyncio.sleep(delay)
return None


async def test_request_reclaim_functionality(request_queue_apify: RequestQueue) -> None:
"""Test request reclaiming for failed processing."""

Expand All @@ -218,8 +228,8 @@ async def test_request_reclaim_functionality(request_queue_apify: RequestQueue)
await rq.add_request('https://example.com/test')
Actor.log.info('Added test request')

# Fetch and reclaim the request
request = await rq.fetch_next_request()
# Fetch and reclaim the request (retry to handle propagation delay in shared mode)
request = await _fetch_next_request_with_retry(rq)
Comment on lines +231 to +232
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the integration test shows intended usage, I think we're not actually fixing anything. We're just sweeping an issue under the rug.

In other words, this is not a flaky test, this shows the code under test being unreliable.

Copy link
Contributor Author

@vdusek vdusek Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but how can we handle this in the case of a shared client? 🙂 If I understand it correctly, a single client adds requests directly to the local queue_head deque, which makes them immediately available. However, a shared client can't do that, right? If retries aren't an option, I'd suggest skipping this test for the shared client for now, opening an issue, and keeping it skipped until we resolve it somehow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess either skip it or add a well-aimed if mode == "shared": sleep(). That's probably more upfront about what we're dealing with.

assert request is not None, f'request={request}'
Actor.log.info(f'Fetched request: {request.url}')

Expand All @@ -231,8 +241,8 @@ async def test_request_reclaim_functionality(request_queue_apify: RequestQueue)
)
Actor.log.info('Request reclaimed successfully')

# Should be able to fetch the same request again
request2 = await rq.fetch_next_request()
# Should be able to fetch the same request again (retry to handle propagation delay in shared mode)
request2 = await _fetch_next_request_with_retry(rq)
assert request2 is not None, f'request2={request2}'
assert request2.url == request.url, (
f'request2.url={request2.url}',
Expand Down