Skip to content

Reapply reduce requests necessary for log publishing from lambda to cloudwatch logs #12470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions localstack-core/localstack/services/lambda_/invocation/logs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import dataclasses
import logging
import threading
import time
from queue import Queue
from typing import Optional, Union

from localstack.aws.connect import connect_to
from localstack.utils.aws.client_types import ServicePrincipal
from localstack.utils.bootstrap import is_api_enabled
from localstack.utils.cloudwatch.cloudwatch_util import store_cloudwatch_logs
from localstack.utils.threads import FuncThread

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,10 +50,34 @@ def run_log_loop(self, *args, **kwargs) -> None:
log_item = self.log_queue.get()
if log_item is QUEUE_SHUTDOWN:
return
# we need to split by newline - but keep the newlines in the strings
# strips empty lines, as they are not accepted by cloudwatch
logs = [line + "\n" for line in log_item.logs.split("\n") if line]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for the empty lines case?

Copy link
Member Author

@dfangl dfangl Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but cloud watch does not accept empty lines. The logic matches the one in store_cloudwatch_logs though, so we at least do not get worse in terms of parity. We should write tests for this though, when we rework the logging in general.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following this discussion, do we have an intuition where these replacements came from?

log_output = log_output.replace("\\x1b", "\n\\x1b")
log_output = log_output.replace("\x1b", "\n\x1b")

I just wanted to add the reference in case we need to add them reactively because we removed them without knowing where they came from 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, still no idea. Since those are ANSI escape sequences, I still think they stem from the time were we captured the lambda logs directly from docker running inside a full terminal instance.

