Skip to content

Commit 99ebc69

Browse files
committed
feature #42335 [Messenger] Add WorkerMetadata to Worker class. (okwinza)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] Add `WorkerMetadata` to `Worker` class. | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | Fixes #37736 | License | MIT | Doc PR | - At the moment, there is no clean way to access the values of `transportNames` or recently introduced `queueNames` that the worker was configured with, although such data might be quite useful for logging/monitoring or other tasks. This PR attempts to fix that by adding a new and extensible way to provide additional information about a particular `Worker` object. So far, the following PRs could benefit from this change: - #43133 - #42723 **Use case example:** ---- - As I developer - When a message was consumed from transport with name `async`. - And the worker state is `idle`. - Then I want to reset services. **Before this PR**, the only solution not relying on using Reflection API would look like this: ```php private $servicesResetter; private $receiversName; private $actualReceiverName = null; public function __construct(ServicesResetter $servicesResetter, array $receiversName) { $this->servicesResetter = $servicesResetter; $this->receiversName = $receiversName; } public function saveReceiverName(AbstractWorkerMessageEvent $event): void { $this->actualReceiverName = $event->getReceiverName(); } public function resetServices(WorkerRunningEvent $event): void { if (!$event->isWorkerIdle() && \in_array($this->actualReceiverName, $this->receiversName, true)) { $this->servicesResetter->reset(); } $this->actualReceiverName = null; } public static function getSubscribedEvents(): array { return [ WorkerMessageHandledEvent::class => ['saveReceiverName'], WorkerMessageFailedEvent::class => ['saveReceiverName'], WorkerRunningEvent::class => ['resetServices'], ]; } ``` **With this PR**, one could simply use this to retrieve the transport name. ```php $event->getWorker()->getWorkerMetadata()->getTransportName() === $this->transportName; ``` So the whole solution would look like this: ```php private $servicesResetter; private $receiversName; public function __construct(ServicesResetter $servicesResetter, array $receiversName) { $this->servicesResetter = $servicesResetter; $this->receiversName = $receiversName; } public function resetServices(WorkerRunningEvent $event): void { $actualTransportName = $event->getWorker()->getWorkerMetadata()->getTransportName(); if (!$event->isWorkerIdle() || !in_array($actualTransportName, $this->receiversName, true)) { return; } $this->servicesResetter->reset(); } public static function getSubscribedEvents(): array { return [ WorkerRunningEvent::class => ['resetServices'], ]; } ``` Commits ------- 583f85b [Messenger] Add WorkerMetadata to Worker class
2 parents 31d81e2 + 583f85b commit 99ebc69

File tree

5 files changed

+150
-3
lines changed

5 files changed

+150
-3
lines changed

src/Symfony/Component/Messenger/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ CHANGELOG
66

77
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
88
* Add support for resetting container services after each messenger message.
9+
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
10+
* New method `getMetadata()` was added to `Worker` class which returns the `WorkerMetadata` object.
911

1012
5.3
1113
---
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\WorkerMetadata;
16+
17+
/**
18+
* @author Oleg Krasavin <okwinza@gmail.com>
19+
*/
20+
class WorkerMetadataTest extends TestCase
21+
{
22+
public function testItReturnsDefaultValuesIfNoneProvided()
23+
{
24+
$metadata = new WorkerMetadata([]);
25+
26+
$this->assertNull($metadata->getQueueNames());
27+
$this->assertSame([], $metadata->getTransportNames());
28+
}
29+
30+
public function testItReturnsProvidedMetadata()
31+
{
32+
$data = [
33+
'queueNames' => ['c', 'b', 'a'],
34+
'transportNames' => ['a', 'b', 'c'],
35+
];
36+
37+
$metadata = new WorkerMetadata($data);
38+
39+
$this->assertSame($data['queueNames'], $metadata->getQueueNames());
40+
$this->assertSame($data['transportNames'], $metadata->getTransportNames());
41+
}
42+
43+
public function testItSetsMetadataViaSetter()
44+
{
45+
$data = [
46+
'queueNames' => ['c', 'b', 'a'],
47+
'transportNames' => ['a', 'b', 'c'],
48+
];
49+
50+
$metadata = new WorkerMetadata([]);
51+
52+
$metadata->set($data);
53+
54+
$this->assertSame($data['queueNames'], $metadata->getQueueNames());
55+
$this->assertSame($data['transportNames'], $metadata->getTransportNames());
56+
}
57+
}

