diff --git a/src/apify_shared/utils.py b/src/apify_shared/utils.py index 11e697e..a136f66 100644 --- a/src/apify_shared/utils.py +++ b/src/apify_shared/utils.py @@ -1,9 +1,14 @@ from __future__ import annotations +import base64 import contextlib +import hashlib +import hmac import io import json import re +import string +import time from datetime import datetime, timezone from enum import Enum from typing import Any, TypeVar, cast @@ -115,3 +120,59 @@ def parse(key: str, value: object) -> object: return {key: parse(key, value) for (key, value) in data.items()} return data + + +CHARSET = string.digits + string.ascii_letters + + +def encode_base62(num: int) -> str: + """Encode the given number to base62.""" + if num == 0: + return CHARSET[0] + + res = '' + while num > 0: + num, remainder = divmod(num, 62) + res = CHARSET[remainder] + res + return res + + +@ignore_docs +def create_hmac_signature(secret_key: str, message: str) -> str: + """Generates an HMAC signature and encodes it using Base62. Base62 encoding reduces the signature length. + + HMAC signature is truncated to 30 characters to make it shorter. + + Args: + secret_key (str): Secret key used for signing signatures + message (str): Message to be signed + + Returns: + str: Base62 encoded signature + """ + signature = hmac.new(secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()[:30] + + decimal_signature = int(signature, 16) + + return encode_base62(decimal_signature) + + +def create_storage_content_signature( + resource_id: str, url_signing_secret_key: str, expires_in_millis: int | None = None, version: int = 0 +) -> str: + """Create a secure signature for a resource like a dataset or key-value store. + + This signature is used to generate a signed URL for authenticated access, which can be expiring or permanent. + The signature is created using HMAC with the provided secret key and includes + the resource ID, expiration time, and version. + + Note: expires_in_millis is optional. If not provided, the signature will not expire. + + """ + expires_at = int(time.time() * 1000) + expires_in_millis if expires_in_millis else 0 + + message_to_sign = f'{version}.{expires_at}.{resource_id}' + hmac = create_hmac_signature(url_signing_secret_key, message_to_sign) + + base64url_encoded_payload = base64.urlsafe_b64encode(f'{version}.{expires_at}.{hmac}'.encode()) + return base64url_encoded_payload.decode('utf-8') diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 5cf93ff..2e61eaf 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,10 +1,14 @@ from __future__ import annotations +import base64 import io from datetime import datetime, timezone from enum import Enum from apify_shared.utils import ( + create_hmac_signature, + create_storage_content_signature, + encode_base62, filter_out_none_values_recursively, filter_out_none_values_recursively_internal, ignore_docs, @@ -146,3 +150,61 @@ def testing_function(_a: str, _b: str) -> str: return 'dummy' assert testing_function is ignore_docs(testing_function) + + +def test_encode_base62() -> None: + assert encode_base62(0) == '0' + assert encode_base62(10) == 'a' + assert encode_base62(999999999) == '15FTGf' + + +# This test ensures compatibility with the JavaScript version of the same method. +# https://github.com/apify/apify-shared-js/blob/master/packages/utilities/src/hmac.ts +def test_create_valid_hmac_signature() -> None: + # This test uses the same secret key and message as in JS tests. + secret_key = 'hmac-secret-key' + message = 'hmac-message-to-be-authenticated' + assert create_hmac_signature(secret_key, message) == 'pcVagAsudj8dFqdlg7mG' + + +def test_create_same_hmac() -> None: + # This test uses the same secret key and message as in JS tests. + secret_key = 'hmac-same-secret-key' + message = 'hmac-same-message-to-be-authenticated' + assert create_hmac_signature(secret_key, message) == 'FYMcmTIm3idXqleF1Sw5' + assert create_hmac_signature(secret_key, message) == 'FYMcmTIm3idXqleF1Sw5' + + +# This test ensures compatibility with the JavaScript version of the same method. +# https://github.com/apify/apify-shared-js/blob/master/packages/utilities/src/storages.ts +def test_create_storage_content_signature() -> None: + # This test uses the same parameters as in JS tests. + secret_key = 'hmac-secret-key' + message = 'resource-id' + + signature = create_storage_content_signature( + resource_id=message, + url_signing_secret_key=secret_key, + ) + + version, expires_at, hmac = base64.urlsafe_b64decode(signature).decode('utf-8').split('.') + + assert signature == 'MC4wLjNUd2ZFRTY1OXVmU05zbVM0N2xS' + assert version == '0' + assert expires_at == '0' + assert hmac == '3TwfEE659ufSNsmS47lR' + + +def test_create_storage_content_signature_with_expiration() -> None: + secret_key = 'hmac-secret-key' + message = 'resource-id' + + signature = create_storage_content_signature( + resource_id=message, + url_signing_secret_key=secret_key, + expires_in_millis=10000, + ) + + version, expires_at, hmac = base64.urlsafe_b64decode(signature).decode('utf-8').split('.') + assert version == '0' + assert expires_at != '0'