# until we have a better way to have timestamps, log events have the same time for a single invocation
log_events = [
{"timestamp": int(time.time() * 1000), "message": log_line} for log_line in logs
]
try:
store_cloudwatch_logs(
logs_client, log_item.log_group, log_item.log_stream, log_item.logs
)
try:
logs_client.put_log_events(
logGroupName=log_item.log_group,
logStreamName=log_item.log_stream,
logEvents=log_events,
)
except logs_client.exceptions.ResourceNotFoundException:
# create new log group
try:
logs_client.create_log_group(logGroupName=log_item.log_group)
except logs_client.exceptions.ResourceAlreadyExistsException:
pass
logs_client.create_log_stream(
logGroupName=log_item.log_group, logStreamName=log_item.log_stream
)
logs_client.put_log_events(
logGroupName=log_item.log_group,
logStreamName=log_item.log_stream,
logEvents=log_events,
)
except Exception as e:
LOG.warning(
"Error saving logs to group %s in region %s: %s",
Expand Down
10 changes: 10 additions & 0 deletions tests/aws/services/lambda_/functions/lambda_cloudwatch_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
A simple handler which does a print on the "body" key of the event passed in.
Can be used to log different payloads, to check for the correct format in cloudwatch logs
"""


def handler(event, context):
# Just print the log line that was passed to lambda
print(event["body"])
return event
1 change: 1 addition & 0 deletions tests/aws/services/lambda_/test_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
THIS_FOLDER, "functions/lambda_multiple_handlers.py"
)
TEST_LAMBDA_NOTIFIER = os.path.join(THIS_FOLDER, "functions/lambda_notifier.py")
TEST_LAMBDA_CLOUDWATCH_LOGS = os.path.join(THIS_FOLDER, "functions/lambda_cloudwatch_logs.py")

PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED["python"]
NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED["nodejs"]
Expand Down
60 changes: 60 additions & 0 deletions tests/aws/services/lambda_/test_lambda_runtimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import os
import shutil
import textwrap
from typing import List

import pytest
Expand All @@ -26,6 +27,7 @@
JAVA_TEST_RUNTIMES,
NODE_TEST_RUNTIMES,
PYTHON_TEST_RUNTIMES,
TEST_LAMBDA_CLOUDWATCH_LOGS,
TEST_LAMBDA_JAVA_MULTIPLE_HANDLERS,
TEST_LAMBDA_JAVA_WITH_LIB,
TEST_LAMBDA_NODEJS_ES6,
Expand Down Expand Up @@ -484,3 +486,61 @@ def test_manual_endpoint_injection(self, multiruntime_lambda, tmp_path, aws_clie
FunctionName=create_function_result["FunctionName"],
)
assert "FunctionError" not in invocation_result


class TestCloudwatchLogs:
@pytest.fixture(autouse=True)
def snapshot_transformers(self, snapshot):
snapshot.add_transformer(snapshot.transform.lambda_report_logs())
snapshot.add_transformer(
snapshot.transform.key_value("eventId", reference_replacement=False)
)
snapshot.add_transformer(
snapshot.transform.regex(r"::runtime:\w+", "::runtime:<runtime-id>")
)
snapshot.add_transformer(snapshot.transform.regex("\\.v\\d{2}", ".v<version>"))

@markers.aws.validated
# skip all snapshots - the logs are too different
# TODO add INIT_START to make snapshotting of logs possible
@markers.snapshot.skip_snapshot_verify()
def test_multi_line_prints(self, aws_client, create_lambda_function, snapshot):
function_name = f"test_lambda_{short_uid()}"
log_group_name = f"/aws/lambda/{function_name}"
create_lambda_function(
func_name=function_name,
handler_file=TEST_LAMBDA_CLOUDWATCH_LOGS,
runtime=Runtime.python3_13,
)

payload = {
"body": textwrap.dedent("""
multi
line
string
another\rline
""")
}
invoke_response = aws_client.lambda_.invoke(
FunctionName=function_name, Payload=json.dumps(payload)
)
snapshot.add_transformer(
snapshot.transform.regex(
invoke_response["ResponseMetadata"]["RequestId"], "<request-id>"
)
)

def fetch_logs():
log_events_result = aws_client.logs.filter_log_events(logGroupName=log_group_name)
assert any("REPORT" in e["message"] for e in log_events_result["events"])
return log_events_result["events"]

log_events = retry(fetch_logs, retries=10, sleep=2)
snapshot.match("log-events", log_events)

log_messages = [log["message"] for log in log_events]
# some manual assertions until we can actually use the snapshot
assert "multi\n" in log_messages
assert "line\n" in log_messages
assert "string\n" in log_messages
assert "another\rline\n" in log_messages
63 changes: 63 additions & 0 deletions tests/aws/services/lambda_/test_lambda_runtimes.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -1208,5 +1208,68 @@
}
}
}
},
"tests/aws/services/lambda_/test_lambda_runtimes.py::TestCloudwatchLogs::test_multi_line_prints": {
"recorded-date": "02-04-2025, 12:35:33",
"recorded-content": {
"log-events": [
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "INIT_START Runtime Version: python:3.13.v<version>\tRuntime Version ARN: arn:<partition>:lambda:<region>::runtime:<runtime-id>\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "START RequestId: <request-id> Version: $LATEST\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "multi\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "line\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "string\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "another\rline\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "END RequestId: <request-id>\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
},
{
"logStreamName": "<log-stream-name:1>",
"timestamp": "timestamp",
"message": "REPORT RequestId: <request-id>\tDuration: <duration> ms\tBilled Duration: <duration> ms\tMemory Size: 128 MB\tMax Memory Used: <memory> MB\tInit Duration: <duration> ms\t\n",
"ingestionTime": "timestamp",
"eventId": "event-id"
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"tests/aws/services/lambda_/test_lambda_runtimes.py::TestCloudwatchLogs::test_multi_line_prints": {
"last_validated_date": "2025-04-02T12:35:33+00:00"
},
"tests/aws/services/lambda_/test_lambda_runtimes.py::TestGoProvidedRuntimes::test_manual_endpoint_injection[provided.al2023]": {
"last_validated_date": "2024-11-26T09:46:59+00:00"
},
Expand Down
Loading