From ac374c2ea13324f981594d719c4d0b326193ad79 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 29 May 2025 15:28:40 -0400 Subject: [PATCH 01/19] set context for sqs-> lambda --- datadog_lambda/wrapper.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 86bbf04d..7a4b29f6 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -79,6 +79,7 @@ DD_REQUESTS_SERVICE_NAME = "DD_REQUESTS_SERVICE_NAME" DD_SERVICE = "DD_SERVICE" DD_ENV = "DD_ENV" +DD_DATA_STREAMS_ENABLED = "DD_DATA_STREAMS_ENABLED" def get_env_as_int(env_key, default_value: int) -> int: @@ -190,6 +191,9 @@ def __init__(self, func): self.min_cold_start_trace_duration = get_env_as_int( DD_MIN_COLD_START_DURATION, 3 ) + self.data_streams_enabled = ( + os.environ.get(DD_DATA_STREAMS_ENABLED, "false").lower() == "true" + ) self.local_testing_mode = os.environ.get( DD_LOCAL_TEST, "false" ).lower() in ("true", "1") @@ -287,6 +291,41 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): + + from ddtrace.internal.datastreams.processor import ( + DataStreamsProcessor as processor, + DsmPathwayCodec, + ) + from ddtrace.internal.datastreams.botocore import ( + get_datastreams_context, + calculate_sqs_payload_size, + ) + + def _dsm_set_sqs_context(record): + try: + queue_arn = record.get("eventSourceARN", "") + + contextjson = get_datastreams_context(record) + payload_size = calculate_sqs_payload_size(record) + + ctx = DsmPathwayCodec.decode(contextjson, processor()) + ctx.set_checkpoint( + ["direction:in", "queue:arn:" + queue_arn, "type:sqs"], + payload_size=payload_size, + ) + + except Exception as e: + logger.error(format_err_with_traceback(e)) + + if self.data_streams_enabled: + if isinstance(event, dict) and "Records" in event and event["Records"]: + sqs_records = [ + r for r in event["Records"] if r.get("eventSource") == "aws:sqs" + ] + if sqs_records: + for record in sqs_records: + _dsm_set_sqs_context(record) + try: self.response = None set_cold_start(init_timestamp_ns) From db0c56c8f8375296748837ef7ffd5c9252f6e3d5 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 09:50:02 -0400 Subject: [PATCH 02/19] needed queue name for consistent hash --- datadog_lambda/wrapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 7a4b29f6..3a86c56f 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -304,13 +304,14 @@ def _before(self, event, context): def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") + queue_name = queue_arn.split(":")[-1] contextjson = get_datastreams_context(record) payload_size = calculate_sqs_payload_size(record) ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", "queue:arn:" + queue_arn, "type:sqs"], + ["direction:in", "topic:" + queue_name, "type:sqs"], payload_size=payload_size, ) From c12e2dc9612bafe3b23b0c228e9ece151c9ddef4 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 13:58:56 -0400 Subject: [PATCH 03/19] add context prop test --- tests/test_wrapper.py | 165 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index f46b365e..79367fa6 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -563,6 +563,171 @@ def return_type_test(event, context): self.assertEqual(result, test_result) self.assertFalse(MockPrintExc.called) + @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) + def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self): + with patch( + "ddtrace.internal.datastreams.processor.get_connection" + ) as mock_get_connection: + + mock_conn = unittest.mock.MagicMock() + mock_response = unittest.mock.MagicMock() + mock_response.status = 200 + mock_conn.getresponse.return_value = mock_response + mock_get_connection.return_value = mock_conn + + def updated_get_datastreams_context(message): + """ + Updated version that handles the correct message formats + """ + import base64 + import json + + context_json = None + message_body = message + try: + body = message.get("Body") + if body: + message_body = json.loads(body) + except (ValueError, TypeError): + pass + + message_attributes = message_body.get( + "MessageAttributes" + ) or message_body.get("messageAttributes") + if not message_attributes: + return None + + if "_datadog" not in message_attributes: + return None + + datadog_attr = message_attributes["_datadog"] + + if message_body.get("Type") == "Notification": + if datadog_attr.get("Type") == "Binary": + context_json = json.loads( + base64.b64decode(datadog_attr["Value"]).decode() + ) + elif "StringValue" in datadog_attr: + context_json = json.loads(datadog_attr["StringValue"]) + elif "stringValue" in datadog_attr: + context_json = json.loads(datadog_attr["stringValue"]) + elif "BinaryValue" in datadog_attr: + context_json = json.loads(datadog_attr["BinaryValue"].decode()) + else: + print(f"DEBUG: Unhandled datadog_attr format: {datadog_attr}") + + return context_json + + with patch( + "ddtrace.internal.datastreams.botocore.get_datastreams_context", + updated_get_datastreams_context, + ): + + # Step 1: Create a message with some context in the message attributes + + from ddtrace.internal.datastreams.processor import DataStreamsProcessor + + processor_instance = DataStreamsProcessor() + + with patch( + "ddtrace.internal.datastreams.processor.DataStreamsProcessor", + return_value=processor_instance, + ): + + parent_ctx = processor_instance.new_pathway() + + parent_ctx.set_checkpoint( + ["direction:out", "topic:upstream-topic", "type:sqs"], + now_sec=1640995200.0, + payload_size=512, + ) + parent_hash = parent_ctx.hash + encoded_parent_context = parent_ctx.encode_b64() + + sqs_event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", + "Body": "test message body", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + { + "dd-pathway-ctx-base64": encoded_parent_context + } + ) + } + }, + } + ] + } + + # Step 2: Call the handler + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + result = lambda_handler(sqs_event, get_mock_context()) + self.assertEqual(result["statusCode"], 200) + + # New context set after handler call + current_ctx = processor_instance._current_context.value + self.assertIsNotNone( + current_ctx, + "Data streams context should be set after processing SQS message", + ) + + # Step 3: Check that hash in this context is the child of the hash you passed + # Step 4: Check that the right checkpoint was produced during call to handler + + found_sqs_checkpoint = False + for bucket_time, bucket in processor_instance._buckets.items(): + for aggr_key, stats in bucket.pathway_stats.items(): + edge_tags_str, hash_value, parent_hash_recorded = aggr_key + edge_tags = edge_tags_str.split(",") + + if ( + "direction:in" in edge_tags + and "topic:test" in edge_tags + and "type:sqs" in edge_tags + ): + found_sqs_checkpoint = True + + # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST + self.assertEqual( + parent_hash_recorded, + parent_hash, + f"Parent hash must be preserved: " + f"expected {parent_hash}, got {parent_hash_recorded}", + ) + self.assertEqual( + hash_value, + current_ctx.hash, + f"Child hash must match current context: " + f"expected {current_ctx.hash}, got {hash_value}", + ) + self.assertNotEqual( + hash_value, + parent_hash_recorded, + f"Child hash ({hash_value}) must be different from " + f"parent hash ({parent_hash_recorded}) - proves parent-child", + ) + self.assertGreaterEqual( + stats.payload_size.count, + 1, + "Should have one payload size measurement", + ) + + break + + self.assertTrue( + found_sqs_checkpoint, + "Should have found SQS consumption checkpoint in processor stats", + ) + + processor_instance.shutdown(timeout=0.1) + class TestLambdaDecoratorSettings(unittest.TestCase): def test_some_envs_should_depend_on_dd_tracing_enabled(self): From 8f4f2d82e4d7bb5f2c6d3ddc4a84ff533a881559 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 14:08:53 -0400 Subject: [PATCH 04/19] comment --- tests/test_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 79367fa6..f0033c3c 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -680,7 +680,7 @@ def lambda_handler(event, context): # Step 3: Check that hash in this context is the child of the hash you passed # Step 4: Check that the right checkpoint was produced during call to handler - + # The buckets hold the aggregated stats for all checkpoints found_sqs_checkpoint = False for bucket_time, bucket in processor_instance._buckets.items(): for aggr_key, stats in bucket.pathway_stats.items(): From e7ebe212662ed7182341c91b17095cdbb4edb619 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 14:23:55 -0400 Subject: [PATCH 05/19] only import when DSM is enabled --- datadog_lambda/wrapper.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 3a86c56f..37c305b2 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -292,15 +292,6 @@ def _inject_authorizer_span_headers(self, request_id): def _before(self, event, context): - from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor as processor, - DsmPathwayCodec, - ) - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) - def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") @@ -319,6 +310,15 @@ def _dsm_set_sqs_context(record): logger.error(format_err_with_traceback(e)) if self.data_streams_enabled: + from ddtrace.internal.datastreams.processor import ( + DataStreamsProcessor as processor, + DsmPathwayCodec, + ) + from ddtrace.internal.datastreams.botocore import ( + get_datastreams_context, + calculate_sqs_payload_size, + ) + if isinstance(event, dict) and "Records" in event and event["Records"]: sqs_records = [ r for r in event["Records"] if r.get("eventSource") == "aws:sqs" From 67efeca92e937b4ba97fd66139af3febddeee66a Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 14:25:53 -0400 Subject: [PATCH 06/19] fix lint --- datadog_lambda/wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 37c305b2..76965b6b 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -291,7 +291,6 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): - def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") From 2c94f074fe6d3696a6f65ab95363b1911dfc00ab Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 16:35:26 -0400 Subject: [PATCH 07/19] fix --- datadog_lambda/wrapper.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 76965b6b..e567a2a3 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -294,14 +294,13 @@ def _before(self, event, context): def _dsm_set_sqs_context(record): try: queue_arn = record.get("eventSourceARN", "") - queue_name = queue_arn.split(":")[-1] contextjson = get_datastreams_context(record) payload_size = calculate_sqs_payload_size(record) ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", "topic:" + queue_name, "type:sqs"], + ["direction:in", "topic:" + queue_arn, "type:sqs"], payload_size=payload_size, ) From 701ed35ee51408623d1a7e136992288b31617776 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 30 May 2025 16:42:15 -0400 Subject: [PATCH 08/19] fix --- tests/test_wrapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index f0033c3c..3a0dce61 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -689,7 +689,8 @@ def lambda_handler(event, context): if ( "direction:in" in edge_tags - and "topic:test" in edge_tags + and "topic:arn:aws:sqs:us-east-1:123456789012:test" + in edge_tags and "type:sqs" in edge_tags ): found_sqs_checkpoint = True From dd2648217f423137ffbbc108fe8f76c2ed4e4e4b Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 12:38:20 -0400 Subject: [PATCH 09/19] refactorings --- datadog_lambda/dsm.py | 37 +++++++++++++++++++++++++++++++++++++ datadog_lambda/wrapper.py | 36 +++--------------------------------- tests/test_wrapper.py | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 33 deletions(-) create mode 100644 datadog_lambda/dsm.py diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py new file mode 100644 index 00000000..d3c7ec54 --- /dev/null +++ b/datadog_lambda/dsm.py @@ -0,0 +1,37 @@ +from datadog_lambda import logger +from datadog_lambda.trigger import EventTypes + + +def set_dsm_context(event, event_source): + + if event_source.equals(EventTypes.SQS): + _dsm_set_sqs_context(event) + + +def _dsm_set_sqs_context(event): + from datadog_lambda.wrapper import format_err_with_traceback + + from ddtrace.internal.datastreams.processor import ( + DataStreamsProcessor as processor, + DsmPathwayCodec, + ) + from ddtrace.internal.datastreams.botocore import ( + get_datastreams_context, + calculate_sqs_payload_size, + ) + + records = event.get("Records", []) + for record in records: + try: + queue_arn = record.get("eventSourceARN", "") + + contextjson = get_datastreams_context(record) + payload_size = calculate_sqs_payload_size(record) + + ctx = DsmPathwayCodec.decode(contextjson, processor()) + ctx.set_checkpoint( + ["direction:in", "topic:" + queue_arn, "type:sqs"], + payload_size=payload_size, + ) + except Exception as e: + logger.error(format_err_with_traceback(e)) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index e567a2a3..91d13a6c 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -9,6 +9,7 @@ from importlib import import_module from time import time_ns +from datadog_lambda.dsm import set_dsm_context from datadog_lambda.extension import should_use_extension, flush_extension from datadog_lambda.cold_start import ( set_cold_start, @@ -291,39 +292,6 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): - def _dsm_set_sqs_context(record): - try: - queue_arn = record.get("eventSourceARN", "") - - contextjson = get_datastreams_context(record) - payload_size = calculate_sqs_payload_size(record) - - ctx = DsmPathwayCodec.decode(contextjson, processor()) - ctx.set_checkpoint( - ["direction:in", "topic:" + queue_arn, "type:sqs"], - payload_size=payload_size, - ) - - except Exception as e: - logger.error(format_err_with_traceback(e)) - - if self.data_streams_enabled: - from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor as processor, - DsmPathwayCodec, - ) - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) - - if isinstance(event, dict) and "Records" in event and event["Records"]: - sqs_records = [ - r for r in event["Records"] if r.get("eventSource") == "aws:sqs" - ] - if sqs_records: - for record in sqs_records: - _dsm_set_sqs_context(record) try: self.response = None @@ -360,6 +328,8 @@ def _dsm_set_sqs_context(record): self.inferred_span = create_inferred_span( event, context, event_source, self.decode_authorizer_context ) + if self.data_streams_enabled: + set_dsm_context(event, event_source) self.span = create_function_execution_span( context=context, function_name=self.function_name, diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 3a0dce61..5369a567 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -565,6 +565,11 @@ def return_type_test(event, context): @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self): + from datadog_lambda.trigger import _EventSource, EventTypes + + sqs_event_source = _EventSource(EventTypes.SQS) + self.mock_extract_dd_trace_context.return_value = ({}, None, sqs_event_source) + with patch( "ddtrace.internal.datastreams.processor.get_connection" ) as mock_get_connection: @@ -729,6 +734,33 @@ def lambda_handler(event, context): processor_instance.shutdown(timeout=0.1) + @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) + @patch("datadog_lambda.wrapper.set_dsm_context") + def test_set_dsm_context_called_when_enabled(self, mock_set_dsm_context): + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + lambda_event = {} + lambda_handler(lambda_event, get_mock_context()) + + mock_set_dsm_context.assert_called_once() + + @patch("datadog_lambda.wrapper.set_dsm_context") + def test_set_dsm_context_not_called_when_disabled(self, mock_set_dsm_context): + # Ensure DD_DATA_STREAMS_ENABLED is not in environment + if "DD_DATA_STREAMS_ENABLED" in os.environ: + del os.environ["DD_DATA_STREAMS_ENABLED"] + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + lambda_event = {} + lambda_handler(lambda_event, get_mock_context()) + + mock_set_dsm_context.assert_not_called() + class TestLambdaDecoratorSettings(unittest.TestCase): def test_some_envs_should_depend_on_dd_tracing_enabled(self): From 116b7d9ad4a1b32cceccbfc60e33134e394279bf Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 12:41:07 -0400 Subject: [PATCH 10/19] fix --- datadog_lambda/wrapper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 91d13a6c..0e23b721 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -292,7 +292,6 @@ def _inject_authorizer_span_headers(self, request_id): self.response["context"]["_datadog"] = datadog_data def _before(self, event, context): - try: self.response = None set_cold_start(init_timestamp_ns) From 6c8ec7ff166e61d0b1916f8dadec3805267fd7ec Mon Sep 17 00:00:00 2001 From: michael-zhao459 Date: Tue, 3 Jun 2025 13:05:10 -0400 Subject: [PATCH 11/19] Update datadog_lambda/dsm.py Co-authored-by: Rey Abolofia --- datadog_lambda/dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index d3c7ec54..c3c23af0 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -30,7 +30,7 @@ def _dsm_set_sqs_context(event): ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", "topic:" + queue_arn, "type:sqs"], + ["direction:in", f"topic:{queue_arn}, "type:sqs"], payload_size=payload_size, ) except Exception as e: From 9dbf2de9ffb7d1d4bab6eb81074f01eb81c3f866 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 13:12:01 -0400 Subject: [PATCH 12/19] fix --- datadog_lambda/dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index c3c23af0..e8e90ea9 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -30,7 +30,7 @@ def _dsm_set_sqs_context(event): ctx = DsmPathwayCodec.decode(contextjson, processor()) ctx.set_checkpoint( - ["direction:in", f"topic:{queue_arn}, "type:sqs"], + ["direction:in", f"topic:{queue_arn}", "type:sqs"], payload_size=payload_size, ) except Exception as e: From 779e6a54ec65ba68e996ac4150ec53c3585988e0 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 13:45:40 -0400 Subject: [PATCH 13/19] fixes --- datadog_lambda/dsm.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index e8e90ea9..8dca6f19 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -12,7 +12,7 @@ def _dsm_set_sqs_context(event): from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor as processor, + DataStreamsProcessor, DsmPathwayCodec, ) from ddtrace.internal.datastreams.botocore import ( @@ -20,7 +20,11 @@ def _dsm_set_sqs_context(event): calculate_sqs_payload_size, ) - records = event.get("Records", []) + records = event.get("Records") + if records is None: + return + processor = DataStreamsProcessor() + for record in records: try: queue_arn = record.get("eventSourceARN", "") @@ -28,7 +32,7 @@ def _dsm_set_sqs_context(event): contextjson = get_datastreams_context(record) payload_size = calculate_sqs_payload_size(record) - ctx = DsmPathwayCodec.decode(contextjson, processor()) + ctx = DsmPathwayCodec.decode(contextjson, processor) ctx.set_checkpoint( ["direction:in", f"topic:{queue_arn}", "type:sqs"], payload_size=payload_size, From e6a8b4e8ccf45d50231fec236a966fdef967e4fe Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 14:25:14 -0400 Subject: [PATCH 14/19] fixes --- datadog_lambda/dsm.py | 9 +- tests/test_wrapper.py | 207 +++++++++++++++++++++--------------------- 2 files changed, 105 insertions(+), 111 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 8dca6f19..427f5e47 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -10,11 +10,8 @@ def set_dsm_context(event, event_source): def _dsm_set_sqs_context(event): from datadog_lambda.wrapper import format_err_with_traceback - - from ddtrace.internal.datastreams.processor import ( - DataStreamsProcessor, - DsmPathwayCodec, - ) + from ddtrace.internal.datastreams import data_streams_processor + from ddtrace.internal.datastreams.processor import DsmPathwayCodec from ddtrace.internal.datastreams.botocore import ( get_datastreams_context, calculate_sqs_payload_size, @@ -23,7 +20,7 @@ def _dsm_set_sqs_context(event): records = event.get("Records") if records is None: return - processor = DataStreamsProcessor() + processor = data_streams_processor() for record in records: try: diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 5369a567..0e0fefb2 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -623,116 +623,113 @@ def updated_get_datastreams_context(message): return context_json + # Step 1: Create a message with some context in the message attributes + from ddtrace.internal.datastreams.processor import DataStreamsProcessor + + processor_instance = DataStreamsProcessor() + with patch( "ddtrace.internal.datastreams.botocore.get_datastreams_context", updated_get_datastreams_context, + ), patch( + "ddtrace.internal.datastreams.data_streams_processor", + return_value=processor_instance, ): - # Step 1: Create a message with some context in the message attributes - - from ddtrace.internal.datastreams.processor import DataStreamsProcessor - - processor_instance = DataStreamsProcessor() - - with patch( - "ddtrace.internal.datastreams.processor.DataStreamsProcessor", - return_value=processor_instance, - ): - - parent_ctx = processor_instance.new_pathway() - - parent_ctx.set_checkpoint( - ["direction:out", "topic:upstream-topic", "type:sqs"], - now_sec=1640995200.0, - payload_size=512, - ) - parent_hash = parent_ctx.hash - encoded_parent_context = parent_ctx.encode_b64() - - sqs_event = { - "Records": [ - { - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", - "Body": "test message body", - "messageAttributes": { - "_datadog": { - "stringValue": json.dumps( - { - "dd-pathway-ctx-base64": encoded_parent_context - } - ) - } - }, - } - ] - } - - # Step 2: Call the handler - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - result = lambda_handler(sqs_event, get_mock_context()) - self.assertEqual(result["statusCode"], 200) - - # New context set after handler call - current_ctx = processor_instance._current_context.value - self.assertIsNotNone( - current_ctx, - "Data streams context should be set after processing SQS message", - ) - - # Step 3: Check that hash in this context is the child of the hash you passed - # Step 4: Check that the right checkpoint was produced during call to handler - # The buckets hold the aggregated stats for all checkpoints - found_sqs_checkpoint = False - for bucket_time, bucket in processor_instance._buckets.items(): - for aggr_key, stats in bucket.pathway_stats.items(): - edge_tags_str, hash_value, parent_hash_recorded = aggr_key - edge_tags = edge_tags_str.split(",") - - if ( - "direction:in" in edge_tags - and "topic:arn:aws:sqs:us-east-1:123456789012:test" - in edge_tags - and "type:sqs" in edge_tags - ): - found_sqs_checkpoint = True - - # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST - self.assertEqual( - parent_hash_recorded, - parent_hash, - f"Parent hash must be preserved: " - f"expected {parent_hash}, got {parent_hash_recorded}", - ) - self.assertEqual( - hash_value, - current_ctx.hash, - f"Child hash must match current context: " - f"expected {current_ctx.hash}, got {hash_value}", - ) - self.assertNotEqual( - hash_value, - parent_hash_recorded, - f"Child hash ({hash_value}) must be different from " - f"parent hash ({parent_hash_recorded}) - proves parent-child", - ) - self.assertGreaterEqual( - stats.payload_size.count, - 1, - "Should have one payload size measurement", - ) - - break - - self.assertTrue( - found_sqs_checkpoint, - "Should have found SQS consumption checkpoint in processor stats", - ) - - processor_instance.shutdown(timeout=0.1) + parent_ctx = processor_instance.new_pathway() + + parent_ctx.set_checkpoint( + ["direction:out", "topic:upstream-topic", "type:sqs"], + now_sec=1640995200.0, + payload_size=512, + ) + parent_hash = parent_ctx.hash + encoded_parent_context = parent_ctx.encode_b64() + + sqs_event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", + "Body": "test message body", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps( + { + "dd-pathway-ctx-base64": encoded_parent_context + } + ) + } + }, + } + ] + } + + # Step 2: Call the handler + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return {"statusCode": 200, "body": "processed"} + + result = lambda_handler(sqs_event, get_mock_context()) + self.assertEqual(result["statusCode"], 200) + + # New context set after handler call + current_ctx = processor_instance._current_context.value + self.assertIsNotNone( + current_ctx, + "Data streams context should be set after processing SQS message", + ) + + # Step 3: Check that hash in this context is the child of the hash you passed + # Step 4: Check that the right checkpoint was produced during call to handler + # The buckets hold the aggregated stats for all checkpoints + found_sqs_checkpoint = False + for bucket_time, bucket in processor_instance._buckets.items(): + for aggr_key, stats in bucket.pathway_stats.items(): + edge_tags_str, hash_value, parent_hash_recorded = aggr_key + edge_tags = edge_tags_str.split(",") + + if ( + "direction:in" in edge_tags + and "topic:arn:aws:sqs:us-east-1:123456789012:test" + in edge_tags + and "type:sqs" in edge_tags + ): + found_sqs_checkpoint = True + + # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST + self.assertEqual( + parent_hash_recorded, + parent_hash, + f"Parent hash must be preserved: " + f"expected {parent_hash}, got {parent_hash_recorded}", + ) + self.assertEqual( + hash_value, + current_ctx.hash, + f"Child hash must match current context: " + f"expected {current_ctx.hash}, got {hash_value}", + ) + self.assertNotEqual( + hash_value, + parent_hash_recorded, + f"Child hash ({hash_value}) must be different from " + f"parent hash ({parent_hash_recorded}) - proves parent-child", + ) + self.assertGreaterEqual( + stats.payload_size.count, + 1, + "Should have one payload size measurement", + ) + + break + + self.assertTrue( + found_sqs_checkpoint, + "Should have found SQS consumption checkpoint in processor stats", + ) + + processor_instance.shutdown(timeout=0.1) @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) @patch("datadog_lambda.wrapper.set_dsm_context") From 7507357179582bcd61fde9d46116cb5b7a318511 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 15:31:18 -0400 Subject: [PATCH 15/19] test fixes --- tests/test_dsm.py | 113 +++++++++++++++++++ tests/test_wrapper.py | 257 +++++++++++------------------------------- 2 files changed, 176 insertions(+), 194 deletions(-) create mode 100644 tests/test_dsm.py diff --git a/tests/test_dsm.py b/tests/test_dsm.py new file mode 100644 index 00000000..8cebb3c0 --- /dev/null +++ b/tests/test_dsm.py @@ -0,0 +1,113 @@ +import unittest +from unittest.mock import patch, MagicMock + +from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context +from datadog_lambda.trigger import EventTypes + + +class TestDsmContext(unittest.TestCase): + def test_non_sqs_event_source_does_nothing(self): + """Test that non-SQS event sources don't trigger DSM context setting""" + event = {"Records": [{"body": "test"}]} + + mock_event_source = MagicMock() + mock_event_source.equals.return_value = False # Not SQS + + with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: + set_dsm_context(event, mock_event_source) + + mock_event_source.equals.assert_called_once_with(EventTypes.SQS) + mock_sqs_context.assert_not_called() + + def test_event_with_no_records_does_nothing(self): + """Test that events where Records is None don't trigger DSM processing""" + events_with_no_records = [ + {}, + {"Records": None}, + {"someOtherField": "value"}, + ] + + for event in events_with_no_records: + with patch( + "ddtrace.internal.datastreams.data_streams_processor" + ) as mock_processor: + _dsm_set_sqs_context(event) + + mock_processor.assert_not_called() + + def test_sqs_event_triggers_dsm_sqs_context(self): + """Test that SQS event sources trigger the SQS-specific DSM context function""" + sqs_event = { + "Records": [ + { + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue", + "body": "Hello from SQS!", + } + ] + } + + mock_event_source = MagicMock() + mock_event_source.equals.return_value = True + + with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: + set_dsm_context(sqs_event, mock_event_source) + + mock_sqs_context.assert_called_once_with(sqs_event) + + def test_multiple_records_process_each_record(self): + """Test that each record in an SQS event gets processed individually""" + multi_record_event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1", + "body": "Message 1", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2", + "body": "Message 2", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3", + "body": "Message 3", + }, + ] + } + + mock_processor = MagicMock() + mock_context = MagicMock() + + with patch( + "ddtrace.internal.datastreams.data_streams_processor", + return_value=mock_processor, + ): + with patch( + "ddtrace.internal.datastreams.botocore.get_datastreams_context", + return_value={}, + ): + with patch( + "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size", + return_value=100, + ): + with patch( + "ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode", + return_value=mock_context, + ): + _dsm_set_sqs_context(multi_record_event) + + assert mock_context.set_checkpoint.call_count == 3 + + calls = mock_context.set_checkpoint.call_args_list + expected_arns = [ + "arn:aws:sqs:us-east-1:123456789012:queue1", + "arn:aws:sqs:us-east-1:123456789012:queue2", + "arn:aws:sqs:us-east-1:123456789012:queue3", + ] + + for i, call in enumerate(calls): + args, kwargs = call + tags = args[0] + self.assertIn("direction:in", tags) + self.assertIn(f"topic:{expected_arns[i]}", tags) + self.assertIn("type:sqs", tags) + self.assertEqual(kwargs["payload_size"], 100) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 0e0fefb2..29a6dfb6 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -563,200 +563,69 @@ def return_type_test(event, context): self.assertEqual(result, test_result) self.assertFalse(MockPrintExc.called) - @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) - def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self): - from datadog_lambda.trigger import _EventSource, EventTypes - - sqs_event_source = _EventSource(EventTypes.SQS) - self.mock_extract_dd_trace_context.return_value = ({}, None, sqs_event_source) - - with patch( - "ddtrace.internal.datastreams.processor.get_connection" - ) as mock_get_connection: - - mock_conn = unittest.mock.MagicMock() - mock_response = unittest.mock.MagicMock() - mock_response.status = 200 - mock_conn.getresponse.return_value = mock_response - mock_get_connection.return_value = mock_conn - - def updated_get_datastreams_context(message): - """ - Updated version that handles the correct message formats - """ - import base64 - import json - - context_json = None - message_body = message - try: - body = message.get("Body") - if body: - message_body = json.loads(body) - except (ValueError, TypeError): - pass - - message_attributes = message_body.get( - "MessageAttributes" - ) or message_body.get("messageAttributes") - if not message_attributes: - return None - - if "_datadog" not in message_attributes: - return None - - datadog_attr = message_attributes["_datadog"] - - if message_body.get("Type") == "Notification": - if datadog_attr.get("Type") == "Binary": - context_json = json.loads( - base64.b64decode(datadog_attr["Value"]).decode() - ) - elif "StringValue" in datadog_attr: - context_json = json.loads(datadog_attr["StringValue"]) - elif "stringValue" in datadog_attr: - context_json = json.loads(datadog_attr["stringValue"]) - elif "BinaryValue" in datadog_attr: - context_json = json.loads(datadog_attr["BinaryValue"].decode()) - else: - print(f"DEBUG: Unhandled datadog_attr format: {datadog_attr}") - - return context_json - - # Step 1: Create a message with some context in the message attributes - from ddtrace.internal.datastreams.processor import DataStreamsProcessor - - processor_instance = DataStreamsProcessor() - - with patch( - "ddtrace.internal.datastreams.botocore.get_datastreams_context", - updated_get_datastreams_context, - ), patch( - "ddtrace.internal.datastreams.data_streams_processor", - return_value=processor_instance, - ): - - parent_ctx = processor_instance.new_pathway() - - parent_ctx.set_checkpoint( - ["direction:out", "topic:upstream-topic", "type:sqs"], - now_sec=1640995200.0, - payload_size=512, - ) - parent_hash = parent_ctx.hash - encoded_parent_context = parent_ctx.encode_b64() - - sqs_event = { - "Records": [ - { - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test", - "Body": "test message body", - "messageAttributes": { - "_datadog": { - "stringValue": json.dumps( - { - "dd-pathway-ctx-base64": encoded_parent_context - } - ) - } - }, - } - ] - } - - # Step 2: Call the handler - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - result = lambda_handler(sqs_event, get_mock_context()) - self.assertEqual(result["statusCode"], 200) - - # New context set after handler call - current_ctx = processor_instance._current_context.value - self.assertIsNotNone( - current_ctx, - "Data streams context should be set after processing SQS message", - ) - - # Step 3: Check that hash in this context is the child of the hash you passed - # Step 4: Check that the right checkpoint was produced during call to handler - # The buckets hold the aggregated stats for all checkpoints - found_sqs_checkpoint = False - for bucket_time, bucket in processor_instance._buckets.items(): - for aggr_key, stats in bucket.pathway_stats.items(): - edge_tags_str, hash_value, parent_hash_recorded = aggr_key - edge_tags = edge_tags_str.split(",") - - if ( - "direction:in" in edge_tags - and "topic:arn:aws:sqs:us-east-1:123456789012:test" - in edge_tags - and "type:sqs" in edge_tags - ): - found_sqs_checkpoint = True - - # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST - self.assertEqual( - parent_hash_recorded, - parent_hash, - f"Parent hash must be preserved: " - f"expected {parent_hash}, got {parent_hash_recorded}", - ) - self.assertEqual( - hash_value, - current_ctx.hash, - f"Child hash must match current context: " - f"expected {current_ctx.hash}, got {hash_value}", - ) - self.assertNotEqual( - hash_value, - parent_hash_recorded, - f"Child hash ({hash_value}) must be different from " - f"parent hash ({parent_hash_recorded}) - proves parent-child", - ) - self.assertGreaterEqual( - stats.payload_size.count, - 1, - "Should have one payload size measurement", - ) - - break - - self.assertTrue( - found_sqs_checkpoint, - "Should have found SQS consumption checkpoint in processor stats", - ) - - processor_instance.shutdown(timeout=0.1) - - @patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"}) - @patch("datadog_lambda.wrapper.set_dsm_context") - def test_set_dsm_context_called_when_enabled(self, mock_set_dsm_context): - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - lambda_event = {} - lambda_handler(lambda_event, get_mock_context()) - - mock_set_dsm_context.assert_called_once() - - @patch("datadog_lambda.wrapper.set_dsm_context") - def test_set_dsm_context_not_called_when_disabled(self, mock_set_dsm_context): - # Ensure DD_DATA_STREAMS_ENABLED is not in environment - if "DD_DATA_STREAMS_ENABLED" in os.environ: - del os.environ["DD_DATA_STREAMS_ENABLED"] - - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return {"statusCode": 200, "body": "processed"} - - lambda_event = {} - lambda_handler(lambda_event, get_mock_context()) - - mock_set_dsm_context.assert_not_called() + def test_set_dsm_context_called_when_DSM_and_tracing_enabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.called_once() + + def test_set_dsm_context_not_called_when_only_DSM_enabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.call_count == 0 + + def test_set_dsm_context_not_called_when_only_tracing_enabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.call_count == 0 + + def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self): + env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} + with patch.dict(os.environ, env_vars): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch( + "datadog_lambda.wrapper.set_dsm_context" + ) as set_dsm_context_patch: + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + assert result == "ok" + assert set_dsm_context_patch.call_count == 0 class TestLambdaDecoratorSettings(unittest.TestCase): From bbf219b9779fb5537987a120150e89ea9f539c74 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 15:37:07 -0400 Subject: [PATCH 16/19] fix --- tests/test_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 29a6dfb6..f5c93388 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -577,7 +577,7 @@ def lambda_handler(event, context): result = lambda_handler({}, get_mock_context()) assert result == "ok" - assert set_dsm_context_patch.called_once() + assert set_dsm_context_patch.call_count == 1 def test_set_dsm_context_not_called_when_only_DSM_enabled(self): env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} From 254b86486a371fa29c835c414ce1bf731ba36781 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 3 Jun 2025 16:07:46 -0400 Subject: [PATCH 17/19] fix --- tests/test_wrapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index f5c93388..d6e71cc3 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -614,7 +614,7 @@ def lambda_handler(event, context): def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self): env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): + with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): with patch( "datadog_lambda.wrapper.set_dsm_context" ) as set_dsm_context_patch: From dda744ee48f6b805e1f42ae1de803ef7ce2b7822 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 4 Jun 2025 08:04:06 -0400 Subject: [PATCH 18/19] unit test fixes --- tests/test_dsm.py | 99 ++++++++++++++++++++++--------------------- tests/test_wrapper.py | 96 ++++++++++++++++++++--------------------- 2 files changed, 97 insertions(+), 98 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 8cebb3c0..1fa28e97 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -6,6 +6,31 @@ class TestDsmContext(unittest.TestCase): + def setUp(self): + patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") + self.mock_dsm_set_sqs_context = patcher.start() + self.addCleanup(patcher.stop) + + patcher = patch("ddtrace.internal.datastreams.data_streams_processor") + self.mock_data_streams_processor = patcher.start() + self.addCleanup(patcher.stop) + + patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context") + self.mock_get_datastreams_context = patcher.start() + self.mock_get_datastreams_context.return_value = {} + self.addCleanup(patcher.stop) + + patcher = patch( + "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size" + ) + self.mock_calculate_sqs_payload_size = patcher.start() + self.mock_calculate_sqs_payload_size.return_value = 100 + self.addCleanup(patcher.stop) + + patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode") + self.mock_dsm_pathway_codec_decode = patcher.start() + self.addCleanup(patcher.stop) + def test_non_sqs_event_source_does_nothing(self): """Test that non-SQS event sources don't trigger DSM context setting""" event = {"Records": [{"body": "test"}]} @@ -13,11 +38,10 @@ def test_non_sqs_event_source_does_nothing(self): mock_event_source = MagicMock() mock_event_source.equals.return_value = False # Not SQS - with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: - set_dsm_context(event, mock_event_source) + set_dsm_context(event, mock_event_source) - mock_event_source.equals.assert_called_once_with(EventTypes.SQS) - mock_sqs_context.assert_not_called() + mock_event_source.equals.assert_called_once_with(EventTypes.SQS) + self.mock_dsm_set_sqs_context.assert_not_called() def test_event_with_no_records_does_nothing(self): """Test that events where Records is None don't trigger DSM processing""" @@ -28,12 +52,8 @@ def test_event_with_no_records_does_nothing(self): ] for event in events_with_no_records: - with patch( - "ddtrace.internal.datastreams.data_streams_processor" - ) as mock_processor: - _dsm_set_sqs_context(event) - - mock_processor.assert_not_called() + _dsm_set_sqs_context(event) + self.mock_data_streams_processor.assert_not_called() def test_sqs_event_triggers_dsm_sqs_context(self): """Test that SQS event sources trigger the SQS-specific DSM context function""" @@ -50,10 +70,9 @@ def test_sqs_event_triggers_dsm_sqs_context(self): mock_event_source = MagicMock() mock_event_source.equals.return_value = True - with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context: - set_dsm_context(sqs_event, mock_event_source) + set_dsm_context(sqs_event, mock_event_source) - mock_sqs_context.assert_called_once_with(sqs_event) + self.mock_dsm_set_sqs_context.assert_called_once_with(sqs_event) def test_multiple_records_process_each_record(self): """Test that each record in an SQS event gets processed individually""" @@ -74,40 +93,24 @@ def test_multiple_records_process_each_record(self): ] } - mock_processor = MagicMock() mock_context = MagicMock() + self.mock_dsm_pathway_codec_decode.return_value = mock_context + + _dsm_set_sqs_context(multi_record_event) + + self.assertEqual(mock_context.set_checkpoint.call_count, 3) + + calls = mock_context.set_checkpoint.call_args_list + expected_arns = [ + "arn:aws:sqs:us-east-1:123456789012:queue1", + "arn:aws:sqs:us-east-1:123456789012:queue2", + "arn:aws:sqs:us-east-1:123456789012:queue3", + ] - with patch( - "ddtrace.internal.datastreams.data_streams_processor", - return_value=mock_processor, - ): - with patch( - "ddtrace.internal.datastreams.botocore.get_datastreams_context", - return_value={}, - ): - with patch( - "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size", - return_value=100, - ): - with patch( - "ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode", - return_value=mock_context, - ): - _dsm_set_sqs_context(multi_record_event) - - assert mock_context.set_checkpoint.call_count == 3 - - calls = mock_context.set_checkpoint.call_args_list - expected_arns = [ - "arn:aws:sqs:us-east-1:123456789012:queue1", - "arn:aws:sqs:us-east-1:123456789012:queue2", - "arn:aws:sqs:us-east-1:123456789012:queue3", - ] - - for i, call in enumerate(calls): - args, kwargs = call - tags = args[0] - self.assertIn("direction:in", tags) - self.assertIn(f"topic:{expected_arns[i]}", tags) - self.assertIn("type:sqs", tags) - self.assertEqual(kwargs["payload_size"], 100) + for i, call in enumerate(calls): + args, kwargs = call + tags = args[0] + self.assertIn("direction:in", tags) + self.assertIn(f"topic:{expected_arns[i]}", tags) + self.assertIn("type:sqs", tags) + self.assertEqual(kwargs["payload_size"], 100) diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index d6e71cc3..f482fa3d 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -76,6 +76,10 @@ def setUp(self): self.mock_dd_lambda_layer_tag = patcher.start() self.addCleanup(patcher.stop) + patcher = patch("datadog_lambda.wrapper.set_dsm_context") + self.mock_set_dsm_context = patcher.start() + self.addCleanup(patcher.stop) + def test_datadog_lambda_wrapper(self): wrapper.dd_tracing_enabled = False @@ -564,68 +568,60 @@ def return_type_test(event, context): self.assertFalse(MockPrintExc.called) def test_set_dsm_context_called_when_DSM_and_tracing_enabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: + os.environ["DD_DATA_STREAMS_ENABLED"] = "true" + wrapper.dd_tracing_enabled = True + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_called_once() - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 1 + del os.environ["DD_DATA_STREAMS_ENABLED"] def test_set_dsm_context_not_called_when_only_DSM_enabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "true"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: + os.environ["DD_DATA_STREAMS_ENABLED"] = "true" + wrapper.dd_tracing_enabled = False + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_not_called() - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 0 + del os.environ["DD_DATA_STREAMS_ENABLED"] def test_set_dsm_context_not_called_when_only_tracing_enabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", True): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: + os.environ["DD_DATA_STREAMS_ENABLED"] = "false" + wrapper.dd_tracing_enabled = True - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 0 + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_not_called() + + del os.environ["DD_DATA_STREAMS_ENABLED"] def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self): - env_vars = {"DD_DATA_STREAMS_ENABLED": "false"} - with patch.dict(os.environ, env_vars): - with patch("datadog_lambda.wrapper.dd_tracing_enabled", False): - with patch( - "datadog_lambda.wrapper.set_dsm_context" - ) as set_dsm_context_patch: - - @wrapper.datadog_lambda_wrapper - def lambda_handler(event, context): - return "ok" - - result = lambda_handler({}, get_mock_context()) - assert result == "ok" - assert set_dsm_context_patch.call_count == 0 + os.environ["DD_DATA_STREAMS_ENABLED"] = "false" + wrapper.dd_tracing_enabled = False + + @wrapper.datadog_lambda_wrapper + def lambda_handler(event, context): + return "ok" + + result = lambda_handler({}, get_mock_context()) + self.assertEqual(result, "ok") + self.mock_set_dsm_context.assert_not_called() + + del os.environ["DD_DATA_STREAMS_ENABLED"] class TestLambdaDecoratorSettings(unittest.TestCase): From 6f65160bb8be7b25db38a92334c5c06934d4da7c Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 5 Jun 2025 13:09:33 -0400 Subject: [PATCH 19/19] fixes --- tests/test_dsm.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 1fa28e97..544212d8 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -2,10 +2,10 @@ from unittest.mock import patch, MagicMock from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context -from datadog_lambda.trigger import EventTypes +from datadog_lambda.trigger import EventTypes, _EventSource -class TestDsmContext(unittest.TestCase): +class TestDsmSQSContext(unittest.TestCase): def setUp(self): patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") self.mock_dsm_set_sqs_context = patcher.start() @@ -33,17 +33,15 @@ def setUp(self): def test_non_sqs_event_source_does_nothing(self): """Test that non-SQS event sources don't trigger DSM context setting""" - event = {"Records": [{"body": "test"}]} + event = {} + # Use Unknown Event Source + event_source = _EventSource(EventTypes.UNKNOWN) + set_dsm_context(event, event_source) - mock_event_source = MagicMock() - mock_event_source.equals.return_value = False # Not SQS - - set_dsm_context(event, mock_event_source) - - mock_event_source.equals.assert_called_once_with(EventTypes.SQS) + # DSM context should not be set for non-SQS events self.mock_dsm_set_sqs_context.assert_not_called() - def test_event_with_no_records_does_nothing(self): + def test_sqs_event_with_no_records_does_nothing(self): """Test that events where Records is None don't trigger DSM processing""" events_with_no_records = [ {}, @@ -67,14 +65,12 @@ def test_sqs_event_triggers_dsm_sqs_context(self): ] } - mock_event_source = MagicMock() - mock_event_source.equals.return_value = True - - set_dsm_context(sqs_event, mock_event_source) + event_source = _EventSource(EventTypes.SQS) + set_dsm_context(sqs_event, event_source) self.mock_dsm_set_sqs_context.assert_called_once_with(sqs_event) - def test_multiple_records_process_each_record(self): + def test_sqs_multiple_records_process_each_record(self): """Test that each record in an SQS event gets processed individually""" multi_record_event = { "Records": [