Skip to content

Enable X-Ray propagation for S3 to SQS notifications #3686

@ghost

Description

Type of request: This is a ...

[ ] bug report
[x] feature request

Detailed description

As per AWS documentation (https://docs.aws.amazon.com/xray/latest/devguide/xray-services-s3.html) it is possible to propagate trace context when publishing S3 notifications to SQS. Value of HTTP header "X-Amzn-Trace-Id" set upon S3 request is converted into "AWSTraceHeader" system attribute of a SQS message.

Expected behavior

If HTTP header "X-Amzn-Trace-Id" is set upon S3 request, resulting message gets system attribute "AWSTraceHeader".

Actual behavior

Header value is not propagated

Steps to reproduce

Groovy / Spock test case used (note - X-Ray header is added by the OpenTelemetry instrumentation):

  def setupSpec() {

    localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest"))
      .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS)
      .withEnv("DEBUG", "1")
      .withEnv("SQS_PROVIDER", "elasticmq")
    localstack.start()

    sqsClient = AmazonSQSAsyncClient.asyncBuilder()
      .withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS))
      .withCredentials(localstack.getDefaultCredentialsProvider())
      .build()

    s3Client = AmazonS3Client.builder()
      .withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SNS))
      .withCredentials(localstack.getDefaultCredentialsProvider())
      .build()

    localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test")))
  }

  def cleanupSpec() {
    if (localstack != null) {
      localstack.stop()
    }
  }

  def createQueue(String queueName) {
    return sqsClient.createQueue(queueName).getQueueUrl()
  }

  def getQueueArn(String queueUrl) {
    return sqsClient.getQueueAttributes(
      new GetQueueAttributesRequest(queueUrl)
        .withAttributeNames("QueueArn")).getAttributes()
      .get("QueueArn")
  }

  def setQueuePolicy(String queueUrl, String queueArn) {
    sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", policy(queueArn)))
  }

  def createBucket(String bucketName) {
    s3Client.createBucket(bucketName)
  }

  def deleteBucket(String bucketName) {
    ObjectListing objectListing = s3Client.listObjects(bucketName)
      Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator()
      while (objIter.hasNext()) {
        s3Client.deleteObject(bucketName, objIter.next().getKey())
      }
    s3Client.deleteBucket(bucketName)
  }

  def enableS3Notifications(String bucketName, String sqsQueueArn) {
    BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration();
    notificationConfiguration.addConfiguration("sqsQueueConfig",
      new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut)));
    s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest(
      bucketName, notificationConfiguration))
  }

  def "simple S3 upload as producer - SQS consumer services"() {
    setup:
    String queueName = "s3ToSqsTestQueue"
    String bucketName = "s3-sqs-test-bucket"

    String queueUrl = createQueue(queueName)
    String queueArn = getQueueArn(queueUrl)
    setQueuePolicy(queueUrl, queueArn)
    createBucket(bucketName)
    enableS3Notifications(bucketName, queueArn)

    when:
    // test message, auto created by AWS
    sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl))
    s3Client.putObject(bucketName, "testKey", "testData")
    // traced message
    sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl))
    // cleanup
    deleteBucket(bucketName)
    sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl))

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions