Skip to content

Commit 7851b99

Browse files
committed
Simplify server approach; Add inflight events gauge; and address comments
1 parent d294d03 commit 7851b99

File tree

10 files changed

+45
-90
lines changed

10 files changed

+45
-90
lines changed

prometheus/prometheus/expose.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from localstack.extensions.api import http
2+
from prometheus_client.exposition import choose_encoder
3+
4+
5+
def retrieve_metrics(request: http.Request):
6+
"""Expose the Prometheus metrics"""
7+
_generate_latest_metrics, content_type = choose_encoder(request.headers.get("Content-Type", ""))
8+
data = _generate_latest_metrics()
9+
return http.Response(response=data, status=200, mimetype=content_type)

prometheus/prometheus/extension.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,28 @@
77
)
88
from localstack.extensions.api import Extension, http
99

10+
from prometheus.expose import retrieve_metrics
1011
from prometheus.handler import RequestMetricsHandler, ResponseMetricsHandler
1112
from prometheus.instruments.patch import apply_poller_tracking_patches
12-
from prometheus.server import PrometheusServer
1313

1414
LOG = logging.getLogger(__name__)
1515

1616

1717
class PrometheusMetricsExtension(Extension):
1818
name = "prometheus"
19-
prometheus_metrics_server: PrometheusServer
20-
21-
def __init__(self, host="localhost", port=None):
22-
self.prometheus_metrics_server = PrometheusServer(port, host)
2319

2420
def on_extension_load(self):
2521
apply_poller_tracking_patches()
2622
LOG.debug("PrometheusMetricsExtension: extension is loaded")
2723

2824
def on_platform_start(self):
2925
LOG.debug("PrometheusMetricsExtension: localstack is starting")
30-
self.prometheus_metrics_server.start()
3126

3227
def on_platform_ready(self):
3328
LOG.debug("PrometheusMetricsExtension: localstack is running")
3429

3530
def update_gateway_routes(self, router: http.Router[http.RouteHandler]):
36-
router.add("/_extension/metrics", self.prometheus_metrics_server.metrics)
31+
router.add("/_extension/metrics", retrieve_metrics)
3732
LOG.debug("Added /metrics endpoint for Prometheus metrics")
3833

3934
def update_request_handlers(self, handlers: CompositeHandler):

prometheus/prometheus/handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from localstack.http import Response
77

88
from prometheus.metrics.core import (
9-
LOCALSTACK_INFLIGHT_REQUESTS_GAUGE,
9+
LOCALSTACK_IN_FLIGHT_REQUESTS_GAUGE,
1010
LOCALSTACK_REQUEST_PROCESSING_DURATION_SECONDS,
1111
)
1212

@@ -31,7 +31,7 @@ def __call__(self, chain: HandlerChain, context: TimedRequestContext, response:
3131
return
3232

3333
service, operation = context.service_operation
34-
LOCALSTACK_INFLIGHT_REQUESTS_GAUGE.labels(service=service, operation=operation).inc()
34+
LOCALSTACK_IN_FLIGHT_REQUESTS_GAUGE.labels(service=service, operation=operation).inc()
3535

3636

3737
class ResponseMetricsHandler(Handler):
@@ -45,7 +45,7 @@ def __call__(self, chain: HandlerChain, context: TimedRequestContext, response:
4545
return
4646

4747
service, operation = context.service_operation
48-
LOCALSTACK_INFLIGHT_REQUESTS_GAUGE.labels(service=service, operation=operation).dec()
48+
LOCALSTACK_IN_FLIGHT_REQUESTS_GAUGE.labels(service=service, operation=operation).dec()
4949

5050
# Do not record if response is None
5151
if response is None:

prometheus/prometheus/instruments/poller.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import time
32

43
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
54
EmptyPollResultsException,
@@ -25,12 +24,8 @@ def tracked_poll_events(fn, self: Poller):
2524
event_target = get_event_target_from_procesor(self.processor)
2625

2726
try:
28-
current_time_epoch = time.perf_counter()
29-
fn(self)
30-
LOCALSTACK_POLL_EVENTS_DURATION_SECONDS.labels(
31-
event_source=event_source,
32-
event_target=event_target,
33-
).observe(time.perf_counter() - current_time_epoch)
27+
with LOCALSTACK_POLL_EVENTS_DURATION_SECONDS.time():
28+
fn(self)
3429
except EmptyPollResultsException:
3530
# set to 0 since it's a batch-miss
3631
LOCALSTACK_POLLED_BATCH_SIZE_EFFICIENCY_RATIO.labels(

prometheus/prometheus/instruments/sender.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from prometheus.metrics.event_processing import (
77
LOCALSTACK_EVENT_PROCESSING_ERRORS_TOTAL,
88
LOCALSTACK_EVENT_PROPAGATION_DELAY_SECONDS,
9+
LOCALSTACK_IN_FLIGHT_EVENTS_GAUGE,
10+
LOCALSTACK_PROCESS_EVENT_DURATION_SECONDS,
911
LOCALSTACK_PROCESSED_EVENTS_TOTAL,
1012
)
1113

@@ -69,12 +71,14 @@ def tracked_send_events(fn, self: Sender, events: list[dict] | dict):
6971
event_source=event_source, event_target=event_target
7072
).observe(delay)
7173

72-
LOCALSTACK_PROCESSED_EVENTS_TOTAL.labels(
73-
event_source=event_source, event_target=event_target, status="processing"
74-
).inc(total_events)
74+
LOCALSTACK_IN_FLIGHT_EVENTS_GAUGE.labels(
75+
event_source=event_source,
76+
event_target=event_target,
77+
).inc()
7578

7679
try:
77-
result = fn(self, original_events)
80+
with LOCALSTACK_PROCESS_EVENT_DURATION_SECONDS.time():
81+
result = fn(self, original_events)
7882
LOCALSTACK_PROCESSED_EVENTS_TOTAL.labels(
7983
event_source=event_source, event_target=event_target, status="success"
8084
).inc(total_events)
@@ -91,3 +95,8 @@ def tracked_send_events(fn, self: Sender, events: list[dict] | dict):
9195
event_source=event_source, event_target=event_target, status="error"
9296
).inc(total_events)
9397
raise
98+
finally:
99+
LOCALSTACK_IN_FLIGHT_EVENTS_GAUGE.labels(
100+
event_source=event_source,
101+
event_target=event_target,
102+
).dec()

prometheus/prometheus/instruments/util.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
EsmEventProcessor,
33
EventProcessor,
44
)
5-
from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller
65

76

87
def get_event_target_from_procesor(processor: EventProcessor) -> str:
@@ -13,33 +12,3 @@ def get_event_target_from_procesor(processor: EventProcessor) -> str:
1312
return processor.event_target()
1413

1514
return "unknown"
16-
17-
18-
# Utility function to extract record batching configuration
19-
def record_batch_configuration(poller: Poller, event_source: dict) -> tuple[int, int]:
20-
batch_size = None
21-
batch_window = None
22-
23-
# Extract configuration based on event source
24-
if event_source == "aws:sqs":
25-
if params := poller.source_parameters.get("PipeSourceSqsQueueParameters"):
26-
batch_size = params.get("BatchSize")
27-
batch_window = params.get("MaximumBatchingWindowInSeconds")
28-
elif event_source == "aws:dynamodb":
29-
if params := poller.source_parameters.get("DynamoDBStreamParameters"):
30-
batch_size = params.get("BatchSize")
31-
batch_window = params.get("MaximumBatchingWindowInSeconds")
32-
elif event_source == "aws:kinesis":
33-
if params := poller.source_parameters.get("KinesisStreamParameters"):
34-
batch_size = params.get("BatchSize")
35-
batch_window = params.get("MaximumBatchingWindowInSeconds")
36-
elif event_source == "aws:kafka" or event_source == "SelfManagedKafka":
37-
if params := poller.source_parameters.get(
38-
"PipeSourceManagedKafkaParameters", {}
39-
) or poller.source_parameters.get("PipeSourceSelfManagedKafkaParameters", {}):
40-
batch_size = params.get("BatchSize")
41-
batch_window = params.get("MaximumBatchingWindowInSeconds")
42-
else:
43-
return ()
44-
45-
return batch_size, batch_window

prometheus/prometheus/metrics/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
buckets=[0.005, 0.05, 0.5, 5, 30, 60, 300, 900, 3600],
99
)
1010

11-
LOCALSTACK_INFLIGHT_REQUESTS_GAUGE = Gauge(
11+
LOCALSTACK_IN_FLIGHT_REQUESTS_GAUGE = Gauge(
1212
"localstack_in_flight_requests",
1313
"Total number of currently in-flight requests",
1414
["service", "operation"],

prometheus/prometheus/metrics/event_polling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
LOCALSTACK_POLL_EVENTS_DURATION_SECONDS = Histogram(
1212
"localstack_poll_events_duration_second",
13-
"Count of poll events .",
13+
"Duration of each poll call in seconds",
1414
["event_source", "event_target"],
1515
buckets=[0.005, 0.05, 0.5, 5, 30, 60, 300, 900, 3600],
1616
)

prometheus/prometheus/metrics/event_processing.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from prometheus_client import Counter, Histogram
1+
from prometheus_client import Counter, Gauge, Histogram
22

33
# Event processing metrics
44
LOCALSTACK_PROCESSED_EVENTS_TOTAL = Counter(
@@ -7,6 +7,19 @@
77
["event_source", "event_target", "status"],
88
)
99

10+
LOCALSTACK_PROCESS_EVENT_DURATION_SECONDS = Histogram(
11+
"localstack_process_event_duration_second",
12+
"Duration to process a polled event from start to completion",
13+
["event_source", "event_target"],
14+
buckets=[0.005, 0.05, 0.5, 5, 30, 60, 300, 900, 3600],
15+
)
16+
17+
LOCALSTACK_IN_FLIGHT_EVENTS_GAUGE = Gauge(
18+
"localstack_in_flight_events_total",
19+
"Total number of event batches currently being processed by the target",
20+
["event_source", "event_target"],
21+
)
22+
1023
# Performance and latency metrics
1124
LOCALSTACK_EVENT_PROPAGATION_DELAY_SECONDS = Histogram(
1225
"localstack_event_propagation_delay_seconds",

prometheus/prometheus/server.py

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)