Skip to content

Commit b7f8425

Browse files
vittoriopolverinoalexrashed
authored andcommitted
feat(metrics): add structured event tracking
1 parent 8b5cedd commit b7f8425

File tree

4 files changed

+525
-1
lines changed

4 files changed

+525
-1
lines changed
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
from __future__ import annotations
2+
3+
import datetime
4+
import logging
5+
import threading
6+
from abc import ABC, abstractmethod
7+
from collections import defaultdict
8+
from typing import Dict, List, Optional, Tuple, Union, overload
9+
10+
from localstack import config
11+
from localstack.runtime import hooks
12+
from localstack.utils.analytics import get_session_id
13+
from localstack.utils.analytics.events import Event, EventMetadata
14+
from localstack.utils.analytics.publisher import AnalyticsClientPublisher
15+
16+
LOG = logging.getLogger(__name__)
17+
18+
19+
class MetricRegistry:
20+
"""
21+
A Singleton class responsible for managing all registered metrics.
22+
Provides methods for retrieving and collecting metrics.
23+
"""
24+
25+
_instance = None
26+
_mutex: threading.Lock = threading.Lock()
27+
28+
def __new__(cls):
29+
# avoid locking if the instance already exist
30+
if cls._instance is None:
31+
with cls._mutex:
32+
# Prevents race conditions when multiple threads enter the first check simultaneously
33+
if cls._instance is None:
34+
cls._instance = super().__new__(cls)
35+
return cls._instance
36+
37+
def __init__(self):
38+
if not hasattr(self, "_registry"):
39+
self._registry = dict()
40+
41+
@property
42+
def registry(self) -> Dict[str, "Metric"]:
43+
return self._registry
44+
45+
def register(self, metric: Metric) -> None:
46+
"""
47+
Registers a new metric.
48+
49+
:param metric: The metric instance to register.
50+
:type metric: Metric
51+
:raises TypeError: If the provided metric is not an instance of `Metric`.
52+
:raises ValueError: If a metric with the same name already exists.
53+
"""
54+
if not isinstance(metric, Metric):
55+
raise TypeError("Only subclasses of `Metric` can be registered.")
56+
57+
if metric.name in self._registry:
58+
raise ValueError(f"Metric '{metric.name}' already exists.")
59+
60+
self._registry[metric.name] = metric
61+
62+
def collect(self) -> Dict[str, List[Dict[str, Union[str, int]]]]:
63+
"""
64+
Collects all registered metrics.
65+
"""
66+
return {
67+
"metrics": [
68+
metric
69+
for metric_instance in self._registry.values()
70+
for metric in metric_instance.collect()
71+
]
72+
}
73+
74+
75+
class Metric(ABC):
76+
"""
77+
Base class for all metrics (e.g., Counter, Gauge).
78+
79+
Each subclass must implement the `collect()` method.
80+
"""
81+
82+
_name: str
83+
84+
@property
85+
def name(self) -> str:
86+
"""
87+
Retrieves the fully qualified metric name.
88+
"""
89+
return self._name
90+
91+
@name.setter
92+
def name(self, value: str) -> None:
93+
"""
94+
Validates and sets the full metric name.
95+
96+
:raises ValueError: If the name is empty or invalid.
97+
"""
98+
if not value or value.strip() == "":
99+
raise ValueError("Metric must have a valid name.")
100+
self._name = value
101+
102+
@abstractmethod
103+
def collect(self) -> List[Dict[str, Union[str, int]]]:
104+
"""
105+
Collects and returns metric data. Subclasses must implement this to return collected metric data.
106+
"""
107+
pass
108+
109+
110+
class _SimpleCounter(Metric):
111+
"""
112+
A thread-safe counter for tracking occurrences of an event without labels.
113+
"""
114+
115+
_mutex: threading.Lock
116+
_namespace: Optional[str]
117+
_name: str
118+
_type: str
119+
_count: int
120+
121+
@property
122+
def mutex(self) -> threading.Lock:
123+
"""
124+
Provides thread-safe access to the internal lock.
125+
"""
126+
return self._mutex
127+
128+
def __init__(self, name: str, namespace: Optional[str] = ""):
129+
if not name:
130+
raise ValueError("Name is required and cannot be empty.")
131+
132+
self._mutex = threading.Lock()
133+
self._name = name.strip()
134+
self._namespace = namespace.strip() if namespace else ""
135+
self._type = "counter"
136+
self._count = 0
137+
MetricRegistry().register(self)
138+
139+
def increment(self, value: int = 1) -> None:
140+
"""Increments the counter unless events are disabled."""
141+
if config.DISABLE_EVENTS:
142+
return
143+
144+
if value <= 0:
145+
raise ValueError("Increment value must be positive.")
146+
147+
with self._mutex:
148+
self._count += value
149+
150+
def reset(self) -> None:
151+
"""Resets the counter to zero unless events are disabled."""
152+
if config.DISABLE_EVENTS:
153+
return
154+
155+
with self._mutex:
156+
self._count = 0
157+
158+
def collect(self) -> List[Dict[str, Union[str, int]]]:
159+
"""Collects the metric unless events are disabled."""
160+
if config.DISABLE_EVENTS:
161+
return list()
162+
163+
with self._mutex:
164+
if self._count == 0:
165+
# Return an empty list if the count is 0, as there are no metrics to send to the analytics backend.
166+
return list()
167+
return [
168+
{
169+
"namespace": self._namespace,
170+
"name": self._name,
171+
"value": self._count,
172+
"type": self._type,
173+
}
174+
]
175+
176+
177+
class _LabeledCounter(Metric):
178+
"""
179+
A labeled counter that tracks occurrences of an event across different label combinations.
180+
"""
181+
182+
_mutex: threading.Lock
183+
_namespace: Optional[str]
184+
_name: str
185+
_type: str
186+
_unit: str
187+
_labels: List[str]
188+
_label_values: tuple[Optional[str], ...]
189+
_count_by_labels: defaultdict[Tuple[str, ...], int]
190+
191+
def __init__(self, name: str, labels: List[str] = list, namespace: Optional[str] = ""):
192+
if not name:
193+
raise ValueError("Name is required and cannot be empty.")
194+
195+
if any(not label for label in labels):
196+
raise ValueError("Labels must be non-empty strings.")
197+
198+
if len(labels) > 8:
199+
raise ValueError("A maximum of 8 labels are allowed.")
200+
201+
self._mutex = threading.Lock()
202+
self._name = name.strip()
203+
self._namespace = namespace.strip() if namespace else ""
204+
self._type = "counter"
205+
self._labels = labels
206+
self._label_values = tuple()
207+
self._count_by_labels = defaultdict(int)
208+
MetricRegistry().register(self)
209+
210+
@property
211+
def mutex(self) -> threading.Lock:
212+
"""
213+
Provides thread-safe access to the internal lock.
214+
"""
215+
return self._mutex
216+
217+
@property
218+
def count_by_labels(self) -> defaultdict[Tuple[str, ...], int]:
219+
return self._count_by_labels
220+
221+
def labels(self, **kwargs: str) -> _LabeledCounterProxy:
222+
"""
223+
Create a scoped counter instance with specific label values.
224+
225+
This method assigns values to the predefined labels of a labeled counter and returns
226+
a proxy object (`_LabeledCounterProxy`) that allows tracking metrics for that specific
227+
combination of label values.
228+
229+
The proxy ensures that increments and resets are scoped to the given label values,
230+
enforcing proper metric categorization.
231+
232+
:raises ValueError:
233+
- If the number of provided labels does not match the expected count.
234+
- If any of the provided labels are empty strings.
235+
"""
236+
self._label_values = tuple(label_value for label_value in kwargs.values())
237+
238+
if len(kwargs) != len(self._label_values):
239+
raise ValueError(f"Expected labels {self._label_values}, got {list(kwargs.values())}")
240+
241+
if any(not label for label in self._label_values):
242+
raise ValueError("Label values must be non-empty strings.")
243+
244+
return _LabeledCounterProxy(counter=self, label_values=self._label_values)
245+
246+
def _as_list(self) -> List[Dict[str, Union[str, int]]]:
247+
num_labels = len(self._labels)
248+
249+
static_key_label_value = [f"label_{i + 1}_value" for i in range(num_labels)]
250+
static_key_label = [f"label_{i + 1}" for i in range(num_labels)]
251+
252+
collected_metrics = []
253+
254+
for label_values, count in self._count_by_labels.items():
255+
if count == 0:
256+
continue # Skip items with a count of 0, as they should not be sent to the analytics backend.
257+
258+
if len(label_values) != num_labels:
259+
raise ValueError(
260+
f"Label count mismatch: expected {num_labels} labels {self._labels}, "
261+
f"but got {len(label_values)} values {label_values}."
262+
)
263+
264+
collected_metrics.append(
265+
{
266+
"namespace": self._namespace,
267+
"name": self._name,
268+
"value": count,
269+
"type": self._type,
270+
**dict(zip(static_key_label_value, label_values)),
271+
**dict(zip(static_key_label, self._labels)),
272+
}
273+
)
274+
275+
return collected_metrics
276+
277+
def collect(self) -> List[Dict[str, Union[str, int]]]:
278+
if config.DISABLE_EVENTS:
279+
return list()
280+
281+
with self._mutex:
282+
return self._as_list()
283+
284+
285+
class _LabeledCounterProxy:
286+
"""A proxy for a labeled counter, enforcing scoped label values."""
287+
288+
def __init__(self, counter: _LabeledCounter, label_values: Tuple[str, ...]):
289+
self._counter = counter
290+
self._label_values = label_values
291+
292+
def increment(self, value: int = 1) -> None:
293+
"""Increments the counter for the assigned labels unless events are disabled."""
294+
if config.DISABLE_EVENTS:
295+
return
296+
297+
if value <= 0:
298+
raise ValueError("Increment value must be positive.")
299+
300+
with self._counter.mutex:
301+
self._counter.count_by_labels[self._label_values] += value
302+
303+
def reset(self) -> None:
304+
"""Resets the counter to zero for the assigned labels unless events are disabled."""
305+
if config.DISABLE_EVENTS:
306+
return
307+
308+
with self._counter.mutex:
309+
self._counter.count_by_labels[self._label_values] = 0
310+
311+
312+
class Counter:
313+
"""
314+
A factory class for creating counter instances.
315+
316+
This class provides a flexible way to create either a simple counter
317+
(`_SimpleCounter`) or a labeled counter (`_LabeledCounter`) based on
318+
whether labels are provided.
319+
"""
320+
321+
@overload
322+
def __new__(cls, name: str, namespace: Optional[str] = "") -> _SimpleCounter:
323+
return _SimpleCounter(namespace=namespace, name=name)
324+
325+
@overload
326+
def __new__(
327+
cls, name: str, labels: List[str], namespace: Optional[str] = ""
328+
) -> _LabeledCounter:
329+
return _LabeledCounter(namespace=namespace, name=name, labels=labels)
330+
331+
def __new__(
332+
cls, name: str, namespace: Optional[str] = "", labels: Optional[List[str]] = None
333+
) -> Union[_SimpleCounter, _LabeledCounter]:
334+
if labels:
335+
return _LabeledCounter(namespace=namespace, name=name, labels=labels)
336+
return _SimpleCounter(namespace=namespace, name=name)
337+
338+
339+
@hooks.on_infra_shutdown()
340+
def publish_metrics() -> None:
341+
"""
342+
Collects all the registered metrics and immediately sends them to the analytics service.
343+
Skips execution if event tracking is disabled (`config.DISABLE_EVENTS`).
344+
345+
This function is automatically triggered on infrastructure shutdown.
346+
"""
347+
if config.DISABLE_EVENTS:
348+
return
349+
350+
collected_metrics = MetricRegistry().collect()
351+
if not collected_metrics["metrics"]: # Skip publishing if no metrics remain after filtering
352+
return
353+
354+
metadata = EventMetadata(
355+
session_id=get_session_id(),
356+
client_time=str(datetime.datetime.now()),
357+
)
358+
359+
if collected_metrics:
360+
publisher = AnalyticsClientPublisher()
361+
publisher.publish([Event(name="ls_metrics", metadata=metadata, payload=collected_metrics)])

localstack-core/localstack/utils/analytics/usage.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
"""
2+
[DEPRECATED] This module is deprecated in favor of `localstack.utils.analytics.metrics`.
3+
"""
4+
15
import datetime
26
import math
37
from collections import defaultdict
@@ -20,6 +24,7 @@
2024

2125
class UsageSetCounter:
2226
"""
27+
[DEPRECATED] Use `localstack.utils.analytics.metrics.Counter` instead.
2328
Use this counter to count occurrences of unique values
2429
2530
Example:
@@ -51,6 +56,7 @@ def aggregate(self) -> dict:
5156

5257
class UsageCounter:
5358
"""
59+
[DEPRECATED] Use `localstack.utils.analytics.metrics.Counter` instead.
5460
Use this counter to count numeric values
5561
5662
Example:

0 commit comments

Comments
 (0)