|
30 | 30 | class Connection
|
31 | 31 | {
|
32 | 32 | private const AWS_SQS_FIFO_SUFFIX = '.fifo';
|
| 33 | + private const MESSAGE_ATTRIBUTE_NAME = 'X-Symfony-Messenger'; |
33 | 34 |
|
34 | 35 | private const DEFAULT_OPTIONS = [
|
35 | 36 | 'buffer_size' => 9,
|
@@ -200,7 +201,12 @@ private function fetchMessage(): bool
|
200 | 201 |
|
201 | 202 | foreach ($this->currentResponse->getMessages() as $message) {
|
202 | 203 | $headers = [];
|
203 |
| - foreach ($message->getMessageAttributes() as $name => $attribute) { |
| 204 | + $attributes = $message->getMessageAttributes(); |
| 205 | + if (isset($attributes[self::MESSAGE_ATTRIBUTE_NAME]) && 'String' === $attributes[self::MESSAGE_ATTRIBUTE_NAME]->getDataType()) { |
| 206 | + $headers = json_decode($attributes[self::MESSAGE_ATTRIBUTE_NAME]->getStringValue(), true); |
| 207 | + unset($attributes[self::MESSAGE_ATTRIBUTE_NAME]); |
| 208 | + } |
| 209 | + foreach ($attributes as $name => $attribute) { |
204 | 210 | if ('String' !== $attribute->getDataType()) {
|
205 | 211 | continue;
|
206 | 212 | }
|
@@ -284,13 +290,29 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
|
284 | 290 | 'MessageAttributes' => [],
|
285 | 291 | ];
|
286 | 292 |
|
| 293 | + $specialHeaders = []; |
287 | 294 | foreach ($headers as $name => $value) {
|
| 295 | + if ('.' === $name[0] || self::MESSAGE_ATTRIBUTE_NAME === $name || \strlen($name) > 256 || '.' === substr($name, -1) || 'AWS.' === substr($name, 0, \strlen('AWS.')) || 'Amazon.' === substr($name, 0, \strlen('Amazon.')) || preg_match('/([^a-zA-Z0-9_\.-]+|\.\.)/', $name)) { |
| 296 | + $specialHeaders[$name] = $value; |
| 297 | + |
| 298 | + continue; |
| 299 | + } |
| 300 | + |
288 | 301 | $parameters['MessageAttributes'][$name] = new MessageAttributeValue([
|
289 | 302 | 'DataType' => 'String',
|
290 | 303 | 'StringValue' => $value,
|
291 | 304 | ]);
|
292 | 305 | }
|
293 | 306 |
|
| 307 | + if (!empty($specialHeaders)) { |
| 308 | + $parameters['MessageAttributes'][self::MESSAGE_ATTRIBUTE_NAME] = new MessageAttributeValue([ |
| 309 | + 'DataType' => 'String', |
| 310 | + 'StringValue' => json_encode($specialHeaders), |
| 311 | + ]); |
| 312 | + } |
| 313 | + |
| 314 | + dd($parameters); |
| 315 | + |
294 | 316 | if (self::isFifoQueue($this->configuration['queue_name'])) {
|
295 | 317 | $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
|
296 | 318 | $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
|
|
0 commit comments