diff --git a/localstack-core/localstack/services/events/provider.py b/localstack-core/localstack/services/events/provider.py index cdb6e3ad32904..1a5b4ab485689 100644 --- a/localstack-core/localstack/services/events/provider.py +++ b/localstack-core/localstack/services/events/provider.py @@ -171,6 +171,7 @@ from localstack.utils.event_matcher import matches_event from localstack.utils.strings import long_uid from localstack.utils.time import TIMESTAMP_FORMAT_TZ, timestamp +from localstack.utils.xray.trace_header import TraceHeader from .analytics import InvocationStatus, rule_invocation @@ -1541,8 +1542,11 @@ def func(*args, **kwargs): } target_unique_id = f"{rule.arn}-{target['Id']}" target_sender = self._target_sender_store[target_unique_id] + new_trace_header = ( + TraceHeader().ensure_root_exists() + ) # scheduled events will always start a new trace try: - target_sender.process_event(event.copy()) + target_sender.process_event(event.copy(), trace_header=new_trace_header) except Exception as e: LOG.info( "Unable to send event notification %s to target %s: %s", @@ -1814,6 +1818,8 @@ def _process_entry( return region, account_id = extract_region_and_account_id(event_bus_name_or_arn, context) + + # TODO check interference with x-ray trace header if encoded_trace_header := get_trace_header_encoded_region_account( entry, context.region, context.account_id, region, account_id ): @@ -1837,14 +1843,16 @@ def _process_entry( ) return - self._proxy_capture_input_event(event_formatted) + trace_header = context.trace_context["aws_trace_header"] + + self._proxy_capture_input_event(event_formatted, trace_header) # Always add the successful EventId entry, even if target processing might fail processed_entries.append({"EventId": event_formatted["id"]}) if configured_rules := list(event_bus.rules.values()): for rule in configured_rules: - self._process_rules(rule, region, account_id, event_formatted) + self._process_rules(rule, region, account_id, event_formatted, trace_header) else: LOG.info( json.dumps( @@ -1855,7 +1863,7 @@ def _process_entry( ) ) - def _proxy_capture_input_event(self, event: FormattedEvent) -> None: + def _proxy_capture_input_event(self, event: FormattedEvent, trace_header: TraceHeader) -> None: # only required for eventstudio to capture input event if no rule is configured pass @@ -1865,6 +1873,7 @@ def _process_rules( region: str, account_id: str, event_formatted: FormattedEvent, + trace_header: TraceHeader, ) -> None: """Process rules for an event. Note that we no longer handle entries here as AWS returns success regardless of target failures.""" event_pattern = rule.event_pattern @@ -1894,7 +1903,7 @@ def _process_rules( target_unique_id = f"{rule.arn}-{target_id}" target_sender = self._target_sender_store[target_unique_id] try: - target_sender.process_event(event_formatted.copy()) + target_sender.process_event(event_formatted.copy(), trace_header) rule_invocation.labels( status=InvocationStatus.success, service=target_sender.service, diff --git a/localstack-core/localstack/services/events/target.py b/localstack-core/localstack/services/events/target.py index b12691f28925e..fe18ce999412c 100644 --- a/localstack-core/localstack/services/events/target.py +++ b/localstack-core/localstack/services/events/target.py @@ -47,6 +47,7 @@ from localstack.utils.json import extract_jsonpath from localstack.utils.strings import to_bytes from localstack.utils.time import now_utc +from localstack.utils.xray.trace_header import TraceHeader LOG = logging.getLogger(__name__) @@ -63,6 +64,7 @@ ) TRANSFORMER_PLACEHOLDER_PATTERN = re.compile(r"<(.*?)>") +TRACE_HEADER_KEY = "X-Amzn-Trace-Id" def transform_event_with_target_input_path( @@ -193,10 +195,10 @@ def client(self): return self._client @abstractmethod - def send_event(self, event: FormattedEvent | TransformedEvent): + def send_event(self, event: FormattedEvent | TransformedEvent, trace_header: TraceHeader): pass - def process_event(self, event: FormattedEvent): + def process_event(self, event: FormattedEvent, trace_header: TraceHeader): """Processes the event and send it to the target.""" if input_ := self.target.get("Input"): event = json.loads(input_) @@ -208,7 +210,7 @@ def process_event(self, event: FormattedEvent): if input_transformer := self.target.get("InputTransformer"): event = self.transform_event_with_target_input_transformer(input_transformer, event) if event: - self.send_event(event) + self.send_event(event, trace_header) else: LOG.info("No event to send to target %s", self.target.get("Id")) @@ -257,6 +259,7 @@ def _initialize_client(self) -> BaseClient: client = client.request_metadata( service_principal=service_principal, source_arn=self.rule_arn ) + self._register_client_hooks(client) return client def _validate_input_transformer(self, input_transformer: InputTransformer): @@ -287,6 +290,24 @@ def _get_predefined_template_replacements(self, event: FormattedEvent) -> dict[s return predefined_template_replacements + def _register_client_hooks(self, client: BaseClient): + """Register client hooks to inject trace header into requests.""" + + def handle_extract_params(params, context, **kwargs): + trace_header = params.pop("TraceHeader", None) + if trace_header is None: + return + context[TRACE_HEADER_KEY] = trace_header.to_header_str() + + def handle_inject_headers(params, context, **kwargs): + if trace_header_str := context.pop(TRACE_HEADER_KEY, None): + params["headers"][TRACE_HEADER_KEY] = trace_header_str + + client.meta.events.register( + f"provide-client-params.{self.service}.*", handle_extract_params + ) + client.meta.events.register(f"before-call.{self.service}.*", handle_inject_headers) + TargetSenderDict = dict[str, TargetSender] # rule_arn-target_id as global unique id @@ -316,7 +337,7 @@ class ApiGatewayTargetSender(TargetSender): ALLOWED_HTTP_METHODS = {"GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"} - def send_event(self, event): + def send_event(self, event, trace_header): # Parse the ARN to extract api_id, stage_name, http_method, and resource path # Example ARN: arn:{partition}:execute-api:{region}:{account_id}:{api_id}/{stage_name}/{method}/{resource_path} arn_parts = parse_arn(self.target["Arn"]) @@ -383,6 +404,9 @@ def send_event(self, event): # Serialize the event, converting datetime objects to strings event_json = json.dumps(event, default=str) + # Add trace header + headers[TRACE_HEADER_KEY] = trace_header.to_header_str() + # Send the HTTP request response = requests.request( method=http_method, url=url, headers=headers, data=event_json, timeout=5 @@ -415,12 +439,12 @@ def _get_predefined_template_replacements(self, event: Dict[str, Any]) -> Dict[s class AppSyncTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("AppSync target is not yet implemented") class BatchTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Batch target is not yet implemented") def _validate_input(self, target: Target): @@ -433,7 +457,7 @@ def _validate_input(self, target: Target): class ECSTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("ECS target is a pro feature, please use LocalStack Pro") def _validate_input(self, target: Target): @@ -444,7 +468,7 @@ def _validate_input(self, target: Target): class EventsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): # TODO add validation and tests for eventbridge to eventbridge requires Detail, DetailType, and Source # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/events/client/put_events.html source = self._get_source(event) @@ -464,7 +488,8 @@ def send_event(self, event): event, self.region, self.account_id, self.target_region, self.target_account_id ): entries[0]["TraceHeader"] = encoded_original_id - self.client.put_events(Entries=entries) + + self.client.put_events(Entries=entries, TraceHeader=trace_header) def _get_source(self, event: FormattedEvent | TransformedEvent) -> str: if isinstance(event, dict) and (source := event.get("source")): @@ -486,7 +511,7 @@ def _get_resources(self, event: FormattedEvent | TransformedEvent) -> list[str]: class EventsApiDestinationTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): """Send an event to an EventBridge API destination See https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-api-destinations.html""" target_arn = self.target["Arn"] @@ -520,6 +545,9 @@ def send_event(self, event): if http_parameters := self.target.get("HttpParameters"): endpoint = add_target_http_parameters(http_parameters, endpoint, headers, event) + # add trace header + headers[TRACE_HEADER_KEY] = trace_header.to_header_str() + result = requests.request( method=method, url=endpoint, data=json.dumps(event or {}), headers=headers ) @@ -532,8 +560,9 @@ def send_event(self, event): class FirehoseTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): delivery_stream_name = firehose_name(self.target["Arn"]) + self.client.put_record( DeliveryStreamName=delivery_stream_name, Record={"Data": to_bytes(to_json_str(event))}, @@ -541,7 +570,7 @@ def send_event(self, event): class KinesisTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): partition_key_path = collections.get_safe( self.target, "$.KinesisParameters.PartitionKeyPath", @@ -549,6 +578,7 @@ def send_event(self, event): ) stream_name = self.target["Arn"].split("/")[-1] partition_key = collections.get_safe(event, partition_key_path, event["id"]) + self.client.put_record( StreamName=stream_name, Data=to_bytes(to_json_str(event)), @@ -565,18 +595,20 @@ def _validate_input(self, target: Target): class LambdaTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): self.client.invoke( FunctionName=self.target["Arn"], Payload=to_bytes(to_json_str(event)), InvocationType="Event", + TraceHeader=trace_header, ) class LogsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): log_group_name = self.target["Arn"].split(":")[6] log_stream_name = str(uuid.uuid4()) # Unique log stream name + self.client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name) self.client.put_log_events( logGroupName=log_group_name, @@ -591,7 +623,7 @@ def send_event(self, event): class RedshiftTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Redshift target is not yet implemented") def _validate_input(self, target: Target): @@ -602,20 +634,21 @@ def _validate_input(self, target: Target): class SagemakerTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Sagemaker target is not yet implemented") class SnsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): self.client.publish(TopicArn=self.target["Arn"], Message=to_json_str(event)) class SqsTargetSender(TargetSender): - def send_event(self, event): + def send_event(self, event, trace_header): queue_url = sqs_queue_url_for_arn(self.target["Arn"]) msg_group_id = self.target.get("SqsParameters", {}).get("MessageGroupId", None) kwargs = {"MessageGroupId": msg_group_id} if msg_group_id else {} + self.client.send_message( QueueUrl=queue_url, MessageBody=to_json_str(event), @@ -626,8 +659,9 @@ def send_event(self, event): class StatesTargetSender(TargetSender): """Step Functions Target Sender""" - def send_event(self, event): + def send_event(self, event, trace_header): self.service = "stepfunctions" + self.client.start_execution( stateMachineArn=self.target["Arn"], name=event["id"], input=to_json_str(event) ) @@ -642,7 +676,7 @@ def _validate_input(self, target: Target): class SystemsManagerSender(TargetSender): """EC2 Run Command Target Sender""" - def send_event(self, event): + def send_event(self, event, trace_header): raise NotImplementedError("Systems Manager target is not yet implemented") def _validate_input(self, target: Target): diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.py b/tests/aws/services/events/test_x_ray_trace_propagation.py new file mode 100644 index 0000000000000..a894dc6345b7d --- /dev/null +++ b/tests/aws/services/events/test_x_ray_trace_propagation.py @@ -0,0 +1,434 @@ +import json +import time + +from localstack.aws.api.lambda_ import Runtime +from localstack.testing.pytest import markers +from localstack.utils.strings import short_uid +from localstack.utils.sync import retry +from localstack.utils.testutil import check_expected_lambda_log_events_length +from localstack.utils.xray.trace_header import TraceHeader +from tests.aws.services.lambda_.test_lambda import TEST_LAMBDA_AWS_PROXY_FORMAT + +APIGATEWAY_ASSUME_ROLE_POLICY = { + "Statement": { + "Sid": "", + "Effect": "Allow", + "Principal": {"Service": "apigateway.amazonaws.com"}, + "Action": "sts:AssumeRole", + } +} +import re + +import pytest + +from localstack.testing.aws.util import is_aws_cloud +from tests.aws.services.events.helper_functions import is_old_provider +from tests.aws.services.events.test_events import TEST_EVENT_DETAIL, TEST_EVENT_PATTERN +from tests.aws.services.lambda_.test_lambda import TEST_LAMBDA_XRAY_TRACEID + +# currently only API Gateway v2 and Lambda support X-Ray tracing + + +@markers.aws.validated +@pytest.mark.skipif( + condition=is_old_provider(), + reason="not supported by the old provider", +) +def test_xray_trace_propagation_events_api_gateway( + aws_client, + create_role_with_policy, + create_lambda_function, + create_rest_apigw, + events_create_event_bus, + events_put_rule, + region_name, + cleanups, + account_id, +): + # create lambda + function_name = f"test-function-{short_uid()}" + function_arn = create_lambda_function( + func_name=function_name, + handler_file=TEST_LAMBDA_AWS_PROXY_FORMAT, + handler="lambda_aws_proxy_format.handler", + runtime=Runtime.python3_12, + )["CreateFunctionResponse"]["FunctionArn"] + + # create api gateway with lambda integration + # create rest api + api_id, api_name, root_id = create_rest_apigw( + name=f"test-api-{short_uid()}", + description="Test Integration with EventBridge X-Ray", + ) + + resource_id = aws_client.apigateway.create_resource( + restApiId=api_id, parentId=root_id, pathPart="test" + )["id"] + + aws_client.apigateway.put_method( + restApiId=api_id, + resourceId=resource_id, + httpMethod="POST", + authorizationType="NONE", + ) + + # Lambda AWS_PROXY integration + aws_client.apigateway.put_integration( + restApiId=api_id, + resourceId=resource_id, + httpMethod="POST", + type="AWS_PROXY", + integrationHttpMethod="POST", + uri=f"arn:aws:apigateway:{region_name}:lambda:path/2015-03-31/functions/{function_arn}/invocations", + ) + + # Give permission to API Gateway to invoke Lambda + source_arn = f"arn:aws:execute-api:{region_name}:{account_id}:{api_id}/*/POST/test" + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId=f"sid-{short_uid()}", + Action="lambda:InvokeFunction", + Principal="apigateway.amazonaws.com", + SourceArn=source_arn, + ) + + stage_name = "test" + aws_client.apigateway.create_deployment(restApiId=api_id, stageName=stage_name) + + # Create event bus + event_bus_name = f"test-bus-{short_uid()}" + events_create_event_bus(Name=event_bus_name) + + # Create rule + rule_name = f"test-rule-{short_uid()}" + event_pattern = {"source": ["test.source"], "detail-type": ["test.detail.type"]} + events_put_rule( + Name=rule_name, + EventBusName=event_bus_name, + EventPattern=json.dumps(event_pattern), + ) + + # Create an IAM Role for EventBridge to invoke API Gateway + assume_role_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "events.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + + role_name, role_arn = create_role_with_policy( + effect="Allow", + actions="execute-api:Invoke", + assume_policy_doc=json.dumps(assume_role_policy_document), + resource=source_arn, + attach=False, # Since we're using put_role_policy, not attach_role_policy + ) + + # Allow some time for IAM role propagation (only needed in AWS) + if is_aws_cloud(): + time.sleep(10) + + # Add the API Gateway as a target with the RoleArn + target_id = f"target-{short_uid()}" + api_target_arn = ( + f"arn:aws:execute-api:{region_name}:{account_id}:{api_id}/{stage_name}/POST/test" + ) + put_targets_response = aws_client.events.put_targets( + Rule=rule_name, + EventBusName=event_bus_name, + Targets=[ + { + "Id": target_id, + "Arn": api_target_arn, + "RoleArn": role_arn, + "Input": json.dumps({"message": "Hello from EventBridge"}), + "RetryPolicy": {"MaximumRetryAttempts": 0}, + } + ], + ) + assert put_targets_response["FailedEntryCount"] == 0 + + ###### + # Test + ###### + # Enable X-Ray tracing for the aws_client + trace_id = "1-67f4141f-e1cd7672871da115129f8b19" + parent_id = "d0ee9531727135a0" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=1) + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = xray_trace_header.to_header_str() + + event_name = "before-send.events.*" + aws_client.events.meta.events.register(event_name, add_xray_header) + + # make sure the hook gets cleaned up after the test + cleanups.append(lambda: aws_client.events.meta.events.unregister(event_name, add_xray_header)) + + event_entry = { + "EventBusName": event_bus_name, + "Source": "test.source", + "DetailType": "test.detail.type", + "Detail": json.dumps({"message": "Hello from EventBridge"}), + } + put_events_response = aws_client.events.put_events(Entries=[event_entry]) + assert put_events_response["FailedEntryCount"] == 0 + + # Verify the Lambda invocation + events = retry( + check_expected_lambda_log_events_length, + retries=10, + sleep=10, + sleep_before=10 if is_aws_cloud() else 1, + function_name=function_name, + expected_length=1, + logs_client=aws_client.logs, + ) + + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api gateway if no X-Ray trace id is present in the event + + lambda_trace_header = events[0]["headers"].get("X-Amzn-Trace-Id") + assert lambda_trace_header is not None + lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + assert lambda_trace_id == trace_id + + +@markers.aws.validated +@pytest.mark.skipif( + condition=is_old_provider(), + reason="not supported by the old provider", +) +def test_xray_trace_propagation_events_lambda( + create_lambda_function, + events_create_event_bus, + events_put_rule, + cleanups, + aws_client, +): + function_name = f"lambda-func-{short_uid()}" + create_lambda_response = create_lambda_function( + handler_file=TEST_LAMBDA_XRAY_TRACEID, + func_name=function_name, + runtime=Runtime.python3_12, + ) + lambda_function_arn = create_lambda_response["CreateFunctionResponse"]["FunctionArn"] + + bus_name = f"bus-{short_uid()}" + events_create_event_bus(Name=bus_name) + + rule_name = f"rule-{short_uid()}" + rule_arn = events_put_rule( + Name=rule_name, + EventBusName=bus_name, + EventPattern=json.dumps(TEST_EVENT_PATTERN), + )["RuleArn"] + + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId=f"{rule_name}-Event", + Action="lambda:InvokeFunction", + Principal="events.amazonaws.com", + SourceArn=rule_arn, + ) + + target_id = f"target-{short_uid()}" + aws_client.events.put_targets( + Rule=rule_name, + EventBusName=bus_name, + Targets=[{"Id": target_id, "Arn": lambda_function_arn}], + ) + + # Enable X-Ray tracing for the aws_client + trace_id = "1-67f4141f-e1cd7672871da115129f8b19" + parent_id = "d0ee9531727135a0" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=1) + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = xray_trace_header.to_header_str() + + event_name = "before-send.events.*" + aws_client.events.meta.events.register(event_name, add_xray_header) + # make sure the hook gets cleaned up after the test + cleanups.append(lambda: aws_client.events.meta.events.unregister(event_name, add_xray_header)) + + aws_client.events.put_events( + Entries=[ + { + "EventBusName": bus_name, + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(TEST_EVENT_DETAIL), + } + ] + ) + + # Verify the Lambda invocation + events = retry( + check_expected_lambda_log_events_length, + retries=10, + sleep=10, + function_name=function_name, + expected_length=1, + logs_client=aws_client.logs, + ) + + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to api lambda if no X-Ray trace id is present in the event + + lambda_trace_header = events[0]["trace_id_inside_handler"] + assert lambda_trace_header is not None + lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + assert lambda_trace_id == trace_id + + +@markers.aws.validated +@pytest.mark.parametrize( + "bus_combination", [("default", "custom"), ("custom", "custom"), ("custom", "default")] +) +@pytest.mark.skipif( + condition=is_old_provider(), + reason="not supported by the old provider", +) +def test_xray_trace_propagation_events_events( + bus_combination, + create_lambda_function, + events_create_event_bus, + create_role_event_bus_source_to_bus_target, + region_name, + account_id, + events_put_rule, + cleanups, + aws_client, +): + """ + Event Bridge Bus Source to Event Bridge Bus Target to Lambda for asserting X-Ray trace propagation + """ + # Create event buses + bus_source, bus_target = bus_combination + if bus_source == "default": + bus_name_source = "default" + if bus_source == "custom": + bus_name_source = f"test-event-bus-source-{short_uid()}" + events_create_event_bus(Name=bus_name_source) + if bus_target == "default": + bus_name_target = "default" + bus_arn_target = f"arn:aws:events:{region_name}:{account_id}:event-bus/default" + if bus_target == "custom": + bus_name_target = f"test-event-bus-target-{short_uid()}" + bus_arn_target = events_create_event_bus(Name=bus_name_target)["EventBusArn"] + + # Create permission for event bus source to send events to event bus target + role_arn_bus_source_to_bus_target = create_role_event_bus_source_to_bus_target() + + if is_aws_cloud(): + time.sleep(10) # required for role propagation + + # Permission for event bus target to receive events from event bus source + aws_client.events.put_permission( + StatementId=f"TargetEventBusAccessPermission{short_uid()}", + EventBusName=bus_name_target, + Action="events:PutEvents", + Principal="*", + ) + + # Create rule source event bus to target + rule_name_source_to_target = f"test-rule-source-to-target-{short_uid()}" + events_put_rule( + Name=rule_name_source_to_target, + EventBusName=bus_name_source, + EventPattern=json.dumps(TEST_EVENT_PATTERN), + ) + + # Add target event bus as target + target_id_event_bus_target = f"test-target-source-events-{short_uid()}" + aws_client.events.put_targets( + Rule=rule_name_source_to_target, + EventBusName=bus_name_source, + Targets=[ + { + "Id": target_id_event_bus_target, + "Arn": bus_arn_target, + "RoleArn": role_arn_bus_source_to_bus_target, + } + ], + ) + + # Create Lambda function + function_name = f"lambda-func-{short_uid()}" + create_lambda_response = create_lambda_function( + handler_file=TEST_LAMBDA_XRAY_TRACEID, + func_name=function_name, + runtime=Runtime.python3_12, + ) + lambda_function_arn = create_lambda_response["CreateFunctionResponse"]["FunctionArn"] + + # Connect Event Bus Target to Lambda + rule_name_lambda = f"rule-{short_uid()}" + rule_arn_lambda = events_put_rule( + Name=rule_name_lambda, + EventBusName=bus_name_target, + EventPattern=json.dumps(TEST_EVENT_PATTERN), + )["RuleArn"] + + aws_client.lambda_.add_permission( + FunctionName=function_name, + StatementId=f"{rule_name_lambda}-Event", + Action="lambda:InvokeFunction", + Principal="events.amazonaws.com", + SourceArn=rule_arn_lambda, + ) + + target_id_lambda = f"target-{short_uid()}" + aws_client.events.put_targets( + Rule=rule_name_lambda, + EventBusName=bus_name_target, + Targets=[{"Id": target_id_lambda, "Arn": lambda_function_arn}], + ) + + ###### + # Test + ###### + + # Enable X-Ray tracing for the aws_client + trace_id = "1-67f4141f-e1cd7672871da115129f8b19" + parent_id = "d0ee9531727135a0" + xray_trace_header = TraceHeader(root=trace_id, parent=parent_id, sampled=1) + + def add_xray_header(request, **kwargs): + request.headers["X-Amzn-Trace-Id"] = xray_trace_header.to_header_str() + + event_name = "before-send.events.*" + aws_client.events.meta.events.register(event_name, add_xray_header) + # make sure the hook gets cleaned up after the test + cleanups.append(lambda: aws_client.events.meta.events.unregister(event_name, add_xray_header)) + + aws_client.events.put_events( + Entries=[ + { + "EventBusName": bus_name_source, + "Source": TEST_EVENT_PATTERN["source"][0], + "DetailType": TEST_EVENT_PATTERN["detail-type"][0], + "Detail": json.dumps(TEST_EVENT_DETAIL), + } + ] + ) + + # Verify the Lambda invocation + events = retry( + check_expected_lambda_log_events_length, + retries=10, + sleep=10, + function_name=function_name, + expected_length=1, + logs_client=aws_client.logs, + ) + + # TODO how to assert X-Ray trace ID correct propagation from eventbridge to eventbridge lambda if no X-Ray trace id is present in the event + + lambda_trace_header = events[0]["trace_id_inside_handler"] + assert lambda_trace_header is not None + lambda_trace_id = re.search(r"Root=([^;]+)", lambda_trace_header).group(1) + assert lambda_trace_id == trace_id diff --git a/tests/aws/services/events/test_x_ray_trace_propagation.validation.json b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json new file mode 100644 index 0000000000000..5ce2e5c48fff7 --- /dev/null +++ b/tests/aws/services/events/test_x_ray_trace_propagation.validation.json @@ -0,0 +1,17 @@ +{ + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_api_gateway": { + "last_validated_date": "2025-04-08T10:51:26+00:00" + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_events[bus_combination0]": { + "last_validated_date": "2025-04-10T10:13:06+00:00" + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_events[bus_combination1]": { + "last_validated_date": "2025-04-10T10:13:27+00:00" + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_events[bus_combination2]": { + "last_validated_date": "2025-04-10T10:14:01+00:00" + }, + "tests/aws/services/events/test_x_ray_trace_propagation.py::test_xray_trace_propagation_events_lambda": { + "last_validated_date": "2025-04-08T10:46:50+00:00" + } +} diff --git a/tests/aws/services/lambda_/test_lambda.py b/tests/aws/services/lambda_/test_lambda.py index 700361d32ebbb..205d2e9c1f113 100644 --- a/tests/aws/services/lambda_/test_lambda.py +++ b/tests/aws/services/lambda_/test_lambda.py @@ -128,6 +128,7 @@ ) TEST_LAMBDA_NOTIFIER = os.path.join(THIS_FOLDER, "functions/lambda_notifier.py") TEST_LAMBDA_CLOUDWATCH_LOGS = os.path.join(THIS_FOLDER, "functions/lambda_cloudwatch_logs.py") +TEST_LAMBDA_XRAY_TRACEID = os.path.join(THIS_FOLDER, "functions/xray_tracing_traceid.py") PYTHON_TEST_RUNTIMES = RUNTIMES_AGGREGATED["python"] NODE_TEST_RUNTIMES = RUNTIMES_AGGREGATED["nodejs"]