Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions localstack/services/s3/s3_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)
from localstack.utils.analytics import event_publisher
from localstack.utils.persistence import PersistingProxyListener
from localstack.utils.aws.aws_responses import requests_response
from localstack.utils.aws.aws_responses import requests_response, create_sqs_system_attributes
from localstack.services.cloudformation.service_models import S3Bucket

CONTENT_SHA256_HEADER = 'x-amz-content-sha256'
Expand Down Expand Up @@ -173,7 +173,7 @@ def get_event_message(event_name, bucket_name, file_name='testfile.txt', etag=''
}


def send_notifications(method, bucket_name, object_path, version_id):
def send_notifications(method, bucket_name, object_path, version_id, headers):
for bucket, notifs in S3_NOTIFICATIONS.items():
if normalize_bucket_name(bucket) == normalize_bucket_name(bucket_name):
action = {'PUT': 'ObjectCreated', 'POST': 'ObjectCreated', 'DELETE': 'ObjectRemoved'}[method]
Expand All @@ -187,10 +187,11 @@ def send_notifications(method, bucket_name, object_path, version_id):
event_name = '%s:%s' % (action, api_method)
for notif in notifs:
send_notification_for_subscriber(notif, bucket_name, object_path,
version_id, api_method, action, event_name)
version_id, api_method, action, event_name, headers)


def send_notification_for_subscriber(notif, bucket_name, object_path, version_id, api_method, action, event_name):
def send_notification_for_subscriber(notif, bucket_name, object_path, version_id, api_method, action, event_name,
headers):
bucket_name = normalize_bucket_name(bucket_name)

if not event_type_matches(notif['Event'], action, api_method) or \
Expand Down Expand Up @@ -222,7 +223,8 @@ def send_notification_for_subscriber(notif, bucket_name, object_path, version_id
sqs_client = aws_stack.connect_to_service('sqs')
try:
queue_url = aws_stack.sqs_queue_url_for_arn(notif['Queue'])
sqs_client.send_message(QueueUrl=queue_url, MessageBody=message)
sqs_client.send_message(QueueUrl=queue_url, MessageBody=message,
MessageSystemAttributes=create_sqs_system_attributes(headers))
except Exception as e:
LOGGER.warning('Unable to send notification for S3 bucket "%s" to SQS queue "%s": %s' %
(bucket_name, notif['Queue'], e))
Expand Down Expand Up @@ -1179,7 +1181,7 @@ def return_response(self, method, path, data, headers, response, request_handler
object_path = parts[1] if parts[1][0] == '/' else '/%s' % parts[1]
version_id = response.headers.get('x-amz-version-id', None)

send_notifications(method, bucket_name, object_path, version_id)
send_notifications(method, bucket_name, object_path, version_id, headers)

# publish event for creation/deletion of buckets:
if method in ('PUT', 'DELETE') and ('/' not in path[1:] or len(path[1:].split('/')[1]) <= 0):
Expand Down
13 changes: 1 addition & 12 deletions localstack/services/sns/sns_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from localstack.services.install import SQS_BACKEND_IMPL
from localstack.utils.analytics import event_publisher
from localstack.utils.aws import aws_stack
from localstack.utils.aws.aws_responses import response_regex_replace
from localstack.utils.aws.aws_responses import response_regex_replace, create_sqs_system_attributes
from localstack.utils.aws.dead_letter_queue import sns_error_to_dead_letter_queue
from localstack.utils.common import parse_request_data, timestamp_millis, short_uid, to_str, to_bytes, start_thread
from localstack.utils.persistence import PersistingProxyListener
Expand Down Expand Up @@ -658,17 +658,6 @@ def create_sqs_message_attributes(subscriber, attributes):
return message_attributes


def create_sqs_system_attributes(headers):

system_attributes = {}
if 'X-Amzn-Trace-Id' in headers:
system_attributes['AWSTraceHeader'] = {
'DataType': 'String',
'StringValue': str(headers['X-Amzn-Trace-Id'])
}
return system_attributes


def get_message_attributes(req_data):
attributes = {}
x = 1
Expand Down
11 changes: 11 additions & 0 deletions localstack/utils/aws/aws_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ def make_error(*args, **kwargs):
return flask_error_response_xml(*args, **kwargs)


def create_sqs_system_attributes(headers):

system_attributes = {}
if 'X-Amzn-Trace-Id' in headers:
system_attributes['AWSTraceHeader'] = {
'DataType': 'String',
'StringValue': str(headers['X-Amzn-Trace-Id'])
}
return system_attributes


def extract_tags(req_data):
keys = []
values = []
Expand Down
30 changes: 29 additions & 1 deletion tests/integration/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from localstack.constants import TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY, S3_VIRTUAL_HOSTNAME
from localstack.utils.aws import aws_stack
from localstack.services.s3 import s3_listener, s3_utils

from localstack.utils.common import (
new_tmp_dir, short_uid, retry, get_service_protocol, to_bytes, safe_requests, to_str, new_tmp_file, rm_rf,
load_file, run)
Expand Down Expand Up @@ -1540,6 +1539,35 @@ def test_encoding_notification_messages(self):
# clean up
self.s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': [{'Key': key}]})

def add_xray_header(self, request, **kwargs):
request.headers['X-Amzn-Trace-Id'] = \
'Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1'

def test_xray_header_to_sqs(self):
key = 'test-data'
bucket_name = 'bucket-%s' % short_uid()
self.s3_client.meta.events.register('before-send.s3.*', self.add_xray_header)
queue_url = self.sqs_client.create_queue(QueueName='testQueueNew')['QueueUrl']
queue_attributes = self.sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])

self._create_test_notification_bucket(queue_attributes, bucket_name=bucket_name)

# put an object where the bucket_name is in the path
self.s3_client.put_object(Bucket=bucket_name, Key=key, Body='something')

def get_message(queue_url):
resp = self.sqs_client.receive_message(QueueUrl=queue_url,
AttributeNames=['AWSTraceHeader'],
MessageAttributeNames=['All'])

self.assertEqual(resp['Messages'][0]['Attributes']['AWSTraceHeader'],
'Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1')

retry(get_message, retries=3, sleep=10, queue_url=queue_url)

# clean up
self.s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': [{'Key': key}]})

def test_s3_batch_delete_objects_using_requests(self):
bucket_name = 'bucket-%s' % short_uid()
object_key_1 = 'key-%s' % short_uid()
Expand Down