Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 134 additions & 2 deletions src/apify_client/clients/resource_clients/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import json as jsonlib
import logging
import math
import re
from base64 import b64encode
from collections.abc import Iterable
from hashlib import sha256
from queue import Queue
from typing import TYPE_CHECKING, Any, TypedDict

Expand Down Expand Up @@ -46,6 +49,29 @@ class BatchAddRequestsResult(TypedDict):
unprocessedRequests: list[dict]


def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> str:
"""Generate a deterministic request ID based on a unique key.

Args:
unique_key: The unique key to convert into a request ID.
request_id_length: The length of the request ID.

Returns:
A URL-safe, truncated request ID based on the unique key.
"""
# Encode the unique key and compute its SHA-256 hash
hashed_key = sha256(unique_key.encode('utf-8')).digest()

# Encode the hash in base64 and decode it to get a string
base64_encoded = b64encode(hashed_key).decode('utf-8')

# Remove characters that are not URL-safe ('+', '/', or '=')
url_safe_key = re.sub(r'(\+|\/|=)', '', base64_encoded)

# Truncate the key to the desired length
return url_safe_key[:request_id_length]


class RequestQueueClient(ResourceClient):
"""Sub-client for manipulating a single request queue."""

Expand Down Expand Up @@ -194,6 +220,19 @@ def get_request(self, request_id: str) -> dict | None:

return None

def get_request_by_unique_key(self, unique_key: str) -> dict | None:
"""Retrieve a request from the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request/get-request

Args:
unique_key: Unique key of the request to retrieve.

Returns:
The retrieved request, or None, if it did not exist.
"""
return self.get_request(unique_key_to_request_id(unique_key))

def update_request(self, request: dict, *, forefront: bool | None = None) -> dict:
"""Update a request in the queue.

Expand All @@ -206,7 +245,7 @@ def update_request(self, request: dict, *, forefront: bool | None = None) -> dic
Returns:
The updated request.
"""
request_id = request['id']
request_id = request.get('id', unique_key_to_request_id(request.get('uniqueKey', '')))

request_params = self._params(forefront=forefront, clientKey=self.client_key)

Expand Down Expand Up @@ -239,6 +278,16 @@ def delete_request(self, request_id: str) -> None:
timeout_secs=_SMALL_TIMEOUT,
)

def delete_request_by_unique_key(self, unique_key: str) -> None:
"""Delete a request from the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request/delete-request

Args:
unique_key: Unique key of the request to delete.
"""
return self.delete_request(unique_key_to_request_id(unique_key))