src/Symfony/Component/Messenger/Tests/WorkerTest.php

+30
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,28 @@ public function testWorkerDispatchesEventsOnError()
167167
$worker->run();
168168
}
169169

170+
public function testWorkerContainsMetadata()
171+
{
172+
$envelope = new Envelope(new DummyMessage('Hello'));
173+
$receiver = new DummyQueueReceiver([[$envelope]]);
174+
175+
$bus = $this->createMock(MessageBusInterface::class);
176+
$bus->method('dispatch')->willReturn($envelope);
177+
178+
$dispatcher = new EventDispatcher();
179+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
180+
$event->getWorker()->stop();
181+
});
182+
183+
$worker = new Worker(['dummyReceiver' => $receiver], $bus, $dispatcher);
184+
$worker->run(['queues' => ['queue1', 'queue2']]);
185+
186+
$workerMetadata = $worker->getMetadata();
187+
188+
$this->assertSame(['queue1', 'queue2'], $workerMetadata->getQueueNames());
189+
$this->assertSame(['dummyReceiver'], $workerMetadata->getTransportNames());
190+
}
191+
170192
public function testTimeoutIsConfigurable()
171193
{
172194
$apiMessage = new DummyMessage('API');
@@ -359,3 +381,11 @@ public function getAcknowledgedEnvelopes(): array
359381
return $this->acknowledgedEnvelopes;
360382
}
361383
}
384+
385+
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
386+
{
387+
public function getFromQueues(array $queueNames): iterable
388+
{
389+
return $this->get();
390+
}
391+
}

src/Symfony/Component/Messenger/Worker.php

+14-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class Worker
4242
private $eventDispatcher;
4343
private $logger;
4444
private $shouldStop = false;
45+
private $metadata;
4546

4647
/**
4748
* @param ReceiverInterface[] $receivers Where the key is the transport name
@@ -52,6 +53,9 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
5253
$this->bus = $bus;
5354
$this->logger = $logger;
5455
$this->eventDispatcher = class_exists(Event::class) ? LegacyEventDispatcherProxy::decorate($eventDispatcher) : $eventDispatcher;
56+
$this->metadata = new WorkerMetadata([
57+
'transportNames' => array_keys($receivers),
58+
]);
5559
}
5660

5761
/**
@@ -63,12 +67,14 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
6367
*/
6468
public function run(array $options = []): void
6569
{
66-
$this->dispatchEvent(new WorkerStartedEvent($this));
67-
6870
$options = array_merge([
6971
'sleep' => 1000000,
7072
], $options);
71-
$queueNames = $options['queues'] ?? false;
73+
$queueNames = $options['queues'] ?? null;
74+
75+
$this->metadata->set(['queueNames' => $queueNames]);
76+
77+
$this->dispatchEvent(new WorkerStartedEvent($this));
7278

7379
if ($queueNames) {
7480
// if queue names are specified, all receivers must implement the QueueReceiverInterface
@@ -173,6 +179,11 @@ public function stop(): void
173179
$this->shouldStop = true;
174180
}
175181

182+
public function getMetadata(): WorkerMetadata
183+
{
184+
return $this->metadata;
185+
}
186+
176187
private function dispatchEvent(object $event): void
177188
{
178189
if (null === $this->eventDispatcher) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
/**
15+
* @author Oleg Krasavin <okwinza@gmail.com>
16+
*/
17+
final class WorkerMetadata
18+
{
19+
private $metadata;
20+
21+
public function __construct(array $metadata)
22+
{
23+
$this->metadata = $metadata;
24+
}
25+
26+
public function set(array $newMetadata): void
27+
{
28+
$this->metadata = array_merge($this->metadata, $newMetadata);
29+
}
30+
31+
/**
32+
* Returns the queue names the worker consumes from, if "--queues" option was used.
33+
* Returns null otherwise.
34+
*/
35+
public function getQueueNames(): ?array
36+
{
37+
return $this->metadata['queueNames'] ?? null;
38+
}
39+
40+
/**
41+
* Returns an array of unique identifiers for transport receivers the worker consumes from.
42+
*/
43+
public function getTransportNames(): array
44+
{
45+
return $this->metadata['transportNames'] ?? [];
46+
}
47+
}

0 commit comments

Comments
 (0)