diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index b5323272..d46fae06 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -1,7 +1,39 @@ from __future__ import annotations +import asyncio +from typing import TYPE_CHECKING, TypeVar + from crawlee._utils.crypto import crypto_random_object_id +from apify import Actor + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + +T = TypeVar('T') + + +async def call_with_exp_backoff(fn: Callable[[], Awaitable[T]], *, max_retries: int = 3) -> T | None: + """Call an async callable with exponential backoff retries until it returns a truthy value. + + In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests + become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with + exponential backoff to handle that delay in integration tests. + """ + result = None + + for attempt in range(max_retries): + result = await fn() + + if result: + return result + + delay = 2**attempt + Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') + await asyncio.sleep(delay) + + return result + def generate_unique_resource_name(label: str) -> str: """Generates a unique resource name, which will contain the given label.""" diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index fcbce94a..641736b7 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -12,7 +12,7 @@ from crawlee import service_locator from crawlee.crawlers import BasicCrawler -from ._utils import generate_unique_resource_name +from ._utils import call_with_exp_backoff, generate_unique_resource_name from apify import Actor, Request from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify import ApifyRequestQueueClient @@ -26,25 +26,12 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata -async def fetch_next_request_with_exp_backoff(rq: RequestQueue, max_retries: int = 5) -> Request | None: - """Fetch the next request with exponential backoff retries. - - In shared request queue mode, there is a propagation delay before newly added or reclaimed requests become visible - (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle - that delay in integration tests. - """ - for attempt in range(max_retries): - result = await rq.fetch_next_request() - if result is not None: - return result - delay = 2**attempt - Actor.log.info(f'fetch_next_request returned None, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) - return None - - -async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None: +async def test_add_and_fetch_requests( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test basic functionality of adding and fetching requests.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -70,12 +57,19 @@ async def test_add_and_fetch_requests(request_queue_apify: RequestQueue) -> None f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() assert is_finished is True, f'is_finished={is_finished}' -async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> None: +async def test_add_requests_in_batches( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test adding multiple requests in a single batch operation.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 rq = request_queue_apify @@ -101,12 +95,19 @@ async def test_add_requests_in_batches(request_queue_apify: RequestQueue) -> Non f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() assert is_finished is True, f'is_finished={is_finished}' -async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueue) -> None: +async def test_add_non_unique_requests_in_batch( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test adding requests with duplicate unique keys in batch.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 rq = request_queue_apify @@ -137,7 +138,10 @@ async def test_add_non_unique_requests_in_batch(request_queue_apify: RequestQueu f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' @@ -255,7 +259,7 @@ async def test_request_reclaim_functionality( # In shared mode, there is a propagation delay before the reclaimed request becomes visible # (see https://github.com/apify/apify-sdk-python/issues/808). if rq_access_mode == 'shared': - request2 = await fetch_next_request_with_exp_backoff(rq) + request2 = await call_with_exp_backoff(rq.fetch_next_request) else: request2 = await rq.fetch_next_request() @@ -266,7 +270,10 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() assert is_finished is True @@ -300,7 +307,7 @@ async def test_request_reclaim_with_forefront( # In shared mode, there is a propagation delay before the reclaimed request becomes visible # (see https://github.com/apify/apify-sdk-python/issues/808). if rq_access_mode == 'shared': - next_request = await fetch_next_request_with_exp_backoff(rq) + next_request = await call_with_exp_backoff(rq.fetch_next_request) else: next_request = await rq.fetch_next_request() @@ -427,8 +434,12 @@ async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None: assert final_handled == 3, f'final_handled={final_handled}' -async def test_batch_operations_performance(request_queue_apify: RequestQueue) -> None: +async def test_batch_operations_performance( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test batch operations vs individual operations.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -459,12 +470,20 @@ async def test_batch_operations_performance(request_queue_apify: RequestQueue) - Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() + assert is_finished is True, f'is_finished={is_finished}' -async def test_state_consistency(request_queue_apify: RequestQueue) -> None: +async def test_state_consistency( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test queue state consistency during concurrent operations.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -482,14 +501,14 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None: reclaimed_requests = [] for i in range(5): - request = await rq.fetch_next_request() - if request: + next_request = await rq.fetch_next_request() + if next_request: if i % 2 == 0: # Process even indices - await rq.mark_request_as_handled(request) - processed_requests.append(request) + await rq.mark_request_as_handled(next_request) + processed_requests.append(next_request) else: # Reclaim odd indices - await rq.reclaim_request(request) - reclaimed_requests.append(request) + await rq.reclaim_request(next_request) + reclaimed_requests.append(next_request) Actor.log.info(f'Processed {len(processed_requests)} requests, reclaimed {len(reclaimed_requests)}') @@ -514,7 +533,10 @@ async def test_state_consistency(request_queue_apify: RequestQueue) -> None: await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() assert is_finished is True, f'is_finished={is_finished}' @@ -549,8 +571,12 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue) -> None: assert metadata.pending_request_count == 0, f'metadata.pending_request_count={metadata.pending_request_count}' -async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None: +async def test_large_batch_operations( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test handling large batches of requests.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -571,18 +597,21 @@ async def test_large_batch_operations(request_queue_apify: RequestQueue) -> None processed_count = 0 while not await rq.is_empty(): - request = await rq.fetch_next_request() + next_request = await rq.fetch_next_request() # The RQ is_empty should ensure we don't get None - assert request is not None, f'request={request}' + assert next_request is not None, f'next_request={next_request}' - await rq.mark_request_as_handled(request) + await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() assert is_finished is True, f'is_finished={is_finished}' @@ -993,18 +1022,22 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non assert apify_metadata.stats.write_count == add_request_count -async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: +async def test_rq_long_url( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test handling of requests with long URLs and extended unique keys.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify - request = Request.from_url( + long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', use_extended_unique_key=True, always_enqueue=True, ) - request_id = unique_key_to_request_id(request.unique_key) + request_id = unique_key_to_request_id(long_url_request.unique_key) - processed_request = await rq.add_request(request) + processed_request = await rq.add_request(long_url_request) assert processed_request.id == request_id request_obtained = await rq.fetch_next_request() @@ -1012,7 +1045,10 @@ async def test_rq_long_url(request_queue_apify: RequestQueue) -> None: await rq.mark_request_as_handled(request_obtained) - is_finished = await rq.is_finished() + if rq_access_mode == 'shared': + is_finished = await call_with_exp_backoff(rq.is_finished) + else: + is_finished = await rq.is_finished() assert is_finished @@ -1061,18 +1097,24 @@ async def test_force_cloud( async def test_request_queue_is_finished( request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, ) -> None: + rq_access_mode = request.node.callspec.params.get('request_queue_apify') + await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - request = await request_queue_apify.fetch_next_request() - assert request is not None + fetched = await request_queue_apify.fetch_next_request() + assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) - await request_queue_apify.mark_request_as_handled(request) - assert await request_queue_apify.is_finished() + await request_queue_apify.mark_request_as_handled(fetched) + if rq_access_mode == 'shared': + assert await call_with_exp_backoff(request_queue_apify.is_finished) + else: + assert await request_queue_apify.is_finished() async def test_request_queue_deduplication_unprocessed_requests(