-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Added two new components: AMQP and Worker #23315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
$this->logger->info('Move a message...'); | ||
$this->broker->move($message, $to); | ||
$this->broker->ack($message); | ||
$this->logger->debug('...message moved.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the same level? Especially since there are ellipsis that should correspond to each other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To not flood to the log output. And it's clearly at debug level.
Instead of putting this logic in the FrameworkBundle is it possible to keep it in it's own bundle? |
{ | ||
$workers = $this->getContainer()->getParameter('worker.workers'); | ||
|
||
foreach ($workers as $name => $_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foreach (array_keys($this->getContainer()->getParameter('worker.workers')) as $name) {
is 2 less variables in the namespace, one of which is quite weird
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not really readable + it traverses twice the array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better as foreach ($this->workers as $worker)
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you are referring to the fact this command should be a service and so we could inject only what we need?
->setDescription('Run a worker') | ||
->setDefinition(array( | ||
new InputArgument('worker', InputArgument::REQUIRED, 'The worker'), | ||
new InputOption('name', null, InputOption::VALUE_REQUIRED, 'A name, useful for stats/monitoring. Default to worker name.'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- "Defaults"
- consider linebreaks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider linebreaks?
Do we do that in symfony? Does it breaks the output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm right, the usual Symfony practice is to not break this, whatever its length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @javiereguiluz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok then
->register($id, Worker\MessageFetcher\BufferedMessageFetcher::class) | ||
->addArgument(new Reference($fetchers[$fetcher['wrap']])) | ||
->addArgument(array( | ||
'max_buffurisation_time' => $fetcher['max_buffurisation_time'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/buffur/buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max_buffurisation_time
=> max_buffering_time
no ?
|
||
/** | ||
* @param int $time Time to wait in the queue in seconds | ||
* @param int $max The maximum number of time to retry (0 means indefinitely) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"number of times" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The maximum number of attempts (0 means no limit)
{ | ||
$exchange = Exchange::createFromUri(getenv('RABBITMQ_URL').'/?type=fanout'); | ||
|
||
$this->fail('An exception should have been thrown.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? If no exception is thrown, won't phpunit complain about it because of the annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, sorry.
{ | ||
private function __construct() | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment to explain why? It's certainly going to raise some eyebrows...
private $threshold; | ||
|
||
/** | ||
* @param int $threshold in byte. Default to 100Mb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- "bytes"
- "Defaults"
$this->stop('Force shut down of the worker because a StopException has been thrown.', $e); | ||
|
||
return; | ||
} catch (\Exception $e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this normal? If yes, maybe explain it in a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you referring to the fact we do nothing in the catch block?
Indeed it can be hard to understand. Initially we had 2 catch block (AMQPException and Exception) with 2 different log messages. So it's a relic of our previous lib.
Now it could be simplified indeed. Except It guess I have to support FatalError too. ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I catch also throwable error.
if (isset($fetchers[$name])) { | ||
throw new \InvalidArgumentException("A fetcher named \"$name\" already exist."); | ||
} | ||
if (isset($fetcher['connection'])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you use a ternary operator instead?
$reference = new Reference(isset($fetcher['connection']) ? 'amqp.broker.'.$fetcher['connection'] : 'amqp.broker');
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I could. But I don't like it as it's not really readable.
What would be the benefit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readability ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, no !
ternary operator are usually less readable. Many language don't even support them (golang, ...)
|
||
foreach ($config['fetchers']['amqps'] as $name => $fetcher) { | ||
if (isset($fetchers[$name])) { | ||
throw new \InvalidArgumentException("A fetcher named \"$name\" already exist."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use sprintf()
instead of variable interpolation in error messages.
Minor typo: already exist.
-> already exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use
sprintf()
instead of variable interpolation in error messages.
Variable interpolation is optimized in PHP7, maybe starting using this kind of practice is nice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my "personal" / "professional" projects I started using the following syntax "hello $name"
.
But in symfony, indeed we don't use it. So I'm going to change it.
Yes, it's possible. But (for now), it's not the logic adopted by symfony. |
public static function getSubscribedEvents() | ||
{ | ||
return array( | ||
LoopEvents::SLEEP => 'clearDoctrine', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be limitMemoryUsage
|
||
use Symfony\Component\Amqp\RetryStrategy\RetryStrategyInterface; | ||
|
||
class NonRetryableException extends \RuntimeException implements ExceptionInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in Broker::retry
|
||
namespace Symfony\Component\Amqp\RetryStrategy; | ||
|
||
interface RetryStrategyInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used?
I mean, ExponentialRetryStrategy and ConstantRetryStrategy implement it, but it seam to be not used elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in NonRetryableException::__construct()
and I added a check in queue::__construct()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, GitHub didn't display the content of this class... That's why I missed a lot of things. Now it displays well and answers my comments.
$this->offset = $offset; | ||
} | ||
|
||
public function isRetryable(\AMQPEnvelope $msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's about passing an Exception or a Context about the need to retry a message to let the strategy choose a better response.
Any reason to put this inside Symfony ? I don't see any reason not to have it ... it just look weird for me as this kind of project can have its own lifecycle without respecting symfony version. (like https://github.com/avalanche123/Imagine ... do you plan to integrate an image manipulation library inside symfony ? ) |
@fabpot ok then ;) |
9d43404
to
3211bfb
Compare
The |
// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly | ||
@cli_set_process_title($processName); | ||
|
||
pcntl_signal(SIGTERM, function () use ($loop) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this in one of my projects using php-amqp and for me it did not work reliably that's why I'm using https://github.com/rstgroup/php-signal-handler
But maybe you have some other experiences?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use the ->consumer()
method that is blocking.
but with ->get()
there are no issues ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I see 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if PCNTL is not enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a check + a dependency in the worker/composer.json
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So a default Symfony installation will need PCNTL extension too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@makasim What do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not understand what you want especially the part "for a kind of extension"
@chalasr It's not easy as it depends on the DIC. We could add an intermediary layer though |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not marked everything but:
- Missing a lot of documentation on class/method level
- The majority (if not all) of the classes should be final
protected function initialize(InputInterface $input, OutputInterface $output) | ||
{ | ||
$this->broker = $this->getContainer()->get('amqp.broker'); | ||
$this->logger = $this->getContainer()->get('logger'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason why not to make this a service? It's already trying to mimic one here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you mean by this
? The command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, command as a service seems to be the pattern here, except that it locates the services on initialization by itself
$this->logger->info('Move a message...'); | ||
$this->broker->move($message, $to); | ||
$this->broker->ack($message); | ||
$this->logger->debug('...message moved.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I see those messages in my console (or where ever it mails), they are useless. Can you add useful message and context to the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you want? I can not dump the message as it can be HUGE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could at least add a few context information like the $to and $from
|
||
protected function execute(InputInterface $input, OutputInterface $output) | ||
{ | ||
$workers = $this->getContainer()->getParameter('worker.workers'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the last command, is there any specific reason to not make it a service? It clearly has dependencies. Especially parameters are not something I'd rely on to be an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Injecting an array isn't reliable too, because array can be of any structure. Still, I also like to have commands as a service. Do we need to check somewhere if worker.workers
has the correct structure?
// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly | ||
@cli_set_process_title($processName); | ||
|
||
pcntl_signal(SIGTERM, function () use ($loop) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if PCNTL is not enabled?
)); | ||
} | ||
|
||
return $this->getContainer()->get($workers[$workerName]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be solved by using the lazy injections instead?
*/ | ||
interface ConfigurableLoopInterface | ||
{ | ||
public function setName($name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing docs
/** | ||
* @author Grégoire Pineau <lyrixx@lyrixx.info> | ||
*/ | ||
class StopException extends \RuntimeException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no dedicated component interface?
public function run() | ||
{ | ||
if (null !== $this->logger) { | ||
$this->logger->notice('Worker {worker} started.', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not require a null logger instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't do that in symfony.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requiring a null logger makes it harder to initialize the class by hand. But I usually do the following in the constructor to avoid if (null !== $this->logger)
statements everywhere:
$this->logger = $logger ?? new NullLogger();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has already been rejected in the past in the symfony core: #14682
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here #15594, even if I often favor readability over such optims (is the performance cost of noop method calls vs if statements really considerable?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initializing it by hand is a bad argument and performance impact is seriously minimal. In fact, the chances of calling a method on null should not exist. You have the dependency, you have the class, you can write new NullLogger()
and forget about it for the rest of your life.
I think @dunglas points out a valid solution, but imo the logger argument can simply become mandatory and require injection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my projects, I always use $this->logger = $logger ?: new NullLogger();
But In symfony we don't do that.
I don't think this PR is the right place to discuss about that, isn't?
/** | ||
* @author Grégoire Pineau <lyrixx@lyrixx.info> | ||
*/ | ||
interface LoopInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing docs
"require": { | ||
"php": ">=5.5.9", | ||
"symfony/event-dispatcher": "^2.3|^3.0|^4.0", | ||
"psr/log": "~1.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing dependency on pcntl
@lyrixx They do not really need to know about the DIC. As pointed out by @iltar, I would make them container unaware and register them as services in the framework, making their scope explicit throughout their constructor. I think using this component and the commands that come with outside of the fullstack makes a lot of sense. |
@iltar :
Yes I agree. I let the boring stuff for the end ;) But please don't add |
Make commands lazy
@lyrixx Don't forget to rebase to get rid of the merge commit. |
Could you define "the same". I mean, Are you talking about worker too ? and about AMQP, do you expose the same features: retry, delay, lazy creation etc. And also the integration with the framework bundle ? If yes, does it mean this PR should be closed ? |
I don't see the retry /delay feature in your interfaces, nor the lazy loading feature. So for now, I would wait a bit before integrating amqp interop. |
Ah yes. I would like to finish this PR fist. Then It will be easier to manage as this PR is already big (and so the discussion ;) ) |
Hello! Any way we can get this in 3.4? :) |
What about SQS, Beanstalkd? Only AMQP is supported? IMHO, it would be nice to make such a component as a layer on top of "queues" not only amqp. |
], | ||
"require": { | ||
"php": ">=5.5.9", | ||
"ext-pcntl": "*", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes the component incompatible with Windows (and so any project depending on it), as pcntl does not exist for Windows. This should be an optional dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR. I added some few comments.
I really like the idea of not making "one component to rule them all". But do we need a AMPQ library? Cant we use any of the 5-6(?) ones that already exists?
Also, I do not see any producer.
/** | ||
* @author Grégoire Pineau <lyrixx@lyrixx.info> | ||
*/ | ||
class ConsumerEvents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be final
interface MessageFetcherInterface | ||
{ | ||
/** | ||
* @return string|bool The message or false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetchMessages
should always return a MessageCollection
. The message collection might we empty, but that is okey.
However, do not change the signature for the RouterInterface
*/ | ||
interface RouterInterface | ||
{ | ||
public function fetchMessages(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must add some doc block here
/** | ||
* @author Grégoire Pineau <lyrixx@lyrixx.info> | ||
*/ | ||
interface ConsumerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you have not provided an implementation of ConsumerInterface
. How come?
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Amqp\Test; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be in "Tests"?
IMO if one needs full AMPQ support is should use one of external libraries. |
This PR was squashed before being merged into the 4.1-dev branch (closes #24411). Discussion ---------- [Messenger] Add a new Messenger component | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #24326 | License | MIT | Doc PR | symfony/symfony-docs#9437 As discussed in #24326. This PR is to help going forward with the discussions of having a Message component. # Resources | What | Where | --- | --- | Documentation | [In the PR](https://github.com/sroze/symfony/blob/add-message-component/src/Symfony/Component/Message/README.md) | Demo | [In `sroze/symfony-demo:message-component-demo`](https://github.com/sroze/symfony-demo/compare/message-component-demo) | [php-enqueue](https://github.com/php-enqueue/enqueue-dev) adapter | 1. Source: [In `sroze/enqueue-bridge`](https://github.com/sroze/enqueue-bridge) _(to be moved as `symfony/enqueue-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-enqueue`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-enqueue) | [Swarrot](https://github.com/swarrot/swarrot) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/swarrot-bridge`](https://github.com/sroze/swarrot-bridge) _(to be moved as `symfony/swarrot-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-swarrot`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-swarrot) | [HTTP](https://github.com/sroze/message-http-adapter) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/message-http-adapter`](https://github.com/sroze/message-http-adapter) <br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-http-adapter`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-http-adapter) | Web profiler integration | _In the pull-request_ # Important points 1. **Tests are not in the PR as they were written in PhpSpec & Behat.** If we decide to go forward with this approach, I'll translate them to PHPUnit. 2. The aim is not to solve all the message/queuing problems but provide a good, simple and extensible message bus for developers. 3. The communication with the actual AMQP/API brokers is down to the adapters for now. Not sure if we need to ship some by default or not 🤔 I guess that this would replace #23842 & #23315. # Changes from the proposals Based on the comments, a few changes have been made from the proposal. 1. `MessageProducer`s have been renamed to `MessageSender`s 2. `MessageConsumer`s have been renamed to `MessageReceiver`s Commits ------- c9cfda9 [Messenger] Add a new Messenger component
Closing as we've just merged the Messenger component. Thank you. |
Hello.
I'm happy and excited to share with you 2 new components.
note: The PR description (what you are currently reading) is also committed (as
pr.body.md
). I will remove it just before the merge. Like that you could alsoask question about the "documentation". But please, don't over-comment the
"language / English". This part of the job will be done in the doc repository.
AMQP
It is a library created at @sensiolabs few years ago (Mon Mar 18 17:26:01 2013 +0100).
Its goal is to ease the communication with a service that implement AMQP
For example, RabbitMQ implements AMQP.
At that time, Swarrot did not exist yet
and only php-amqplib existed.
We started by using php-amqplib but we faced many issues: memory leak, bad
handling of signal, poor documentation
So we decided to stop using it and to build our own library. Over the years, we
added very nice features, we fixed very weird edge case and we gain real
expertise on AMQP.
Nowadays, it's very common to use AMQP in a web / CLI project.
So four years later, we decided to open-source it and to add it to Symfony to
leverage the Symfony Ecosystem (code quality, release process, documentation,
visibility, community, etc.)
So basically it's an abstraction of the AMQP pecl.
Here is the README.rst we had for this lib. I have updated it to match the
version that will land in Symfony
The old README (but updated)
Symfony AMQP
Fed up of writing the same boiler-plate code over and over again whenever you
need to use your favorite AMQP broker? Have you a hard time remembering how to
publish a message or how to wire exchanges and queues? I had the exact same
feeling. There are many AMQP libraries providing a very good low-level access to
the AMQP protocol, but what about providing a simple API for abstracting the
most common use cases? This library gives you an opinionated way of using any
AMQP brokers and it also provides a nice and consistent API for low-level
interaction with any AMQP brokers.
Dependencies
This library depends on the
amqp
PECL extensions (version 1.4.0-beta2 orlater)::
Using the Conventions
The simplest usage of an AMQP broker is sending a message that is consumed by
another script::
The example above is based on some "conventions" and as such makes the
following assumptions:
A default exchange is used to publish the message (named
symfony.default
);The routing is done via the routing key (
log
in this example);Queues and exchanges are created implicitly when first accessed;
The connection to the broker is done lazily whenever a message must be sent
or received.
Retrying a Message
Retrying processing a message when an error occurs is as easy as defining a
retry strategy on a queue::
Whenever you
$broker->retry()
a message, it is going to be automatically re-enqueued after a
5
seconds wait for a retry.You can also drop the message after a limited number of retries (
2
in thefollowing example)::
Instead of trying every
n
seconds, you can also use a retry mechanism basedon a truncated exponential backoff algorithm::
The message will be re-enqueued after 1 second the first time you call
retry()
,2^1
seconds the second time,2^2
seconds the third time,and
2^n
seconds the nth time. If you want to wait more than 1 second thefirst time, you can pass an offset::
.. note::
.. note::
Don't forget to
ack
ornack
your message if you retry it. Andobviously you should not use the AMQP_Requeue flag.
Configuring a Broker
By default, a broker tries to connect to a local AMQP broker with the default
port, username, and password. If you have a different setting, pass a URI to
the
Broker
constructor::Configuring an Exchange
The default exchange used by the library is of type
direct
. You can alsocreate your own exchange::
You can then binding a queue to this named exchange easily::
The second argument of
createExchange()
takes an array of arguments passedto the exchange. The following keys are used to further configure the exchange:
flags
: Sets the exchange flags;type
: Sets the type of the queue (see\AMQP_EX_TYPE_*
constants)... note::
Configuring a Queue
As demonstrated in some examples, you can create your own queue. As for the
exchange, the second argument of the
createQueue()
method is a list ofqueue arguments; the following keys are used to further configure the queue:
exchange
: The exchange name to bind the queue to (the default exchange isused if not set);
flags
: Sets the exchange flags;bind_arguments
: An array of arguments to pass when binding the queue withan exchange;
retry_strategy
: The retry strategy to use (an instance ofRetryStrategyInterface
)... note::
Implementation details
The retry strategy
..................
The retry strategy is implemented with two customs and privates exchanges:
symfony.dead_letter
andsymfony.retry
.Calling
Broker::retry
will publish the same message in thesymfony.dead_letter
exchange.This exchange will route the message to a queue named like
%exchange%.%time%.wait
. For examplesensiolabs.default.000005.wait
. Thisqueue has a TTL of 5 seconds. It means that if nothing consumes this message, it
will be dropped after 5 seconds. But this queue has also a Dead Letter (DL). It
means that instead of dropping the message, the AMQP server will re-publish
automatically the message to the Exchange configured as DL.
After 5 seconds the message will be re-published to
symfony.retry
Exchange.This exchange is bound with every single queues. Finally, the message will land
in the original queue.
Worker
The second component was extracted from our internal SensioLabsAmqp component.
We extracted it as is decoupled from the AMQP component. Thus it could be used,
for example, to write redis, kafka daemon.
Documentation
Symfony Worker
The worker component help you to write simple but flexible daemon.
Introduction
First you need something that
fetch
some messages. If the message are sentto AMQP, you should use the
AmqpMessageFetcher
::Then you need a Consumer that will
consumer
each AMQP message::Finally plug everything together::
Message Fetcher
AmqpMessageFetcher
: Proxy to interact with an AMQP serverBufferedMessageFetcher
: Wrapper to buffer some message. Useful if you want to call an API in a "bulk" way.InMemoryMessageFetcher
: Useful in test envRouter
The router has the responsibility to fetch a message, then to dispatch it to a
consumer.
DirectRouter
: Use aMessageFetcherInterface
and aConsumerInterface
. Each message fetched is passed to the consumer.RoundRobinRouter
: Wrapper to be able to fetch message from various sources.In Symfony full stack, everything is simpler.
I have forked the standard edition to show how it works.
Current Status: