|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import datetime |
| 4 | +import logging |
| 5 | +import threading |
| 6 | +from abc import ABC, abstractmethod |
| 7 | +from collections import defaultdict |
| 8 | +from typing import Dict, List, Optional, Tuple, Union, overload |
| 9 | + |
| 10 | +from localstack import config |
| 11 | +from localstack.runtime import hooks |
| 12 | +from localstack.utils.analytics import get_session_id |
| 13 | +from localstack.utils.analytics.events import Event, EventMetadata |
| 14 | +from localstack.utils.analytics.publisher import AnalyticsClientPublisher |
| 15 | + |
| 16 | +LOG = logging.getLogger(__name__) |
| 17 | + |
| 18 | + |
| 19 | +class MetricRegistry: |
| 20 | + """ |
| 21 | + A Singleton class responsible for managing all registered metrics. |
| 22 | + Provides methods for retrieving and collecting metrics. |
| 23 | + """ |
| 24 | + |
| 25 | + _instance = None |
| 26 | + _mutex: threading.Lock = threading.Lock() |
| 27 | + |
| 28 | + def __new__(cls): |
| 29 | + # avoid locking if the instance already exist |
| 30 | + if cls._instance is None: |
| 31 | + with cls._mutex: |
| 32 | + # Prevents race conditions when multiple threads enter the first check simultaneously |
| 33 | + if cls._instance is None: |
| 34 | + cls._instance = super().__new__(cls) |
| 35 | + return cls._instance |
| 36 | + |
| 37 | + def __init__(self): |
| 38 | + if not hasattr(self, "_registry"): |
| 39 | + self._registry = dict() |
| 40 | + |
| 41 | + @property |
| 42 | + def registry(self) -> Dict[str, "Metric"]: |
| 43 | + return self._registry |
| 44 | + |
| 45 | + def register(self, metric: Metric) -> None: |
| 46 | + """ |
| 47 | + Registers a new metric. |
| 48 | +
|
| 49 | + :param metric: The metric instance to register. |
| 50 | + :type metric: Metric |
| 51 | + :raises TypeError: If the provided metric is not an instance of `Metric`. |
| 52 | + :raises ValueError: If a metric with the same name already exists. |
| 53 | + """ |
| 54 | + if not isinstance(metric, Metric): |
| 55 | + raise TypeError("Only subclasses of `Metric` can be registered.") |
| 56 | + |
| 57 | + if metric.name in self._registry: |
| 58 | + raise ValueError(f"Metric '{metric.name}' already exists.") |
| 59 | + |
| 60 | + self._registry[metric.name] = metric |
| 61 | + |
| 62 | + def collect(self) -> Dict[str, List[Dict[str, Union[str, int]]]]: |
| 63 | + """ |
| 64 | + Collects all registered metrics. |
| 65 | + """ |
| 66 | + return { |
| 67 | + "metrics": [ |
| 68 | + metric |
| 69 | + for metric_instance in self._registry.values() |
| 70 | + for metric in metric_instance.collect() |
| 71 | + ] |
| 72 | + } |
| 73 | + |
| 74 | + |
| 75 | +class Metric(ABC): |
| 76 | + """ |
| 77 | + Base class for all metrics (e.g., Counter, Gauge). |
| 78 | +
|
| 79 | + Each subclass must implement the `collect()` method. |
| 80 | + """ |
| 81 | + |
| 82 | + _name: str |
| 83 | + |
| 84 | + @property |
| 85 | + def name(self) -> str: |
| 86 | + """ |
| 87 | + Retrieves the fully qualified metric name. |
| 88 | + """ |
| 89 | + return self._name |
| 90 | + |
| 91 | + @name.setter |
| 92 | + def name(self, value: str) -> None: |
| 93 | + """ |
| 94 | + Validates and sets the full metric name. |
| 95 | +
|
| 96 | + :raises ValueError: If the name is empty or invalid. |
| 97 | + """ |
| 98 | + if not value or value.strip() == "": |
| 99 | + raise ValueError("Metric must have a valid name.") |
| 100 | + self._name = value |
| 101 | + |
| 102 | + @abstractmethod |
| 103 | + def collect(self) -> List[Dict[str, Union[str, int]]]: |
| 104 | + """ |
| 105 | + Collects and returns metric data. Subclasses must implement this to return collected metric data. |
| 106 | + """ |
| 107 | + pass |
| 108 | + |
| 109 | + |
| 110 | +class _SimpleCounter(Metric): |
| 111 | + """ |
| 112 | + A thread-safe counter for tracking occurrences of an event without labels. |
| 113 | + """ |
| 114 | + |
| 115 | + _mutex: threading.Lock |
| 116 | + _namespace: Optional[str] |
| 117 | + _name: str |
| 118 | + _type: str |
| 119 | + _count: int |
| 120 | + |
| 121 | + @property |
| 122 | + def mutex(self) -> threading.Lock: |
| 123 | + """ |
| 124 | + Provides thread-safe access to the internal lock. |
| 125 | + """ |
| 126 | + return self._mutex |
| 127 | + |
| 128 | + def __init__(self, name: str, namespace: Optional[str] = ""): |
| 129 | + if not name: |
| 130 | + raise ValueError("Name is required and cannot be empty.") |
| 131 | + |
| 132 | + self._mutex = threading.Lock() |
| 133 | + self._name = name.strip() |
| 134 | + self._namespace = namespace.strip() if namespace else "" |
| 135 | + self._type = "counter" |
| 136 | + self._count = 0 |
| 137 | + MetricRegistry().register(self) |
| 138 | + |
| 139 | + def increment(self, value: int = 1) -> None: |
| 140 | + """Increments the counter unless events are disabled.""" |
| 141 | + if config.DISABLE_EVENTS: |
| 142 | + return |
| 143 | + |
| 144 | + if value <= 0: |
| 145 | + raise ValueError("Increment value must be positive.") |
| 146 | + |
| 147 | + with self._mutex: |
| 148 | + self._count += value |
| 149 | + |
| 150 | + def reset(self) -> None: |
| 151 | + """Resets the counter to zero unless events are disabled.""" |
| 152 | + if config.DISABLE_EVENTS: |
| 153 | + return |
| 154 | + |
| 155 | + with self._mutex: |
| 156 | + self._count = 0 |
| 157 | + |
| 158 | + def collect(self) -> List[Dict[str, Union[str, int]]]: |
| 159 | + """Collects the metric unless events are disabled.""" |
| 160 | + if config.DISABLE_EVENTS: |
| 161 | + return list() |
| 162 | + |
| 163 | + with self._mutex: |
| 164 | + if self._count == 0: |
| 165 | + # Return an empty list if the count is 0, as there are no metrics to send to the analytics backend. |
| 166 | + return list() |
| 167 | + return [ |
| 168 | + { |
| 169 | + "namespace": self._namespace, |
| 170 | + "name": self._name, |
| 171 | + "value": self._count, |
| 172 | + "type": self._type, |
| 173 | + } |
| 174 | + ] |
| 175 | + |
| 176 | + |
| 177 | +class _LabeledCounter(Metric): |
| 178 | + """ |
| 179 | + A labeled counter that tracks occurrences of an event across different label combinations. |
| 180 | + """ |
| 181 | + |
| 182 | + _mutex: threading.Lock |
| 183 | + _namespace: Optional[str] |
| 184 | + _name: str |
| 185 | + _type: str |
| 186 | + _unit: str |
| 187 | + _labels: List[str] |
| 188 | + _label_values: tuple[Optional[str], ...] |
| 189 | + _count_by_labels: defaultdict[Tuple[str, ...], int] |
| 190 | + |
| 191 | + def __init__(self, name: str, labels: List[str] = list, namespace: Optional[str] = ""): |
| 192 | + if not name: |
| 193 | + raise ValueError("Name is required and cannot be empty.") |
| 194 | + |
| 195 | + if any(not label for label in labels): |
| 196 | + raise ValueError("Labels must be non-empty strings.") |
| 197 | + |
| 198 | + if len(labels) > 8: |
| 199 | + raise ValueError("A maximum of 8 labels are allowed.") |
| 200 | + |
| 201 | + self._mutex = threading.Lock() |
| 202 | + self._name = name.strip() |
| 203 | + self._namespace = namespace.strip() if namespace else "" |
| 204 | + self._type = "counter" |
| 205 | + self._labels = labels |
| 206 | + self._label_values = tuple() |
| 207 | + self._count_by_labels = defaultdict(int) |
| 208 | + MetricRegistry().register(self) |
| 209 | + |
| 210 | + @property |
| 211 | + def mutex(self) -> threading.Lock: |
| 212 | + """ |
| 213 | + Provides thread-safe access to the internal lock. |
| 214 | + """ |
| 215 | + return self._mutex |
| 216 | + |
| 217 | + @property |
| 218 | + def count_by_labels(self) -> defaultdict[Tuple[str, ...], int]: |
| 219 | + return self._count_by_labels |
| 220 | + |
| 221 | + def labels(self, **kwargs: str) -> _LabeledCounterProxy: |
| 222 | + """ |
| 223 | + Create a scoped counter instance with specific label values. |
| 224 | +
|
| 225 | + This method assigns values to the predefined labels of a labeled counter and returns |
| 226 | + a proxy object (`_LabeledCounterProxy`) that allows tracking metrics for that specific |
| 227 | + combination of label values. |
| 228 | +
|
| 229 | + The proxy ensures that increments and resets are scoped to the given label values, |
| 230 | + enforcing proper metric categorization. |
| 231 | +
|
| 232 | + :raises ValueError: |
| 233 | + - If the number of provided labels does not match the expected count. |
| 234 | + - If any of the provided labels are empty strings. |
| 235 | + """ |
| 236 | + self._label_values = tuple(label_value for label_value in kwargs.values()) |
| 237 | + |
| 238 | + if len(kwargs) != len(self._label_values): |
| 239 | + raise ValueError(f"Expected labels {self._label_values}, got {list(kwargs.values())}") |
| 240 | + |
| 241 | + if any(not label for label in self._label_values): |
| 242 | + raise ValueError("Label values must be non-empty strings.") |
| 243 | + |
| 244 | + return _LabeledCounterProxy(counter=self, label_values=self._label_values) |
| 245 | + |
| 246 | + def _as_list(self) -> List[Dict[str, Union[str, int]]]: |
| 247 | + num_labels = len(self._labels) |
| 248 | + |
| 249 | + static_key_label_value = [f"label_{i + 1}_value" for i in range(num_labels)] |
| 250 | + static_key_label = [f"label_{i + 1}" for i in range(num_labels)] |
| 251 | + |
| 252 | + collected_metrics = [] |
| 253 | + |
| 254 | + for label_values, count in self._count_by_labels.items(): |
| 255 | + if count == 0: |
| 256 | + continue # Skip items with a count of 0, as they should not be sent to the analytics backend. |
| 257 | + |
| 258 | + if len(label_values) != num_labels: |
| 259 | + raise ValueError( |
| 260 | + f"Label count mismatch: expected {num_labels} labels {self._labels}, " |
| 261 | + f"but got {len(label_values)} values {label_values}." |
| 262 | + ) |
| 263 | + |
| 264 | + collected_metrics.append( |
| 265 | + { |
| 266 | + "namespace": self._namespace, |
| 267 | + "name": self._name, |
| 268 | + "value": count, |
| 269 | + "type": self._type, |
| 270 | + **dict(zip(static_key_label_value, label_values)), |
| 271 | + **dict(zip(static_key_label, self._labels)), |
| 272 | + } |
| 273 | + ) |
| 274 | + |
| 275 | + return collected_metrics |
| 276 | + |
| 277 | + def collect(self) -> List[Dict[str, Union[str, int]]]: |
| 278 | + if config.DISABLE_EVENTS: |
| 279 | + return list() |
| 280 | + |
| 281 | + with self._mutex: |
| 282 | + return self._as_list() |
| 283 | + |
| 284 | + |
| 285 | +class _LabeledCounterProxy: |
| 286 | + """A proxy for a labeled counter, enforcing scoped label values.""" |
| 287 | + |
| 288 | + def __init__(self, counter: _LabeledCounter, label_values: Tuple[str, ...]): |
| 289 | + self._counter = counter |
| 290 | + self._label_values = label_values |
| 291 | + |
| 292 | + def increment(self, value: int = 1) -> None: |
| 293 | + """Increments the counter for the assigned labels unless events are disabled.""" |
| 294 | + if config.DISABLE_EVENTS: |
| 295 | + return |
| 296 | + |
| 297 | + if value <= 0: |
| 298 | + raise ValueError("Increment value must be positive.") |
| 299 | + |
| 300 | + with self._counter.mutex: |
| 301 | + self._counter.count_by_labels[self._label_values] += value |
| 302 | + |
| 303 | + def reset(self) -> None: |
| 304 | + """Resets the counter to zero for the assigned labels unless events are disabled.""" |
| 305 | + if config.DISABLE_EVENTS: |
| 306 | + return |
| 307 | + |
| 308 | + with self._counter.mutex: |
| 309 | + self._counter.count_by_labels[self._label_values] = 0 |
| 310 | + |
| 311 | + |
| 312 | +class Counter: |
| 313 | + """ |
| 314 | + A factory class for creating counter instances. |
| 315 | +
|
| 316 | + This class provides a flexible way to create either a simple counter |
| 317 | + (`_SimpleCounter`) or a labeled counter (`_LabeledCounter`) based on |
| 318 | + whether labels are provided. |
| 319 | + """ |
| 320 | + |
| 321 | + @overload |
| 322 | + def __new__(cls, name: str, namespace: Optional[str] = "") -> _SimpleCounter: |
| 323 | + return _SimpleCounter(namespace=namespace, name=name) |
| 324 | + |
| 325 | + @overload |
| 326 | + def __new__( |
| 327 | + cls, name: str, labels: List[str], namespace: Optional[str] = "" |
| 328 | + ) -> _LabeledCounter: |
| 329 | + return _LabeledCounter(namespace=namespace, name=name, labels=labels) |
| 330 | + |
| 331 | + def __new__( |
| 332 | + cls, name: str, namespace: Optional[str] = "", labels: Optional[List[str]] = None |
| 333 | + ) -> Union[_SimpleCounter, _LabeledCounter]: |
| 334 | + if labels: |
| 335 | + return _LabeledCounter(namespace=namespace, name=name, labels=labels) |
| 336 | + return _SimpleCounter(namespace=namespace, name=name) |
| 337 | + |
| 338 | + |
| 339 | +@hooks.on_infra_shutdown() |
| 340 | +def publish_metrics() -> None: |
| 341 | + """ |
| 342 | + Collects all the registered metrics and immediately sends them to the analytics service. |
| 343 | + Skips execution if event tracking is disabled (`config.DISABLE_EVENTS`). |
| 344 | +
|
| 345 | + This function is automatically triggered on infrastructure shutdown. |
| 346 | + """ |
| 347 | + if config.DISABLE_EVENTS: |
| 348 | + return |
| 349 | + |
| 350 | + collected_metrics = MetricRegistry().collect() |
| 351 | + if not collected_metrics["metrics"]: # Skip publishing if no metrics remain after filtering |
| 352 | + return |
| 353 | + |
| 354 | + metadata = EventMetadata( |
| 355 | + session_id=get_session_id(), |
| 356 | + client_time=str(datetime.datetime.now()), |
| 357 | + ) |
| 358 | + |
| 359 | + if collected_metrics: |
| 360 | + publisher = AnalyticsClientPublisher() |
| 361 | + publisher.publish([Event(name="ls_metrics", metadata=metadata, payload=collected_metrics)]) |
0 commit comments