-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Added AMQP component #27140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added AMQP component #27140
Conversation
lyrixx
commented
May 3, 2018
Q | A |
---|---|
Branch? | master |
Bug fix? | no |
New feature? | yes |
BC breaks? | no |
Deprecations? | no |
Tests pass? | yes |
Fixed tickets | |
License | MIT |
Doc PR |
de49253
to
2bc374d
Compare
💃 ! Can you, in the PR description, show me how to use it with Messenger so I give it a try? 😉 |
Right now, it's not wired with Messenger yet. Here a doc draftSymfony AMQPFed up of writing the same boiler-plate code over and over again whenever you DependenciesThis library depends on the
Using the ConventionsThe simplest usage of an AMQP broker is sending a message that is consumed by
The example above is based on some "conventions" and as such makes the
Retrying a MessageRetrying processing a message when an error occurs is as easy as defining a
Whenever you You can also drop the message after a limited number of retries (
Instead of trying every
The message will be re-enqueued after 1 second the first time you call
.. note::
.. note::
Configuring a BrokerBy default, a broker tries to connect to a local AMQP broker with the default
Configuring an ExchangeThe default exchange used by the library is of type
You can then binding a queue to this named exchange easily::
The second argument of
.. note::
Configuring a QueueAs demonstrated in some examples, you can create your own queue. As for the
.. note::
Implementation detailsThe retry strategy The retry strategy is implemented with two custom and private exchanges: Calling This exchange will route the message to a queue named like After 5 seconds the message will be re-published to |
8a5a74d
to
15e05b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is a first round of random comments :)
private $logger; | ||
|
||
/** | ||
* @param ContainerInterface $container A PSR11 container from which to load the Broker service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't the docblock be removed? feels like so :)
the broker should be injected directly instead of the container
protected function configure() | ||
{ | ||
$this | ||
->setName('amqp:move') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make the command lazy, you should define protected static $defaultName = 'amqp:move';
instead
.travis.yml
Outdated
@@ -199,7 +200,6 @@ install: | |||
- if [[ ! $skip ]]; then $COMPOSER_UP; fi | |||
- if [[ ! $skip ]]; then ./phpunit install; fi | |||
- php -i | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be kept, ths makes it a separate section in travis output
@@ -20,6 +20,7 @@ | |||
use Symfony\Bundle\FrameworkBundle\Routing\AnnotatedRouteControllerLoader; | |||
use Symfony\Bundle\FullStack; | |||
use Symfony\Component\Cache\Adapter\AbstractAdapter; | |||
use Symfony\Component\Amqp\Broker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alpha order
->addArgument($connection['dsn']) | ||
->addArgument($connection['queues']) | ||
->addArgument($connection['exchanges']) | ||
->setPublic(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already the default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/symfony/symfony/blob/master/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php => there are 14 times ->setPublic(false)
. Looks like there is an "easy first contribution" here
/** | ||
* This class should not be instantiated. | ||
*/ | ||
private function __construct() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for this, the class is internal already
* creation. | ||
* | ||
* The following arguments are "special": | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line
* Create a new Exchange. | ||
* | ||
* Special arguments: | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line (there might be more)
"php": "^7.1.3", | ||
"ext-amqp": ">=1.5", | ||
"psr/log": "~1.0", | ||
"symfony/event-dispatcher": "^2.3|^3.0|^4.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be ^3.4|^4.0
as in other components (we bump these at each major versions)
"minimum-stability": "dev", | ||
"extra": { | ||
"branch-alias": { | ||
"dev-master": "4.1-dev" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4.2
->ifTrue(function ($v) { | ||
return !is_array($v); | ||
}) | ||
->thenInvalid('Arguments should be an array (got %s).') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should
-> must
? Same for all the other occurrences of "should".
|
||
public function publish($message, $routingKey = null, $flags = null, array $attributes = null) | ||
{ | ||
if (null === $flags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can remove this if()
and move this logic to this other line:
return parent::publish($message, $routingKey, $flags ?? \AMQP_MANDATORY, $attributes);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because if someone pass AMQP_NOPARAM
(===0), we don't want to change the value.
And I can not set $flag = AMQP_NOPARAM
as default value, because I need to keep the same signature as the parent :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, with ??
it works. I will update the code. 👍
* | ||
* @author Grégoire Pineau <lyrixx@lyrixx.info> | ||
*/ | ||
class MessageExporter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a heavy user of queues, so I can be totally wrong ... but this command looks strange to me. Why not allowing to export all queue messages as JSON files in some dir. If the user wants compression too, they can use any command tool. Compressing messages (and picking the tgz format specifically) looks a bit "too much" for Symfony core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AMQP Messages could be something else than JSON, It could be an image for example
So exporting that in a json file is too opinionated.
But indeed, the compression could be an option, but it comes for almost free ;)
But you are right about "too much" for Symfony core.
It was from our internal sensiolabs/amqp package. but it's really useful to debug ;)
src/Symfony/Component/Amqp/Queue.php
Outdated
$routingKeys = array($name); | ||
} | ||
|
||
if (isset($arguments['flags'])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can refactor this into PHP 7 code?
$this->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
unset($arguments['flags']);
Same for the other occurrences.
*/ | ||
public function __construct(int $time, int $max = 0) | ||
{ | ||
$time = (int) $time; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed because of the int
typehint in the argument.
$time = (int) $time; | ||
|
||
if ($time < 1) { | ||
throw new InvalidArgumentException('"time" should be at least 1.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an arbitrary decision. I expected 0
to be allowed and mean "no wait".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't want to wait, there is no need to use a retry strategy. A simple nack + requeue is enough (I guess)
89c8d34
to
49fcf13
Compare
@lyrixx We have a month to finish this one. I would really like to merge it for 4.2 as the Messenger component will have to become stable in terms of API. Do you think you will have time to work on this? |
@fabpot Hello, Except some tiny comments / conflict I have to address, I think this PR is ready for review. |
@lyrixx I actually think that we need to have the bridge straight away. It will helps us to see if the two fit well and if we need to change any of the two as soon as possible :) |
->end() | ||
->validate() | ||
->ifTrue(function ($config) { | ||
return 'constant' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of these, can't we explicitly set the retry strategy options in retry_strategy.constant
and retry_strategy.exponential
? The only thing to validate after is that we only have one retry strategy.
; | ||
} | ||
|
||
private function fixXmlArguments($v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's exactly fixed by this method? Can't you simply have an XSD that allows any kind of data like the rest of the "free options"? 🤔
} | ||
|
||
if (!$match) { | ||
throw new \InvalidArgumentException(sprintf('The "framework.amqp.default_connection" option "%s" does not exist.', $defaultConnectionName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The default connection configured ("%s") does not exist." ?
|
||
$container | ||
->setAlias('amqp.broker', "amqp.broker.$defaultConnectionName") | ||
->setPublic(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should I alias ? the Broker::class to the default connection ?
Yep 👍
<defaults public="false" /> | ||
|
||
<service id="amqp.command.move" class="Symfony\Component\Amqp\Command\AmqpMoveCommand"> | ||
<argument type="service" id="amqp.broker" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use a connection locator here I guess, in case there are multiple connections defined.
|
||
parent::setName($name); | ||
|
||
if (Broker::DEAD_LETTER_EXCHANGE === $name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outch. That's super hardcoding: can you move this logic (i.e. name to type mapping) to wherever this is created?
parent::setFlags($arguments['flags'] ?? \AMQP_DURABLE); | ||
unset($arguments['flags']); | ||
|
||
parent::declareExchange(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we do so at construct time?
public function publish($message, $routingKey = null, $flags = null, array $attributes = null) | ||
{ | ||
$attributes = array_merge(array( | ||
'delivery_mode' => 2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the AMQP_DURABLE
constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, why is it forced here?
{ | ||
const DEFAULT_EXCHANGE = 'symfony.default'; | ||
const DEAD_LETTER_EXCHANGE = 'symfony.dead_letter'; | ||
const RETRY_EXCHANGE = 'symfony.retry'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same than for Broker
: should be configurable.
public function __construct(\AMQPConnection $connection, array $queues = array(), array $exchanges = array()) | ||
{ | ||
$this->connection = $connection; | ||
$this->connection->setReadTimeout(4 * 60 * 60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a configuration?
I agree with @sroze, we want the integration in the same PR. |
private $queuesBindings = array(); | ||
|
||
/** | ||
* Create a new Broker instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creates (I won't do the same for other similar changes)
* Create a new Broker instance. | ||
* | ||
* Example of $queuesConfiguration | ||
* array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code example should be indented (and with a blank line before)
/** | ||
* Creates a new Exchange. | ||
* | ||
* Special arguments: See the Exchange constructor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use a regular @param
tag here for that info
/** | ||
* Creates a new Queue. | ||
* | ||
* Special arguments: See the Queue constructor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
|
||
// WIP | ||
|
||
interface BrokerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need an interface?