Skip to content

Commit a84bdc8

Browse files
authored
add XRay trace headers in notifications from S3 to SQS (localstack#3933)
1 parent 5de3bf2 commit a84bdc8

File tree

4 files changed

+49
-19
lines changed

4 files changed

+49
-19
lines changed

localstack/services/s3/s3_listener.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
)
3232
from localstack.utils.analytics import event_publisher
3333
from localstack.utils.persistence import PersistingProxyListener
34-
from localstack.utils.aws.aws_responses import requests_response
34+
from localstack.utils.aws.aws_responses import requests_response, create_sqs_system_attributes
3535
from localstack.services.cloudformation.service_models import S3Bucket
3636

3737
CONTENT_SHA256_HEADER = 'x-amz-content-sha256'
@@ -176,7 +176,7 @@ def get_event_message(event_name, bucket_name, file_name='testfile.txt', etag=''
176176
}
177177

178178

179-
def send_notifications(method, bucket_name, object_path, version_id):
179+
def send_notifications(method, bucket_name, object_path, version_id, headers):
180180
for bucket, notifs in S3_NOTIFICATIONS.items():
181181
if normalize_bucket_name(bucket) == normalize_bucket_name(bucket_name):
182182
action = {'PUT': 'ObjectCreated', 'POST': 'ObjectCreated', 'DELETE': 'ObjectRemoved'}[method]
@@ -190,10 +190,11 @@ def send_notifications(method, bucket_name, object_path, version_id):
190190
event_name = '%s:%s' % (action, api_method)
191191
for notif in notifs:
192192
send_notification_for_subscriber(notif, bucket_name, object_path,
193-
version_id, api_method, action, event_name)
193+
version_id, api_method, action, event_name, headers)
194194

195195

196-
def send_notification_for_subscriber(notif, bucket_name, object_path, version_id, api_method, action, event_name):
196+
def send_notification_for_subscriber(notif, bucket_name, object_path, version_id, api_method, action, event_name,
197+
headers):
197198
bucket_name = normalize_bucket_name(bucket_name)
198199

199200
if not event_type_matches(notif['Event'], action, api_method) or \
@@ -225,7 +226,8 @@ def send_notification_for_subscriber(notif, bucket_name, object_path, version_id
225226
sqs_client = aws_stack.connect_to_service('sqs')
226227
try:
227228
queue_url = aws_stack.sqs_queue_url_for_arn(notif['Queue'])
228-
sqs_client.send_message(QueueUrl=queue_url, MessageBody=message)
229+
sqs_client.send_message(QueueUrl=queue_url, MessageBody=message,
230+
MessageSystemAttributes=create_sqs_system_attributes(headers))
229231
except Exception as e:
230232
LOGGER.warning('Unable to send notification for S3 bucket "%s" to SQS queue "%s": %s' %
231233
(bucket_name, notif['Queue'], e))
@@ -1239,7 +1241,7 @@ def return_response(self, method, path, data, headers, response, request_handler
12391241
object_path = parts[1] if parts[1][0] == '/' else '/%s' % parts[1]
12401242
version_id = response.headers.get('x-amz-version-id', None)
12411243

1242-
send_notifications(method, bucket_name, object_path, version_id)
1244+
send_notifications(method, bucket_name, object_path, version_id, headers)
12431245

12441246
# publish event for creation/deletion of buckets:
12451247
if method in ('PUT', 'DELETE') and ('/' not in path[1:] or len(path[1:].split('/')[1]) <= 0):

localstack/services/sns/sns_listener.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from localstack.services.install import SQS_BACKEND_IMPL
1717
from localstack.utils.analytics import event_publisher
1818
from localstack.utils.aws import aws_stack
19-
from localstack.utils.aws.aws_responses import response_regex_replace
19+
from localstack.utils.aws.aws_responses import response_regex_replace, create_sqs_system_attributes
2020
from localstack.utils.aws.dead_letter_queue import sns_error_to_dead_letter_queue
2121
from localstack.utils.common import parse_request_data, timestamp_millis, short_uid, to_str, to_bytes, start_thread
2222
from localstack.utils.persistence import PersistingProxyListener
@@ -659,17 +659,6 @@ def create_sqs_message_attributes(subscriber, attributes):
659659
return message_attributes
660660

661661

662-
def create_sqs_system_attributes(headers):
663-
664-
system_attributes = {}
665-
if 'X-Amzn-Trace-Id' in headers:
666-
system_attributes['AWSTraceHeader'] = {
667-
'DataType': 'String',
668-
'StringValue': str(headers['X-Amzn-Trace-Id'])
669-
}
670-
return system_attributes
671-
672-
673662
def get_message_attributes(req_data):
674663
attributes = {}
675664
x = 1

localstack/utils/aws/aws_responses.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,17 @@ def make_error(*args, **kwargs):
170170
return flask_error_response_xml(*args, **kwargs)
171171

172172

173+
def create_sqs_system_attributes(headers):
174+
175+
system_attributes = {}
176+
if 'X-Amzn-Trace-Id' in headers:
177+
system_attributes['AWSTraceHeader'] = {
178+
'DataType': 'String',
179+
'StringValue': str(headers['X-Amzn-Trace-Id'])
180+
}
181+
return system_attributes
182+
183+
173184
def extract_tags(req_data):
174185
keys = []
175186
values = []

tests/integration/test_s3.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from localstack.constants import TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY, S3_VIRTUAL_HOSTNAME
2323
from localstack.utils.aws import aws_stack
2424
from localstack.services.s3 import s3_listener, s3_utils
25-
2625
from localstack.utils.common import (
2726
new_tmp_dir, short_uid, retry, get_service_protocol, to_bytes, safe_requests, to_str, new_tmp_file, rm_rf,
2827
load_file, run)
@@ -1558,6 +1557,35 @@ def test_encoding_notification_messages(self):
15581557
# clean up
15591558
self.s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': [{'Key': key}]})
15601559

1560+
def add_xray_header(self, request, **kwargs):
1561+
request.headers['X-Amzn-Trace-Id'] = \
1562+
'Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1'
1563+
1564+
def test_xray_header_to_sqs(self):
1565+
key = 'test-data'
1566+
bucket_name = 'bucket-%s' % short_uid()
1567+
self.s3_client.meta.events.register('before-send.s3.*', self.add_xray_header)
1568+
queue_url = self.sqs_client.create_queue(QueueName='testQueueNew')['QueueUrl']
1569+
queue_attributes = self.sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])
1570+
1571+
self._create_test_notification_bucket(queue_attributes, bucket_name=bucket_name)
1572+
1573+
# put an object where the bucket_name is in the path
1574+
self.s3_client.put_object(Bucket=bucket_name, Key=key, Body='something')
1575+
1576+
def get_message(queue_url):
1577+
resp = self.sqs_client.receive_message(QueueUrl=queue_url,
1578+
AttributeNames=['AWSTraceHeader'],
1579+
MessageAttributeNames=['All'])
1580+
1581+
self.assertEqual(resp['Messages'][0]['Attributes']['AWSTraceHeader'],
1582+
'Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1')
1583+
1584+
retry(get_message, retries=3, sleep=10, queue_url=queue_url)
1585+
1586+
# clean up
1587+
self.s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': [{'Key': key}]})
1588+
15611589
def test_s3_batch_delete_objects_using_requests(self):
15621590
bucket_name = 'bucket-%s' % short_uid()
15631591
object_key_1 = 'key-%s' % short_uid()

0 commit comments

Comments
 (0)