def prolong_request_lock(
self,
request_id: str,
Expand Down Expand Up @@ -266,6 +315,24 @@ def prolong_request_lock(

return parse_date_fields(pluck_data(jsonlib.loads(response.text)))

def prolong_request_lock_by_unique_key(
self,
unique_key: str,
*,
forefront: bool | None = None,
lock_secs: int,
) -> dict:
"""Prolong the lock on a request.

https://docs.apify.com/api/v2#/reference/request-queues/request-lock/prolong-request-lock

Args:
unique_key: Unique key of the request to prolong the lock.
forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
lock_secs: By how much to prolong the lock, in seconds.
"""
return self.prolong_request_lock(unique_key_to_request_id(unique_key), forefront=forefront, lock_secs=lock_secs)

def delete_request_lock(self, request_id: str, *, forefront: bool | None = None) -> None:
"""Delete the lock on a request.

Expand All @@ -284,6 +351,17 @@ def delete_request_lock(self, request_id: str, *, forefront: bool | None = None)
timeout_secs=_SMALL_TIMEOUT,
)

def delete_request_lock_by_unique_key(self, unique_key: str, *, forefront: bool | None = None) -> None:
"""Delete the lock on a request.

https://docs.apify.com/api/v2#/reference/request-queues/request-lock/delete-request-lock

Args:
unique_key: Unique key of the request to delete the lock.
forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
"""
return self.delete_request_lock(unique_key_to_request_id(unique_key), forefront=forefront)

def batch_add_requests(
self,
requests: list[dict],
Expand Down Expand Up @@ -574,6 +652,19 @@ async def get_request(self, request_id: str) -> dict | None:

return None

async def get_request_by_unique_key(self, unique_key: str) -> dict | None:
"""Retrieve a request from the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request/get-request

Args:
unique_key: Unique key of the request to retrieve.

Returns:
The retrieved request, or None, if it did not exist.
"""
return await self.get_request(unique_key_to_request_id(unique_key))

async def update_request(self, request: dict, *, forefront: bool | None = None) -> dict:
"""Update a request in the queue.

Expand All @@ -586,7 +677,7 @@ async def update_request(self, request: dict, *, forefront: bool | None = None)
Returns:
The updated request.
"""
request_id = request['id']
request_id = request.get('id', unique_key_to_request_id(request.get('uniqueKey', '')))

request_params = self._params(forefront=forefront, clientKey=self.client_key)

Expand Down Expand Up @@ -617,6 +708,16 @@ async def delete_request(self, request_id: str) -> None:
timeout_secs=_SMALL_TIMEOUT,
)

async def delete_request_by_unique_key(self, unique_key: str) -> None:
"""Delete a request from the queue.

https://docs.apify.com/api/v2#/reference/request-queues/request/delete-request

Args:
unique_key: Unique key of the request to delete.
"""
return await self.delete_request(unique_key_to_request_id(unique_key))

async def prolong_request_lock(
self,
request_id: str,
Expand Down Expand Up @@ -644,6 +745,26 @@ async def prolong_request_lock(

return parse_date_fields(pluck_data(jsonlib.loads(response.text)))

async def prolong_request_lock_by_unique_key(
self,
unique_key: str,
*,
forefront: bool | None = None,
lock_secs: int,
) -> dict:
"""Prolong the lock on a request.

https://docs.apify.com/api/v2#/reference/request-queues/request-lock/prolong-request-lock

Args:
unique_key: Unique key of the request to prolong the lock.
forefront: Whether to put the request in the beginning or the end of the queue after lock expires.
lock_secs: By how much to prolong the lock, in seconds.
"""
return await self.prolong_request_lock(
unique_key_to_request_id(unique_key), forefront=forefront, lock_secs=lock_secs
)

async def delete_request_lock(
self,
request_id: str,
Expand All @@ -667,6 +788,17 @@ async def delete_request_lock(
timeout_secs=_SMALL_TIMEOUT,
)

async def delete_request_lock_by_unique_key(self, unique_key: str, *, forefront: bool | None = None) -> None:
"""Delete the lock on a request.

https://docs.apify.com/api/v2#/reference/request-queues/request-lock/delete-request-lock

Args:
unique_key: Unique key of the request to delete the lock.
forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted.
"""
return await self.delete_request_lock(unique_key_to_request_id(unique_key), forefront=forefront)

async def _batch_add_requests_worker(
self,
queue: asyncio.Queue[Iterable[dict]],
Expand Down
39 changes: 39 additions & 0 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

from typing import TYPE_CHECKING

import pytest

from integration.integration_test_utils import random_resource_name, random_string

from apify_client.clients.resource_clients.request_queue import unique_key_to_request_id

if TYPE_CHECKING:
from apify_client import ApifyClient, ApifyClientAsync

Expand Down Expand Up @@ -113,3 +117,38 @@ async def test_request_batch_operations(self, apify_client_async: ApifyClientAsy
assert len(requests_in_queue2['items']) == 25 - len(delete_response['processedRequests'])

await queue.delete()


def test_unique_key_to_request_id_length() -> None:
unique_key = 'exampleKey123'
request_id = unique_key_to_request_id(unique_key, request_id_length=15)
assert len(request_id) == 15, 'Request ID should have the correct length.'


def test_unique_key_to_request_id_consistency() -> None:
unique_key = 'consistentKey'
request_id_1 = unique_key_to_request_id(unique_key)
request_id_2 = unique_key_to_request_id(unique_key)
assert request_id_1 == request_id_2, 'The same unique key should generate consistent request IDs.'


@pytest.mark.parametrize(
('unique_key', 'expected_request_id'),
[
('abc', 'ungWv48BzpBQUDe'),
('uniqueKey', 'xiWPs083cree7mH'),
('', '47DEQpj8HBSaTIm'),
('测试中文', 'lKPdJkdvw8MXEUp'),
('test+/=', 'XZRQjhoG0yjfnYD'),
],
ids=[
'basic_abc',
'keyword_uniqueKey',
'empty_string',
'non_ascii_characters',
'url_unsafe_characters',
],
)
def test_unique_key_to_request_id_matches_known_values(unique_key: str, expected_request_id: str) -> None:
request_id = unique_key_to_request_id(unique_key)
assert request_id == expected_request_id, f'Unique key "{unique_key}" should produce the expected request ID.'
4 changes: 2 additions & 2 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading