Skip to content

Commit 08382d0

Browse files
authored
Migrate legacy usage counters to new analytic counters (#12412)
1 parent 4f5c1ba commit 08382d0

File tree

7 files changed

+159
-28
lines changed

7 files changed

+159
-28
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from enum import StrEnum
2+
3+
from localstack.utils.analytics.metrics import Counter
4+
5+
NAMESPACE = "lambda"
6+
7+
hotreload_counter = Counter(namespace=NAMESPACE, name="hotreload", labels=["operation"])
8+
9+
function_counter = Counter(
10+
namespace=NAMESPACE,
11+
name="function",
12+
labels=[
13+
"operation",
14+
"status",
15+
"runtime",
16+
"package_type",
17+
# only for operation "invoke"
18+
"invocation_type",
19+
],
20+
)
21+
22+
23+
class FunctionOperation(StrEnum):
24+
invoke = "invoke"
25+
create = "create"
26+
27+
28+
class FunctionStatus(StrEnum):
29+
success = "success"
30+
zero_reserved_concurrency_error = "zero_reserved_concurrency_error"
31+
event_age_exceeded_error = "event_age_exceeded_error"
32+
throttle_error = "throttle_error"
33+
system_error = "system_error"
34+
unhandled_state_error = "unhandled_state_error"
35+
failed_state_error = "failed_state_error"
36+
pending_state_error = "pending_state_error"
37+
invalid_payload_error = "invalid_payload_error"
38+
invocation_error = "invocation_error"
39+
40+
41+
esm_counter = Counter(namespace=NAMESPACE, name="esm", labels=["source", "status"])
42+
43+
44+
class EsmExecutionStatus(StrEnum):
45+
success = "success"
46+
partial_batch_failure_error = "partial_batch_failure_error"
47+
target_invocation_error = "target_invocation_error"
48+
unhandled_error = "unhandled_error"
49+
source_poller_error = "source_poller_error"
50+
# TODO: Add tracking for filter error. Options:
51+
# a) raise filter exception and track it in the esm_worker
52+
# b) somehow add tracking in the individual pollers
53+
filter_error = "filter_error"

localstack-core/localstack/services/lambda_/event_source_mapping/esm_event_processor.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import uuid
44

55
from localstack.aws.api.pipes import LogLevel
6+
from localstack.services.lambda_.analytics import EsmExecutionStatus, esm_counter
67
from localstack.services.lambda_.event_source_mapping.event_processor import (
78
BatchFailureError,
89
EventProcessor,
@@ -15,7 +16,6 @@
1516
Sender,
1617
SenderError,
1718
)
18-
from localstack.services.lambda_.usage import esm_error, esm_invocation
1919

2020
LOG = logging.getLogger(__name__)
2121

@@ -37,7 +37,6 @@ def process_events_batch(self, input_events: list[dict] | dict) -> None:
3737
else:
3838
first_event = {}
3939
event_source = first_event.get("eventSource")
40-
esm_invocation.record(event_source)
4140

4241
execution_id = uuid.uuid4()
4342
# Create a copy of the original input events
@@ -60,12 +59,16 @@ def process_events_batch(self, input_events: list[dict] | dict) -> None:
6059
messageType="ExecutionSucceeded",
6160
logLevel=LogLevel.INFO,
6261
)
62+
esm_counter.labels(source=event_source, status=EsmExecutionStatus.success).increment()
6363
except PartialFailureSenderError as e:
6464
self.logger.log(
6565
messageType="ExecutionFailed",
6666
logLevel=LogLevel.ERROR,
6767
error=e.error,
6868
)
69+
esm_counter.labels(
70+
source=event_source, status=EsmExecutionStatus.partial_batch_failure_error
71+
).increment()
6972
# TODO: check whether partial batch item failures is enabled by default or need to be explicitly enabled
7073
# using --function-response-types "ReportBatchItemFailures"
7174
# https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html
@@ -78,15 +81,20 @@ def process_events_batch(self, input_events: list[dict] | dict) -> None:
7881
logLevel=LogLevel.ERROR,
7982
error=e.error,
8083
)
84+
esm_counter.labels(
85+
source=event_source, status=EsmExecutionStatus.target_invocation_error
86+
).increment()
8187
raise BatchFailureError(error=e.error) from e
8288
except Exception as e:
83-
esm_error.record(event_source)
8489
LOG.error(
8590
"Unhandled exception while processing Lambda event source mapping (ESM) events %s for ESM with execution id %s",
8691
events,
8792
execution_id,
8893
exc_info=LOG.isEnabledFor(logging.DEBUG),
8994
)
95+
esm_counter.labels(
96+
source=event_source, status=EsmExecutionStatus.unhandled_error
97+
).increment()
9098
raise e
9199

92100
def process_target_stage(self, events: list[dict]) -> None:

localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC,
1111
LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC,
1212
)
13+
from localstack.services.lambda_.analytics import EsmExecutionStatus, esm_counter
1314
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
1415
EmptyPollResultsException,
1516
Poller,
1617
)
1718
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
1819
from localstack.services.lambda_.provider_utils import get_function_version_from_arn
20+
from localstack.utils.aws.arns import parse_arn
1921
from localstack.utils.backoff import ExponentialBackoff
2022
from localstack.utils.threads import FuncThread
2123

@@ -181,6 +183,10 @@ def poller_loop(self, *args, **kwargs):
181183
e,
182184
exc_info=LOG.isEnabledFor(logging.DEBUG),
183185
)
186+
event_source = parse_arn(self.esm_config.get("EventSourceArn")).get("service")
187+
esm_counter.labels(
188+
source=event_source, status=EsmExecutionStatus.source_poller_error
189+
).increment()
184190
# Wait some time between retries to avoid running into the problem right again
185191
poll_interval_duration = error_boff.next_backoff()
186192
finally:

localstack-core/localstack/services/lambda_/invocation/event_manager.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from botocore.config import Config
1212

1313
from localstack import config
14-
from localstack.aws.api.lambda_ import TooManyRequestsException
14+
from localstack.aws.api.lambda_ import InvocationType, TooManyRequestsException
15+
from localstack.services.lambda_.analytics import (
16+
FunctionOperation,
17+
FunctionStatus,
18+
function_counter,
19+
)
1520
from localstack.services.lambda_.invocation.internal_sqs_queue import get_fake_sqs_client
1621
from localstack.services.lambda_.invocation.lambda_models import (
1722
EventInvokeConfig,
@@ -194,18 +199,30 @@ def handle_message(self, message: dict) -> None:
194199
failure_cause = None
195200
qualifier = self.version_manager.function_version.id.qualifier
196201
event_invoke_config = self.version_manager.function.event_invoke_configs.get(qualifier)
202+
runtime = None
203+
status = None
197204
try:
198205
sqs_invocation = SQSInvocation.decode(message["Body"])
199206
invocation = sqs_invocation.invocation
200207
try:
201208
invocation_result = self.version_manager.invoke(invocation=invocation)
209+
function_config = self.version_manager.function_version.config
210+
function_counter.labels(
211+
operation=FunctionOperation.invoke,
212+
runtime=function_config.runtime or "n/a",
213+
status=FunctionStatus.success,
214+
invocation_type=InvocationType.Event,
215+
package_type=function_config.package_type,
216+
).increment()
202217
except Exception as e:
203218
# Reserved concurrency == 0
204219
if self.version_manager.function.reserved_concurrent_executions == 0:
205220
failure_cause = "ZeroReservedConcurrency"
221+
status = FunctionStatus.zero_reserved_concurrency_error
206222
# Maximum event age expired (lookahead for next retry)
207223
elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
208224
failure_cause = "EventAgeExceeded"
225+
status = FunctionStatus.event_age_exceeded_error
209226
if failure_cause:
210227
invocation_result = InvocationResult(
211228
is_error=True, request_id=invocation.request_id, payload=None, logs=None
@@ -216,13 +233,22 @@ def handle_message(self, message: dict) -> None:
216233
self.process_dead_letter_queue(sqs_invocation, invocation_result)
217234
return
218235
# 3) Otherwise, retry without increasing counter
219-
self.process_throttles_and_system_errors(sqs_invocation, e)
236+
status = self.process_throttles_and_system_errors(sqs_invocation, e)
220237
return
221238
finally:
222239
sqs_client = get_sqs_client(self.version_manager.function_version)
223240
sqs_client.delete_message(
224241
QueueUrl=self.event_queue_url, ReceiptHandle=message["ReceiptHandle"]
225242
)
243+
# status MUST be set before returning
244+
package_type = self.version_manager.function_version.config.package_type
245+
function_counter.labels(
246+
operation=FunctionOperation.invoke,
247+
runtime=runtime or "n/a",
248+
status=status,
249+
invocation_type=InvocationType.Event,
250+
package_type=package_type,
251+
).increment()
226252

227253
# Good summary blogpost: https://haithai91.medium.com/aws-lambdas-retry-behaviors-edff90e1cf1b
228254
# Asynchronous invocation handling: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
@@ -278,7 +304,9 @@ def handle_message(self, message: dict) -> None:
278304
"Error handling lambda invoke %s", e, exc_info=LOG.isEnabledFor(logging.DEBUG)
279305
)
280306

281-
def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, error: Exception):
307+
def process_throttles_and_system_errors(
308+
self, sqs_invocation: SQSInvocation, error: Exception
309+
) -> str:
282310
# If the function doesn't have enough concurrency available to process all events, additional
283311
# requests are throttled. For throttling errors (429) and system errors (500-series), Lambda returns
284312
# the event to the queue and attempts to run the function again for up to 6 hours. The retry interval
@@ -292,10 +320,12 @@ def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, err
292320
# https://repost.aws/knowledge-center/lambda-troubleshoot-invoke-error-502-500
293321
if isinstance(error, TooManyRequestsException): # Throttles 429
294322
LOG.debug("Throttled lambda %s: %s", self.version_manager.function_arn, error)
323+
status = FunctionStatus.throttle_error
295324
else: # System errors 5xx
296325
LOG.debug(
297326
"Service exception in lambda %s: %s", self.version_manager.function_arn, error
298327
)
328+
status = FunctionStatus.system_error
299329
maximum_exception_retry_delay_seconds = 5 * 60
300330
delay_seconds = min(
301331
2**sqs_invocation.exception_retries, maximum_exception_retry_delay_seconds
@@ -307,6 +337,7 @@ def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, err
307337
MessageBody=sqs_invocation.encode(),
308338
DelaySeconds=delay_seconds,
309339
)
340+
return status
310341

