-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Closed

Description
Type of request: This is a ...
[ ] bug report
[x] feature request
Detailed description
As per AWS documentation (https://docs.aws.amazon.com/xray/latest/devguide/xray-services-s3.html) it is possible to propagate trace context when publishing S3 notifications to SQS. Value of HTTP header "X-Amzn-Trace-Id" set upon S3 request is converted into "AWSTraceHeader" system attribute of a SQS message.
Expected behavior
If HTTP header "X-Amzn-Trace-Id" is set upon S3 request, resulting message gets system attribute "AWSTraceHeader".
Actual behavior
Header value is not propagated
Steps to reproduce
Groovy / Spock test case used (note - X-Ray header is added by the OpenTelemetry instrumentation):
def setupSpec() {
localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest"))
.withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS)
.withEnv("DEBUG", "1")
.withEnv("SQS_PROVIDER", "elasticmq")
localstack.start()
sqsClient = AmazonSQSAsyncClient.asyncBuilder()
.withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS))
.withCredentials(localstack.getDefaultCredentialsProvider())
.build()
s3Client = AmazonS3Client.builder()
.withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SNS))
.withCredentials(localstack.getDefaultCredentialsProvider())
.build()
localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test")))
}
def cleanupSpec() {
if (localstack != null) {
localstack.stop()
}
}
def createQueue(String queueName) {
return sqsClient.createQueue(queueName).getQueueUrl()
}
def getQueueArn(String queueUrl) {
return sqsClient.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn")).getAttributes()
.get("QueueArn")
}
def setQueuePolicy(String queueUrl, String queueArn) {
sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", policy(queueArn)))
}
def createBucket(String bucketName) {
s3Client.createBucket(bucketName)
}
def deleteBucket(String bucketName) {
ObjectListing objectListing = s3Client.listObjects(bucketName)
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator()
while (objIter.hasNext()) {
s3Client.deleteObject(bucketName, objIter.next().getKey())
}
s3Client.deleteBucket(bucketName)
}
def enableS3Notifications(String bucketName, String sqsQueueArn) {
BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration();
notificationConfiguration.addConfiguration("sqsQueueConfig",
new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut)));
s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest(
bucketName, notificationConfiguration))
}
def "simple S3 upload as producer - SQS consumer services"() {
setup:
String queueName = "s3ToSqsTestQueue"
String bucketName = "s3-sqs-test-bucket"
String queueUrl = createQueue(queueName)
String queueArn = getQueueArn(queueUrl)
setQueuePolicy(queueUrl, queueArn)
createBucket(bucketName)
enableS3Notifications(bucketName, queueArn)
when:
// test message, auto created by AWS
sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl))
s3Client.putObject(bucketName, "testKey", "testData")
// traced message
sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl))
// cleanup
deleteBucket(bucketName)
sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl))
Metadata
Metadata
Assignees
Labels
No labels