From 9d9be329cd35cadfafb95473649f7c58f4d36c97 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Fri, 4 Apr 2025 11:09:42 +0200 Subject: [PATCH 01/24] feat: propagate x-ray trace id --- .../localstack/services/events/provider.py | 10 +++++++-- .../localstack/services/events/target.py | 12 +++++++---- .../localstack/services/events/utils.py | 21 +++++++++++++++++++ 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index cdb6e3ad32904..e1204091720fe 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -164,6 +164,7 @@ get_resource_type, get_trace_header_encoded_region_account, is_archive_arn, + populate_trace_id, recursive_remove_none_values_from_dict, ) from localstack.services.plugins import ServiceLifecycleHook @@ -1814,6 +1815,10 @@ def _process_entry( return region, account_id = extract_region_and_account_id(event_bus_name_or_arn, context) + + trace_id = populate_trace_id(context.request.headers) + + # TODO check interference with x-ray trace header if encoded_trace_header := get_trace_header_encoded_region_account( entry, context.region, context.account_id, region, account_id ): @@ -1844,7 +1849,7 @@ def _process_entry( if configured_rules := list(event_bus.rules.values()): for rule in configured_rules: - self._process_rules(rule, region, account_id, event_formatted) + self._process_rules(rule, region, account_id, event_formatted, trace_id) else: LOG.info( json.dumps( @@ -1865,6 +1870,7 @@ def _process_rules( region: str, account_id: str, event_formatted: FormattedEvent, + trace_id: str, ) -> None: """Process rules for an event. Note that we no longer handle entries here as AWS returns success regardless of target failures.""" event_pattern = rule.event_pattern @@ -1894,7 +1900,7 @@ def _process_rules( target_unique_id = f"{rule.arn}-{target_id}" target_sender = self._target_sender_store[target_unique_id] try: - target_sender.process_event(event_formatted.copy()) + target_sender.process_event(event_formatted.copy(), trace_id) rule_invocation.labels( status=InvocationStatus.success, service=target_sender.service, diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index b12691f28925e..addb83da82432 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -193,10 +193,10 @@ def client(self): return self._client @abstractmethod - def send_event(self, event: FormattedEvent | TransformedEvent): + def send_event(self, event: FormattedEvent | TransformedEvent, trace_id: str | None = None): pass - def process_event(self, event: FormattedEvent): + def process_event(self, event: FormattedEvent, trace_id: str | None = None): """Processes the event and send it to the target.""" if input_ := self.target.get("Input"): event = json.loads(input_) @@ -208,7 +208,7 @@ def process_event(self, event: FormattedEvent): if input_transformer := self.target.get("InputTransformer"): event = self.transform_event_with_target_input_transformer(input_transformer, event) if event: - self.send_event(event) + self.send_event(event, trace_id) else: LOG.info("No event to send to target %s", self.target.get("Id")) @@ -316,7 +316,7 @@ class ApiGatewayTargetSender(TargetSender): ALLOWED_HTTP_METHODS = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"} - def send_event(self, event): + def send_event(self, event, trace_id): # Parse the ARN to extract api_id, stage_name, http_method, and resource path # Example ARN: arn:{partition}:execute-api:{region}:{account_id}:{api_id}/{stage_name}/{method}/{resource_path} arn_parts = parse_arn(self.target["Arn"]) @@ -383,6 +383,10 @@ def send_event(self, event): # Serialize the event, converting datetime objects to strings event_json = json.dumps(event, default=str) + # Set x-ray trace header + if trace_id: + headers["X-Amzn-Trace-Id"] = trace_id + # Send the HTTP request response = requests.request( method=http_method, url=url, headers=headers, data=event_json, timeout=5 diff --git a/localstack-core/localstack/services/events/utils.py b/localstack-core/localstack/services/events/utils.py index 36258ac668acb..7001702729afa 100644 --- a/localstack-core/localstack/services/events/utils.py +++ b/localstack-core/localstack/services/events/utils.py @@ -5,6 +5,7 @@ from typing import Any, Dict, Optional from botocore.utils import ArnParser +from werkzeug.datastructures import Headers from localstack.aws.api import RequestContext from localstack.aws.api.events import ( @@ -19,6 +20,11 @@ RuleArn, Timestamp, ) +from localstack.services.apigateway.next_gen.execute_api.helpers import ( + generate_trace_id, + generate_trace_parent, + parse_trace_id, +) from localstack.services.events.models import ( FormattedEvent, ResourceType, @@ -293,3 +299,18 @@ def is_nested_in_string(template: str, match: re.Match[str]) -> bool: return False return left_quote != -1 + + +def populate_trace_id(headers: Headers) -> str: + """ + Populates the trace header for the request xray header if present. + If not present, it generates a new trace_id + """ + incoming_trace = parse_trace_id(headers.get("X-Amzn-Trace-Id", "")) + # parse_trace_id always return capitalized keys + + trace = incoming_trace.get("Root", generate_trace_id()) + incoming_parent = incoming_trace.get("Parent") + parent = incoming_parent or generate_trace_parent() + sampled = incoming_trace.get("Sampled", "1" if incoming_parent else "0") + return f"Root={trace};Parent={parent};Sampled={sampled}" From fc45d72f19fe71ac2d696cc5a1fdd26dcb6c1b2c Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Fri, 4 Apr 2025 14:01:22 +0200 Subject: [PATCH 02/24] feat: use internal trace_context and auto create segment --- .../localstack/services/events/provider.py | 10 ++--- .../localstack/services/events/target.py | 39 +++++++++++++--- .../localstack/services/events/utils.py | 45 +++++++++++-------- 3 files changed, 63 insertions(+), 31 deletions(-) diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index e1204091720fe..d618bc69020c2 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -157,6 +157,7 @@ ) from localstack.services.events.utils import ( TARGET_ID_PATTERN, + create_segment_from_trace_header, extract_connection_name, extract_event_bus_name, extract_region_and_account_id, @@ -164,7 +165,6 @@ get_resource_type, get_trace_header_encoded_region_account, is_archive_arn, - populate_trace_id, recursive_remove_none_values_from_dict, ) from localstack.services.plugins import ServiceLifecycleHook @@ -1816,7 +1816,8 @@ def _process_entry( region, account_id = extract_region_and_account_id(event_bus_name_or_arn, context) - trace_id = populate_trace_id(context.request.headers) + # Set x-ray segment from trace header + create_segment_from_trace_header(context.trace_context["aws_trace_header"]) # TODO check interference with x-ray trace header if encoded_trace_header := get_trace_header_encoded_region_account( @@ -1849,7 +1850,7 @@ def _process_entry( if configured_rules := list(event_bus.rules.values()): for rule in configured_rules: - self._process_rules(rule, region, account_id, event_formatted, trace_id) + self._process_rules(rule, region, account_id, event_formatted) else: LOG.info( json.dumps( @@ -1870,7 +1871,6 @@ def _process_rules( region: str, account_id: str, event_formatted: FormattedEvent, - trace_id: str, ) -> None: """Process rules for an event. Note that we no longer handle entries here as AWS returns success regardless of target failures.""" event_pattern = rule.event_pattern @@ -1900,7 +1900,7 @@ def _process_rules( target_unique_id = f"{rule.arn}-{target_id}" target_sender = self._target_sender_store[target_unique_id] try: - target_sender.process_event(event_formatted.copy(), trace_id) + target_sender.process_event(event_formatted.copy()) rule_invocation.labels( status=InvocationStatus.success, service=target_sender.service, diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index addb83da82432..3e8572bfb2c41 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -8,6 +8,7 @@ from urllib.parse import urlencode import requests +from aws_xray_sdk.core import patch from botocore.client import BaseClient from localstack import config @@ -28,6 +29,7 @@ from localstack.services.events.utils import ( event_time_to_time_string, get_trace_header_encoded_region_account, + get_trace_header_str_from_segment, is_nested_in_string, to_json_str, ) @@ -169,6 +171,8 @@ def __init__( self._validate_input(target) self._client: BaseClient | None = None + self._x_ray_segment = None + @property def arn(self): return self.target["Arn"] @@ -193,10 +197,10 @@ def client(self): return self._client @abstractmethod - def send_event(self, event: FormattedEvent | TransformedEvent, trace_id: str | None = None): + def send_event(self, event: FormattedEvent | TransformedEvent): pass - def process_event(self, event: FormattedEvent, trace_id: str | None = None): + def process_event(self, event: FormattedEvent): """Processes the event and send it to the target.""" if input_ := self.target.get("Input"): event = json.loads(input_) @@ -208,7 +212,7 @@ def process_event(self, event: FormattedEvent, trace_id: str | None = None): if input_transformer := self.target.get("InputTransformer"): event = self.transform_event_with_target_input_transformer(input_transformer, event) if event: - self.send_event(event, trace_id) + self.send_event(event) else: LOG.info("No event to send to target %s", self.target.get("Id")) @@ -316,7 +320,7 @@ class ApiGatewayTargetSender(TargetSender): ALLOWED_HTTP_METHODS = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"} - def send_event(self, event, trace_id): + def send_event(self, event): # Parse the ARN to extract api_id, stage_name, http_method, and resource path # Example ARN: arn:{partition}:execute-api:{region}:{account_id}:{api_id}/{stage_name}/{method}/{resource_path} arn_parts = parse_arn(self.target["Arn"]) @@ -383,9 +387,8 @@ def send_event(self, event, trace_id): # Serialize the event, converting datetime objects to strings event_json = json.dumps(event, default=str) - # Set x-ray trace header - if trace_id: - headers["X-Amzn-Trace-Id"] = trace_id + trace_header_str = get_trace_header_str_from_segment() + headers["X-Amzn-Trace-Id"] = trace_header_str # Send the HTTP request response = requests.request( @@ -468,6 +471,10 @@ def send_event(self, event): event, self.region, self.account_id, self.target_region, self.target_account_id ): entries[0]["TraceHeader"] = encoded_original_id + + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) + self.client.put_events(Entries=entries) def _get_source(self, event: FormattedEvent | TransformedEvent) -> str: @@ -524,6 +531,10 @@ def send_event(self, event): if http_parameters := self.target.get("HttpParameters"): endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event) + # add trace header + trace_header_str = get_trace_header_str_from_segment() + headers["X-Amzn-Trace-Id"] = trace_header_str + result = requests.request( method=method, url=endpoint, data=json.dumps(event or {}), headers=headers ) @@ -538,6 +549,8 @@ def send_event(self, event): class FirehoseTargetSender(TargetSender): def send_event(self, event): delivery_stream_name = firehose_name(self.target["Arn"]) + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.put_record( DeliveryStreamName=delivery_stream_name, Record={"Data": to_bytes(to_json_str(event))}, @@ -553,6 +566,8 @@ def send_event(self, event): ) stream_name = self.target["Arn"].split("/")[-1] partition_key = collections.get_safe(event, partition_key_path, event["id"]) + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.put_record( StreamName=stream_name, Data=to_bytes(to_json_str(event)), @@ -570,6 +585,8 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): def send_event(self, event): + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), @@ -581,6 +598,8 @@ class LogsTargetSender(TargetSender): def send_event(self, event): log_group_name = self.target["Arn"].split(":")[6] log_stream_name = str(uuid.uuid4()) # Unique log stream name + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) self.client.put_log_events( logGroupName=log_group_name, @@ -612,6 +631,8 @@ def send_event(self, event): class SnsTargetSender(TargetSender): def send_event(self, event): + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.publish(TopicArn=self.target["Arn"], Message=to_json_str(event)) @@ -620,6 +641,8 @@ def send_event(self, event): queue_url = sqs_queue_url_for_arn(self.target["Arn"]) msg_group_id = self.target.get("SqsParameters", {}).get("MessageGroupId", None) kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {} + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.send_message( QueueUrl=queue_url, MessageBody=to_json_str(event), @@ -632,6 +655,8 @@ class StatesTargetSender(TargetSender): def send_event(self, event): self.service = "stepfunctions" + # Patch the boto3 client to automatically include X-Ray trace headers + patch(self.client) self.client.start_execution( stateMachineArn=self.target["Arn"], name=event["id"], input=to_json_str(event) ) diff --git a/localstack-core/localstack/services/events/utils.py b/localstack-core/localstack/services/events/utils.py index 7001702729afa..5be463cd457a5 100644 --- a/localstack-core/localstack/services/events/utils.py +++ b/localstack-core/localstack/services/events/utils.py @@ -4,8 +4,8 @@ from datetime import datetime, timezone from typing import Any, Dict, Optional +from aws_xray_sdk.core import xray_recorder from botocore.utils import ArnParser -from werkzeug.datastructures import Headers from localstack.aws.api import RequestContext from localstack.aws.api.events import ( @@ -20,11 +20,6 @@ RuleArn, Timestamp, ) -from localstack.services.apigateway.next_gen.execute_api.helpers import ( - generate_trace_id, - generate_trace_parent, - parse_trace_id, -) from localstack.services.events.models import ( FormattedEvent, ResourceType, @@ -33,6 +28,7 @@ ) from localstack.utils.aws.arns import ARN_PARTITION_REGEX, parse_arn from localstack.utils.strings import long_uid +from localstack.utils.xray.trace_header import TraceHeader LOG = logging.getLogger(__name__) @@ -301,16 +297,27 @@ def is_nested_in_string(template: str, match: re.Match[str]) -> bool: return left_quote != -1 -def populate_trace_id(headers: Headers) -> str: - """ - Populates the trace header for the request xray header if present. - If not present, it generates a new trace_id - """ - incoming_trace = parse_trace_id(headers.get("X-Amzn-Trace-Id", "")) - # parse_trace_id always return capitalized keys - - trace = incoming_trace.get("Root", generate_trace_id()) - incoming_parent = incoming_trace.get("Parent") - parent = incoming_parent or generate_trace_parent() - sampled = incoming_trace.get("Sampled", "1" if incoming_parent else "0") - return f"Root={trace};Parent={parent};Sampled={sampled}" +def create_segment_from_trace_header(trace_header: TraceHeader) -> None: + segment = xray_recorder.begin_segment(name="events.put_events") + segment.trace_id = trace_header.root + segment.parent_id = trace_header.parent + if trace_header.sampled == "1": + segment.sampled = True + elif trace_header.sampled == "0": + segment.sampled = False + + +def get_trace_header_str_from_segment(): + # Get the current segment + segment = xray_recorder.current_segment() + + if segment: + # Construct the trace header manually + trace_id = segment.trace_id + parent_id = segment.id # Use the segment's own ID as the parent for downstream calls + sampled = "1" if segment.sampled else "0" + + # Format according to X-Ray trace header specification + trace_header_str = f"Root={trace_id};Parent={parent_id};Sampled={sampled}" + + return trace_header_str From 03dff26f82de991bd63b5efd425502cc29719c9f Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Fri, 4 Apr 2025 14:33:11 +0200 Subject: [PATCH 03/24] feat: add x-ray trace id propagation event api gateway --- .../events/test_x_ray_trace_propagation.py | 219 ++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 tests/aws/services/events/test_x_ray_trace_propagation.py diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py new file mode 100644 index 0000000000000..68cb2007aac36 --- /dev/null +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -0,0 +1,219 @@ +import json + +from localstack import config +from localstack.aws.api.lambda_ import Runtime +from localstack.testing.pytest import markers +from localstack.utils.strings import short_uid +from localstack.utils.sync import retry +from localstack.utils.testutil import check_expected_lambda_log_events_length + +APIGATEWAY_ASSUME_ROLE_POLICY = { + "Statement": { + "Sid": "", + "Effect": "Allow", + "Principal": {"Service": "apigateway.amazonaws.com"}, + "Action": "sts:AssumeRole", + } +} +import pytest + +from localstack.testing.aws.util import is_aws_cloud +from tests.aws.services.events.helper_functions import is_old_provider +from tests.aws.services.lambda_.test_lambda import ( + TEST_LAMBDA_PYTHON_ECHO, +) + + +@markers.aws.unknown +@pytest.mark.skipif( + condition=is_old_provider() and not is_aws_cloud(), + reason="not supported by the old provider", +) +@markers.snapshot.skip_snapshot_verify( + paths=[ + # TODO: those headers are sent by Events via the SDK, we should at least populate X-Amz-Source-Account + # and X-Amz-Source-Arn + "$..headers.amz-sdk-invocation-id", + "$..headers.amz-sdk-request", + "$..headers.amz-sdk-retry", + "$..headers.X-Amz-Security-Token", + "$..headers.X-Amz-Source-Account", + "$..headers.X-Amz-Source-Arn", + # seems like this one can vary in casing between runs? + "$..headers.x-amz-date", + "$..headers.X-Amz-Date", + # those headers are missing in API Gateway + "$..headers.CloudFront-Forwarded-Proto", + "$..headers.CloudFront-Is-Desktop-Viewer", + "$..headers.CloudFront-Is-Mobile-Viewer", + "$..headers.CloudFront-Is-SmartTV-Viewer", + "$..headers.CloudFront-Is-Tablet-Viewer", + "$..headers.CloudFront-Viewer-ASN", + "$..headers.CloudFront-Viewer-Country", + "$..headers.X-Amz-Cf-Id", + "$..headers.Via", + # sent by `requests` library by default + "$..headers.Accept-Encoding", + "$..headers.Accept", + ] +) +@markers.snapshot.skip_snapshot_verify( + condition=lambda: not config.APIGW_NEXT_GEN_PROVIDER, + paths=[ + # parity issue from previous APIGW implementation + "$..headers.x-localstack-edge", + "$..headers.Connection", + "$..headers.Content-Length", + "$..headers.accept-encoding", + "$..headers.accept", + "$..headers.X-Amzn-Trace-Id", + "$..headers.X-Forwarded-Port", + "$..headers.X-Forwarded-Proto", + "$..pathParameters", + "$..requestContext.authorizer", + "$..requestContext.deploymentId", + "$..requestContext.extendedRequestId", + "$..requestContext.identity", + "$..requestContext.requestId", + "$..stageVariables", + ], +) +def test_xray_trace_propagation_events_api_gateway( + aws_client, + create_role_with_policy, + create_lambda_function, + create_rest_apigw, + events_create_event_bus, + events_put_rule, + events_put_targets, + region_name, + account_id, + snapshot, +): + # create lambda + function_name = f"test-function-{short_uid()}" + function_arn = create_lambda_function( + handler_file=TEST_LAMBDA_PYTHON_ECHO, + func_name=function_name, + runtime=Runtime.python3_12, + )["CreateFunctionResponse"]["FunctionArn"] + + # create api gateway with lambda integration + # create rest api + api_id, api_name, root = create_rest_apigw( + name=f"test-api-{short_uid()}", + description="Integration test API", + ) + + resource_id = aws_client.apigateway.create_resource( + restApiId=api_id, parentId=root, pathPart="{proxy+}" + )["id"] + + aws_client.apigateway.put_method( + restApiId=api_id, + resourceId=resource_id, + httpMethod="ANY", + authorizationType="NONE", + ) + + # create role with policy + _, role_arn = create_role_with_policy( + "Allow", "lambda:InvokeFunction", json.dumps(APIGATEWAY_ASSUME_ROLE_POLICY), "*" + ) + + # Lambda AWS_PROXY integration + aws_client.apigateway.put_integration( + restApiId=api_id, + resourceId=resource_id, + httpMethod="ANY", + type="AWS_PROXY", + integrationHttpMethod="POST", + uri=f"arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/{function_arn}/invocations", + credentials=role_arn, + ) + + stage_name = "test-api-stage-name" + aws_client.apigateway.create_deployment(restApiId=api_id, stageName=stage_name) + + # Create event bus + event_bus_name = f"test-bus-{short_uid()}" + events_create_event_bus(Name=event_bus_name) + + # Create rule + rule_name = f"test-rule-{short_uid()}" + event_pattern = {"source": ["test.source"], "detail-type": ["test.detail.type"]} + events_put_rule( + Name=rule_name, + EventBusName=event_bus_name, + EventPattern=json.dumps(event_pattern), + ) + + # Create an IAM Role for EventBridge to invoke API Gateway + assume_role_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "events.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + source_arn = f"arn:aws:execute-api:{region_name}:{account_id}:{api_id}/*/POST/test" + role_name, role_arn = create_role_with_policy( + effect="Allow", + actions="execute-api:Invoke", + assume_policy_doc=json.dumps(assume_role_policy_document), + resource=source_arn, + attach=False, # Since we're using put_role_policy, not attach_role_policy + ) + + # Add the API Gateway as a target with the RoleArn + target_id = f"target-{short_uid()}" + api_target_arn = ( + f"arn:aws:execute-api:{region_name}:{account_id}:{api_id}/{stage_name}/POST/test" + ) + put_targets_response = aws_client.events.put_targets( + Rule=rule_name, + EventBusName=event_bus_name, + Targets=[ + { + "Id": target_id, + "Arn": api_target_arn, + "RoleArn": role_arn, + "Input": json.dumps({"message": "Hello from EventBridge"}), + "RetryPolicy": {"MaximumRetryAttempts": 0}, + } + ], + ) + assert put_targets_response["FailedEntryCount"] == 0 + + ###### + # Test + ###### + event_entry = { + "EventBusName": event_bus_name, + "Source": "test.source", + "DetailType": "test.detail.type", + "Detail": json.dumps({"message": "Hello from EventBridge"}), + } + put_events_response = aws_client.events.put_events(Entries=[event_entry]) + snapshot.match("put_events_response", put_events_response) + assert put_events_response["FailedEntryCount"] == 0 + + # Verify the Lambda invocation + events = retry( + check_expected_lambda_log_events_length, + retries=10, + sleep=10, + sleep_before=10 if is_aws_cloud() else 1, + function_name=function_name, + expected_length=1, + logs_client=aws_client.logs, + ) + snapshot.match("lambda_logs", events) + # TODO assert that the X-Ray trace ID is present in the logs + # TODO how to assert X-Ray trace ID correct propagation + + +# def test_xray_trace_propagation_events_lambda(): From 1865747181a599bd4deb70dbbcd04229980712e3 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 16:09:03 +0200 Subject: [PATCH 04/24] feat: update test --- .../events/test_x_ray_trace_propagation.py | 44 +++++-- ...test_x_ray_trace_propagation.snapshot.json | 107 ++++++++++++++++++ 2 files changed, 139 insertions(+), 12 deletions(-) create mode 100644 tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index 68cb2007aac36..13b3f06947dae 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -1,6 +1,5 @@ import json -from localstack import config from localstack.aws.api.lambda_ import Runtime from localstack.testing.pytest import markers from localstack.utils.strings import short_uid @@ -15,7 +14,10 @@ "Action": "sts:AssumeRole", } } +import re + import pytest +from aws_xray_sdk.core import patch, xray_recorder from localstack.testing.aws.util import is_aws_cloud from tests.aws.services.events.helper_functions import is_old_provider @@ -55,18 +57,19 @@ # sent by `requests` library by default "$..headers.Accept-Encoding", "$..headers.Accept", - ] -) -@markers.snapshot.skip_snapshot_verify( - condition=lambda: not config.APIGW_NEXT_GEN_PROVIDER, - paths=[ - # parity issue from previous APIGW implementation + "$..headers.Host", + "$..multiValueHeaders.Host", + "$..requestContext.apiId", + "$..requestContext.domainName", + "$..requestContext.domainPrefix", + "$..requestContext.requestTime", + "$..requestContext.requestTimeEpoch", + "$..requestContext.resourceId", "$..headers.x-localstack-edge", "$..headers.Connection", "$..headers.Content-Length", "$..headers.accept-encoding", "$..headers.accept", - "$..headers.X-Amzn-Trace-Id", "$..headers.X-Forwarded-Port", "$..headers.X-Forwarded-Proto", "$..pathParameters", @@ -85,7 +88,6 @@ def test_xray_trace_propagation_events_api_gateway( create_rest_apigw, events_create_event_bus, events_put_rule, - events_put_targets, region_name, account_id, snapshot, @@ -191,13 +193,21 @@ def test_xray_trace_propagation_events_api_gateway( ###### # Test ###### + events_client = aws_client.events + + # Enable X-Ray tracing for the aws_client + segment = xray_recorder.begin_segment(name="put_events") + trace_id = segment.trace_id + libraries = ["botocore"] + patch(libraries) + event_entry = { "EventBusName": event_bus_name, "Source": "test.source", "DetailType": "test.detail.type", "Detail": json.dumps({"message": "Hello from EventBridge"}), } - put_events_response = aws_client.events.put_events(Entries=[event_entry]) + put_events_response = events_client.put_events(Entries=[event_entry]) snapshot.match("put_events_response", put_events_response) assert put_events_response["FailedEntryCount"] == 0 @@ -211,9 +221,19 @@ def test_xray_trace_propagation_events_api_gateway( expected_length=1, logs_client=aws_client.logs, ) + + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway + + lambda_trace_header = events[0]["headers"].get("X-Amzn-Trace-Id") + assert lambda_trace_header is not None + lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + assert lambda_trace_id == trace_id + + snapshot.add_transformer( + snapshot.transform.regex(lambda_trace_id, "trace_id_root"), + ) + snapshot.match("lambda_logs", events) - # TODO assert that the X-Ray trace ID is present in the logs - # TODO how to assert X-Ray trace ID correct propagation # def test_xray_trace_propagation_events_lambda(): diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json new file mode 100644 index 0000000000000..e654599c9fe80 --- /dev/null +++ b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json @@ -0,0 +1,107 @@ +{ + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_api_gateway": { + "recorded-date": "07-04-2025, 14:08:19", + "recorded-content": { + "put_events_response": { + "Entries": [ + { + "EventId": "" + } + ], + "FailedEntryCount": 0, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "lambda_logs": [ + { + "headers": { + "User-Agent": "python-requests/2.32.3", + "Accept-Encoding": "gzip, deflate", + "Accept": "*/*", + "Host": "t72nlybios.execute-api.localhost.localstack.cloud", + "Content-Type": "application/json", + "X-Amzn-Trace-Id": "Root=trace_id_root;Parent=1111111111110000;Sampled=0", + "X-Forwarded-For": "127.0.0.1", + "X-Forwarded-Port": "4566", + "X-Forwarded-Proto": "HTTP" + }, + "multiValueHeaders": { + "User-Agent": [ + "python-requests/2.32.3" + ], + "Accept-Encoding": [ + "gzip, deflate" + ], + "Accept": [ + "*/*" + ], + "Host": [ + "t72nlybios.execute-api.localhost.localstack.cloud" + ], + "Content-Type": [ + "application/json" + ], + "X-Amzn-Trace-Id": [ + "Root=trace_id_root;Parent=1111111111110000;Sampled=0" + ], + "X-Forwarded-For": [ + "127.0.0.1" + ], + "X-Forwarded-Port": [ + "4566" + ], + "X-Forwarded-Proto": [ + "HTTP" + ] + }, + "body": { + "message": "Hello from EventBridge" + }, + "isBase64Encoded": false, + "requestContext": { + "accountId": "111111111111", + "apiId": "t72nlybios", + "deploymentId": "wpv6qsijdu", + "domainName": "t72nlybios.execute-api.localhost.localstack.cloud", + "domainPrefix": "t72nlybios", + "extendedRequestId": "897696d2", + "httpMethod": "POST", + "identity": { + "accountId": null, + "accessKey": null, + "caller": null, + "cognitoAuthenticationProvider": null, + "cognitoAuthenticationType": null, + "cognitoIdentityId": null, + "cognitoIdentityPoolId": null, + "principalOrgId": null, + "sourceIp": "127.0.0.1", + "user": null, + "userAgent": "python-requests/2.32.3", + "userArn": null + }, + "path": "/test-api-stage-name/test/", + "protocol": "HTTP/1.1", + "requestId": "", + "requestTime": "07/Apr/2025:16:08:13 ", + "requestTimeEpoch": 1744034893964, + "stage": "test-api-stage-name", + "resourcePath": "/{proxy+}", + "resourceId": "2oz6s7aidy" + }, + "stageVariables": null, + "queryStringParameters": null, + "multiValueQueryStringParameters": null, + "pathParameters": { + "proxy": "test" + }, + "httpMethod": "POST", + "path": "/test/", + "resource": "/{proxy+}" + } + ] + } + } +} From e4e82504be3d7fc8e441ded44628948cab72eef3 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 16:09:19 +0200 Subject: [PATCH 05/24] fix: correctly patch botocore --- .../localstack/services/events/target.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index 3e8572bfb2c41..c469c74b3f920 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -473,7 +473,7 @@ def send_event(self, event): entries[0]["TraceHeader"] = encoded_original_id # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.put_events(Entries=entries) @@ -550,7 +550,7 @@ class FirehoseTargetSender(TargetSender): def send_event(self, event): delivery_stream_name = firehose_name(self.target["Arn"]) # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.put_record( DeliveryStreamName=delivery_stream_name, Record={"Data": to_bytes(to_json_str(event))}, @@ -567,7 +567,7 @@ def send_event(self, event): stream_name = self.target["Arn"].split("/")[-1] partition_key = collections.get_safe(event, partition_key_path, event["id"]) # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.put_record( StreamName=stream_name, Data=to_bytes(to_json_str(event)), @@ -586,7 +586,7 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): def send_event(self, event): # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), @@ -599,7 +599,7 @@ def send_event(self, event): log_group_name = self.target["Arn"].split(":")[6] log_stream_name = str(uuid.uuid4()) # Unique log stream name # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) self.client.put_log_events( logGroupName=log_group_name, @@ -632,7 +632,7 @@ def send_event(self, event): class SnsTargetSender(TargetSender): def send_event(self, event): # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.publish(TopicArn=self.target["Arn"], Message=to_json_str(event)) @@ -642,7 +642,7 @@ def send_event(self, event): msg_group_id = self.target.get("SqsParameters", {}).get("MessageGroupId", None) kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {} # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.send_message( QueueUrl=queue_url, MessageBody=to_json_str(event), @@ -656,7 +656,7 @@ class StatesTargetSender(TargetSender): def send_event(self, event): self.service = "stepfunctions" # Patch the boto3 client to automatically include X-Ray trace headers - patch(self.client) + patch(["botocore"]) self.client.start_execution( stateMachineArn=self.target["Arn"], name=event["id"], input=to_json_str(event) ) From 21c07200022c1684c81a98bf8add1df5ac584738 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 17:15:57 +0200 Subject: [PATCH 06/24] feat: add test xray events lambda --- .../events/test_x_ray_trace_propagation.py | 89 +++++++++++++++++-- ...test_x_ray_trace_propagation.snapshot.json | 11 +++ 2 files changed, 95 insertions(+), 5 deletions(-) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index 13b3f06947dae..5cb440b5eef49 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -21,9 +21,10 @@ from localstack.testing.aws.util import is_aws_cloud from tests.aws.services.events.helper_functions import is_old_provider -from tests.aws.services.lambda_.test_lambda import ( - TEST_LAMBDA_PYTHON_ECHO, -) +from tests.aws.services.events.test_events import TEST_EVENT_DETAIL, TEST_EVENT_PATTERN +from tests.aws.services.lambda_.test_lambda import TEST_LAMBDA_PYTHON_ECHO, TEST_LAMBDA_XRAY_TRACEID + +# currently only API Gateway v2 and Lambda support X-Ray tracing @markers.aws.unknown @@ -216,7 +217,6 @@ def test_xray_trace_propagation_events_api_gateway( check_expected_lambda_log_events_length, retries=10, sleep=10, - sleep_before=10 if is_aws_cloud() else 1, function_name=function_name, expected_length=1, logs_client=aws_client.logs, @@ -236,4 +236,83 @@ def test_xray_trace_propagation_events_api_gateway( snapshot.match("lambda_logs", events) -# def test_xray_trace_propagation_events_lambda(): +@markers.aws.unknown +def test_xray_trace_propagation_events_lambda( + create_lambda_function, + events_create_event_bus, + events_put_rule, + aws_client, + snapshot, +): + function_name = f"lambda-func-{short_uid()}" + create_lambda_response = create_lambda_function( + handler_file=TEST_LAMBDA_XRAY_TRACEID, + func_name=function_name, + runtime=Runtime.python3_12, + ) + lambda_function_arn = create_lambda_response["CreateFunctionResponse"]["FunctionArn"] + + bus_name = f"bus-{short_uid()}" + events_create_event_bus(Name=bus_name) + + rule_name = f"rule-{short_uid()}" + rule_arn = events_put_rule( + Name=rule_name, + EventBusName=bus_name, + EventPattern=json.dumps(TEST_EVENT_PATTERN), + )["RuleArn"] + + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId=f"{rule_name}-Event", + Action="lambda:InvokeFunction", + Principal="events.amazonaws.com", + SourceArn=rule_arn, + ) + + target_id = f"target-{short_uid()}" + aws_client.events.put_targets( + Rule=rule_name, + EventBusName=bus_name, + Targets=[{"Id": target_id, "Arn": lambda_function_arn}], + ) + + # Enable X-Ray tracing for the aws_client + segment = xray_recorder.begin_segment(name="put_events") + trace_id = segment.trace_id + libraries = ["botocore"] + patch(libraries) + + aws_client.events.put_events( + Entries=[ + { + "EventBusName": bus_name, + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(TEST_EVENT_DETAIL), + } + ] + ) + + # Verify the Lambda invocation + events = retry( + check_expected_lambda_log_events_length, + retries=10, + sleep=10, + function_name=function_name, + expected_length=1, + logs_client=aws_client.logs, + ) + + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway + + lambda_trace_header = events[0]["trace_id_inside_handler"] + assert lambda_trace_header is not None + lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + assert lambda_trace_id == trace_id + + snapshot.add_transformer( + snapshot.transform.regex(lambda_trace_id, "trace_id_root"), + ) + + snapshot.match("lambda_logs", events) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json index e654599c9fe80..1079fa83a1343 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json +++ b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json @@ -103,5 +103,16 @@ } ] } + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { + "recorded-date": "07-04-2025, 15:13:15", + "recorded-content": { + "lambda_logs": [ + { + "trace_id_outside_handler": "None", + "trace_id_inside_handler": "Root=trace_id_root;Parent=1111111111110000;Sampled=1" + } + ] + } } } From b29ba1d7e6fce020fe74ad4cf857aa0e8125f186 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 17:16:15 +0200 Subject: [PATCH 07/24] feat: add x_ray lambda to test lambda --- tests/aws/services/lambda_/test_lambda.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/aws/services/lambda_/test_lambda.py b/tests/aws/services/lambda_/test_lambda.py index 700361d32ebbb..205d2e9c1f113 100644 --- a/tests/aws/services/lambda_/test_lambda.py +++ b/tests/aws/services/lambda_/test_lambda.py @@ -128,6 +128,7 @@ ) TEST_LAMBDA_NOTIFIER = os.path.join(THIS_FOLDER, "functions/lambda_notifier.py") TEST_LAMBDA_CLOUDWATCH_LOGS = os.path.join(THIS_FOLDER, "functions/lambda_cloudwatch_logs.py") +TEST_LAMBDA_XRAY_TRACEID = os.path.join(THIS_FOLDER, "functions/xray_tracing_traceid.py") PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED["python"] NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED["nodejs"] From 707d1083d0553a87c3f3aed1c1eb693cec21b470 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 17:16:38 +0200 Subject: [PATCH 08/24] feat: remove xray sdk patch of already patched boto clients --- .../localstack/services/events/target.py | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index c469c74b3f920..e8a3b2d015b6a 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -8,7 +8,6 @@ from urllib.parse import urlencode import requests -from aws_xray_sdk.core import patch from botocore.client import BaseClient from localstack import config @@ -472,9 +471,6 @@ def send_event(self, event): ): entries[0]["TraceHeader"] = encoded_original_id - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) - self.client.put_events(Entries=entries) def _get_source(self, event: FormattedEvent | TransformedEvent) -> str: @@ -549,8 +545,7 @@ def send_event(self, event): class FirehoseTargetSender(TargetSender): def send_event(self, event): delivery_stream_name = firehose_name(self.target["Arn"]) - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) + self.client.put_record( DeliveryStreamName=delivery_stream_name, Record={"Data": to_bytes(to_json_str(event))}, @@ -566,8 +561,7 @@ def send_event(self, event): ) stream_name = self.target["Arn"].split("/")[-1] partition_key = collections.get_safe(event, partition_key_path, event["id"]) - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) + self.client.put_record( StreamName=stream_name, Data=to_bytes(to_json_str(event)), @@ -585,8 +579,6 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): def send_event(self, event): - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), @@ -598,8 +590,7 @@ class LogsTargetSender(TargetSender): def send_event(self, event): log_group_name = self.target["Arn"].split(":")[6] log_stream_name = str(uuid.uuid4()) # Unique log stream name - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) + self.client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) self.client.put_log_events( logGroupName=log_group_name, @@ -631,8 +622,6 @@ def send_event(self, event): class SnsTargetSender(TargetSender): def send_event(self, event): - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) self.client.publish(TopicArn=self.target["Arn"], Message=to_json_str(event)) @@ -641,8 +630,7 @@ def send_event(self, event): queue_url = sqs_queue_url_for_arn(self.target["Arn"]) msg_group_id = self.target.get("SqsParameters", {}).get("MessageGroupId", None) kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {} - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) + self.client.send_message( QueueUrl=queue_url, MessageBody=to_json_str(event), @@ -655,8 +643,7 @@ class StatesTargetSender(TargetSender): def send_event(self, event): self.service = "stepfunctions" - # Patch the boto3 client to automatically include X-Ray trace headers - patch(["botocore"]) + self.client.start_execution( stateMachineArn=self.target["Arn"], name=event["id"], input=to_json_str(event) ) From fdbbeabc804ee2e75687d73c2731f00e0b1d3685 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 17:28:57 +0200 Subject: [PATCH 09/24] fix: skip for v1 --- tests/aws/services/events/test_x_ray_trace_propagation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index 5cb440b5eef49..fa56fe9bbf7f8 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -237,6 +237,10 @@ def test_xray_trace_propagation_events_api_gateway( @markers.aws.unknown +@pytest.mark.skipif( + condition=is_old_provider() and not is_aws_cloud(), + reason="not supported by the old provider", +) def test_xray_trace_propagation_events_lambda( create_lambda_function, events_create_event_bus, From 4e5a7aa4afdde46b133f5a5d9a4c642c088585db Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 18:28:39 +0200 Subject: [PATCH 10/24] feat: template replace parent id --- tests/aws/services/events/test_x_ray_trace_propagation.py | 8 ++++++-- .../events/test_x_ray_trace_propagation.snapshot.json | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index fa56fe9bbf7f8..05c849e010613 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -313,10 +313,14 @@ def test_xray_trace_propagation_events_lambda( lambda_trace_header = events[0]["trace_id_inside_handler"] assert lambda_trace_header is not None lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + lambda_trace_parent = re.search(r"Parent=([^;]+)", lambda_trace_header).group(1) assert lambda_trace_id == trace_id - snapshot.add_transformer( - snapshot.transform.regex(lambda_trace_id, "trace_id_root"), + snapshot.add_transformers_list( + [ + snapshot.transform.regex(lambda_trace_id, "trace_id_root"), + snapshot.transform.regex(lambda_trace_parent, "trace_id_parent"), + ] ) snapshot.match("lambda_logs", events) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json index 1079fa83a1343..2d4394c912136 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json +++ b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json @@ -105,12 +105,12 @@ } }, "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { - "recorded-date": "07-04-2025, 15:13:15", + "recorded-date": "07-04-2025, 16:27:59", "recorded-content": { "lambda_logs": [ { "trace_id_outside_handler": "None", - "trace_id_inside_handler": "Root=trace_id_root;Parent=1111111111110000;Sampled=1" + "trace_id_inside_handler": "Root=trace_id_root;Parent=trace_id_parent;Sampled=1" } ] } From bc1abe9e34676e322c368109aa135a113d4a92e6 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 20:31:55 +0200 Subject: [PATCH 11/24] fix: register boto hook instead of patching all clients --- .../events/test_x_ray_trace_propagation.py | 43 +++++++++++++------ ...test_x_ray_trace_propagation.snapshot.json | 30 ++++++------- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index 05c849e010613..76c3e88fa3760 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -5,6 +5,7 @@ from localstack.utils.strings import short_uid from localstack.utils.sync import retry from localstack.utils.testutil import check_expected_lambda_log_events_length +from localstack.utils.xray.trace_header import TraceHeader APIGATEWAY_ASSUME_ROLE_POLICY = { "Statement": { @@ -17,7 +18,6 @@ import re import pytest -from aws_xray_sdk.core import patch, xray_recorder from localstack.testing.aws.util import is_aws_cloud from tests.aws.services.events.helper_functions import is_old_provider @@ -90,6 +90,7 @@ def test_xray_trace_propagation_events_api_gateway( events_create_event_bus, events_put_rule, region_name, + cleanups, account_id, snapshot, ): @@ -197,10 +198,18 @@ def test_xray_trace_propagation_events_api_gateway( events_client = aws_client.events # Enable X-Ray tracing for the aws_client - segment = xray_recorder.begin_segment(name="put_events") - trace_id = segment.trace_id - libraries = ["botocore"] - patch(libraries) + trace_id = "1-67f4141f-e1cd7672871da115129f8b19" + parent_id = "d0ee9531727135a0" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=1) + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = xray_trace_header.to_header_str() + + event_name = "before-send.events.*" + aws_client.events.meta.events.register(event_name, add_xray_header) + + # make sure the hook gets cleaned up after the test + cleanups.append(lambda: aws_client.events.meta.events.unregister(event_name, add_xray_header)) event_entry = { "EventBusName": event_bus_name, @@ -222,15 +231,16 @@ def test_xray_trace_propagation_events_api_gateway( logs_client=aws_client.logs, ) - # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway if no X-Ray trace id is present in the event lambda_trace_header = events[0]["headers"].get("X-Amzn-Trace-Id") assert lambda_trace_header is not None lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) assert lambda_trace_id == trace_id + lambda_trace_parent = re.search(r"Parent=([^;]+)", lambda_trace_header).group(1) snapshot.add_transformer( - snapshot.transform.regex(lambda_trace_id, "trace_id_root"), + snapshot.transform.regex(lambda_trace_parent, "trace_id_parent"), ) snapshot.match("lambda_logs", events) @@ -245,6 +255,7 @@ def test_xray_trace_propagation_events_lambda( create_lambda_function, events_create_event_bus, events_put_rule, + cleanups, aws_client, snapshot, ): @@ -282,10 +293,17 @@ def test_xray_trace_propagation_events_lambda( ) # Enable X-Ray tracing for the aws_client - segment = xray_recorder.begin_segment(name="put_events") - trace_id = segment.trace_id - libraries = ["botocore"] - patch(libraries) + trace_id = "1-67f4141f-e1cd7672871da115129f8b19" + parent_id = "d0ee9531727135a0" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=1) + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = xray_trace_header.to_header_str() + + event_name = "before-send.events.*" + aws_client.events.meta.events.register(event_name, add_xray_header) + # make sure the hook gets cleaned up after the test + cleanups.append(lambda: aws_client.events.meta.events.unregister(event_name, add_xray_header)) aws_client.events.put_events( Entries=[ @@ -308,7 +326,7 @@ def test_xray_trace_propagation_events_lambda( logs_client=aws_client.logs, ) - # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway if no X-Ray trace id is present in the event lambda_trace_header = events[0]["trace_id_inside_handler"] assert lambda_trace_header is not None @@ -318,7 +336,6 @@ def test_xray_trace_propagation_events_lambda( snapshot.add_transformers_list( [ - snapshot.transform.regex(lambda_trace_id, "trace_id_root"), snapshot.transform.regex(lambda_trace_parent, "trace_id_parent"), ] ) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json index 2d4394c912136..ea250742452c7 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json +++ b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json @@ -1,6 +1,6 @@ { "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_api_gateway": { - "recorded-date": "07-04-2025, 14:08:19", + "recorded-date": "07-04-2025, 18:30:52", "recorded-content": { "put_events_response": { "Entries": [ @@ -20,9 +20,9 @@ "User-Agent": "python-requests/2.32.3", "Accept-Encoding": "gzip, deflate", "Accept": "*/*", - "Host": "t72nlybios.execute-api.localhost.localstack.cloud", + "Host": "7mjwvnuh5e.execute-api.localhost.localstack.cloud", "Content-Type": "application/json", - "X-Amzn-Trace-Id": "Root=trace_id_root;Parent=1111111111110000;Sampled=0", + "X-Amzn-Trace-Id": "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1", "X-Forwarded-For": "127.0.0.1", "X-Forwarded-Port": "4566", "X-Forwarded-Proto": "HTTP" @@ -38,13 +38,13 @@ "*/*" ], "Host": [ - "t72nlybios.execute-api.localhost.localstack.cloud" + "7mjwvnuh5e.execute-api.localhost.localstack.cloud" ], "Content-Type": [ "application/json" ], "X-Amzn-Trace-Id": [ - "Root=trace_id_root;Parent=1111111111110000;Sampled=0" + "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1" ], "X-Forwarded-For": [ "127.0.0.1" @@ -62,11 +62,11 @@ "isBase64Encoded": false, "requestContext": { "accountId": "111111111111", - "apiId": "t72nlybios", - "deploymentId": "wpv6qsijdu", - "domainName": "t72nlybios.execute-api.localhost.localstack.cloud", - "domainPrefix": "t72nlybios", - "extendedRequestId": "897696d2", + "apiId": "7mjwvnuh5e", + "deploymentId": "s3rzz4h271", + "domainName": "7mjwvnuh5e.execute-api.localhost.localstack.cloud", + "domainPrefix": "7mjwvnuh5e", + "extendedRequestId": "5c344ece", "httpMethod": "POST", "identity": { "accountId": null, @@ -85,11 +85,11 @@ "path": "/test-api-stage-name/test/", "protocol": "HTTP/1.1", "requestId": "", - "requestTime": "07/Apr/2025:16:08:13 ", - "requestTimeEpoch": 1744034893964, + "requestTime": "07/Apr/2025:20:30:48 ", + "requestTimeEpoch": 1744050648151, "stage": "test-api-stage-name", "resourcePath": "/{proxy+}", - "resourceId": "2oz6s7aidy" + "resourceId": "ovsfuu76bu" }, "stageVariables": null, "queryStringParameters": null, @@ -105,12 +105,12 @@ } }, "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { - "recorded-date": "07-04-2025, 16:27:59", + "recorded-date": "07-04-2025, 18:22:40", "recorded-content": { "lambda_logs": [ { "trace_id_outside_handler": "None", - "trace_id_inside_handler": "Root=trace_id_root;Parent=trace_id_parent;Sampled=1" + "trace_id_inside_handler": "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1" } ] } From 786f0249d1b5d196265c27e933f54ea1bad578a0 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 20:32:22 +0200 Subject: [PATCH 12/24] fix: instrument lambda boto client call with trace header --- localstack-core/localstack/services/events/target.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index e8a3b2d015b6a..03590241f76e1 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -579,12 +579,23 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): def send_event(self, event): + # instrument boto client to add x-ray trace header + trace_header_str = get_trace_header_str_from_segment() + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = trace_header_str + + event_name = "before-send.lambda.*" + self.client.meta.events.register(event_name, add_xray_header) + self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), InvocationType="Event", ) + self.client.meta.events.unregister(event_name, add_xray_header) + class LogsTargetSender(TargetSender): def send_event(self, event): From 441408aacd87856c8e103afd489fad58b3f81659 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 7 Apr 2025 20:32:35 +0200 Subject: [PATCH 13/24] feat: use to string from TraceHeader --- localstack-core/localstack/services/events/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/localstack-core/localstack/services/events/utils.py b/localstack-core/localstack/services/events/utils.py index 5be463cd457a5..5d6b2614512ab 100644 --- a/localstack-core/localstack/services/events/utils.py +++ b/localstack-core/localstack/services/events/utils.py @@ -318,6 +318,6 @@ def get_trace_header_str_from_segment(): sampled = "1" if segment.sampled else "0" # Format according to X-Ray trace header specification - trace_header_str = f"Root={trace_id};Parent={parent_id};Sampled={sampled}" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=sampled) - return trace_header_str + return xray_trace_header.to_header_str() From 921b10df5fd5f28bb2ee7b30d8f05ee357d9fb23 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Tue, 8 Apr 2025 12:32:58 +0200 Subject: [PATCH 14/24] feat: validate events lambda xray test --- .../events/test_x_ray_trace_propagation.py | 34 +++++++++++++------ ...test_x_ray_trace_propagation.snapshot.json | 4 +-- ...st_x_ray_trace_propagation.validation.json | 5 +++ 3 files changed, 31 insertions(+), 12 deletions(-) create mode 100644 tests/aws/services/events/test_x_ray_trace_propagation.validation.json diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index 76c3e88fa3760..4cbb180d74942 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -1,4 +1,5 @@ import json +import time from localstack.aws.api.lambda_ import Runtime from localstack.testing.pytest import markers @@ -110,13 +111,13 @@ def test_xray_trace_propagation_events_api_gateway( ) resource_id = aws_client.apigateway.create_resource( - restApiId=api_id, parentId=root, pathPart="{proxy+}" + restApiId=api_id, parentId=root, pathPart="test" )["id"] aws_client.apigateway.put_method( restApiId=api_id, resourceId=resource_id, - httpMethod="ANY", + httpMethod="POST", authorizationType="NONE", ) @@ -129,13 +130,23 @@ def test_xray_trace_propagation_events_api_gateway( aws_client.apigateway.put_integration( restApiId=api_id, resourceId=resource_id, - httpMethod="ANY", + httpMethod="POST", type="AWS_PROXY", integrationHttpMethod="POST", - uri=f"arn:aws:apigateway:us-east-1:lambda:path/2015-03-31/functions/{function_arn}/invocations", + uri=f"arn:aws:apigateway:{region_name}:lambda:path/2015-03-31/functions/{function_arn}/invocations", credentials=role_arn, ) + # Give permission to API Gateway to invoke Lambda + source_arn = f"arn:aws:execute-api:{region_name}:{account_id}:{api_id}/*/POST/test" + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId=f"sid-{short_uid()}", + Action="lambda:InvokeFunction", + Principal="apigateway.amazonaws.com", + SourceArn=source_arn, + ) + stage_name = "test-api-stage-name" aws_client.apigateway.create_deployment(restApiId=api_id, stageName=stage_name) @@ -163,7 +174,7 @@ def test_xray_trace_propagation_events_api_gateway( } ], } - source_arn = f"arn:aws:execute-api:{region_name}:{account_id}:{api_id}/*/POST/test" + role_name, role_arn = create_role_with_policy( effect="Allow", actions="execute-api:Invoke", @@ -172,6 +183,10 @@ def test_xray_trace_propagation_events_api_gateway( attach=False, # Since we're using put_role_policy, not attach_role_policy ) + # Allow some time for IAM role propagation (only needed in AWS) + if is_aws_cloud(): + time.sleep(10) + # Add the API Gateway as a target with the RoleArn target_id = f"target-{short_uid()}" api_target_arn = ( @@ -195,8 +210,6 @@ def test_xray_trace_propagation_events_api_gateway( ###### # Test ###### - events_client = aws_client.events - # Enable X-Ray tracing for the aws_client trace_id = "1-67f4141f-e1cd7672871da115129f8b19" parent_id = "d0ee9531727135a0" @@ -217,7 +230,7 @@ def add_xray_header(request, **kwargs): "DetailType": "test.detail.type", "Detail": json.dumps({"message": "Hello from EventBridge"}), } - put_events_response = events_client.put_events(Entries=[event_entry]) + put_events_response = aws_client.events.put_events(Entries=[event_entry]) snapshot.match("put_events_response", put_events_response) assert put_events_response["FailedEntryCount"] == 0 @@ -226,6 +239,7 @@ def add_xray_header(request, **kwargs): check_expected_lambda_log_events_length, retries=10, sleep=10, + sleep_before=10 if is_aws_cloud() else 1, function_name=function_name, expected_length=1, logs_client=aws_client.logs, @@ -246,9 +260,9 @@ def add_xray_header(request, **kwargs): snapshot.match("lambda_logs", events) -@markers.aws.unknown +@markers.aws.validated @pytest.mark.skipif( - condition=is_old_provider() and not is_aws_cloud(), + condition=is_old_provider(), reason="not supported by the old provider", ) def test_xray_trace_propagation_events_lambda( diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json index ea250742452c7..e721a918afe9a 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json +++ b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json @@ -105,12 +105,12 @@ } }, "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { - "recorded-date": "07-04-2025, 18:22:40", + "recorded-date": "08-04-2025, 10:31:19", "recorded-content": { "lambda_logs": [ { "trace_id_outside_handler": "None", - "trace_id_inside_handler": "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1" + "trace_id_inside_handler": "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1;Lineage=1:c88b76bb:0" } ] } diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.validation.json b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json new file mode 100644 index 0000000000000..5bab52e5db048 --- /dev/null +++ b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json @@ -0,0 +1,5 @@ +{ + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { + "last_validated_date": "2025-04-08T10:31:19+00:00" + } +} From 4a83ca86b5bbc7c3320fdf3e964870dcba33bcc1 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Tue, 8 Apr 2025 12:59:35 +0200 Subject: [PATCH 15/24] feat: validate events api gateway snapshot --- .../events/test_x_ray_trace_propagation.py | 91 ++------------ ...test_x_ray_trace_propagation.snapshot.json | 118 ------------------ ...st_x_ray_trace_propagation.validation.json | 5 +- 3 files changed, 13 insertions(+), 201 deletions(-) delete mode 100644 tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index 4cbb180d74942..af098d3129cbf 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -7,6 +7,7 @@ from localstack.utils.sync import retry from localstack.utils.testutil import check_expected_lambda_log_events_length from localstack.utils.xray.trace_header import TraceHeader +from tests.aws.services.lambda_.test_lambda import TEST_LAMBDA_AWS_PROXY_FORMAT APIGATEWAY_ASSUME_ROLE_POLICY = { "Statement": { @@ -23,66 +24,16 @@ from localstack.testing.aws.util import is_aws_cloud from tests.aws.services.events.helper_functions import is_old_provider from tests.aws.services.events.test_events import TEST_EVENT_DETAIL, TEST_EVENT_PATTERN -from tests.aws.services.lambda_.test_lambda import TEST_LAMBDA_PYTHON_ECHO, TEST_LAMBDA_XRAY_TRACEID +from tests.aws.services.lambda_.test_lambda import TEST_LAMBDA_XRAY_TRACEID # currently only API Gateway v2 and Lambda support X-Ray tracing @markers.aws.unknown @pytest.mark.skipif( - condition=is_old_provider() and not is_aws_cloud(), + condition=is_old_provider(), reason="not supported by the old provider", ) -@markers.snapshot.skip_snapshot_verify( - paths=[ - # TODO: those headers are sent by Events via the SDK, we should at least populate X-Amz-Source-Account - # and X-Amz-Source-Arn - "$..headers.amz-sdk-invocation-id", - "$..headers.amz-sdk-request", - "$..headers.amz-sdk-retry", - "$..headers.X-Amz-Security-Token", - "$..headers.X-Amz-Source-Account", - "$..headers.X-Amz-Source-Arn", - # seems like this one can vary in casing between runs? - "$..headers.x-amz-date", - "$..headers.X-Amz-Date", - # those headers are missing in API Gateway - "$..headers.CloudFront-Forwarded-Proto", - "$..headers.CloudFront-Is-Desktop-Viewer", - "$..headers.CloudFront-Is-Mobile-Viewer", - "$..headers.CloudFront-Is-SmartTV-Viewer", - "$..headers.CloudFront-Is-Tablet-Viewer", - "$..headers.CloudFront-Viewer-ASN", - "$..headers.CloudFront-Viewer-Country", - "$..headers.X-Amz-Cf-Id", - "$..headers.Via", - # sent by `requests` library by default - "$..headers.Accept-Encoding", - "$..headers.Accept", - "$..headers.Host", - "$..multiValueHeaders.Host", - "$..requestContext.apiId", - "$..requestContext.domainName", - "$..requestContext.domainPrefix", - "$..requestContext.requestTime", - "$..requestContext.requestTimeEpoch", - "$..requestContext.resourceId", - "$..headers.x-localstack-edge", - "$..headers.Connection", - "$..headers.Content-Length", - "$..headers.accept-encoding", - "$..headers.accept", - "$..headers.X-Forwarded-Port", - "$..headers.X-Forwarded-Proto", - "$..pathParameters", - "$..requestContext.authorizer", - "$..requestContext.deploymentId", - "$..requestContext.extendedRequestId", - "$..requestContext.identity", - "$..requestContext.requestId", - "$..stageVariables", - ], -) def test_xray_trace_propagation_events_api_gateway( aws_client, create_role_with_policy, @@ -93,25 +44,25 @@ def test_xray_trace_propagation_events_api_gateway( region_name, cleanups, account_id, - snapshot, ): # create lambda function_name = f"test-function-{short_uid()}" function_arn = create_lambda_function( - handler_file=TEST_LAMBDA_PYTHON_ECHO, func_name=function_name, + handler_file=TEST_LAMBDA_AWS_PROXY_FORMAT, + handler="lambda_aws_proxy_format.handler", runtime=Runtime.python3_12, )["CreateFunctionResponse"]["FunctionArn"] # create api gateway with lambda integration # create rest api - api_id, api_name, root = create_rest_apigw( + api_id, api_name, root_id = create_rest_apigw( name=f"test-api-{short_uid()}", - description="Integration test API", + description="Test Integration with EventBridge X-Ray", ) resource_id = aws_client.apigateway.create_resource( - restApiId=api_id, parentId=root, pathPart="test" + restApiId=api_id, parentId=root_id, pathPart="test" )["id"] aws_client.apigateway.put_method( @@ -121,11 +72,6 @@ def test_xray_trace_propagation_events_api_gateway( authorizationType="NONE", ) - # create role with policy - _, role_arn = create_role_with_policy( - "Allow", "lambda:InvokeFunction", json.dumps(APIGATEWAY_ASSUME_ROLE_POLICY), "*" - ) - # Lambda AWS_PROXY integration aws_client.apigateway.put_integration( restApiId=api_id, @@ -134,7 +80,6 @@ def test_xray_trace_propagation_events_api_gateway( type="AWS_PROXY", integrationHttpMethod="POST", uri=f"arn:aws:apigateway:{region_name}:lambda:path/2015-03-31/functions/{function_arn}/invocations", - credentials=role_arn, ) # Give permission to API Gateway to invoke Lambda @@ -147,7 +92,7 @@ def test_xray_trace_propagation_events_api_gateway( SourceArn=source_arn, ) - stage_name = "test-api-stage-name" + stage_name = "test" aws_client.apigateway.create_deployment(restApiId=api_id, stageName=stage_name) # Create event bus @@ -231,7 +176,6 @@ def add_xray_header(request, **kwargs): "Detail": json.dumps({"message": "Hello from EventBridge"}), } put_events_response = aws_client.events.put_events(Entries=[event_entry]) - snapshot.match("put_events_response", put_events_response) assert put_events_response["FailedEntryCount"] == 0 # Verify the Lambda invocation @@ -251,13 +195,6 @@ def add_xray_header(request, **kwargs): assert lambda_trace_header is not None lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) assert lambda_trace_id == trace_id - lambda_trace_parent = re.search(r"Parent=([^;]+)", lambda_trace_header).group(1) - - snapshot.add_transformer( - snapshot.transform.regex(lambda_trace_parent, "trace_id_parent"), - ) - - snapshot.match("lambda_logs", events) @markers.aws.validated @@ -271,7 +208,6 @@ def test_xray_trace_propagation_events_lambda( events_put_rule, cleanups, aws_client, - snapshot, ): function_name = f"lambda-func-{short_uid()}" create_lambda_response = create_lambda_function( @@ -345,13 +281,4 @@ def add_xray_header(request, **kwargs): lambda_trace_header = events[0]["trace_id_inside_handler"] assert lambda_trace_header is not None lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) - lambda_trace_parent = re.search(r"Parent=([^;]+)", lambda_trace_header).group(1) assert lambda_trace_id == trace_id - - snapshot.add_transformers_list( - [ - snapshot.transform.regex(lambda_trace_parent, "trace_id_parent"), - ] - ) - - snapshot.match("lambda_logs", events) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json b/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json deleted file mode 100644 index e721a918afe9a..0000000000000 --- a/tests/aws/services/events/test_x_ray_trace_propagation.snapshot.json +++ /dev/null @@ -1,118 +0,0 @@ -{ - "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_api_gateway": { - "recorded-date": "07-04-2025, 18:30:52", - "recorded-content": { - "put_events_response": { - "Entries": [ - { - "EventId": "" - } - ], - "FailedEntryCount": 0, - "ResponseMetadata": { - "HTTPHeaders": {}, - "HTTPStatusCode": 200 - } - }, - "lambda_logs": [ - { - "headers": { - "User-Agent": "python-requests/2.32.3", - "Accept-Encoding": "gzip, deflate", - "Accept": "*/*", - "Host": "7mjwvnuh5e.execute-api.localhost.localstack.cloud", - "Content-Type": "application/json", - "X-Amzn-Trace-Id": "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1", - "X-Forwarded-For": "127.0.0.1", - "X-Forwarded-Port": "4566", - "X-Forwarded-Proto": "HTTP" - }, - "multiValueHeaders": { - "User-Agent": [ - "python-requests/2.32.3" - ], - "Accept-Encoding": [ - "gzip, deflate" - ], - "Accept": [ - "*/*" - ], - "Host": [ - "7mjwvnuh5e.execute-api.localhost.localstack.cloud" - ], - "Content-Type": [ - "application/json" - ], - "X-Amzn-Trace-Id": [ - "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1" - ], - "X-Forwarded-For": [ - "127.0.0.1" - ], - "X-Forwarded-Port": [ - "4566" - ], - "X-Forwarded-Proto": [ - "HTTP" - ] - }, - "body": { - "message": "Hello from EventBridge" - }, - "isBase64Encoded": false, - "requestContext": { - "accountId": "111111111111", - "apiId": "7mjwvnuh5e", - "deploymentId": "s3rzz4h271", - "domainName": "7mjwvnuh5e.execute-api.localhost.localstack.cloud", - "domainPrefix": "7mjwvnuh5e", - "extendedRequestId": "5c344ece", - "httpMethod": "POST", - "identity": { - "accountId": null, - "accessKey": null, - "caller": null, - "cognitoAuthenticationProvider": null, - "cognitoAuthenticationType": null, - "cognitoIdentityId": null, - "cognitoIdentityPoolId": null, - "principalOrgId": null, - "sourceIp": "127.0.0.1", - "user": null, - "userAgent": "python-requests/2.32.3", - "userArn": null - }, - "path": "/test-api-stage-name/test/", - "protocol": "HTTP/1.1", - "requestId": "", - "requestTime": "07/Apr/2025:20:30:48 ", - "requestTimeEpoch": 1744050648151, - "stage": "test-api-stage-name", - "resourcePath": "/{proxy+}", - "resourceId": "ovsfuu76bu" - }, - "stageVariables": null, - "queryStringParameters": null, - "multiValueQueryStringParameters": null, - "pathParameters": { - "proxy": "test" - }, - "httpMethod": "POST", - "path": "/test/", - "resource": "/{proxy+}" - } - ] - } - }, - "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { - "recorded-date": "08-04-2025, 10:31:19", - "recorded-content": { - "lambda_logs": [ - { - "trace_id_outside_handler": "None", - "trace_id_inside_handler": "Root=1-67f4141f-e1cd7672871da115129f8b19;Parent=trace_id_parent;Sampled=1;Lineage=1:c88b76bb:0" - } - ] - } - } -} diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.validation.json b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json index 5bab52e5db048..85732eaca9eff 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.validation.json +++ b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json @@ -1,5 +1,8 @@ { + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_api_gateway": { + "last_validated_date": "2025-04-08T10:51:26+00:00" + }, "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { - "last_validated_date": "2025-04-08T10:31:19+00:00" + "last_validated_date": "2025-04-08T10:46:50+00:00" } } From cfa03365ab552ce0ffd2781422d4393ef03e95c9 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Wed, 9 Apr 2025 18:33:05 +0200 Subject: [PATCH 16/24] feat: switch to using x-ray trace header variable --- .../localstack/services/events/provider.py | 14 +++--- .../localstack/services/events/target.py | 44 ++++++++++--------- .../localstack/services/events/utils.py | 28 ------------ 3 files changed, 30 insertions(+), 56 deletions(-) diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index d618bc69020c2..bfa52f74a8284 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -157,7 +157,6 @@ ) from localstack.services.events.utils import ( TARGET_ID_PATTERN, - create_segment_from_trace_header, extract_connection_name, extract_event_bus_name, extract_region_and_account_id, @@ -172,6 +171,7 @@ from localstack.utils.event_matcher import matches_event from localstack.utils.strings import long_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp +from localstack.utils.xray.trace_header import TraceHeader from .analytics import InvocationStatus, rule_invocation @@ -1816,9 +1816,6 @@ def _process_entry( region, account_id = extract_region_and_account_id(event_bus_name_or_arn, context) - # Set x-ray segment from trace header - create_segment_from_trace_header(context.trace_context["aws_trace_header"]) - # TODO check interference with x-ray trace header if encoded_trace_header := get_trace_header_encoded_region_account( entry, context.region, context.account_id, region, account_id @@ -1843,14 +1840,16 @@ def _process_entry( ) return - self._proxy_capture_input_event(event_formatted) + trace_header = context.trace_context["aws_trace_header"] + + self._proxy_capture_input_event(event_formatted, trace_header) # Always add the successful EventId entry, even if target processing might fail processed_entries.append({"EventId": event_formatted["id"]}) if configured_rules := list(event_bus.rules.values()): for rule in configured_rules: - self._process_rules(rule, region, account_id, event_formatted) + self._process_rules(rule, region, account_id, event_formatted, trace_header) else: LOG.info( json.dumps( @@ -1871,6 +1870,7 @@ def _process_rules( region: str, account_id: str, event_formatted: FormattedEvent, + trace_header: TraceHeader, ) -> None: """Process rules for an event. Note that we no longer handle entries here as AWS returns success regardless of target failures.""" event_pattern = rule.event_pattern @@ -1900,7 +1900,7 @@ def _process_rules( target_unique_id = f"{rule.arn}-{target_id}" target_sender = self._target_sender_store[target_unique_id] try: - target_sender.process_event(event_formatted.copy()) + target_sender.process_event(event_formatted.copy(), trace_header) rule_invocation.labels( status=InvocationStatus.success, service=target_sender.service, diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index 03590241f76e1..d9a0fa744a9d7 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -48,6 +48,7 @@ from localstack.utils.json import extract_jsonpath from localstack.utils.strings import to_bytes from localstack.utils.time import now_utc +from localstack.utils.xray.trace_header import TraceHeader LOG = logging.getLogger(__name__) @@ -196,10 +197,10 @@ def client(self): return self._client @abstractmethod - def send_event(self, event: FormattedEvent | TransformedEvent): + def send_event(self, event: FormattedEvent | TransformedEvent, trace_header: TraceHeader): pass - def process_event(self, event: FormattedEvent): + def process_event(self, event: FormattedEvent, trace_header: TraceHeader): """Processes the event and send it to the target.""" if input_ := self.target.get("Input"): event = json.loads(input_) @@ -211,7 +212,7 @@ def process_event(self, event: FormattedEvent): if input_transformer := self.target.get("InputTransformer"): event = self.transform_event_with_target_input_transformer(input_transformer, event) if event: - self.send_event(event) + self.send_event(event, trace_header) else: LOG.info("No event to send to target %s", self.target.get("Id")) @@ -319,7 +320,7 @@ class ApiGatewayTargetSender(TargetSender): ALLOWED_HTTP_METHODS = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"} - def send_event(self, event): + def send_event(self, event, trace_header): # Parse the ARN to extract api_id, stage_name, http_method, and resource path # Example ARN: arn:{partition}:execute-api:{region}:{account_id}:{api_id}/{stage_name}/{method}/{resource_path} arn_parts = parse_arn(self.target["Arn"]) @@ -386,8 +387,7 @@ def send_event(self, event): # Serialize the event, converting datetime objects to strings event_json = json.dumps(event, default=str) - trace_header_str = get_trace_header_str_from_segment() - headers["X-Amzn-Trace-Id"] = trace_header_str + headers["X-Amzn-Trace-Id"] = trace_header.to_header_str() # Send the HTTP request response = requests.request( @@ -421,12 +421,12 @@ def _get_predefined_template_replacements(self, event: Dict[str, Any]) -> Dict[s class AppSyncTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("AppSync target is not yet implemented") class BatchTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Batch target is not yet implemented") def _validate_input(self, target: Target): @@ -439,7 +439,7 @@ def _validate_input(self, target: Target): class ECSTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("ECS target is a pro feature, please use LocalStack Pro") def _validate_input(self, target: Target): @@ -450,7 +450,7 @@ def _validate_input(self, target: Target): class EventsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): # TODO add validation and tests for eventbridge to eventbridge requires Detail, DetailType, and Source # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/events/client/put_events.html source = self._get_source(event) @@ -471,6 +471,7 @@ def send_event(self, event): ): entries[0]["TraceHeader"] = encoded_original_id + # TODO instrument to send x-ray trace header self.client.put_events(Entries=entries) def _get_source(self, event: FormattedEvent | TransformedEvent) -> str: @@ -493,7 +494,7 @@ def _get_resources(self, event: FormattedEvent | TransformedEvent) -> list[str]: class EventsApiDestinationTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): """Send an event to an EventBridge API destination See https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-api-destinations.html""" target_arn = self.target["Arn"] @@ -543,7 +544,7 @@ def send_event(self, event): class FirehoseTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): delivery_stream_name = firehose_name(self.target["Arn"]) self.client.put_record( @@ -553,7 +554,7 @@ def send_event(self, event): class KinesisTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): partition_key_path = collections.get_safe( self.target, "$.KinesisParameters.PartitionKeyPath", @@ -578,7 +579,7 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): # instrument boto client to add x-ray trace header trace_header_str = get_trace_header_str_from_segment() @@ -588,6 +589,7 @@ def add_xray_header(request, **kwargs): event_name = "before-send.lambda.*" self.client.meta.events.register(event_name, add_xray_header) + # TODO instrument to send x-ray trace header self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), @@ -598,7 +600,7 @@ def add_xray_header(request, **kwargs): class LogsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): log_group_name = self.target["Arn"].split(":")[6] log_stream_name = str(uuid.uuid4()) # Unique log stream name @@ -616,7 +618,7 @@ def send_event(self, event): class RedshiftTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Redshift target is not yet implemented") def _validate_input(self, target: Target): @@ -627,17 +629,17 @@ def _validate_input(self, target: Target): class SagemakerTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Sagemaker target is not yet implemented") class SnsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): self.client.publish(TopicArn=self.target["Arn"], Message=to_json_str(event)) class SqsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): queue_url = sqs_queue_url_for_arn(self.target["Arn"]) msg_group_id = self.target.get("SqsParameters", {}).get("MessageGroupId", None) kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {} @@ -652,7 +654,7 @@ def send_event(self, event): class StatesTargetSender(TargetSender): """Step Functions Target Sender""" - def send_event(self, event): + def send_event(self, event, trace_header): self.service = "stepfunctions" self.client.start_execution( @@ -669,7 +671,7 @@ def _validate_input(self, target: Target): class SystemsManagerSender(TargetSender): """EC2 Run Command Target Sender""" - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Systems Manager target is not yet implemented") def _validate_input(self, target: Target): diff --git a/localstack-core/localstack/services/events/utils.py b/localstack-core/localstack/services/events/utils.py index 5d6b2614512ab..36258ac668acb 100644 --- a/localstack-core/localstack/services/events/utils.py +++ b/localstack-core/localstack/services/events/utils.py @@ -4,7 +4,6 @@ from datetime import datetime, timezone from typing import Any, Dict, Optional -from aws_xray_sdk.core import xray_recorder from botocore.utils import ArnParser from localstack.aws.api import RequestContext @@ -28,7 +27,6 @@ ) from localstack.utils.aws.arns import ARN_PARTITION_REGEX, parse_arn from localstack.utils.strings import long_uid -from localstack.utils.xray.trace_header import TraceHeader LOG = logging.getLogger(__name__) @@ -295,29 +293,3 @@ def is_nested_in_string(template: str, match: re.Match[str]) -> bool: return False return left_quote != -1 - - -def create_segment_from_trace_header(trace_header: TraceHeader) -> None: - segment = xray_recorder.begin_segment(name="events.put_events") - segment.trace_id = trace_header.root - segment.parent_id = trace_header.parent - if trace_header.sampled == "1": - segment.sampled = True - elif trace_header.sampled == "0": - segment.sampled = False - - -def get_trace_header_str_from_segment(): - # Get the current segment - segment = xray_recorder.current_segment() - - if segment: - # Construct the trace header manually - trace_id = segment.trace_id - parent_id = segment.id # Use the segment's own ID as the parent for downstream calls - sampled = "1" if segment.sampled else "0" - - # Format according to X-Ray trace header specification - xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=sampled) - - return xray_trace_header.to_header_str() From b3d5224634fd6e1c366ac6aadfb4fdd5f98755c6 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Wed, 9 Apr 2025 19:12:18 +0200 Subject: [PATCH 17/24] feat: use custom boto hook to inject trace header --- .../localstack/services/events/provider.py | 2 +- .../localstack/services/events/target.py | 26 +++++++------------ 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index bfa52f74a8284..f6bb1be23d021 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -1860,7 +1860,7 @@ def _process_entry( ) ) - def _proxy_capture_input_event(self, event: FormattedEvent) -> None: + def _proxy_capture_input_event(self, event: FormattedEvent, trace_header: TraceHeader) -> None: # only required for eventstudio to capture input event if no rule is configured pass diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index d9a0fa744a9d7..cde26b6921ebb 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -28,7 +28,6 @@ from localstack.services.events.utils import ( event_time_to_time_string, get_trace_header_encoded_region_account, - get_trace_header_str_from_segment, is_nested_in_string, to_json_str, ) @@ -261,6 +260,7 @@ def _initialize_client(self) -> BaseClient: client = client.request_metadata( service_principal=service_principal, source_arn=self.rule_arn ) + self._register_client_hooks() return client def _validate_input_transformer(self, input_transformer: InputTransformer): @@ -291,6 +291,13 @@ def _get_predefined_template_replacements(self, event: FormattedEvent) -> dict[s return predefined_template_replacements + def _register_client_hooks(self): + def handle_inject_headers(params, context, **kwargs): + if trace_header := context.trace_context["aws_trace_header"]: + params["headers"]["X-Amzn-Trace-Id"] = trace_header.to_header_str() + + self._client.register(f"before-call.{self.service}.*", handle_inject_headers) + TargetSenderDict = dict[str, TargetSender] # rule_arn-target_id as global unique id @@ -387,6 +394,7 @@ def send_event(self, event, trace_header): # Serialize the event, converting datetime objects to strings event_json = json.dumps(event, default=str) + # Add trace header headers["X-Amzn-Trace-Id"] = trace_header.to_header_str() # Send the HTTP request @@ -471,7 +479,6 @@ def send_event(self, event, trace_header): ): entries[0]["TraceHeader"] = encoded_original_id - # TODO instrument to send x-ray trace header self.client.put_events(Entries=entries) def _get_source(self, event: FormattedEvent | TransformedEvent) -> str: @@ -529,8 +536,7 @@ def send_event(self, event, trace_header): endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event) # add trace header - trace_header_str = get_trace_header_str_from_segment() - headers["X-Amzn-Trace-Id"] = trace_header_str + headers["X-Amzn-Trace-Id"] = trace_header.to_header_str() result = requests.request( method=method, url=endpoint, data=json.dumps(event or {}), headers=headers @@ -580,24 +586,12 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): def send_event(self, event, trace_header): - # instrument boto client to add x-ray trace header - trace_header_str = get_trace_header_str_from_segment() - - def add_xray_header(request, **kwargs): - request.headers["X-Amzn-Trace-Id"] = trace_header_str - - event_name = "before-send.lambda.*" - self.client.meta.events.register(event_name, add_xray_header) - - # TODO instrument to send x-ray trace header self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), InvocationType="Event", ) - self.client.meta.events.unregister(event_name, add_xray_header) - class LogsTargetSender(TargetSender): def send_event(self, event, trace_header): From 2449ff501c329f28bf30a2a63f275b9908500d99 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Thu, 10 Apr 2025 11:59:43 +0200 Subject: [PATCH 18/24] fix: use custom input parameter and transfer to header during call --- .../localstack/services/events/target.py | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index cde26b6921ebb..9d9e651f3baa1 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -64,6 +64,7 @@ ) TRANSFORMER_PLACEHOLDER_PATTERN = re.compile(r"<(.*?)>") +TRACE_HEADER_KEY = "X-Amzn-Trace-Id" def transform_event_with_target_input_path( @@ -260,7 +261,7 @@ def _initialize_client(self) -> BaseClient: client = client.request_metadata( service_principal=service_principal, source_arn=self.rule_arn ) - self._register_client_hooks() + self._register_client_hooks(client) return client def _validate_input_transformer(self, input_transformer: InputTransformer): @@ -291,12 +292,23 @@ def _get_predefined_template_replacements(self, event: FormattedEvent) -> dict[s return predefined_template_replacements - def _register_client_hooks(self): + def _register_client_hooks(self, client: BaseClient): + """Register client hooks to inject trace header into requests.""" + + def handle_extract_params(params, context, **kwargs): + trace_header = params.pop("TraceHeader", None) + if trace_header is None: + return + context[TRACE_HEADER_KEY] = trace_header.to_header_str() + def handle_inject_headers(params, context, **kwargs): - if trace_header := context.trace_context["aws_trace_header"]: - params["headers"]["X-Amzn-Trace-Id"] = trace_header.to_header_str() + if trace_header_str := context.pop(TRACE_HEADER_KEY, None): + params["headers"][TRACE_HEADER_KEY] = trace_header_str - self._client.register(f"before-call.{self.service}.*", handle_inject_headers) + client.meta.events.register( + f"provide-client-params.{self.service}.*", handle_extract_params + ) + client.meta.events.register(f"before-call.{self.service}.*", handle_inject_headers) TargetSenderDict = dict[str, TargetSender] # rule_arn-target_id as global unique id @@ -395,7 +407,7 @@ def send_event(self, event, trace_header): event_json = json.dumps(event, default=str) # Add trace header - headers["X-Amzn-Trace-Id"] = trace_header.to_header_str() + headers[TRACE_HEADER_KEY] = trace_header.to_header_str() # Send the HTTP request response = requests.request( @@ -479,7 +491,7 @@ def send_event(self, event, trace_header): ): entries[0]["TraceHeader"] = encoded_original_id - self.client.put_events(Entries=entries) + self.client.put_events(Entries=entries, TraceHeader=trace_header) def _get_source(self, event: FormattedEvent | TransformedEvent) -> str: if isinstance(event, dict) and (source := event.get("source")): @@ -536,7 +548,7 @@ def send_event(self, event, trace_header): endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event) # add trace header - headers["X-Amzn-Trace-Id"] = trace_header.to_header_str() + headers[TRACE_HEADER_KEY] = trace_header.to_header_str() result = requests.request( method=method, url=endpoint, data=json.dumps(event or {}), headers=headers @@ -590,6 +602,7 @@ def send_event(self, event, trace_header): FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), InvocationType="Event", + TraceHeader=trace_header, ) From 7dfd36d1f182d93896a3144d91a879eb1bfe7d31 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Thu, 10 Apr 2025 12:14:32 +0200 Subject: [PATCH 19/24] feat: add test xray propagation event bridge to event bridge --- .../events/test_x_ray_trace_propagation.py | 152 +++++++++++++++++- ...st_x_ray_trace_propagation.validation.json | 9 ++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index af098d3129cbf..aa8cfd4a58da3 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -276,7 +276,157 @@ def add_xray_header(request, **kwargs): logs_client=aws_client.logs, ) - # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway if no X-Ray trace id is present in the event + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api lambda if no X-Ray trace id is present in the event + + lambda_trace_header = events[0]["trace_id_inside_handler"] + assert lambda_trace_header is not None + lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + assert lambda_trace_id == trace_id + + +@markers.aws.validated +@pytest.mark.parametrize( + "bus_combination", [("default", "custom"), ("custom", "custom"), ("custom", "default")] +) +@pytest.mark.skipif( + condition=is_old_provider(), + reason="not supported by the old provider", +) +def test_xray_trace_propagation_events_events( + bus_combination, + create_lambda_function, + events_create_event_bus, + create_role_event_bus_source_to_bus_target, + region_name, + account_id, + events_put_rule, + cleanups, + aws_client, +): + """ + Event Bridge Bus Source to Event Bridge Bus Target to Lambda for asserting X-Ray trace propagation + """ + # Create event buses + bus_source, bus_target = bus_combination + if bus_source == "default": + bus_name_source = "default" + if bus_source == "custom": + bus_name_source = f"test-event-bus-source-{short_uid()}" + events_create_event_bus(Name=bus_name_source) + if bus_target == "default": + bus_name_target = "default" + bus_arn_target = f"arn:aws:events:{region_name}:{account_id}:event-bus/default" + if bus_target == "custom": + bus_name_target = f"test-event-bus-target-{short_uid()}" + bus_arn_target = events_create_event_bus(Name=bus_name_target)["EventBusArn"] + + # Create permission for event bus source to send events to event bus target + role_arn_bus_source_to_bus_target = create_role_event_bus_source_to_bus_target() + + if is_aws_cloud(): + time.sleep(10) # required for role propagation + + # Permission for event bus target to receive events from event bus source + aws_client.events.put_permission( + StatementId=f"TargetEventBusAccessPermission{short_uid()}", + EventBusName=bus_name_target, + Action="events:PutEvents", + Principal="*", + ) + + # Create rule source event bus to target + rule_name_source_to_target = f"test-rule-source-to-target-{short_uid()}" + events_put_rule( + Name=rule_name_source_to_target, + EventBusName=bus_name_source, + EventPattern=json.dumps(TEST_EVENT_PATTERN), + ) + + # Add target event bus as target + target_id_event_bus_target = f"test-target-source-events-{short_uid()}" + aws_client.events.put_targets( + Rule=rule_name_source_to_target, + EventBusName=bus_name_source, + Targets=[ + { + "Id": target_id_event_bus_target, + "Arn": bus_arn_target, + "RoleArn": role_arn_bus_source_to_bus_target, + } + ], + ) + + # Create Lambda function + function_name = f"lambda-func-{short_uid()}" + create_lambda_response = create_lambda_function( + handler_file=TEST_LAMBDA_XRAY_TRACEID, + func_name=function_name, + runtime=Runtime.python3_12, + ) + lambda_function_arn = create_lambda_response["CreateFunctionResponse"]["FunctionArn"] + + # Connect Event Bus Target to Lambda + rule_name_lambda = f"rule-{short_uid()}" + rule_arn_lambda = events_put_rule( + Name=rule_name_lambda, + EventBusName=bus_name_target, + EventPattern=json.dumps(TEST_EVENT_PATTERN), + )["RuleArn"] + + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId=f"{rule_name_lambda}-Event", + Action="lambda:InvokeFunction", + Principal="events.amazonaws.com", + SourceArn=rule_arn_lambda, + ) + + target_id_lambda = f"target-{short_uid()}" + aws_client.events.put_targets( + Rule=rule_name_lambda, + EventBusName=bus_name_target, + Targets=[{"Id": target_id_lambda, "Arn": lambda_function_arn}], + ) + + ###### + # Test + ###### + + # Enable X-Ray tracing for the aws_client + trace_id = "1-67f4141f-e1cd7672871da115129f8b19" + parent_id = "d0ee9531727135a0" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=1) + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = xray_trace_header.to_header_str() + + event_name = "before-send.events.*" + aws_client.events.meta.events.register(event_name, add_xray_header) + # make sure the hook gets cleaned up after the test + cleanups.append(lambda: aws_client.events.meta.events.unregister(event_name, add_xray_header)) + + aws_client.events.put_events( + Entries=[ + { + "EventBusName": bus_name_source, + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(TEST_EVENT_DETAIL), + } + ] + ) + + # Verify the Lambda invocation + events = retry( + check_expected_lambda_log_events_length, + retries=10, + sleep=10, + function_name=function_name, + expected_length=1, + logs_client=aws_client.logs, + ) + + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to eventbridge lambda if no X-Ray trace id is present in the event lambda_trace_header = events[0]["trace_id_inside_handler"] assert lambda_trace_header is not None diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.validation.json b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json index 85732eaca9eff..5ce2e5c48fff7 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.validation.json +++ b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json @@ -2,6 +2,15 @@ "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_api_gateway": { "last_validated_date": "2025-04-08T10:51:26+00:00" }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_events[bus_combination0]": { + "last_validated_date": "2025-04-10T10:13:06+00:00" + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_events[bus_combination1]": { + "last_validated_date": "2025-04-10T10:13:27+00:00" + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_events[bus_combination2]": { + "last_validated_date": "2025-04-10T10:14:01+00:00" + }, "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { "last_validated_date": "2025-04-08T10:46:50+00:00" } From 00d6da4b4d69bf0b55cecb8e0e995663b5e7ccb4 Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Fri, 11 Apr 2025 12:02:52 +0200 Subject: [PATCH 20/24] feat: skip flake schedule rate test --- tests/aws/services/events/test_events_schedule.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/aws/services/events/test_events_schedule.py b/tests/aws/services/events/test_events_schedule.py index 9bdda3cbf8147..eeb581313bb56 100644 --- a/tests/aws/services/events/test_events_schedule.py +++ b/tests/aws/services/events/test_events_schedule.py @@ -144,6 +144,7 @@ def tests_schedule_rate_target_sqs( assert expected_time_delta - tolerance <= time_delta <= expected_time_delta + tolerance @markers.aws.validated + @pytest.mark.skip(reason="flakey scheduler execution time of 60 seconds") def tests_schedule_rate_custom_input_target_sqs( self, sqs_as_events_target, events_put_rule, aws_client, snapshot ): From 1a9b0723d15f3d4bbaa3a4edb8b2ca7a262bb16c Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Fri, 11 Apr 2025 15:48:20 +0200 Subject: [PATCH 21/24] refactor: remove unnecessary parameter --- localstack-core/localstack/services/events/target.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index 9d9e651f3baa1..fe18ce999412c 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -171,8 +171,6 @@ def __init__( self._validate_input(target) self._client: BaseClient | None = None - self._x_ray_segment = None - @property def arn(self): return self.target["Arn"] From 6bf8cc14fd1f4e15252405b380dbe63815ea6259 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 11 Apr 2025 15:57:16 +0200 Subject: [PATCH 22/24] Update tests/aws/services/events/test_x_ray_trace_propagation.py Co-authored-by: Ben Simon Hartung <42031100+bentsku@users.noreply.github.com> --- tests/aws/services/events/test_x_ray_trace_propagation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py index aa8cfd4a58da3..a894dc6345b7d 100644 --- a/tests/aws/services/events/test_x_ray_trace_propagation.py +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -29,7 +29,7 @@ # currently only API Gateway v2 and Lambda support X-Ray tracing -@markers.aws.unknown +@markers.aws.validated @pytest.mark.skipif( condition=is_old_provider(), reason="not supported by the old provider", From 915c7d667e5bc9b3f84ecaeef48316510421addf Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 14 Apr 2025 13:59:30 +0200 Subject: [PATCH 23/24] feat: create new trace for scheduled event --- localstack-core/localstack/services/events/provider.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index f6bb1be23d021..1a5b4ab485689 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -1542,8 +1542,11 @@ def func(*args, **kwargs): } target_unique_id = f"{rule.arn}-{target['Id']}" target_sender = self._target_sender_store[target_unique_id] + new_trace_header = ( + TraceHeader().ensure_root_exists() + ) # scheduled events will always start a new trace try: - target_sender.process_event(event.copy()) + target_sender.process_event(event.copy(), trace_header=new_trace_header) except Exception as e: LOG.info( "Unable to send event notification %s to target %s: %s", From 0d4c4e065de805cb355cb3cc2f4c943eefc55a7b Mon Sep 17 00:00:00 2001 From: maxhoheiser Date: Mon, 14 Apr 2025 13:59:49 +0200 Subject: [PATCH 24/24] feat: unskip schedule rate test --- tests/aws/services/events/test_events_schedule.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/aws/services/events/test_events_schedule.py b/tests/aws/services/events/test_events_schedule.py index eeb581313bb56..9bdda3cbf8147 100644 --- a/tests/aws/services/events/test_events_schedule.py +++ b/tests/aws/services/events/test_events_schedule.py @@ -144,7 +144,6 @@ def tests_schedule_rate_target_sqs( assert expected_time_delta - tolerance <= time_delta <= expected_time_delta + tolerance @markers.aws.validated - @pytest.mark.skip(reason="flakey scheduler execution time of 60 seconds") def tests_schedule_rate_custom_input_target_sqs( self, sqs_as_events_target, events_put_rule, aws_client, snapshot ):