311342
def process_success_destination(
312343
self,

localstack-core/localstack/services/lambda_/invocation/lambda_service.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@
2525
)
2626
from localstack.aws.connect import connect_to
2727
from localstack.constants import AWS_REGION_US_EAST_1
28-
from localstack.services.lambda_ import usage
28+
from localstack.services.lambda_.analytics import (
29+
FunctionOperation,
30+
FunctionStatus,
31+
function_counter,
32+
hotreload_counter,
33+
)
2934
from localstack.services.lambda_.api_utils import (
3035
lambda_arn,
3136
qualified_lambda_arn,
@@ -272,29 +277,42 @@ def invoke(
272277

273278
# Need the qualified arn to exactly get the target lambda
274279
qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
280+
version = function.versions.get(version_qualifier)
281+
runtime = version.config.runtime or "n/a"
282+
package_type = version.config.package_type
275283
try:
276284
version_manager = self.get_lambda_version_manager(qualified_arn)
277285
event_manager = self.get_lambda_event_manager(qualified_arn)
278-
usage.runtime.record(version_manager.function_version.config.runtime)
279286
except ValueError as e:
280-
version = function.versions.get(version_qualifier)
281287
state = version and version.config.state.state
282288
# TODO: make such developer hints optional or remove after initial v2 transition period
283289
if state == State.Failed:
290+
status = FunctionStatus.failed_state_error
284291
HINT_LOG.error(
285292
f"Failed to create the runtime executor for the function {function_name}. "
286293
"Please ensure that Docker is available in the LocalStack container by adding the volume mount "
287294
'"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
288295
"Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
289296
)
290297
elif state == State.Pending:
298+
status = FunctionStatus.pending_state_error
291299
HINT_LOG.warning(
292300
"Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
293301
f"Before invoking {function_name}, please wait until the function transitioned from the state "
294302
"Pending to Active using: "
295303
f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
296304
"Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
297305
)
306+
else:
307+
status = FunctionStatus.unhandled_state_error
308+
LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
309+
function_counter.labels(
310+
operation=FunctionOperation.invoke,
311+
runtime=runtime,
312+
status=status,
313+
invocation_type=invocation_type,
314+
package_type=package_type,
315+
).increment()
298316
raise ResourceConflictException(
299317
f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
300318
) from e
@@ -306,6 +324,13 @@ def invoke(
306324
try:
307325
to_str(payload)
308326
except Exception as e:
327+
function_counter.labels(
328+
operation=FunctionOperation.invoke,
329+
runtime=runtime,
330+
status=FunctionStatus.invalid_payload_error,
331+
invocation_type=invocation_type,
332+
package_type=package_type,
333+
).increment()
309334
# MAYBE: improve parity of detailed exception message (quite cumbersome)
310335
raise InvalidRequestContentException(
311336
f"Could not parse request body into json: Could not parse payload into json: {e}",
@@ -331,7 +356,7 @@ def invoke(
331356
)
332357
)
333358

334-
return version_manager.invoke(
359+
invocation_result = version_manager.invoke(
335360
invocation=Invocation(
336361
payload=payload,
337362
invoked_arn=invoked_arn,
@@ -342,6 +367,19 @@ def invoke(
342367
trace_context=trace_context,
343368
)
344369
)
370+
status = (
371+
FunctionStatus.invocation_error
372+
if invocation_result.is_error
373+
else FunctionStatus.success
374+
)
375+
function_counter.labels(
376+
operation=FunctionOperation.invoke,
377+
runtime=runtime,
378+
status=status,
379+
invocation_type=invocation_type,
380+
package_type=package_type,
381+
).increment()
382+
return invocation_result
345383

346384
def update_version(self, new_version: FunctionVersion) -> Future[None]:
347385
"""
@@ -601,7 +639,7 @@ def store_s3_bucket_archive(
601639
:return: S3 Code object representing the archive stored in S3
602640
"""
603641
if archive_bucket == config.BUCKET_MARKER_LOCAL:
604-
usage.hotreload.increment()
642+
hotreload_counter.labels(operation="create").increment()
605643
return create_hot_reloading_code(path=archive_key)
606644
s3_client: "S3Client" = connect_to().s3
607645
kwargs = {"VersionId": archive_version} if archive_version else {}

localstack-core/localstack/services/lambda_/provider.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@
150150
from localstack.services.edge import ROUTER
151151
from localstack.services.lambda_ import api_utils
152152
from localstack.services.lambda_ import hooks as lambda_hooks
153+
from localstack.services.lambda_.analytics import (
154+
FunctionOperation,
155+
FunctionStatus,
156+
function_counter,
157+
)
153158
from localstack.services.lambda_.api_utils import (
154159
ARCHITECTURES,
155160
STATEMENT_ID_REGEX,
@@ -1037,6 +1042,13 @@ def create_function(
10371042
)
10381043
fn.versions["$LATEST"] = version
10391044
state.functions[function_name] = fn
1045+
function_counter.labels(
1046+
operation=FunctionOperation.create,
1047+
runtime=runtime or "n/a",
1048+
status=FunctionStatus.success,
1049+
invocation_type="n/a",
1050+
package_type=package_type,
1051+
)
10401052
self.lambda_service.create_function_version(version)
10411053

10421054
if tags := request.get("Tags"):

localstack-core/localstack/services/lambda_/usage.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

0 commit comments

Comments
 (0)