diff --git a/README.md b/README.md index 58af6a7..1903169 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Generate CloudWatch Metrics embedded within structured log events. The embedded - Easily generate custom metrics from Lambda functions without requiring custom batching code, making blocking network requests or relying on 3rd party software. - Other compute environments (EC2, On-prem, ECS, EKS, and other container environments) are supported by installing the [CloudWatch Agent](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html). + - Examples can be found in [examples/README.md](examples/README.md) - **Linking metrics to high cardinality context** @@ -35,11 +36,13 @@ To get a metric logger, you can decorate your function with a `metric_scope`: ```py from aws_embedded_metrics import metric_scope +from aws_embedded_metrics.storage_resolution import StorageResolution @metric_scope def my_handler(metrics): metrics.put_dimensions({"Foo": "Bar"}) - metrics.put_metric("ProcessingLatency", 100, "Milliseconds") + metrics.put_metric("ProcessingLatency", 100, "Milliseconds", StorageResolution.STANDARD) + metrics.put_metric("Memory.HeapUsed", 1600424.0, "Bytes", StorageResolution.HIGH) metrics.set_property("AccountId", "123456789012") metrics.set_property("RequestId", "422b1569-16f6-4a03") metrics.set_property("DeviceId", "61270781-c6ac-46f1") @@ -53,21 +56,29 @@ def my_handler(metrics): The `MetricsLogger` is the interface you will use to publish embedded metrics. -- **put_metric**(key: str, value: float, unit: str = "None") -> MetricsLogger +- **put_metric**(key: str, value: float, unit: str = "None", storage_resolution: int = 60) -> MetricsLogger -Adds a new metric to the current logger context. Multiple metrics using the same key will be appended to an array of values. The Embedded Metric Format supports a maximum of 100 values per key. If more metric values are added than are supported by the format, the logger will be flushed to allow for new metric values to be captured. +Adds a new metric to the current logger context. Multiple metrics using the same key will be appended to an array of values. Multiple metrics cannot have same key and different storage resolution. The Embedded Metric Format supports a maximum of 100 values per key. If more metric values are added than are supported by the format, the logger will be flushed to allow for new metric values to be captured. Requirements: - Name Length 1-255 characters - Name must be ASCII characters only - Values must be in the range of 8.515920e-109 to 1.174271e+108. In addition, special values (for example, NaN, +Infinity, -Infinity) are not supported. -- Units must meet CW Metrics unit requirements, if not it will default to None. See [MetricDatum](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html) for valid values. +- Metrics must meet CloudWatch Metrics requirements, otherwise a `InvalidMetricError` will be thrown. See [MetricDatum](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html) for valid values. + +- ##### Storage Resolution +An OPTIONAL value representing the storage resolution for the corresponding metric. Setting this to `High` specifies this metric as a high-resolution metric, so that CloudWatch stores the metric with sub-minute resolution down to one second. Setting this to `Standard` specifies this metric as a standard-resolution metric, which CloudWatch stores at 1-minute resolution. If a value is not provided, then a default value of `Standard` is assumed. See [Cloud Watch High-Resolution metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/publishingMetrics.html#high-resolution-metrics) Examples: ```py +# Standard Resolution example put_metric("Latency", 200, "Milliseconds") +put_metric("Latency", 201, "Milliseconds", StorageResolution.STANDARD) + +# High Resolution example +put_metric("Memory.HeapUsed", 1600424.0, "Bytes", StorageResolution.HIGH) ``` - **set_property**(key: str, value: Any) -> MetricsLogger @@ -102,6 +113,7 @@ Requirements: - Length 1-255 characters - ASCII characters only +- Dimensions must meet CloudWatch Dimensions requirements, otherwise a `InvalidDimensionError` or `DimensionSetExceededError` will be thrown. See [Dimensions](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_Dimension.html) for valid values. Examples: @@ -110,9 +122,9 @@ put_dimensions({ "Operation": "Aggregator" }) put_dimensions({ "Operation": "Aggregator", "DeviceType": "Actuator" }) ``` -- **set_dimensions**(\*dimensions: Dict[str, str]) -> MetricsLogger +- **set_dimensions**(\*dimensions: Dict[str, str], use_default: bool = False) -> MetricsLogger -Explicitly override all dimensions. This will remove the default dimensions. +Explicitly override all dimensions. By default, this will disable the default dimensions, but can be configured using the *keyword-only* parameter `use_default`. **WARNING**: Every distinct value will result in a new CloudWatch Metric. If the cardinality of a particular value is expected to be high, you should consider @@ -122,6 +134,7 @@ Requirements: - Length 1-255 characters - ASCII characters only +- Dimensions must meet CloudWatch Dimensions requirements, otherwise a `InvalidDimensionError` or `DimensionSetExceededError` will be thrown. See [Dimensions](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_Dimension.html) for valid values. Examples: @@ -132,6 +145,23 @@ set_dimensions( ) ``` +```py +set_dimensions( + { "Operation": "Aggregator" }, + use_default=True # default dimensions would be enabled +) +``` + +- **reset_dimensions**(use_default: bool) -> MetricsLogger + +Explicitly clear all custom dimensions. The behavior of whether default dimensions should be used can be configured with the `use_default` parameter. + +Examples: + +```py +reset_dimensions(False) # this will clear all custom dimensions as well as disable default dimensions +``` + - **set_namespace**(value: str) -> MetricsLogger Sets the CloudWatch [namespace](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Namespace) that extracted metrics should be published to. If not set, a default value of aws-embedded-metrics will be used. @@ -140,6 +170,7 @@ Requirements: - Name Length 1-255 characters - Name must be ASCII characters only +- Namespace must meet CloudWatch Namespace requirements, otherwise a `InvalidNamespaceError` will be thrown. See [Namespaces](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Namespace) for valid values. Examples: @@ -147,9 +178,40 @@ Examples: set_namespace("MyApplication") ``` +- **set_timestamp**(timestamp: datetime) -> MetricsLogger + +Sets the timestamp of the metrics. If not set, current time of the client will be used. + +Timestamp must meet CloudWatch requirements, otherwise a InvalidTimestampError will be thrown. See [Timestamps](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#about_timestamp) for valid values. + +Examples: + +```py + set_timestamp(datetime.datetime.now()) +``` + + + - **flush**() -Flushes the current MetricsContext to the configured sink and resets all properties, dimensions and metric values. The namespace and default dimensions will be preserved across flushes. +Flushes the current MetricsContext to the configured sink and resets all properties and metric values. The namespace and default dimensions will be preserved across flushes. +Custom dimensions are **not** preserved by default, but this behavior can be changed by setting `logger.flush_preserve_dimensions = True`, so that custom dimensions would be preserved after each flushing thereafter. + +Example: + +```py +logger.flush() # only default dimensions will be preserved after each flush() +``` + +```py +logger.flush_preserve_dimensions = True +logger.flush() # custom dimensions and default dimensions will be preserved after each flush() +``` + +```py +logger.reset_dimensions(False) +logger.flush() # default dimensions are disabled; no dimensions will be preserved after each flush() +``` ### Configuration diff --git a/aws_embedded_metrics/config/configuration.py b/aws_embedded_metrics/config/configuration.py index 8628e26..5a04ba9 100644 --- a/aws_embedded_metrics/config/configuration.py +++ b/aws_embedded_metrics/config/configuration.py @@ -11,6 +11,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional + class Configuration: def __init__( @@ -24,6 +26,7 @@ def __init__( ec2_metadata_endpoint: str = None, namespace: str = None, disable_metric_extraction: bool = False, + environment: Optional[str] = None, ): self.debug_logging_enabled = debug_logging_enabled self.service_name = service_name @@ -34,3 +37,4 @@ def __init__( self.ec2_metadata_endpoint = ec2_metadata_endpoint self.namespace = namespace self.disable_metric_extraction = disable_metric_extraction + self.environment = environment diff --git a/aws_embedded_metrics/config/environment_configuration_provider.py b/aws_embedded_metrics/config/environment_configuration_provider.py index f9102ae..219105a 100644 --- a/aws_embedded_metrics/config/environment_configuration_provider.py +++ b/aws_embedded_metrics/config/environment_configuration_provider.py @@ -25,6 +25,7 @@ EC2_METADATA_ENDPOINT = "EC2_METADATA_ENDPOINT" NAMESPACE = "NAMESPACE" DISABLE_METRIC_EXTRACTION = "DISABLE_METRIC_EXTRACTION" +ENVIRONMENT_OVERRIDE = "ENVIRONMENT" class EnvironmentConfigurationProvider: @@ -43,6 +44,7 @@ def get_configuration(self) -> Configuration: self.__get_env_var(EC2_METADATA_ENDPOINT), self.__get_env_var(NAMESPACE), self.__get_bool_env_var(DISABLE_METRIC_EXTRACTION), + self.__get_env_var(ENVIRONMENT_OVERRIDE), ) @staticmethod diff --git a/aws_embedded_metrics/constants.py b/aws_embedded_metrics/constants.py index 756e9cd..45f1b6a 100644 --- a/aws_embedded_metrics/constants.py +++ b/aws_embedded_metrics/constants.py @@ -12,5 +12,14 @@ # limitations under the License. DEFAULT_NAMESPACE = "aws-embedded-metrics" -MAX_DIMENSIONS = 9 MAX_METRICS_PER_EVENT = 100 +MAX_DATAPOINTS_PER_METRIC = 100 +MAX_DIMENSION_SET_SIZE = 30 +MAX_DIMENSION_NAME_LENGTH = 250 +MAX_DIMENSION_VALUE_LENGTH = 1024 +MAX_METRIC_NAME_LENGTH = 1024 +MAX_NAMESPACE_LENGTH = 256 +VALID_NAMESPACE_REGEX = '^[a-zA-Z0-9._#:/-]+$' +TIMESTAMP = "Timestamp" +MAX_TIMESTAMP_PAST_AGE = 14 * 24 * 60 * 60 * 1000 # 14 days +MAX_TIMESTAMP_FUTURE_AGE = 2 * 60 * 60 * 1000 # 2 hours diff --git a/aws_embedded_metrics/environment/default_environment.py b/aws_embedded_metrics/environment/default_environment.py index 8f9e3f4..c4a16af 100644 --- a/aws_embedded_metrics/environment/default_environment.py +++ b/aws_embedded_metrics/environment/default_environment.py @@ -14,6 +14,7 @@ from aws_embedded_metrics.config import get_config from aws_embedded_metrics.environment import Environment from aws_embedded_metrics.logger.metrics_context import MetricsContext +from aws_embedded_metrics.sinks import Sink from aws_embedded_metrics.sinks.agent_sink import AgentSink from typing import Optional @@ -22,7 +23,7 @@ class DefaultEnvironment(Environment): def __init__(self) -> None: - self.sink: Optional[AgentSink] = None + self.sink: Optional[Sink] = None async def probe(self) -> bool: return True @@ -39,7 +40,7 @@ def get_log_group_name(self) -> str: def configure_context(self, context: MetricsContext) -> None: pass - def get_sink(self) -> AgentSink: + def get_sink(self) -> Sink: if self.sink is None: self.sink = AgentSink(self.get_log_group_name(), Config.log_stream_name) return self.sink diff --git a/aws_embedded_metrics/environment/ec2_environment.py b/aws_embedded_metrics/environment/ec2_environment.py index 77ffeb1..b158772 100644 --- a/aws_embedded_metrics/environment/ec2_environment.py +++ b/aws_embedded_metrics/environment/ec2_environment.py @@ -24,21 +24,34 @@ log = logging.getLogger(__name__) Config = get_config() +# Documentation for configuring instance metadata can be found here: +# https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html +TOKEN_ENDPOINT = "http://169.254.169.254/latest/api/token" +TOKEN_REQUEST_HEADER_KEY = "X-aws-ec2-metadata-token-ttl-seconds" +TOKEN_REQUEST_HEADER_VALUE = "21600" DEFAULT_EC2_METADATA_ENDPOINT = ( "http://169.254.169.254/latest/dynamic/instance-identity/document" ) +METADATA_REQUEST_HEADER_KEY = "X-aws-ec2-metadata-token" -async def fetch( # type: ignore - session: aiohttp.ClientSession, url: str +async def fetchJSON( + session: aiohttp.ClientSession, method: str, url: str, headers: Dict[str, str], ) -> Dict[str, Any]: - async with session.get(url, timeout=2) as response: + async with session.request(method, url, timeout=2, headers=headers) as response: # content_type=None prevents validation of the HTTP Content-Type header # The EC2 metadata endpoint uses text/plain instead of application/json # https://github.com/aio-libs/aiohttp/blob/7f777333a4ec0043ddf2e8d67146a626089773d9/aiohttp/web_request.py#L582-L585 return cast(Dict[str, Any], await response.json(content_type=None)) +async def fetchString( + session: aiohttp.ClientSession, method: str, url: str, headers: Dict[str, str] +) -> str: + async with session.request(method, url, timeout=2, headers=headers) as response: + return await response.text() + + class EC2Environment(Environment): def __init__(self) -> None: self.sink: Optional[AgentSink] = None @@ -48,10 +61,22 @@ async def probe(self) -> bool: metadata_endpoint = ( Config.ec2_metadata_endpoint or DEFAULT_EC2_METADATA_ENDPOINT ) + token_header = {TOKEN_REQUEST_HEADER_KEY: TOKEN_REQUEST_HEADER_VALUE} + log.info("Fetching token for EC2 metadata request from: %s", TOKEN_ENDPOINT) + try: + token = await fetchString(session, "PUT", TOKEN_ENDPOINT, token_header) + log.debug("Received token for request to EC2 metadata endpoint.") + except Exception: + log.info( + "Failed to fetch token for EC2 metadata request from %s", TOKEN_ENDPOINT + ) + return False + log.info("Fetching EC2 metadata from: %s", metadata_endpoint) try: - response_json = await fetch(session, metadata_endpoint) - log.debug("Received response from EC2 metdata endpoint.") + metadata_request_header = {METADATA_REQUEST_HEADER_KEY: token} + response_json = await fetchJSON(session, "GET", metadata_endpoint, metadata_request_header) + log.debug("Received response from EC2 metadata endpoint.") self.metadata = response_json return True except Exception: diff --git a/aws_embedded_metrics/environment/environment_detector.py b/aws_embedded_metrics/environment/environment_detector.py index a616d0f..9b3d320 100644 --- a/aws_embedded_metrics/environment/environment_detector.py +++ b/aws_embedded_metrics/environment/environment_detector.py @@ -12,15 +12,22 @@ # limitations under the License. import logging +from aws_embedded_metrics import config from aws_embedded_metrics.environment import Environment from aws_embedded_metrics.environment.default_environment import DefaultEnvironment from aws_embedded_metrics.environment.lambda_environment import LambdaEnvironment +from aws_embedded_metrics.environment.local_environment import LocalEnvironment from aws_embedded_metrics.environment.ec2_environment import EC2Environment from typing import Optional log = logging.getLogger(__name__) -environments = [LambdaEnvironment(), EC2Environment()] +lambda_environment = LambdaEnvironment() +ec2_environment = EC2Environment() +default_environment = DefaultEnvironment() +local_environment = LocalEnvironment() +environments = [lambda_environment, ec2_environment] +Config = config.get_config() class EnvironmentCache: @@ -32,6 +39,21 @@ async def resolve_environment() -> Environment: log.debug("Environment resolved from cache.") return EnvironmentCache.environment + if Config.environment: + lower_configured_enviroment = Config.environment.lower() + if lower_configured_enviroment == "lambda": + EnvironmentCache.environment = lambda_environment + elif lower_configured_enviroment == "ec2": + EnvironmentCache.environment = ec2_environment + elif lower_configured_enviroment == "default": + EnvironmentCache.environment = default_environment + elif lower_configured_enviroment == "local": + EnvironmentCache.environment = local_environment + else: + log.info("Failed to understand environment override: %s", Config.environment) + if EnvironmentCache.environment is not None: + return EnvironmentCache.environment + for env_under_test in environments: is_environment = False try: @@ -49,5 +71,5 @@ async def resolve_environment() -> Environment: return env_under_test log.info("No environment was detected. Using default.") - EnvironmentCache.environment = DefaultEnvironment() + EnvironmentCache.environment = default_environment return EnvironmentCache.environment diff --git a/aws_embedded_metrics/environment/lambda_environment.py b/aws_embedded_metrics/environment/lambda_environment.py index 8e52078..14635cd 100644 --- a/aws_embedded_metrics/environment/lambda_environment.py +++ b/aws_embedded_metrics/environment/lambda_environment.py @@ -14,7 +14,7 @@ from aws_embedded_metrics.environment import Environment from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.sinks import Sink -from aws_embedded_metrics.sinks.lambda_sink import LambdaSink +from aws_embedded_metrics.sinks.stdout_sink import StdoutSink import os @@ -24,7 +24,7 @@ def get_env(key: str) -> str: return "" -sink = LambdaSink() +sink = StdoutSink() class LambdaEnvironment(Environment): diff --git a/aws_embedded_metrics/environment/local_environment.py b/aws_embedded_metrics/environment/local_environment.py new file mode 100644 index 0000000..a6a5e55 --- /dev/null +++ b/aws_embedded_metrics/environment/local_environment.py @@ -0,0 +1,46 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. +# Licensed under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aws_embedded_metrics.config import get_config +from aws_embedded_metrics.environment import Environment +from aws_embedded_metrics.logger.metrics_context import MetricsContext +from aws_embedded_metrics.sinks import Sink +from aws_embedded_metrics.sinks.stdout_sink import StdoutSink +from typing import Optional + +Config = get_config() + + +class LocalEnvironment(Environment): + def __init__(self) -> None: + self.sink: Optional[Sink] = None + + async def probe(self) -> bool: + return False + + def get_name(self) -> str: + return Config.service_name or "Unknown" + + def get_type(self) -> str: + return Config.service_type or "Unknown" + + def get_log_group_name(self) -> str: + return Config.log_group_name or f"{self.get_name()}-metrics" + + def configure_context(self, context: MetricsContext) -> None: + pass + + def get_sink(self) -> Sink: + if self.sink is None: + self.sink = StdoutSink() + return self.sink diff --git a/aws_embedded_metrics/exceptions.py b/aws_embedded_metrics/exceptions.py new file mode 100644 index 0000000..22cfe94 --- /dev/null +++ b/aws_embedded_metrics/exceptions.py @@ -0,0 +1,41 @@ +# Copyright 2022 Amazon.com, Inc. or its affiliates. +# Licensed under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class DimensionSetExceededError(Exception): + def __init__(self, message: str) -> None: + # Call the base class constructor with the parameters it needs + super().__init__(message) + + +class InvalidDimensionError(Exception): + def __init__(self, message: str) -> None: + # Call the base class constructor with the parameters it needs + super().__init__(message) + + +class InvalidMetricError(Exception): + def __init__(self, message: str) -> None: + # Call the base class constructor with the parameters it needs + super().__init__(message) + + +class InvalidNamespaceError(Exception): + def __init__(self, message: str) -> None: + # Call the base class constructor with the parameters it needs + super().__init__(message) + + +class InvalidTimestampError(Exception): + def __init__(self, message: str) -> None: + # Call the base class constructor with the parameters it needs + super().__init__(message) diff --git a/aws_embedded_metrics/logger/metric.py b/aws_embedded_metrics/logger/metric.py index 7ef419f..ee02259 100644 --- a/aws_embedded_metrics/logger/metric.py +++ b/aws_embedded_metrics/logger/metric.py @@ -10,12 +10,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from aws_embedded_metrics.storage_resolution import StorageResolution class Metric(object): - def __init__(self, value: float, unit: str = None): + def __init__(self, value: float, unit: str = None, storage_resolution: StorageResolution = StorageResolution.STANDARD): self.values = [value] self.unit = unit or "None" + self.storage_resolution = storage_resolution or StorageResolution.STANDARD def add_value(self, value: float) -> None: self.values.append(value) diff --git a/aws_embedded_metrics/logger/metrics_context.py b/aws_embedded_metrics/logger/metrics_context.py index 11a84ca..ac11ea9 100644 --- a/aws_embedded_metrics/logger/metrics_context.py +++ b/aws_embedded_metrics/logger/metrics_context.py @@ -12,10 +12,13 @@ # limitations under the License. -from aws_embedded_metrics import constants, utils +from datetime import datetime +from aws_embedded_metrics import constants, utils, validator from aws_embedded_metrics.config import get_config from aws_embedded_metrics.logger.metric import Metric -from typing import List, Dict, Any +from aws_embedded_metrics.validator import validate_dimension_set, validate_metric +from aws_embedded_metrics.storage_resolution import StorageResolution +from typing import List, Dict, Any, Set class MetricsContext(object): @@ -37,9 +40,10 @@ def __init__( self.default_dimensions: Dict[str, str] = default_dimensions or {} self.metrics: Dict[str, Metric] = {} self.should_use_default_dimensions = True - self.meta: Dict[str, Any] = {"Timestamp": utils.now()} + self.meta: Dict[str, Any] = {constants.TIMESTAMP: utils.now()} + self.metric_name_and_resolution_map: Dict[str, StorageResolution] = {} - def put_metric(self, key: str, value: float, unit: str = None) -> None: + def put_metric(self, key: str, value: float, unit: str = None, storage_resolution: StorageResolution = StorageResolution.STANDARD) -> None: """ Adds a metric measurement to the context. Multiple calls using the same key will be stored as an @@ -48,27 +52,37 @@ def put_metric(self, key: str, value: float, unit: str = None) -> None: context.put_metric("Latency", 100, "Milliseconds") ``` """ + validate_metric(key, value, unit, storage_resolution, self.metric_name_and_resolution_map) metric = self.metrics.get(key) if metric: # TODO: we should log a warning if the unit has been changed metric.add_value(value) else: - self.metrics[key] = Metric(value, unit) + self.metrics[key] = Metric(value, unit, storage_resolution) + self.metric_name_and_resolution_map[key] = storage_resolution - def put_dimensions(self, dimensions: Dict[str, str]) -> None: + def put_dimensions(self, dimension_set: Dict[str, str]) -> None: """ Adds dimensions to the context. ``` context.put_dimensions({ "k1": "v1", "k2": "v2" }) ``` """ - if dimensions is None: + if dimension_set is None: # TODO add ability to define failure strategy return - self.dimensions.append(dimensions) + validate_dimension_set(dimension_set) - def set_dimensions(self, dimensionSets: List[Dict[str, str]]) -> None: + # Duplicate dimension sets are removed before being added to the end of the collection. + # This ensures only latest dimension value is used as a target member on the root EMF node. + # This operation is O(n^2), but acceptable given sets are capped at 30 dimensions + incoming_keys: Set = set(dimension_set.keys()) + self.dimensions = list(filter(lambda dim: (set(dim.keys()) != incoming_keys), self.dimensions)) + + self.dimensions.append(dimension_set) + + def set_dimensions(self, dimension_sets: List[Dict[str, str]], use_default: bool = False) -> None: """ Overwrite all dimensions. ``` @@ -77,8 +91,12 @@ def set_dimensions(self, dimensionSets: List[Dict[str, str]]) -> None: { "k1": "v1", "k2": "v2" }]) ``` """ - self.should_use_default_dimensions = False - self.dimensions = dimensionSets + self.should_use_default_dimensions = use_default + + for dimension_set in dimension_sets: + validate_dimension_set(dimension_set) + + self.dimensions = dimension_sets def set_default_dimensions(self, default_dimensions: Dict) -> None: """ @@ -91,6 +109,16 @@ def set_default_dimensions(self, default_dimensions: Dict) -> None: """ self.default_dimensions = default_dimensions + def reset_dimensions(self, use_default: bool) -> None: + """ + Clear all custom dimensions on this MetricsLogger instance. Whether default dimensions should + be used can be configured by the input parameter. + :param use_default: indicates whether default dimensions should be used + """ + new_dimensions: List[Dict] = [] + self.dimensions = new_dimensions + self.should_use_default_dimensions = use_default + def set_property(self, key: str, value: Any) -> None: self.properties[key] = value @@ -119,14 +147,15 @@ def get_dimensions(self) -> List[Dict]: def __has_default_dimensions(self) -> bool: return self.default_dimensions is not None and len(self.default_dimensions) > 0 - def create_copy_with_context(self) -> "MetricsContext": + def create_copy_with_context(self, preserve_dimensions: bool = False) -> "MetricsContext": """ Creates a deep copy of the context excluding metrics. + Custom dimensions are NOT preserved by default unless preserve_dimensions parameter is set. """ new_properties: Dict = {} new_properties.update(self.properties) - # dimensions added with put_dimension will not be copied. + # custom dimensions will not be copied. # the reason for this is so that you can flush the same scope multiple # times without stacking new dimensions. Example: # @@ -136,7 +165,7 @@ def create_copy_with_context(self) -> "MetricsContext": # # my_func() # my_func() - new_dimensions: List[Dict] = [] + new_dimensions: List[Dict] = [] if not preserve_dimensions else self.dimensions new_default_dimensions: Dict = {} new_default_dimensions.update(self.default_dimensions) @@ -148,3 +177,21 @@ def create_copy_with_context(self) -> "MetricsContext": @staticmethod def empty() -> "MetricsContext": return MetricsContext() + + def set_timestamp(self, timestamp: datetime) -> None: + """ + Set the timestamp of metrics emitted in this context. If not set, the timestamp will default to the time the context is constructed. + + Timestamp must meet CloudWatch requirements, otherwise a InvalidTimestampError will be thrown. + See [Timestamps](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#about_timestamp) + for valid values. + + Parameters: + timestamp (datetime): The timestamp value to be set. + + Raises: + InvalidTimestampError: If the provided timestamp is invalid. + + """ + validator.validate_timestamp(timestamp) + self.meta[constants.TIMESTAMP] = utils.convert_to_milliseconds(timestamp) diff --git a/aws_embedded_metrics/logger/metrics_logger.py b/aws_embedded_metrics/logger/metrics_logger.py index c1da08a..ebbe469 100644 --- a/aws_embedded_metrics/logger/metrics_logger.py +++ b/aws_embedded_metrics/logger/metrics_logger.py @@ -11,9 +11,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime from aws_embedded_metrics.environment import Environment from aws_embedded_metrics.logger.metrics_context import MetricsContext +from aws_embedded_metrics.validator import validate_namespace from aws_embedded_metrics.config import get_config +from aws_embedded_metrics.storage_resolution import StorageResolution from typing import Any, Awaitable, Callable, Dict, Tuple import sys import traceback @@ -29,6 +32,7 @@ def __init__( ): self.resolve_environment = resolve_environment self.context: MetricsContext = context or MetricsContext.empty() + self.flush_preserve_dimensions: bool = False async def flush(self) -> None: # resolve the environment and get the sink @@ -37,14 +41,14 @@ async def flush(self) -> None: # first time in a non-lambda environment environment = await self.resolve_environment() - self.__configureContextForEnvironment(environment) + self.__configure_context_for_environment(environment) sink = environment.get_sink() # accept and reset the context sink.accept(self.context) - self.context = self.context.create_copy_with_context() + self.context = self.context.create_copy_with_context(self.flush_preserve_dimensions) - def __configureContextForEnvironment(self, env: Environment) -> None: + def __configure_context_for_environment(self, env: Environment) -> None: default_dimensions = { # LogGroup name will entirely depend on the environment since there # are some cases where the LogGroup cannot be configured (e.g. Lambda) @@ -63,16 +67,23 @@ def put_dimensions(self, dimensions: Dict[str, str]) -> "MetricsLogger": self.context.put_dimensions(dimensions) return self - def set_dimensions(self, *dimensions: Dict[str, str]) -> "MetricsLogger": - self.context.set_dimensions(list(dimensions)) + def set_dimensions(self, *dimensions: Dict[str, str], use_default: bool = False) -> "MetricsLogger": + self.context.set_dimensions(list(dimensions), use_default) + return self + + def reset_dimensions(self, use_default: bool) -> "MetricsLogger": + self.context.reset_dimensions(use_default) return self def set_namespace(self, namespace: str) -> "MetricsLogger": + validate_namespace(namespace) self.context.namespace = namespace return self - def put_metric(self, key: str, value: float, unit: str = "None") -> "MetricsLogger": - self.context.put_metric(key, value, unit) + def put_metric( + self, key: str, value: float, unit: str = "None", storage_resolution: StorageResolution = StorageResolution.STANDARD + ) -> "MetricsLogger": + self.context.put_metric(key, value, unit, storage_resolution) return self def add_stack_trace(self, key: str, details: Any = None, exc_info: Tuple = None) -> "MetricsLogger": @@ -104,6 +115,10 @@ def add_stack_trace(self, key: str, details: Any = None, exc_info: Tuple = None) self.set_property(key, trace_value) return self + def set_timestamp(self, timestamp: datetime) -> "MetricsLogger": + self.context.set_timestamp(timestamp) + return self + def new(self) -> "MetricsLogger": return MetricsLogger( self.resolve_environment, self.context.create_copy_with_context() diff --git a/aws_embedded_metrics/metric_scope/__init__.py b/aws_embedded_metrics/metric_scope/__init__.py index 47044bc..b185f38 100644 --- a/aws_embedded_metrics/metric_scope/__init__.py +++ b/aws_embedded_metrics/metric_scope/__init__.py @@ -18,35 +18,70 @@ def metric_scope(fn): # type: ignore + if inspect.isasyncgenfunction(fn): + @wraps(fn) + async def async_gen_wrapper(*args, **kwargs): # type: ignore + logger = create_metrics_logger() + if "metrics" in inspect.signature(fn).parameters: + kwargs["metrics"] = logger + + try: + fn_gen = fn(*args, **kwargs) + while True: + result = await fn_gen.__anext__() + await logger.flush() + yield result + except Exception as ex: + await logger.flush() + if not isinstance(ex, StopIteration): + raise + + return async_gen_wrapper + + elif inspect.isgeneratorfunction(fn): + @wraps(fn) + def gen_wrapper(*args, **kwargs): # type: ignore + logger = create_metrics_logger() + if "metrics" in inspect.signature(fn).parameters: + kwargs["metrics"] = logger + + try: + fn_gen = fn(*args, **kwargs) + while True: + result = next(fn_gen) + asyncio.run(logger.flush()) + yield result + except Exception as ex: + asyncio.run(logger.flush()) + if not isinstance(ex, StopIteration): + raise - if asyncio.iscoroutinefunction(fn): + return gen_wrapper + elif asyncio.iscoroutinefunction(fn): @wraps(fn) - async def wrapper(*args, **kwargs): # type: ignore + async def async_wrapper(*args, **kwargs): # type: ignore logger = create_metrics_logger() if "metrics" in inspect.signature(fn).parameters: kwargs["metrics"] = logger + try: return await fn(*args, **kwargs) - except Exception as e: - raise e finally: await logger.flush() - return wrapper - else: + return async_wrapper + else: @wraps(fn) def wrapper(*args, **kwargs): # type: ignore logger = create_metrics_logger() if "metrics" in inspect.signature(fn).parameters: kwargs["metrics"] = logger + try: return fn(*args, **kwargs) - except Exception as e: - raise e finally: - loop = asyncio.get_event_loop() - loop.run_until_complete(logger.flush()) + asyncio.run(logger.flush()) return wrapper diff --git a/aws_embedded_metrics/py.typed b/aws_embedded_metrics/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/aws_embedded_metrics/serializers/log_serializer.py b/aws_embedded_metrics/serializers/log_serializer.py index fcc496a..36bbb12 100644 --- a/aws_embedded_metrics/serializers/log_serializer.py +++ b/aws_embedded_metrics/serializers/log_serializer.py @@ -14,7 +14,11 @@ from aws_embedded_metrics.config import get_config from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers import Serializer -from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT +from aws_embedded_metrics.constants import ( + MAX_DIMENSION_SET_SIZE, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_METRIC +) +from aws_embedded_metrics.exceptions import DimensionSetExceededError +from aws_embedded_metrics.storage_resolution import StorageResolution import json from typing import Any, Dict, List @@ -29,7 +33,11 @@ def serialize(context: MetricsContext) -> List[str]: for dimension_set in context.get_dimensions(): keys = list(dimension_set.keys()) - dimension_keys.append(keys[0:MAX_DIMENSIONS]) + if len(keys) > MAX_DIMENSION_SET_SIZE: + err_msg = (f"Maximum number of dimensions per dimension set allowed are {MAX_DIMENSION_SET_SIZE}. " + f"Account for default dimensions if not using set_dimensions.") + raise DimensionSetExceededError(err_msg) + dimension_keys.append(keys) dimensions_properties = {**dimensions_properties, **dimension_set} def create_body() -> Dict[str, Any]: @@ -50,29 +58,57 @@ def create_body() -> Dict[str, Any]: } return body - current_body: Dict[str, Any] = create_body() + current_body: Dict[str, Any] = {} event_batches: List[str] = [] num_metrics_in_current_body = 0 - for metric_name, metric in context.metrics.items(): + # Track if any given metric has data remaining to be serialized + remaining_data = True - if len(metric.values) == 1: - current_body[metric_name] = metric.values[0] - else: - current_body[metric_name] = metric.values + # Track batch number to know where to slice metric data + i = 0 + complete_metrics = set() + while remaining_data: + remaining_data = False + current_body = create_body() - if not config.disable_metric_extraction: - current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) + for metric_name, metric in context.metrics.items(): + # ensure we don't add duplicates of metrics we already completed + if metric_name in complete_metrics: + continue - num_metrics_in_current_body += 1 + if len(metric.values) == 1: + current_body[metric_name] = metric.values[0] + complete_metrics.add(metric_name) + else: + # Slice metric data as each batch cannot contain more than + # MAX_DATAPOINTS_PER_METRIC entries for a given metric + start_index = i * MAX_DATAPOINTS_PER_METRIC + end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC + current_body[metric_name] = metric.values[start_index:end_index] - should_serialize: bool = num_metrics_in_current_body == MAX_METRICS_PER_EVENT - if should_serialize: - event_batches.append(json.dumps(current_body)) - current_body = create_body() - num_metrics_in_current_body = 0 + # Make sure to consume remaining values if we sliced before the end + # of the metric value list + if len(metric.values) > end_index: + remaining_data = True + else: + complete_metrics.add(metric_name) + + metric_body = {"Name": metric_name, "Unit": metric.unit} + if metric.storage_resolution == StorageResolution.HIGH: + metric_body["StorageResolution"] = metric.storage_resolution.value # type: ignore + if not config.disable_metric_extraction: + current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append(metric_body) + num_metrics_in_current_body += 1 - if not event_batches or num_metrics_in_current_body > 0: - event_batches.append(json.dumps(current_body)) + if (num_metrics_in_current_body == MAX_METRICS_PER_EVENT): + event_batches.append(json.dumps(current_body)) + current_body = create_body() + num_metrics_in_current_body = 0 + + # iter over missing datapoints + i += 1 + if not event_batches or num_metrics_in_current_body > 0: + event_batches.append(json.dumps(current_body)) return event_batches diff --git a/aws_embedded_metrics/sinks/lambda_sink.py b/aws_embedded_metrics/sinks/stdout_sink.py similarity index 89% rename from aws_embedded_metrics/sinks/lambda_sink.py rename to aws_embedded_metrics/sinks/stdout_sink.py index 54e390b..4004938 100644 --- a/aws_embedded_metrics/sinks/lambda_sink.py +++ b/aws_embedded_metrics/sinks/stdout_sink.py @@ -17,14 +17,15 @@ from aws_embedded_metrics.serializers.log_serializer import LogSerializer -class LambdaSink(Sink): +class StdoutSink(Sink): def __init__(self, serializer: Serializer = LogSerializer()): self.serializer = serializer def accept(self, context: MetricsContext) -> None: for serialized_content in self.serializer.serialize(context): - print(serialized_content) + if serialized_content: + print(serialized_content) @staticmethod def name() -> str: - return "LambdaSink" + return "StdoutSink" diff --git a/aws_embedded_metrics/storage_resolution.py b/aws_embedded_metrics/storage_resolution.py new file mode 100644 index 0000000..73b999f --- /dev/null +++ b/aws_embedded_metrics/storage_resolution.py @@ -0,0 +1,16 @@ +from enum import Enum, EnumMeta + + +class StorageResolutionMeta(EnumMeta): + def __contains__(self, item: object) -> bool: + try: + self(item) + except (ValueError, TypeError): + return False + else: + return True + + +class StorageResolution(Enum, metaclass=StorageResolutionMeta): + STANDARD = 60 + HIGH = 1 diff --git a/aws_embedded_metrics/unit.py b/aws_embedded_metrics/unit.py index dfd01e9..58f25e2 100644 --- a/aws_embedded_metrics/unit.py +++ b/aws_embedded_metrics/unit.py @@ -1,7 +1,17 @@ -from enum import Enum +from enum import Enum, EnumMeta -class Unit(Enum): +class UnitMeta(EnumMeta): + def __contains__(self, item: object) -> bool: + try: + self(item) + except (ValueError, TypeError): + return False + else: + return True + + +class Unit(Enum, metaclass=UnitMeta): SECONDS = "Seconds" MICROSECONDS = "Microseconds" MILLISECONDS = "Milliseconds" @@ -28,3 +38,4 @@ class Unit(Enum): GIGABITS_PER_SECOND = "Gigabits/Second" TERABITS_PER_SECOND = "Terabits/Second" COUNT_PER_SECOND = "Count/Second" + NONE = "None" diff --git a/aws_embedded_metrics/utils.py b/aws_embedded_metrics/utils.py index b73d10f..8753cd6 100644 --- a/aws_embedded_metrics/utils.py +++ b/aws_embedded_metrics/utils.py @@ -12,4 +12,12 @@ # limitations under the License. import time +from datetime import datetime def now() -> int: return int(round(time.time() * 1000)) + + +def convert_to_milliseconds(dt: datetime) -> int: + if dt == datetime.min: + return 0 + + return int(round(dt.timestamp() * 1000)) diff --git a/aws_embedded_metrics/validator.py b/aws_embedded_metrics/validator.py new file mode 100644 index 0000000..d6ac3fe --- /dev/null +++ b/aws_embedded_metrics/validator.py @@ -0,0 +1,147 @@ +# Copyright 2019 Amazon.com, Inc. or its affiliates. +# Licensed under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import re +from typing import Dict, Optional +from aws_embedded_metrics.unit import Unit +from aws_embedded_metrics.storage_resolution import StorageResolution +from aws_embedded_metrics.exceptions import DimensionSetExceededError, InvalidDimensionError, InvalidMetricError, InvalidNamespaceError +from aws_embedded_metrics.exceptions import InvalidTimestampError +from datetime import datetime +from aws_embedded_metrics import constants, utils + + +def validate_dimension_set(dimension_set: Dict[str, str]) -> None: + """ + Validates a dimension set + + Parameters: + dimension_set (Dict[str, str]): The dimension set to validate + + Raises: + DimensionSetExceededError: If the dimension set is too large + InvalidDimensionError: If a dimension is invalid + """ + if len(dimension_set) > constants.MAX_DIMENSION_SET_SIZE: + raise DimensionSetExceededError( + f"Maximum number of dimensions per dimension set allowed are {constants.MAX_DIMENSION_SET_SIZE}") + + for name, value in dimension_set.items(): + if not name or len(name.strip()) == 0: + raise InvalidDimensionError("Dimension name must include at least one non-whitespace character") + + if not value or len(value.strip()) == 0: + raise InvalidDimensionError("Dimension value must include at least one non-whitespace character") + + if len(name) > constants.MAX_DIMENSION_NAME_LENGTH: + raise InvalidDimensionError(f"Dimension name cannot be longer than {constants.MAX_DIMENSION_NAME_LENGTH} characters") + + if len(value) > constants.MAX_DIMENSION_VALUE_LENGTH: + raise InvalidDimensionError(f"Dimension value cannot be longer than {constants.MAX_DIMENSION_VALUE_LENGTH} characters") + + if not name.isascii(): + raise InvalidDimensionError(f"Dimension name contains invalid characters: {name}") + + if not value.isascii(): + raise InvalidDimensionError(f"Dimension value contains invalid characters: {value}") + + if name.startswith(":"): + raise InvalidDimensionError("Dimension name cannot start with ':'") + + +def validate_metric(name: str, + value: float, + unit: Optional[str], + storage_resolution: StorageResolution, + metric_name_and_resolution_map: dict) -> None: + """ + Validates a metric + + Parameters: + name (str): The name of the metric + value (float): The value of the metric + unit (Optional[str]): The unit of the metric + storage_resolution (Optional[int]): The storage resolution of metric + metric_name_and_resolution_map (dict): The map used for validating metric + + Raises: + InvalidMetricError: If the metric is invalid + """ + if not name or len(name.strip()) == 0: + raise InvalidMetricError("Metric name must include at least one non-whitespace character") + + if len(name) > constants.MAX_DIMENSION_NAME_LENGTH: + raise InvalidMetricError(f"Metric name cannot be longer than {constants.MAX_DIMENSION_NAME_LENGTH} characters") + + if not math.isfinite(value): + raise InvalidMetricError("Metric value must be finite") + + if unit is not None and unit not in Unit: + raise InvalidMetricError(f"Metric unit is not valid: {unit}") + + if storage_resolution is None or storage_resolution not in StorageResolution: + raise InvalidMetricError(f"Metric storage resolution is not valid: {storage_resolution}") + + if name in metric_name_and_resolution_map and metric_name_and_resolution_map.get(name) is not storage_resolution: + raise InvalidMetricError( + f"Resolution for metrics {name} is already set. A single log event cannot have a metric with two different resolutions.") + + +def validate_namespace(namespace: str) -> None: + """ + Validates a namespace + + Parameters: + namespace (str): The namespace to validate + + Raises: + InvalidNamespaceError: If the namespace is invalid + """ + if not namespace or len(namespace.strip()) == 0: + raise InvalidNamespaceError("Namespace must include at least one non-whitespace character") + + if len(namespace) > constants.MAX_NAMESPACE_LENGTH: + raise InvalidNamespaceError(f"Namespace cannot be longer than {constants.MAX_NAMESPACE_LENGTH} characters") + + if not re.match(constants.VALID_NAMESPACE_REGEX, namespace): + raise InvalidNamespaceError(f"Namespace contains invalid characters: {namespace}") + + +def validate_timestamp(timestamp: datetime) -> None: + """ + Validates a given timestamp based on CloudWatch Timestamp guidelines. + + Timestamp must meet CloudWatch requirements, otherwise a InvalidTimestampError will be thrown. + See [Timestamps](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#about_timestamp) + for valid values. + + Parameters: + timestamp (datetime): Datetime object representing the timestamp to validate. + + Raises: + InvalidTimestampError: If the timestamp is either None, too old, or too far in the future. + """ + if not timestamp: + raise InvalidTimestampError("Timestamp must be a valid datetime object") + + given_time_in_milliseconds = utils.convert_to_milliseconds(timestamp) + current_time_in_milliseconds = utils.now() + + if given_time_in_milliseconds < (current_time_in_milliseconds - constants.MAX_TIMESTAMP_PAST_AGE): + raise InvalidTimestampError( + f"Timestamp {str(timestamp)} must not be older than {int(constants.MAX_TIMESTAMP_PAST_AGE/(24 * 60 * 60 * 1000))} days") + + if given_time_in_milliseconds > (current_time_in_milliseconds + constants.MAX_TIMESTAMP_FUTURE_AGE): + raise InvalidTimestampError( + f"Timestamp {str(timestamp)} must not be newer than {int(constants.MAX_TIMESTAMP_FUTURE_AGE/(60 * 60 * 1000))} hours") diff --git a/bin/deploy-canary.sh b/bin/deploy-canary.sh index 7aa9037..a9ba7da 100755 --- a/bin/deploy-canary.sh +++ b/bin/deploy-canary.sh @@ -8,22 +8,22 @@ LIB_PATH=$rootdir CANARY_PATH=$LIB_PATH/tests/canary/agent ACCOUNT_ID=863722843142 REGION=us-west-2 -IMAGE_NAME=emf-python-canary -ECS_CLUSTER_NAME=emf-canary -ECS_TASK_FAMILY=emf-python-canary -ECS_SERVICE_NAME=emf-python-canary -ECR_REMOTE=$ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/$IMAGE_NAME +EMF_LANGUAGE=python +IMAGE_NAME=emf-$EMF_LANGUAGE-canary +ECS_CLUSTER_NAME=emf-canary-cluster +ECS_TASK_FAMILY=emf-canary-$EMF_LANGUAGE-tasks +ECS_SERVICE_NAME=emf-canary-$EMF_LANGUAGE-service +ECR_ENDPOINT=$ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com +ECR_REMOTE=$ECR_ENDPOINT/$IMAGE_NAME pushd $CANARY_PATH echo 'INSTALLING LOCAL PROJECT' python3 -m venv venv source venv/bin/activate pip3 install $rootdir -pip3 install psutil -pip3 install getversion echo 'BUILDING THE EXAMPLE DOCKER IMAGE' -`aws ecr get-login --no-include-email --region $REGION` +aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ECR_ENDPOINT docker build . -t $IMAGE_NAME:latest check_exit @@ -40,13 +40,13 @@ aws ecs update-service \ --service $ECS_SERVICE_NAME \ --force-new-deployment \ --task-definition $(aws ecs register-task-definition \ - --network-mode bridge \ - --requires-compatibilities EC2 \ - --task-role arn:aws:iam::$ACCOUNT_ID:role/ecsTaskExecutionRole \ - --execution-role-arn "arn:aws:iam::$ACCOUNT_ID:role/ecsTaskExecutionRole" \ + --network-mode awsvpc \ + --requires-compatibilities FARGATE \ + --task-role arn:aws:iam::$ACCOUNT_ID:role/ECSCanaryTaskExecutionRole \ + --execution-role-arn "arn:aws:iam::$ACCOUNT_ID:role/ECSCanaryTaskExecutionRole" \ --region $REGION \ - --memory 256 \ - --cpu '0.5 vcpu' \ + --memory 512 \ + --cpu '0.25 vcpu' \ --family $ECS_TASK_FAMILY \ --container-definitions "$(cat container-definitions.json)" \ | jq --raw-output '.taskDefinition.taskDefinitionArn' | awk -F '/' '{ print $2 }') diff --git a/bin/run-integ-tests.sh b/bin/run-integ-tests.sh index f8d5d98..51fb3de 100755 --- a/bin/run-integ-tests.sh +++ b/bin/run-integ-tests.sh @@ -18,6 +18,27 @@ status_code=0 # Configure and start the agent ################################### +# Check if IAM user credentials exist +if [ -z "$AWS_ACCESS_KEY_ID" ] || [ -z "$AWS_SECRET_ACCESS_KEY" ]; then + echo "No IAM user credentials found, assuming we are running on CodeBuild pipeline, falling back to IAM role..." + + # Store the AWS STS assume-role output and extract credentials + CREDS=$(aws sts assume-role \ + --role-arn $Code_Build_Execution_Role_ARN \ + --role-session-name "session-$(uuidgen)" \ + --query 'Credentials.[AccessKeyId,SecretAccessKey,SessionToken]' \ + --output text \ + --duration-seconds 3600) + + # Parse the output into separate variables + read AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_SESSION_TOKEN <<< $CREDS + + # Export the variables + export AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_SESSION_TOKEN +else + echo "Using provided IAM user credentials..." +fi + $rootdir/bin/start-agent.sh ################################### diff --git a/bin/start-agent.sh b/bin/start-agent.sh index 0fb09a8..e4b0c7d 100755 --- a/bin/start-agent.sh +++ b/bin/start-agent.sh @@ -22,6 +22,7 @@ cd $rootdir/tests/integ/agent echo "[AmazonCloudWatchAgent] aws_access_key_id = $AWS_ACCESS_KEY_ID aws_secret_access_key = $AWS_SECRET_ACCESS_KEY +aws_session_token = $AWS_SESSION_TOKEN " > ./.aws/credentials echo "[profile AmazonCloudWatchAgent] diff --git a/buildspecs/buildspec.canary.yml b/buildspecs/buildspec.canary.yml deleted file mode 100644 index 7450fbc..0000000 --- a/buildspecs/buildspec.canary.yml +++ /dev/null @@ -1,22 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - parameter-store: - AWS_ACCESS_KEY_ID: AccessKey - AWS_SECRET_ACCESS_KEY: SecretKey -phases: - install: - runtime-versions: - python: 3.7 - commands: - # start docker - # https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files - - nohup /usr/local/bin/dockerd --host=unix:///var/run/docker.sock --host=tcp://127.0.0.1:2375 --storage-driver=overlay2& - - timeout 15 sh -c "until docker info; do echo .; sleep 1; done" - - pip install tox - build: - commands: - - tox - - ./bin/deploy-canary.sh diff --git a/buildspecs/buildspec.release.prod.yml b/buildspecs/buildspec.release.prod.yml deleted file mode 100644 index 6290973..0000000 --- a/buildspecs/buildspec.release.prod.yml +++ /dev/null @@ -1,19 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - parameter-store: - TWINE_USERNAME: PyPIUsername - TWINE_PASSWORD: PyPIPassword -phases: - install: - runtime-versions: - python: 3.7 - commands: - - pip install tox - - pip install --user --upgrade twine - build: - commands: - - tox - - ./bin/publish-pypi.sh diff --git a/buildspecs/buildspec.release.test.yml b/buildspecs/buildspec.release.test.yml deleted file mode 100644 index 556e838..0000000 --- a/buildspecs/buildspec.release.test.yml +++ /dev/null @@ -1,20 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - TWINE_REPOSITORY: https://test.pypi.org/legacy/ - parameter-store: - TWINE_USERNAME: PyPIUsername - TWINE_PASSWORD: PyPIPassword -phases: - install: - runtime-versions: - python: 3.7 - commands: - - pip install tox - - pip install --user --upgrade twine - build: - commands: - - tox - - ./bin/publish-pypi.sh diff --git a/buildspecs/buildspec.yml b/buildspecs/buildspec.yml deleted file mode 100644 index 3f396f9..0000000 --- a/buildspecs/buildspec.yml +++ /dev/null @@ -1,22 +0,0 @@ -version: 0.2 - -env: - variables: - AWS_REGION: us-west-2 - parameter-store: - AWS_ACCESS_KEY_ID: AccessKey - AWS_SECRET_ACCESS_KEY: SecretKey -phases: - install: - runtime-versions: - python: 3.7 - commands: - # start docker - # https://docs.aws.amazon.com/codebuild/latest/userguide/sample-docker-custom-image.html#sample-docker-custom-image-files - - nohup /usr/local/bin/dockerd --host=unix:///var/run/docker.sock --host=tcp://127.0.0.1:2375 --storage-driver=overlay2& - - timeout 15 sh -c "until docker info; do echo .; sleep 1; done" - - pip install tox - build: - commands: - - tox - - ./bin/run-integ-tests.sh diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..4527755 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,51 @@ +# Examples + +## Docker + +With Docker images, using the `awslogs` log driver will send your container logs to CloudWatch Logs. All you have to do is write to STDOUT and your EMF logs will be processed. + +[Official Docker documentation for `awslogs` driver](https://docs.docker.com/config/containers/logging/awslogs/) + +## ECS and Fargate + +With ECS and Fargate, you can use the `awsfirelens` (recommended) or `awslogs` log driver to have your logs sent to CloudWatch Logs on your behalf. After configuring the options for your preferred log driver, you may write your EMF logs to STDOUT and they will be processed. + +[`awsfirelens` documentation](https://github.com/aws/amazon-cloudwatch-logs-for-fluent-bit) + +[ECS documentation on `awslogs` log driver](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html) + +## Fluent Bit and Fluentd + +Fluent Bit can be used to collect logs and push them to CloudWatch Logs. After configuring the Amazon CloudWatch Logs output plugin, you may write your EMF logs to STDOUT and they will be processed. + +[Getting Started with Fluent Bit](https://docs.fluentbit.io/manual/installation/getting-started-with-fluent-bit) + +[Amazon CloudWatch output plugin for Fluent Bit](https://docs.fluentbit.io/manual/pipeline/outputs/cloudwatch) + +### Example Metrics + +```json +{ + "_aws": { + "Timestamp": 1583902595342, + "CloudWatchMetrics": [ + { + "Dimensions": [[ "ServiceName", "ServiceType" ]], + "Metrics": [{ "Name": "ProcessingTime", "Unit": "Milliseconds" }], + "Namespace": "aws-embedded-metrics" + } + ] + }, + "ServiceName": "example", + "ServiceType": "AWS::ECS::Container", + "Method": "GET", + "Url": "/test", + "containerId": "702e4bcf1345", + "createdAt": "2020-03-11T04:54:24.981207801Z", + "startedAt": "2020-03-11T04:54:25.594413051Z", + "image": ".dkr.ecr..amazonaws.com/emf-examples:latest", + "cluster": "emf-example", + "taskArn": "arn:aws:ecs:::task/2fe946f6-8a2e-41a4-8fec-c4983bad8f74", + "ProcessingTime": 5 +} +``` diff --git a/examples/ec2/app.py b/examples/ec2/app.py index 13e9b6c..00a587d 100644 --- a/examples/ec2/app.py +++ b/examples/ec2/app.py @@ -1,4 +1,5 @@ from aws_embedded_metrics import metric_scope +from aws_embedded_metrics.storage_resolution import StorageResolution import logging @@ -10,6 +11,7 @@ def my_handler(metrics): metrics.put_dimensions({"Foo": "Bar"}) metrics.put_metric("ProcessingLatency", 100, "Milliseconds") + metrics.put_metric("CPU Utilization", 87, "Percent", StorageResolution.HIGH) metrics.set_property("AccountId", "123456789012") metrics.set_property("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8") metrics.set_property("DeviceId", "61270781-c6ac-46f1-baf7-22c808af8162") diff --git a/examples/lambda/function.py b/examples/lambda/function.py index cd63b4d..3c9801f 100644 --- a/examples/lambda/function.py +++ b/examples/lambda/function.py @@ -1,10 +1,12 @@ from aws_embedded_metrics import metric_scope +from aws_embedded_metrics.storage_resolution import StorageResolution @metric_scope def my_handler(event, context, metrics): metrics.put_dimensions({"Foo": "Bar"}) metrics.put_metric("ProcessingLatency", 100, "Milliseconds") + metrics.put_metric("CPU Utilization", 87, "Percent", StorageResolution.HIGH) metrics.set_property("AccountId", "123456789012") metrics.set_property("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8") metrics.set_property("DeviceId", "61270781-c6ac-46f1-baf7-22c808af8162") diff --git a/mypy.ini b/mypy.ini index a4b98cc..db30524 100644 --- a/mypy.ini +++ b/mypy.ini @@ -13,4 +13,5 @@ warn_unused_ignores = False warn_return_any = True strict_equality = True - ignore_missing_imports = True \ No newline at end of file + ignore_missing_imports = True + no_implicit_optional = False \ No newline at end of file diff --git a/setup.py b/setup.py index 2de9cae..7e8cc13 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name="aws-embedded-metrics", - version="1.0.5", + version="3.3.0", author="Amazon Web Services", author_email="jarnance@amazon.com", description="AWS Embedded Metrics Package", @@ -22,7 +22,11 @@ "License :: OSI Approved :: Apache Software License", ], packages=find_packages(exclude=["tests*"]), + package_data={ + "aws_embedded_metrics": ["py.typed"], + }, include_package_data=True, install_requires=["aiohttp"], - test_suite="tests" + test_suite="tests", + python_requires=">=3.6" ) diff --git a/tests/canary/agent/canary.py b/tests/canary/agent/canary.py index 1f73090..7bbf807 100644 --- a/tests/canary/agent/canary.py +++ b/tests/canary/agent/canary.py @@ -2,6 +2,7 @@ import aws_embedded_metrics from aws_embedded_metrics import metric_scope from aws_embedded_metrics.config import get_config +from aws_embedded_metrics.storage_resolution import StorageResolution from getversion import get_module_version import os import psutil @@ -26,12 +27,12 @@ async def app(init, last_run_duration, metrics): metrics.set_dimensions({"Runtime": 'Python', "Platform": 'ECS', "Agent": 'CloudWatchAgent', "Version": version}) metrics.put_metric('Invoke', 1, "Count") metrics.put_metric('Duration', last_run_duration, 'Seconds') - metrics.put_metric('Memory.RSS', process.memory_info().rss, 'Bytes') + metrics.put_metric('Memory.RSS', process.memory_info().rss, 'Bytes', StorageResolution.HIGH) async def main(): init = True - duration = None + duration = 0 # wait for agent to start # TODO: this should not be needed if we're using a ring buffer to queue and re-try events await asyncio.sleep(10) diff --git a/tests/canary/agent/container-definitions.json b/tests/canary/agent/container-definitions.json index e66822f..facf927 100644 --- a/tests/canary/agent/container-definitions.json +++ b/tests/canary/agent/container-definitions.json @@ -18,7 +18,7 @@ "environment": [ { "name": "AWS_EMF_AGENT_ENDPOINT", - "value": "tcp://cloudwatch-agent-python:25888" + "value": "tcp://127.0.0.1:25888" }, { "name": "AWS_EMF_ENABLE_DEBUG_LOGGING", @@ -26,12 +26,11 @@ } ], "image": "863722843142.dkr.ecr.us-west-2.amazonaws.com/emf-python-canary", - "links": ["cloudwatch-agent-python"], "name": "emf-python-canary" }, { "name": "cloudwatch-agent-python", - "image": "amazon/cloudwatch-agent:latest", + "image": "public.ecr.aws/cloudwatch-agent/cloudwatch-agent:latest", "logConfiguration": { "logDriver": "awslogs", "options": { @@ -48,4 +47,4 @@ } ] } -] \ No newline at end of file +] diff --git a/tests/canary/agent/start.sh b/tests/canary/agent/start.sh index 4e3b60c..02776f6 100755 --- a/tests/canary/agent/start.sh +++ b/tests/canary/agent/start.sh @@ -1,3 +1,7 @@ #!/bin/bash . /app/venv/bin/activate -python ./canary.py \ No newline at end of file + +pip3 install psutil > /dev/null +pip3 install getversion > /dev/null + +python3 ./canary.py \ No newline at end of file diff --git a/tests/config/test_config.py b/tests/config/test_config.py index d4c559d..7c7908d 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -24,6 +24,7 @@ def test_can_get_config_from_environment(monkeypatch): ec2_metadata_endpoint = fake.word() namespace = fake.word() disable_metric_extraction = True + environment_override = fake.word() monkeypatch.setenv("AWS_EMF_ENABLE_DEBUG_LOGGING", str(debug_enabled)) monkeypatch.setenv("AWS_EMF_SERVICE_NAME", service_name) @@ -34,6 +35,7 @@ def test_can_get_config_from_environment(monkeypatch): monkeypatch.setenv("AWS_EMF_EC2_METADATA_ENDPOINT", ec2_metadata_endpoint) monkeypatch.setenv("AWS_EMF_NAMESPACE", namespace) monkeypatch.setenv("AWS_EMF_DISABLE_METRIC_EXTRACTION", str(disable_metric_extraction)) + monkeypatch.setenv("AWS_EMF_ENVIRONMENT", environment_override) # act result = get_config() @@ -48,6 +50,7 @@ def test_can_get_config_from_environment(monkeypatch): assert result.ec2_metadata_endpoint == ec2_metadata_endpoint assert result.namespace == namespace assert result.disable_metric_extraction == disable_metric_extraction + assert result.environment == environment_override def test_can_override_config(monkeypatch): @@ -61,6 +64,7 @@ def test_can_override_config(monkeypatch): monkeypatch.setenv("AWS_EMF_EC2_METADATA_ENDPOINT", fake.word()) monkeypatch.setenv("AWS_EMF_NAMESPACE", fake.word()) monkeypatch.setenv("AWS_EMF_DISABLE_METRIC_EXTRACTION", str(True)) + monkeypatch.setenv("AWS_EMF_ENVIRONMENT", fake.word()) config = get_config() @@ -73,6 +77,7 @@ def test_can_override_config(monkeypatch): ec2_metadata_endpoint = fake.word() namespace = fake.word() disable_metric_extraction = False + environment = fake.word() # act config.debug_logging_enabled = debug_enabled @@ -84,6 +89,7 @@ def test_can_override_config(monkeypatch): config.ec2_metadata_endpoint = ec2_metadata_endpoint config.namespace = namespace config.disable_metric_extraction = disable_metric_extraction + config.environment = environment # assert assert config.debug_logging_enabled == debug_enabled @@ -95,3 +101,4 @@ def test_can_override_config(monkeypatch): assert config.ec2_metadata_endpoint == ec2_metadata_endpoint assert config.namespace == namespace assert config.disable_metric_extraction == disable_metric_extraction + assert config.environment == environment diff --git a/tests/environment/test_ec2_environment.py b/tests/environment/test_ec2_environment.py index ffa97b5..b673241 100644 --- a/tests/environment/test_ec2_environment.py +++ b/tests/environment/test_ec2_environment.py @@ -16,7 +16,7 @@ @pytest.mark.asyncio async def test_probe_returns_true_if_fetch_succeeds(aresponses): # arrange - configure_response(aresponses, "{}") + configure_response(aresponses, fake.pystr(), "{}") env = EC2Environment() # act @@ -55,7 +55,7 @@ def test_get_name_returns_configured_name(): async def test_get_type_returns_ec2_instance(aresponses): # arrange expected = "AWS::EC2::Instance" - configure_response(aresponses, "{}") + configure_response(aresponses, fake.pystr(), "{}") env = EC2Environment() # environment MUST be detected before we can access the metadata @@ -79,6 +79,7 @@ async def test_configure_context_adds_ec2_metadata_props(aresponses): configure_response( aresponses, + fake.pystr(), json.dumps( { "imageId": image_id, @@ -109,12 +110,20 @@ async def test_configure_context_adds_ec2_metadata_props(aresponses): # Test helper methods -def configure_response(aresponses, json): +def configure_response(aresponses, token, json): + aresponses.add( + "169.254.169.254", + "/latest/api/token", + "put", + aresponses.Response(text=token, headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}), + ) aresponses.add( "169.254.169.254", "/latest/dynamic/instance-identity/document", "get", # the ec2-metdata endpoint does not actually set the correct # content-type header, it will instead use text/plain - aresponses.Response(text=json, content_type="text/plain"), + aresponses.Response(text=json, + content_type="text/plain", + headers={"X-aws-ec2-metadata-token": token}) ) diff --git a/tests/environment/test_environment_detector.py b/tests/environment/test_environment_detector.py index ea058c8..1a2d117 100644 --- a/tests/environment/test_environment_detector.py +++ b/tests/environment/test_environment_detector.py @@ -3,7 +3,7 @@ import pytest from importlib import reload -from aws_embedded_metrics.config import get_config +from aws_embedded_metrics import config from aws_embedded_metrics.environment.lambda_environment import LambdaEnvironment from aws_embedded_metrics.environment.default_environment import DefaultEnvironment @@ -11,7 +11,6 @@ from aws_embedded_metrics.environment import environment_detector fake = Faker() -Config = get_config() @pytest.fixture @@ -26,7 +25,7 @@ def before(): @pytest.mark.asyncio -async def test_resolve_environment_returns_LambdaEnvironment(before): +async def test_resolve_environment_returns_lambda_environment(before): # arrange os.environ["AWS_LAMBDA_FUNCTION_NAME"] = fake.word() @@ -60,3 +59,31 @@ async def test_resolve_environment_returns_default_envionment(before): # assert assert isinstance(result, DefaultEnvironment) + + +@pytest.mark.asyncio +async def test_resolve_environment_returns_override_ec2(before, monkeypatch): + # arrange + monkeypatch.setenv("AWS_EMF_ENVIRONMENT", "ec2") + reload(config) + reload(environment_detector) + + # act + result = await environment_detector.resolve_environment() + + # assert + assert isinstance(result, ec2_environment.EC2Environment) + + +@pytest.mark.asyncio +async def test_resolve_environment_returns_override_lambda(before, monkeypatch): + # arrange + monkeypatch.setenv("AWS_EMF_ENVIRONMENT", "lambda") + reload(config) + reload(environment_detector) + + # act + result = await environment_detector.resolve_environment() + + # assert + assert isinstance(result, LambdaEnvironment) diff --git a/tests/environment/test_lambda_environment.py b/tests/environment/test_lambda_environment.py index e9e76bd..9fbbf18 100644 --- a/tests/environment/test_lambda_environment.py +++ b/tests/environment/test_lambda_environment.py @@ -1,6 +1,6 @@ import os from aws_embedded_metrics.environment.lambda_environment import LambdaEnvironment -from aws_embedded_metrics.sinks.lambda_sink import LambdaSink +from aws_embedded_metrics.sinks.stdout_sink import StdoutSink import pytest from faker import Faker @@ -57,7 +57,7 @@ def test_get_log_group_name_returns_function_name(): assert result == expected_name -def test_create_sink_creates_LambdaSink(): +def test_create_sink_creates_lambda_sink(): # arrange env = LambdaEnvironment() @@ -65,4 +65,4 @@ def test_create_sink_creates_LambdaSink(): result = env.get_sink() # assert - assert isinstance(result, LambdaSink) + assert isinstance(result, StdoutSink) diff --git a/tests/environment/test_local_environment.py b/tests/environment/test_local_environment.py new file mode 100644 index 0000000..fbf063f --- /dev/null +++ b/tests/environment/test_local_environment.py @@ -0,0 +1,108 @@ +from aws_embedded_metrics import config +from aws_embedded_metrics.environment.local_environment import LocalEnvironment +from aws_embedded_metrics.sinks.stdout_sink import StdoutSink +import pytest +from faker import Faker + +Config = config.get_config() +fake = Faker() + + +@pytest.mark.asyncio +async def test_probe_always_returns_false(): + # arrange + env = LocalEnvironment() + + # act + result = await env.probe() + + # assert + assert result is False + + +def test_get_name_returns_unknown_if_not_specified(): + # arrange + env = LocalEnvironment() + Config.service_name = None + + # act + result = env.get_name() + + # assert + assert result == "Unknown" + + +def test_get_type_returns_unknown_if_not_specified(): + # arrange + env = LocalEnvironment() + Config.service_type = None + + # act + result = env.get_type() + + # assert + assert result == "Unknown" + + +def test_get_name_returns_name_if_configured(): + # arrange + expected_name = fake.word() + env = LocalEnvironment() + Config.service_name = expected_name + + # act + result = env.get_name() + + # assert + assert result == expected_name + + +def test__get_type__returns_type_if_configured(): + # arrange + expected_type = fake.word() + env = LocalEnvironment() + Config.service_type = expected_type + + # act + result = env.get_type() + + # assert + assert result == expected_type + + +def test__get_log_group_name__returns_log_group_if_configured(): + # arrange + expected = fake.word() + env = LocalEnvironment() + Config.log_group_name = expected + + # act + result = env.get_log_group_name() + + # assert + assert result == expected + + +def test__get_log_group_name__returns_service_name_metrics_if_not_configured(): + # arrange + expected = fake.word() + env = LocalEnvironment() + Config.service_name = expected + Config.log_group_name = None + + # act + result = env.get_log_group_name() + + # assert + assert result == f"{expected}-metrics" + + +def test__get_sink__returns_stdout_sink(): + # arrange + env = LocalEnvironment() + + # act + result = env.get_sink() + + # assert + assert isinstance(result, StdoutSink) diff --git a/tests/integ/agent/Dockerfile b/tests/integ/agent/Dockerfile index d8078d9..eb443fe 100644 --- a/tests/integ/agent/Dockerfile +++ b/tests/integ/agent/Dockerfile @@ -1,4 +1,4 @@ -FROM debian:latest +FROM debian:bullseye RUN apt-get update && \ apt-get install -y ca-certificates curl && \ diff --git a/tests/integ/agent/test_end_to_end.py b/tests/integ/agent/test_end_to_end.py index 9bb363e..8b45d39 100644 --- a/tests/integ/agent/test_end_to_end.py +++ b/tests/integ/agent/test_end_to_end.py @@ -42,6 +42,7 @@ async def do_work(metrics): metrics.put_dimensions({"Operation": "Agent"}) metrics.put_metric(metric_name, 100, "Milliseconds") metrics.set_property("RequestId", "422b1569-16f6-4a03-b8f0-fe3fd9b100f8") + metrics.set_timestamp(datetime.utcnow()) # act await do_work() diff --git a/tests/logger/test_metrics_context.py b/tests/logger/test_metrics_context.py index fde3d82..a0ab133 100644 --- a/tests/logger/test_metrics_context.py +++ b/tests/logger/test_metrics_context.py @@ -1,8 +1,17 @@ +from faker import Faker +from importlib import reload +from datetime import datetime, timedelta +import pytest +import math +import random +from aws_embedded_metrics import constants, utils +from aws_embedded_metrics.unit import Unit +from aws_embedded_metrics.storage_resolution import StorageResolution from aws_embedded_metrics import config from aws_embedded_metrics.logger.metrics_context import MetricsContext -from aws_embedded_metrics.constants import DEFAULT_NAMESPACE -from importlib import reload -from faker import Faker +from aws_embedded_metrics.constants import DEFAULT_NAMESPACE, MAX_TIMESTAMP_FUTURE_AGE, MAX_TIMESTAMP_PAST_AGE +from aws_embedded_metrics.exceptions import DimensionSetExceededError, InvalidDimensionError, InvalidMetricError +from aws_embedded_metrics.exceptions import InvalidTimestampError fake = Faker() @@ -42,14 +51,115 @@ def test_put_dimension_adds_to_dimensions(): # arrange context = MetricsContext() - dimension_key = fake.word() - dimension_value = fake.word() + dimensions_to_add = 30 + dimension_set = generate_dimension_set(dimensions_to_add) # act - context.put_dimensions({dimension_key: dimension_value}) + context.put_dimensions(dimension_set) + + # assert + assert context.dimensions == [dimension_set] + + +def test_put_dimensions_accept_multiple_unique_dimensions(): + # arrange + context = MetricsContext() + dimension1 = {fake.word(): fake.word()} + dimension2 = {fake.word(): fake.word()} + + # act + context.put_dimensions(dimension1) + context.put_dimensions(dimension2) + + # assert + assert len(context.get_dimensions()) == 2 + assert context.get_dimensions()[0] == dimension1 + assert context.get_dimensions()[1] == dimension2 + + +def test_put_dimensions_prevent_duplicate_dimensions(): + # arrange + context = MetricsContext() + pair1 = [fake.word(), fake.word()] + pair2 = [fake.word(), fake.word()] + + dimension1 = {pair1[0]: pair1[1]} + dimension2 = {pair2[0]: pair2[1]} + dimension3 = {pair1[0]: pair1[1], pair2[0]: pair2[1]} + + # act + context.put_dimensions(dimension1) + context.put_dimensions(dimension2) + context.put_dimensions(dimension1) + context.put_dimensions(dimension3) + context.put_dimensions(dimension2) + context.put_dimensions(dimension3) + + # assert + assert len(context.get_dimensions()) == 3 + assert context.get_dimensions()[0] == dimension1 + assert context.get_dimensions()[1] == dimension2 + assert context.get_dimensions()[2] == dimension3 + + +def test_put_dimensions_use_most_recent_dimension_value(): + # arrange + context = MetricsContext() + key1 = fake.word() + key2 = fake.word() + val1 = fake.word() + val2 = fake.word() + + dimension1 = {key1: val1} + dimension2 = {key2: val2} + dimension3 = {key1: val2} + dimension4 = {key2: val1} + dimension5 = {key1: val1, key2: val2} + dimension6 = {key1: val2, key2: val1} + + # act + context.put_dimensions(dimension1) + context.put_dimensions(dimension2) + context.put_dimensions(dimension5) + context.put_dimensions(dimension3) + context.put_dimensions(dimension4) + context.put_dimensions(dimension6) + + # assert + assert len(context.get_dimensions()) == 3 + assert context.get_dimensions()[0] == dimension3 + assert context.get_dimensions()[1] == dimension4 + assert context.get_dimensions()[2] == dimension6 + + +def test_put_dimensions_with_set_dimensions(): + # arrange + context = MetricsContext() + key1 = fake.word() + key2 = fake.word() + val1 = fake.word() + val2 = fake.word() + + dimension1 = {key1: val1} + dimension2 = {key2: val2} + dimension3 = {key1: val2} + dimension4 = {key2: val1} + dimension5 = {key1: val1, key2: val2} + dimension6 = {key1: val2, key2: val1} + + # act + context.put_dimensions(dimension1) + context.put_dimensions(dimension2) + context.set_dimensions([dimension3]) + context.put_dimensions(dimension4) + context.put_dimensions(dimension5) + context.put_dimensions(dimension6) # assert - assert context.dimensions == [{dimension_key: dimension_value}] + assert len(context.get_dimensions()) == 3 + assert context.get_dimensions()[0] == dimension3 + assert context.get_dimensions()[1] == dimension4 + assert context.get_dimensions()[2] == dimension6 def test_get_dimensions_returns_only_custom_dimensions_if_no_default_dimensions_not_set(): @@ -125,23 +235,50 @@ def test_get_dimensions_returns_merged_custom_and_default_dimensions(): assert [expected_dimensions] == actual_dimensions +@pytest.mark.parametrize( + "name, value", + [ + (None, "value"), + ("", "value"), + (" ", "value"), + ("a" * (constants.MAX_DIMENSION_NAME_LENGTH + 1), "value"), + ("ḓɨɱɛɳʂɨøɳ", "value"), + (":dim", "value"), + ("dim", ""), + ("dim", " "), + ("dim", "a" * (constants.MAX_DIMENSION_VALUE_LENGTH + 1)), + ("dim", "ṽɑɭʊɛ"), + ] +) +def test_add_invalid_dimensions_raises_exception(name, value): + context = MetricsContext() + + with pytest.raises(InvalidDimensionError): + context.put_dimensions({name: value}) + + with pytest.raises(InvalidDimensionError): + context.set_dimensions([{name: value}]) + + def test_put_metric_adds_metrics(): # arrange context = MetricsContext() metric_key = fake.word() metric_value = fake.random.random() - metric_unit = fake.word() + metric_unit = random.choice(list(Unit)).value + metric_storage_resolution = random.choice(list(StorageResolution)).value # act - context.put_metric(metric_key, metric_value, metric_unit) + context.put_metric(metric_key, metric_value, metric_unit, metric_storage_resolution) # assert metric = context.metrics[metric_key] assert metric.unit == metric_unit assert metric.values == [metric_value] + assert metric.storage_resolution == metric_storage_resolution -def test_put_metric_uses_None_unit_if_not_provided(): +def test_put_metric_uses_none_unit_if_not_provided(): # arrange context = MetricsContext() metric_key = fake.word() @@ -155,6 +292,45 @@ def test_put_metric_uses_None_unit_if_not_provided(): assert metric.unit == "None" +def test_put_metric_uses_standard_storage_resolution_if_not_provided(): + # arrange + context = MetricsContext() + metric_key = fake.word() + metric_value = fake.random.random() + + # act + context.put_metric(metric_key, metric_value) + + # assert + metric = context.metrics[metric_key] + assert metric.storage_resolution == StorageResolution.STANDARD + + +@pytest.mark.parametrize( + "name, value, unit, storage_resolution", + [ + ("", 1, "None", StorageResolution.STANDARD), + (" ", 1, "Seconds", StorageResolution.STANDARD), + ("a" * (constants.MAX_METRIC_NAME_LENGTH + 1), 1, "None", StorageResolution.STANDARD), + ("metric", float("inf"), "Count", StorageResolution.STANDARD), + ("metric", float("-inf"), "Count", StorageResolution.STANDARD), + ("metric", float("nan"), "Count", StorageResolution.STANDARD), + ("metric", math.inf, "Seconds", StorageResolution.STANDARD), + ("metric", -math.inf, "Seconds", StorageResolution.STANDARD), + ("metric", math.nan, "Seconds", StorageResolution.STANDARD), + ("metric", 1, "Kilometers/Fahrenheit", StorageResolution.STANDARD), + ("metric", 1, "Seconds", 2), + ("metric", 1, "Seconds", 0), + ("metric", 1, "Seconds", None) + ] +) +def test_put_invalid_metric_raises_exception(name, value, unit, storage_resolution): + context = MetricsContext() + + with pytest.raises(InvalidMetricError): + context.put_metric(name, value, unit, storage_resolution) + + def test_create_copy_with_context_creates_new_instance(): # arrange context = MetricsContext() @@ -237,10 +413,10 @@ def test_create_copy_with_context_does_not_copy_metrics(): def test_set_dimensions_overwrites_all_dimensions(): # arrange context = MetricsContext() - context.set_default_dimensions({fake.word(): fake.word}) - context.put_dimensions({fake.word(): fake.word}) + context.set_default_dimensions({fake.word(): fake.word()}) + context.put_dimensions({fake.word(): fake.word()}) - expected_dimensions = {fake.word(): fake.word} + expected_dimensions = [{fake.word(): fake.word()}] # act context.set_dimensions(expected_dimensions) @@ -264,3 +440,70 @@ def test_create_copy_with_context_does_not_repeat_dimensions(): # assert assert len(new_context.get_dimensions()) == 1 + + +def test_cannot_set_more_than_30_dimensions(): + context = MetricsContext() + dimensions_to_add = 32 + dimension_set = generate_dimension_set(dimensions_to_add) + + with pytest.raises(DimensionSetExceededError): + context.set_dimensions([dimension_set]) + + +def test_cannot_put_more_than_30_dimensions(): + context = MetricsContext() + dimensions_to_add = 32 + dimension_set = generate_dimension_set(dimensions_to_add) + + with pytest.raises(DimensionSetExceededError): + context.put_dimensions(dimension_set) + + +@pytest.mark.parametrize( + "timestamp", + [ + datetime.now(), + datetime.now() - timedelta(milliseconds=MAX_TIMESTAMP_PAST_AGE - 5000), + datetime.now() + timedelta(milliseconds=MAX_TIMESTAMP_FUTURE_AGE - 5000) + ] +) +def test_set_valid_timestamp_verify_timestamp(timestamp: datetime): + context = MetricsContext() + + context.set_timestamp(timestamp) + + assert context.meta[constants.TIMESTAMP] == utils.convert_to_milliseconds(timestamp) + + +@pytest.mark.parametrize( + "timestamp", + [ + None, + datetime.min, + datetime(1970, 1, 1, 0, 0, 0), + datetime.max, + datetime(9999, 12, 31, 23, 59, 59, 999999), + datetime(1, 1, 1, 0, 0, 0, 0, None), + datetime(1, 1, 1), + datetime(1, 1, 1, 0, 0), + datetime.now() - timedelta(milliseconds=MAX_TIMESTAMP_PAST_AGE + 1), + datetime.now() + timedelta(milliseconds=MAX_TIMESTAMP_FUTURE_AGE + 5000) + ] +) +def test_set_invalid_timestamp_raises_exception(timestamp: datetime): + context = MetricsContext() + + with pytest.raises(InvalidTimestampError): + context.set_timestamp(timestamp) + + +# Test utility method + + +def generate_dimension_set(dimensions_to_add): + dimension_set = {} + for i in range(0, dimensions_to_add): + dimension_set[f"{i}"] = fake.word() + + return dimension_set diff --git a/tests/logger/test_metrics_logger.py b/tests/logger/test_metrics_logger.py index d3fe900..08bd971 100644 --- a/tests/logger/test_metrics_logger.py +++ b/tests/logger/test_metrics_logger.py @@ -1,7 +1,11 @@ -from aws_embedded_metrics import config +from datetime import datetime +from aws_embedded_metrics import config, utils from aws_embedded_metrics.logger import metrics_logger from aws_embedded_metrics.sinks import Sink from aws_embedded_metrics.environment import Environment +from aws_embedded_metrics.exceptions import InvalidNamespaceError, InvalidMetricError +from aws_embedded_metrics.storage_resolution import StorageResolution +import aws_embedded_metrics.constants as constants import pytest from faker import Faker from asyncio import Future @@ -51,6 +55,49 @@ async def test_can_put_metric(mocker): assert context.metrics[expected_key].unit == "None" +@pytest.mark.asyncio +async def test_can_put_metric_with_different_storage_resolution_different_flush(mocker): + # arrange + expected_key = fake.word() + expected_value = fake.random.randrange(100) + + logger, sink, env = get_logger_and_sink(mocker) + + # act + logger.put_metric(expected_key, expected_value, None, StorageResolution.HIGH) + await logger.flush() + + # assert + context = sink.accept.call_args[0][0] + assert context.metrics[expected_key].values == [expected_value] + assert context.metrics[expected_key].unit == "None" + assert context.metrics[expected_key].storage_resolution == StorageResolution.HIGH + + expected_key = fake.word() + expected_value = fake.random.randrange(100) + logger.put_metric(expected_key, expected_value, None) + await logger.flush() + context = sink.accept.call_args[0][0] + assert context.metrics[expected_key].values == [expected_value] + assert context.metrics[expected_key].unit == "None" + assert context.metrics[expected_key].storage_resolution == StorageResolution.STANDARD + + +@pytest.mark.asyncio +async def test_cannot_put_metric_with_different_storage_resolution_same_flush(mocker): + # arrange + expected_key = fake.word() + expected_value = fake.random.randrange(100) + + logger, sink, env = get_logger_and_sink(mocker) + + # act + logger.put_metric(expected_key, expected_value, None, StorageResolution.HIGH) + with pytest.raises(InvalidMetricError): + logger.put_metric(expected_key, expected_value, None, StorageResolution.STANDARD) + await logger.flush() + + @pytest.mark.asyncio async def test_can_add_stack_trace(mocker): # arrange @@ -191,6 +238,50 @@ async def test_put_dimension(mocker): assert dimensions[0][expected_key] == expected_value +@pytest.mark.asyncio +async def test_reset_dimension_with_default_dimension(mocker): + # arrange + pair1 = ["key1", "val1"] + pair2 = ["key2", "val2"] + + logger, sink, env = get_logger_and_sink(mocker) + + # act + logger.put_dimensions({pair1[0]: pair1[1]}) + logger.reset_dimensions(True) + logger.put_dimensions({pair2[0]: pair2[1]}) + await logger.flush() + + # assert + context = get_flushed_context(sink) + dimensions = context.get_dimensions() + assert len(dimensions) == 1 + assert len(dimensions[0]) == 4 + assert dimensions[0][pair2[0]] == pair2[1] + + +@pytest.mark.asyncio +async def test_reset_dimension_without_default_dimension(mocker): + # arrange + pair1 = ["key1", "val1"] + pair2 = ["key2", "val2"] + + logger, sink, env = get_logger_and_sink(mocker) + + # act + logger.put_dimensions({pair1[0]: pair1[1]}) + logger.reset_dimensions(False) + logger.put_dimensions({pair2[0]: pair2[1]}) + await logger.flush() + + # assert + context = get_flushed_context(sink) + dimensions = context.get_dimensions() + assert len(dimensions) == 1 + assert len(dimensions[0]) == 1 + assert dimensions[0][pair2[0]] == pair2[1] + + @pytest.mark.asyncio async def test_logger_configures_default_dimensions_on_flush(before, mocker): # arrange @@ -267,6 +358,32 @@ async def test_set_dimensions_overrides_all_dimensions(mocker): assert dimensions[expected_key] == expected_value +@pytest.mark.asyncio +async def test_configure_set_dimensions_to_preserve_default_dimensions(mocker): + # arrange + logger, sink, env = get_logger_and_sink(mocker) + + # setup the typical default dimensions + env.get_log_group_name.return_value = fake.word() + env.get_name.return_value = fake.word() + env.get_type.return_value = fake.word() + + expected_key = fake.word() + expected_value = fake.word() + + # act + logger.set_dimensions({expected_key: expected_value}, use_default=True) + await logger.flush() + + # assert + context = get_flushed_context(sink) + dimension_sets = context.get_dimensions() + assert len(dimension_sets) == 1 + dimensions = dimension_sets[0] + assert len(dimensions) == 4 + assert dimensions[expected_key] == expected_value + + @pytest.mark.asyncio async def test_can_set_namespace(mocker): # arrange @@ -283,6 +400,14 @@ async def test_can_set_namespace(mocker): assert context.namespace == expected_value +@pytest.mark.parametrize("namespace", [None, "", " ", "a" * (constants.MAX_NAMESPACE_LENGTH + 1), "ŋàɱȅƨƥȁƈȅ", "namespace "]) +def test_set_invalid_namespace_throws_exception(namespace, mocker): + logger, sink, env = get_logger_and_sink(mocker) + + with pytest.raises(InvalidNamespaceError): + logger.set_namespace(namespace) + + @pytest.mark.asyncio async def test_context_is_preserved_across_flushes(mocker): # arrange @@ -316,6 +441,74 @@ async def test_context_is_preserved_across_flushes(mocker): assert context.metrics[metric_key].values == [1] +@pytest.mark.asyncio +async def test_flush_dont_preserve_dimensions_by_default(mocker): + # arrange + dimension_key = "Dim" + dimension_value = "Value" + + logger, sink, env = get_logger_and_sink(mocker) + + logger.set_dimensions({dimension_key: dimension_value}) + + # act + await logger.flush() + + context = sink.accept.call_args[0][0] + dimensions = context.get_dimensions() + assert len(dimensions) == 1 + assert dimensions[0][dimension_key] == dimension_value + + await logger.flush() + + context = sink.accept.call_args[0][0] + dimensions = context.get_dimensions() + assert len(dimensions) == 1 + assert dimension_key not in dimensions[0] + + +@pytest.mark.asyncio +async def test_configure_flush_to_preserve_dimensions(mocker): + # arrange + dimension_key = "Dim" + dimension_value = "Value" + + logger, sink, env = get_logger_and_sink(mocker) + + logger.set_dimensions({dimension_key: dimension_value}) + logger.flush_preserve_dimensions = True + + # act + await logger.flush() + + context = sink.accept.call_args[0][0] + dimensions = context.get_dimensions() + assert len(dimensions) == 1 + assert dimensions[0][dimension_key] == dimension_value + + await logger.flush() + + context = sink.accept.call_args[0][0] + dimensions = context.get_dimensions() + assert len(dimensions) == 1 + assert dimensions[0][dimension_key] == dimension_value + + +@pytest.mark.asyncio +async def test_can_set_timestamp(mocker): + # arrange + expected_value = datetime.now() + + logger, sink, env = get_logger_and_sink(mocker) + + # act + logger.set_timestamp(expected_value) + await logger.flush() + + # assert + context = get_flushed_context(sink) + assert context.meta[constants.TIMESTAMP] == utils.convert_to_milliseconds(expected_value) + # Test helper methods diff --git a/tests/metric_scope/test_metric_scope.py b/tests/metric_scope/test_metric_scope.py index 20bf131..9ebd1f1 100644 --- a/tests/metric_scope/test_metric_scope.py +++ b/tests/metric_scope/test_metric_scope.py @@ -168,6 +168,43 @@ def my_handler(metrics): actual_timestamp_second = int(round(logger.context.meta["Timestamp"] / 1000)) assert expected_timestamp_second == actual_timestamp_second + +def test_sync_scope_iterates_generator(mock_logger): + expected_results = [1, 2] + + @metric_scope + def my_handler(): + yield from expected_results + raise Exception("test exception") + + actual_results = [] + with pytest.raises(Exception, match="test exception"): + for result in my_handler(): + actual_results.append(result) + + assert actual_results == expected_results + assert InvocationTracker.invocations == 3 + + +@pytest.mark.asyncio +async def test_async_scope_iterates_async_generator(mock_logger): + expected_results = [1, 2] + + @metric_scope + async def my_handler(): + for item in expected_results: + yield item + await asyncio.sleep(1) + raise Exception("test exception") + + actual_results = [] + with pytest.raises(Exception, match="test exception"): + async for result in my_handler(): + actual_results.append(result) + + assert actual_results == expected_results + assert InvocationTracker.invocations == 3 + # Test helpers diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index 32795f7..ff5e77a 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -1,8 +1,12 @@ from aws_embedded_metrics.config import get_config +from aws_embedded_metrics.exceptions import DimensionSetExceededError from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers.log_serializer import LogSerializer +from aws_embedded_metrics.storage_resolution import StorageResolution +from collections import Counter from faker import Faker import json +import pytest fake = Faker() @@ -29,29 +33,58 @@ def test_serialize_dimensions(): assert_json_equality(result_json, expected) -def test_cannot_serialize_more_than_9_dimensions(): +def test_serialize_properties(): # arrange + expected_key = fake.word() + expected_value = fake.word() + + expected = {**get_empty_payload()} + expected[expected_key] = expected_value + + context = get_context() + context.set_property(expected_key, expected_value) + + # act + result_json = serializer.serialize(context)[0] + + # assert + assert_json_equality(result_json, expected) + + +def test_default_and_custom_dimensions_combined_limit_exceeded(): + # While serializing default dimensions are added to the custom dimension set, + # and the combined size of the dimension set should not be more than 30 dimensions = {} - dimension_pointers = [] - allowed_dimensions = 9 - dimensions_to_add = 15 + default_dimension_key = fake.word() + default_dimension_value = fake.word() + custom_dimensions_to_add = 30 - for i in range(0, dimensions_to_add): - print(i) - expected_key = f"{i}" - expected_value = fake.word() - dimensions[expected_key] = expected_value - dimension_pointers.append(expected_key) + for i in range(0, custom_dimensions_to_add): + dimensions[f"{i}"] = fake.word() - expected_dimensions_pointers = dimension_pointers[0:allowed_dimensions] + context = get_context() + context.set_default_dimensions({default_dimension_key: default_dimension_value}) + context.put_dimensions(dimensions) - expected = {**get_empty_payload(), **dimensions} - expected["_aws"]["CloudWatchMetrics"][0]["Dimensions"].append( - expected_dimensions_pointers + with pytest.raises(DimensionSetExceededError): + serializer.serialize(context) + + +def test_serialize_metrics(): + # arrange + expected_key = fake.word() + expected_value = fake.random.randrange(0, 100) + + expected_metric_definition = {"Name": expected_key, "Unit": "None"} + + expected = {**get_empty_payload()} + expected[expected_key] = expected_value + expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append( + expected_metric_definition ) context = get_context() - context.put_dimensions(dimensions) + context.put_metric(expected_key, expected_value) # act result_json = serializer.serialize(context)[0] @@ -60,16 +93,21 @@ def test_cannot_serialize_more_than_9_dimensions(): assert_json_equality(result_json, expected) -def test_serialize_properties(): +def test_serialize_metrics_with_standard_storage_resolution(): # arrange expected_key = fake.word() - expected_value = fake.word() + expected_value = fake.random.randrange(0, 100) + + expected_metric_definition = {"Name": expected_key, "Unit": "None"} expected = {**get_empty_payload()} expected[expected_key] = expected_value + expected["_aws"]["CloudWatchMetrics"][0]["Metrics"].append( + expected_metric_definition + ) context = get_context() - context.set_property(expected_key, expected_value) + context.put_metric(expected_key, expected_value, "None", StorageResolution.STANDARD) # act result_json = serializer.serialize(context)[0] @@ -78,12 +116,12 @@ def test_serialize_properties(): assert_json_equality(result_json, expected) -def test_serialize_metrics(): +def test_serialize_metrics_with_high_storage_resolution(): # arrange expected_key = fake.word() expected_value = fake.random.randrange(0, 100) - expected_metric_definition = {"Name": expected_key, "Unit": "None"} + expected_metric_definition = {"Name": expected_key, "Unit": "None", "StorageResolution": 1} expected = {**get_empty_payload()} expected[expected_key] = expected_value @@ -92,7 +130,7 @@ def test_serialize_metrics(): ) context = get_context() - context.put_metric(expected_key, expected_value) + context.put_metric(expected_key, expected_value, "None", StorageResolution.HIGH) # act result_json = serializer.serialize(context)[0] @@ -103,7 +141,7 @@ def test_serialize_metrics(): def test_serialize_more_than_100_metrics(): # arrange - expected_value = fake.word() + expected_value = fake.random.randrange(0, 100) expected_batches = 3 metrics = 295 @@ -130,6 +168,113 @@ def test_serialize_more_than_100_metrics(): metric_index += 1 +def test_serialize_more_than_100_datapoints(): + expected_batches = 3 + datapoints = 295 + metrics = 3 + + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + for i in range(datapoints): + context.put_metric(expected_key, i) + + # add one metric with more datapoints + expected_extra_batches = 2 + extra_datapoints = 433 + for i in range(extra_datapoints): + context.put_metric(f"Metric-{metrics}", i) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == expected_batches + expected_extra_batches + + for batch_index in range(expected_batches): + result_json = results[batch_index] + result_obj = json.loads(result_json) + for index in range(metrics): + metric_name = f"Metric-{index}" + expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 + assert len(result_obj[metric_name]) == expected_datapoint_count + + # extra metric with more datapoints + for batch_index in range(expected_batches): + result_json = results[batch_index] + result_obj = json.loads(result_json) + metric_name = f"Metric-{metrics}" + expected_datapoint_count = extra_datapoints % 100 if (batch_index == expected_batches + expected_extra_batches - 1) else 100 + assert len(result_obj[metric_name]) == expected_datapoint_count + + +def test_serialize_with_more_than_100_metrics_and_datapoints(): + expected_batches = 11 + datapoints = 295 + metrics = 295 + + expected_results = {} + metric_results = {} + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + expected_results[expected_key] = [] + metric_results[expected_key] = [] + + for i in range(datapoints): + context.put_metric(expected_key, i) + expected_results[expected_key].append(i) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == expected_batches + + datapoints_count = Counter() + for batch in results: + result = json.loads(batch) + datapoints_count.update({ + metric: len(result[metric]) + for metric in result if metric != "_aws" + }) + for metric in result: + if metric != "_aws": + metric_results[metric] += result[metric] + + for count in datapoints_count.values(): + assert count == datapoints + assert len(datapoints_count) == metrics + assert metric_results == expected_results + + +def test_serialize_no_duplication_bug(): + """ + A bug existed where metrics with lots of values have to be broken up + but single value metrics got duplicated across each section. + This test verifies the fix to ensure no duplication. + """ + context = get_context() + single_expected_result = 1 + single_found_result = 0 + + # create a metric with a single value + single_key = "Metric-single" + context.put_metric(single_key, single_expected_result) + # add a lot of another metric so the log batches must be broken up + for i in range(1000): + context.put_metric("Metric-many", 0) + + results = serializer.serialize(context) + + # count up all values for the single metric to ensure no duplicates + for batch in results: + for metric_key, value in json.loads(batch).items(): + if metric_key == single_key: + single_found_result += value + assert single_expected_result == single_found_result + + def test_serialize_with_multiple_metrics(): # arrange metrics = 2 @@ -138,7 +283,7 @@ def test_serialize_with_multiple_metrics(): for index in range(metrics): expected_key = f"Metric-{index}" - expected_value = fake.word() + expected_value = fake.random.randrange(0, 100) context.put_metric(expected_key, expected_value) expected_metric_definition = {"Name": expected_key, "Unit": "None"} @@ -158,7 +303,7 @@ def test_serialize_with_multiple_metrics(): def test_serialize_metrics_with_multiple_datapoints(): # arrange expected_key = fake.word() - expected_values = [fake.word(), fake.word()] + expected_values = [fake.random.randrange(0, 100), fake.random.randrange(0, 100)] expected_metric_definition = {"Name": expected_key, "Unit": "None"} expected = {**get_empty_payload()} expected[expected_key] = expected_values @@ -219,8 +364,8 @@ def get_empty_payload(): } -def assert_json_equality(actualJSON, expectedObj): - actualObj = json.loads(actualJSON) - print("Expected: ", expectedObj) - print("Actual: ", actualObj) - assert actualObj == expectedObj +def assert_json_equality(actual_json, expected_obj): + actual_obj = json.loads(actual_json) + print("Expected: ", expected_obj) + print("Actual: ", actual_obj) + assert actual_obj == expected_obj diff --git a/tests/sinks/test_lambda_sink.py b/tests/sinks/test_stdout_sink.py similarity index 78% rename from tests/sinks/test_lambda_sink.py rename to tests/sinks/test_stdout_sink.py index 79aeb84..b0be62a 100644 --- a/tests/sinks/test_lambda_sink.py +++ b/tests/sinks/test_stdout_sink.py @@ -1,7 +1,7 @@ from importlib import reload from aws_embedded_metrics import config -from aws_embedded_metrics.sinks.lambda_sink import LambdaSink +from aws_embedded_metrics.sinks.stdout_sink import StdoutSink from aws_embedded_metrics.logger.metrics_context import MetricsContext from faker import Faker from unittest.mock import patch @@ -14,9 +14,10 @@ def test_accept_writes_to_stdout(capfd): # arrange reload(config) - sink = LambdaSink() + sink = StdoutSink() context = MetricsContext.empty() context.meta["Timestamp"] = 1 + context.put_metric("Dummy", 1) # act sink.accept(context) @@ -25,7 +26,8 @@ def test_accept_writes_to_stdout(capfd): out, err = capfd.readouterr() assert ( out - == '{"_aws": {"Timestamp": 1, "CloudWatchMetrics": [{"Dimensions": [], "Metrics": [], "Namespace": "aws-embedded-metrics"}]}}\n' + == '{"_aws": {"Timestamp": 1, "CloudWatchMetrics": [{"Dimensions": [], "Metrics": [{"Name": "Dummy", "Unit": "None"}], ' + '"Namespace": "aws-embedded-metrics"}]}, "Dummy": 1}\n' ) @@ -34,7 +36,7 @@ def test_accept_writes_multiple_messages_to_stdout(mock_serializer, capfd): # arrange expected_messages = [fake.word() for _ in range(10)] mock_serializer.serialize.return_value = expected_messages - sink = LambdaSink(serializer=mock_serializer) + sink = StdoutSink(serializer=mock_serializer) context = MetricsContext.empty() context.meta["Timestamp"] = 1 diff --git a/tox.ini b/tox.ini index 1a33dfa..a8a60fc 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,7 @@ deps = Faker aiohttp aresponses -commands = pytest --ignore=tests/integ +commands = pytest --ignore=tests/integ {posargs} [testenv:integ] deps = @@ -23,6 +23,7 @@ passenv = AWS_REGION AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY + AWS_SESSION_TOKEN [testenv:flake8] basepython = python3.7 @@ -38,4 +39,4 @@ commands = [flake8] max-line-length = 150 -exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv \ No newline at end of file +exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv