Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/apify_shared/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from __future__ import annotations

import base64
import contextlib
import hashlib
import hmac
import io
import json
import re
import string
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, TypeVar, cast
Expand Down Expand Up @@ -115,3 +120,59 @@ def parse(key: str, value: object) -> object:
return {key: parse(key, value) for (key, value) in data.items()}

return data


CHARSET = string.digits + string.ascii_letters


def encode_base62(num: int) -> str:
"""Encode the given number to base62."""
if num == 0:
return CHARSET[0]

res = ''
while num > 0:
num, remainder = divmod(num, 62)
res = CHARSET[remainder] + res
return res


@ignore_docs
def create_hmac_signature(secret_key: str, message: str) -> str:
"""Generates an HMAC signature and encodes it using Base62. Base62 encoding reduces the signature length.

HMAC signature is truncated to 30 characters to make it shorter.

Args:
secret_key (str): Secret key used for signing signatures
message (str): Message to be signed

Returns:
str: Base62 encoded signature
"""
signature = hmac.new(secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).hexdigest()[:30]

decimal_signature = int(signature, 16)

return encode_base62(decimal_signature)


def create_storage_content_signature(
resource_id: str, url_signing_secret_key: str, expires_in_millis: int | None = None, version: int = 0
) -> str:
"""Create a secure signature for a resource like a dataset or key-value store.

This signature is used to generate a signed URL for authenticated access, which can be expiring or permanent.
The signature is created using HMAC with the provided secret key and includes
the resource ID, expiration time, and version.

Note: expires_in_millis is optional. If not provided, the signature will not expire.

"""
expires_at = int(time.time() * 1000) + expires_in_millis if expires_in_millis else 0

message_to_sign = f'{version}.{expires_at}.{resource_id}'
hmac = create_hmac_signature(url_signing_secret_key, message_to_sign)

base64url_encoded_payload = base64.urlsafe_b64encode(f'{version}.{expires_at}.{hmac}'.encode())
return base64url_encoded_payload.decode('utf-8')
62 changes: 62 additions & 0 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import base64
import io
from datetime import datetime, timezone
from enum import Enum

from apify_shared.utils import (
create_hmac_signature,
create_storage_content_signature,
encode_base62,
filter_out_none_values_recursively,
filter_out_none_values_recursively_internal,
ignore_docs,
Expand Down Expand Up @@ -146,3 +150,61 @@ def testing_function(_a: str, _b: str) -> str:
return 'dummy'

assert testing_function is ignore_docs(testing_function)


def test_encode_base62() -> None:
assert encode_base62(0) == '0'
assert encode_base62(10) == 'a'
assert encode_base62(999999999) == '15FTGf'


# This test ensures compatibility with the JavaScript version of the same method.
# https://github.com/apify/apify-shared-js/blob/master/packages/utilities/src/hmac.ts
def test_create_valid_hmac_signature() -> None:
# This test uses the same secret key and message as in JS tests.
secret_key = 'hmac-secret-key'
message = 'hmac-message-to-be-authenticated'
assert create_hmac_signature(secret_key, message) == 'pcVagAsudj8dFqdlg7mG'


def test_create_same_hmac() -> None:
# This test uses the same secret key and message as in JS tests.
secret_key = 'hmac-same-secret-key'
message = 'hmac-same-message-to-be-authenticated'
assert create_hmac_signature(secret_key, message) == 'FYMcmTIm3idXqleF1Sw5'
assert create_hmac_signature(secret_key, message) == 'FYMcmTIm3idXqleF1Sw5'


# This test ensures compatibility with the JavaScript version of the same method.
# https://github.com/apify/apify-shared-js/blob/master/packages/utilities/src/storages.ts
def test_create_storage_content_signature() -> None:
# This test uses the same parameters as in JS tests.
secret_key = 'hmac-secret-key'
message = 'resource-id'

signature = create_storage_content_signature(
resource_id=message,
url_signing_secret_key=secret_key,
)

version, expires_at, hmac = base64.urlsafe_b64decode(signature).decode('utf-8').split('.')

assert signature == 'MC4wLjNUd2ZFRTY1OXVmU05zbVM0N2xS'
assert version == '0'
assert expires_at == '0'
assert hmac == '3TwfEE659ufSNsmS47lR'


def test_create_storage_content_signature_with_expiration() -> None:
secret_key = 'hmac-secret-key'
message = 'resource-id'

signature = create_storage_content_signature(
resource_id=message,
url_signing_secret_key=secret_key,
expires_in_millis=10000,
)

version, expires_at, hmac = base64.urlsafe_b64decode(signature).decode('utf-8').split('.')
assert version == '0'
assert expires_at != '0'
Loading