Skip to content

Commit 8a3af7b

Browse files
crynoboneStyleCIBottaylorotwell
authored
[12.x] Add supports for SQS Fair Queue (#56763)
* [12.x] Add supports for SQS Fair Queue Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * Apply fixes from StyleCI * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * Check for `deduplicateId()` before using `uniqueId()` Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * Skip relying on ShouldBeUnique Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * wip Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> * formatting * Apply fixes from StyleCI * cast to string --------- Signed-off-by: Mior Muhammad Zaki <crynobone@gmail.com> Co-authored-by: StyleCI Bot <bot@styleci.io> Co-authored-by: Taylor Otwell <taylor@laravel.com>
1 parent 2b79368 commit 8a3af7b

File tree

3 files changed

+71
-7
lines changed

3 files changed

+71
-7
lines changed

src/Illuminate/Bus/Queueable.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ trait Queueable
2727
*/
2828
public $queue;
2929

30+
/**
31+
* The job "group" the job should be sent to.
32+
*
33+
* @var string|null
34+
*/
35+
public $group;
36+
3037
/**
3138
* The number of seconds before the job should be made available.
3239
*
@@ -102,6 +109,21 @@ public function onQueue($queue)
102109
return $this;
103110
}
104111

112+
/**
113+
* Set the desired job "group".
114+
*
115+
* This feature is only supported by some queues, such as Amazon SQS.
116+
*
117+
* @param \UnitEnum|string $group
118+
* @return $this
119+
*/
120+
public function onGroup($group)
121+
{
122+
$this->group = enum_value($group);
123+
124+
return $this;
125+
}
126+
105127
/**
106128
* Set the desired connection for the chain.
107129
*

src/Illuminate/Foundation/Bus/PendingDispatch.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,21 @@ public function onQueue($queue)
6363
return $this;
6464
}
6565

66+
/**
67+
* Set the desired job "group".
68+
*
69+
* This feature is only supported by some queues, such as Amazon SQS.
70+
*
71+
* @param \UnitEnum|string $group
72+
* @return $this
73+
*/
74+
public function onGroup($group)
75+
{
76+
$this->job->onGroup($group);
77+
78+
return $this;
79+
}
80+
6681
/**
6782
* Set the desired connection for the chain.
6883
*

src/Illuminate/Queue/SqsQueue.php

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ public function push($job, $data = '', $queue = null)
162162
$this->createPayload($job, $queue ?: $this->default, $data),
163163
$queue,
164164
null,
165-
function ($payload, $queue) {
166-
return $this->pushRaw($payload, $queue);
165+
function ($payload, $queue) use ($job) {
166+
return $this->pushRaw($payload, $queue, $this->getQueueableOptions($job, $queue));
167167
}
168168
);
169169
}
@@ -199,16 +199,43 @@ public function later($delay, $job, $data = '', $queue = null)
199199
$this->createPayload($job, $queue ?: $this->default, $data, $delay),
200200
$queue,
201201
$delay,
202-
function ($payload, $queue, $delay) {
203-
return $this->sqs->sendMessage([
204-
'QueueUrl' => $this->getQueue($queue),
205-
'MessageBody' => $payload,
202+
function ($payload, $queue, $delay) use ($job) {
203+
return $this->pushRaw($payload, $queue, [
206204
'DelaySeconds' => $this->secondsUntil($delay),
207-
])->get('MessageId');
205+
...$this->getQueueableOptions($job, $queue),
206+
]);
208207
}
209208
);
210209
}
211210

211+
/**
212+
* Get the queueable options from the job.
213+
*
214+
* @param mixed $job
215+
* @param string|null $queue
216+
* @return array{MessageGroupId?: string, MessageDeduplicationId?: string}
217+
*/
218+
protected function getQueueableOptions($job, $queue): array
219+
{
220+
if (! is_object($job) || ! str_ends_with((string) $queue, '.fifo')) {
221+
return [];
222+
}
223+
224+
$transformToString = fn ($value) => strval($value);
225+
226+
$messageGroupId = transform($job->group ?? null, $transformToString);
227+
228+
$messageDeduplicationId = match (true) {
229+
method_exists($job, 'deduplicationId') => transform($job->deduplicationId(), $transformToString),
230+
default => (string) Str::orderedUuid(),
231+
};
232+
233+
return array_filter([
234+
'MessageGroupId' => $messageGroupId,
235+
'MessageDeduplicationId' => $messageDeduplicationId,
236+
]);
237+
}
238+
212239
/**
213240
* Push an array of jobs onto the queue.
214241
*

0 commit comments

Comments
 (0)