Skip to content

Added AMQP component #27140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed

Added AMQP component #27140

wants to merge 1 commit into from

Conversation

lyrixx
Copy link
Member

@lyrixx lyrixx commented May 3, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets
License MIT
Doc PR

@sroze
Copy link
Contributor

sroze commented May 3, 2018

💃 ! Can you, in the PR description, show me how to use it with Messenger so I give it a try? 😉

@lyrixx
Copy link
Member Author

lyrixx commented May 3, 2018

Can you, in the PR description, show me how to use it with Messenger so I give it a try?

Right now, it's not wired with Messenger yet.
I thinks it's better to merge this as it (after review / fix etc. obviously), then to add the bridge.

Here a doc draft

Symfony AMQP

Fed up of writing the same boiler-plate code over and over again whenever you
need to use your favorite AMQP broker? Have you a hard time remembering how to
publish a message or how to wire exchanges and queues? I had the exact same
feeling. There are many AMQP libraries providing a very good low-level access to
the AMQP protocol, but what about providing a simple API for abstracting the
most common use cases? This library gives you an opinionated way of using any
AMQP brokers and it also provides a nice and consistent API for low-level
interaction with any AMQP brokers.

Dependencies

This library depends on the amqp PECL extensions (version 1.4.0-beta2 or
later)::

sudo apt-get install php-amqp

Using the Conventions

The simplest usage of an AMQP broker is sending a message that is consumed by
another script::

use Symfony\Component\Amqp\Broker;

// connects to a local AMQP broker by default
$broker = new Broker();

// publish a message on the 'log' queue
$broker->publish('log', 'some message');

// in another script (non-blocking)
// $message is false if no messages are in the queue
$message = $broker->get('log');

// blocking (waits for a message to be available in the queue)
$message = $broker->consume('log');

The example above is based on some "conventions" and as such makes the
following assumptions:

  • A default exchange is used to publish the message (named
    symfony.default);

  • The routing is done via the routing key (log in this example);

  • Queues and exchanges are created implicitly when first accessed;

  • The connection to the broker is done lazily whenever a message must be sent
    or received.

Retrying a Message

Retrying processing a message when an error occurs is as easy as defining a
retry strategy on a queue::

use Symfony\Component\Amqp\RetryStrategy\ConstantRetryStrategy;

// configure the queue explicitly
$broker->createQueue('log', array(
    // retry every 5 seconds
    'retry_strategy' => new ConstantRetryStrategy(5),
));

Whenever you $broker->retry() a message, it is going to be automatically re-
enqueued after a 5 seconds wait for a retry.

You can also drop the message after a limited number of retries (2 in the
following example)::

$broker->createQueue('log', array(
    // retry 2 times
    'retry_strategy' => new ConstantRetryStrategy(5, 2),
));

Instead of trying every n seconds, you can also use a retry mechanism based
on a truncated exponential backoff algorithm::

use Symfony\Component\Amqp\RetryStrategy\ExponentialRetryStrategy;

$broker->createQueue('log', array(
    // retry 5 times
    'retry_strategy' => new ExponentialRetryStrategy(5),
));

The message will be re-enqueued after 1 second the first time you call
retry(), 2^1 seconds the second time, 2^2 seconds the third time,
and 2^n seconds the nth time. If you want to wait more than 1 second the
first time, you can pass an offset::

$broker->createQueue('log', array(
    // starts at 2^3
    'retry_strategy' => new ExponentialRetryStrategy(5, 3),
));

.. note::

The retry strategies are implemented by using the dead-lettering feature of
AMQP. Behind the scene, a special exchange is bound to queues configured
based on the retry strategy you set.

.. note::

Don't forget to ``ack`` or ``nack`` your message if you retry it. And
obviously you should not use the AMQP_Requeue flag.

Configuring a Broker

By default, a broker tries to connect to a local AMQP broker with the default
port, username, and password. If you have a different setting, pass a URI to
the Broker constructor::

$broker = new Broker('amqp://user:pass@10.1.2.3:345/some-vhost');

Configuring an Exchange

The default exchange used by the library is of type direct. You can also
create your own exchange::

// define a new fanout exchange
$broker->createExchange('sensiolabs.fanout', array('type' => \AMQP_EX_TYPE_FANOUT));

You can then binding a queue to this named exchange easily::

$broker->createQueue('logs', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));
$broker->createQueue('logs.again', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));

The second argument of createExchange() takes an array of arguments passed
to the exchange. The following keys are used to further configure the exchange:

  • flags: sets the exchange flags;

  • type: sets the type of the queue (see \AMQP_EX_TYPE_* constants).

.. note::

Note that ``createExchange()`` automatically declares the exchange.

Configuring a Queue

As demonstrated in some examples, you can create your own queue. As for the
exchange, the second argument of the createQueue() method is a list of
queue arguments; the following keys are used to further configure the queue:

  • exchange: The exchange name to bind the queue to (the default exchange is
    used if not set);

  • flags: Sets the exchange flags;

  • bind_arguments: An array of arguments to pass when binding the queue with
    an exchange;

  • retry_strategy: The retry strategy to use (an instance of
    :class:Symfony\\Amqp\\RetryStrategy\\RetryStrategyInterface).

.. note::

Note that ``createQueue()`` automatically declares and binds the queue.

Implementation details

The retry strategy
..................

The retry strategy is implemented with two custom and private exchanges:
symfony.dead_letter and symfony.retry.

Calling Broker::retry will publish the same message in the
symfony.dead_letter exchange.

This exchange will route the message to a queue named like
%exchange%.%time%.wait, for example sensiolabs.default.000005.wait. This
queue has a TTL of 5 seconds. It means that if nothing consumes this message, it
will be dropped after 5 seconds. But this queue has also a Dead Letter (DL). It
means that instead of dropping the message, the AMQP server will re-publish
automatically the message to the Exchange configured as DL.

After 5 seconds the message will be re-published to symfony.retry Exchange.
This exchange is bound with every single queue. Finally, the message will land
in the original queue.

@lyrixx lyrixx requested review from nicolas-grekas, sroze and fabpot May 3, 2018 15:48
@lyrixx lyrixx force-pushed the amqp branch 5 times, most recently from 8a5a74d to 15e05b1 Compare May 3, 2018 16:50
@nicolas-grekas nicolas-grekas added this to the next milestone May 3, 2018
Copy link
Member

@nicolas-grekas nicolas-grekas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a first round of random comments :)

private $logger;

/**
* @param ContainerInterface $container A PSR11 container from which to load the Broker service
Copy link
Member

@nicolas-grekas nicolas-grekas May 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't the docblock be removed? feels like so :)
the broker should be injected directly instead of the container

protected function configure()
{
$this
->setName('amqp:move')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make the command lazy, you should define protected static $defaultName = 'amqp:move'; instead

.travis.yml Outdated
@@ -199,7 +200,6 @@ install:
- if [[ ! $skip ]]; then $COMPOSER_UP; fi
- if [[ ! $skip ]]; then ./phpunit install; fi
- php -i

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be kept, ths makes it a separate section in travis output

@@ -20,6 +20,7 @@
use Symfony\Bundle\FrameworkBundle\Routing\AnnotatedRouteControllerLoader;
use Symfony\Bundle\FullStack;
use Symfony\Component\Cache\Adapter\AbstractAdapter;
use Symfony\Component\Amqp\Broker;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alpha order

->addArgument($connection['dsn'])
->addArgument($connection['queues'])
->addArgument($connection['exchanges'])
->setPublic(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already the default

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/symfony/symfony/blob/master/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php => there are 14 times ->setPublic(false). Looks like there is an "easy first contribution" here

/**
* This class should not be instantiated.
*/
private function __construct()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this, the class is internal already

* creation.
*
* The following arguments are "special":
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line

* Create a new Exchange.
*
* Special arguments:
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line (there might be more)

"php": "^7.1.3",
"ext-amqp": ">=1.5",
"psr/log": "~1.0",
"symfony/event-dispatcher": "^2.3|^3.0|^4.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be ^3.4|^4.0 as in other components (we bump these at each major versions)

"minimum-stability": "dev",
"extra": {
"branch-alias": {
"dev-master": "4.1-dev"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.2

->ifTrue(function ($v) {
return !is_array($v);
})
->thenInvalid('Arguments should be an array (got %s).')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should -> must ? Same for all the other occurrences of "should".


public function publish($message, $routingKey = null, $flags = null, array $attributes = null)
{
if (null === $flags) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove this if() and move this logic to this other line:

return parent::publish($message, $routingKey, $flags ?? \AMQP_MANDATORY, $attributes);

Copy link
Member Author

@lyrixx lyrixx May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because if someone pass AMQP_NOPARAM (===0), we don't want to change the value.
And I can not set $flag = AMQP_NOPARAM as default value, because I need to keep the same signature as the parent :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, with ?? it works. I will update the code. 👍

*
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class MessageExporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a heavy user of queues, so I can be totally wrong ... but this command looks strange to me. Why not allowing to export all queue messages as JSON files in some dir. If the user wants compression too, they can use any command tool. Compressing messages (and picking the tgz format specifically) looks a bit "too much" for Symfony core.

Copy link
Member Author

@lyrixx lyrixx May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AMQP Messages could be something else than JSON, It could be an image for example
So exporting that in a json file is too opinionated.

But indeed, the compression could be an option, but it comes for almost free ;)

But you are right about "too much" for Symfony core. It was from our internal sensiolabs/amqp package. but it's really useful to debug ;)

$routingKeys = array($name);
}

if (isset($arguments['flags'])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can refactor this into PHP 7 code?

$this->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
unset($arguments['flags']);

Same for the other occurrences.

*/
public function __construct(int $time, int $max = 0)
{
$time = (int) $time;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed because of the int typehint in the argument.

$time = (int) $time;

if ($time < 1) {
throw new InvalidArgumentException('"time" should be at least 1.');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an arbitrary decision. I expected 0 to be allowed and mean "no wait".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't want to wait, there is no need to use a retry strategy. A simple nack + requeue is enough (I guess)

@lyrixx lyrixx force-pushed the amqp branch 5 times, most recently from 89c8d34 to 49fcf13 Compare May 4, 2018 16:59
@fabpot
Copy link
Member

fabpot commented Aug 23, 2018

@lyrixx We have a month to finish this one. I would really like to merge it for 4.2 as the Messenger component will have to become stable in terms of API. Do you think you will have time to work on this?

@lyrixx
Copy link
Member Author

lyrixx commented Aug 23, 2018

@fabpot Hello, Except some tiny comments / conflict I have to address, I think this PR is ready for review.

@sroze
Copy link
Contributor

sroze commented Aug 24, 2018

Right now, it's not wired with Messenger yet. I thinks it's better to merge this as it (after review / fix etc. obviously), then to add the bridge.

@lyrixx I actually think that we need to have the bridge straight away. It will helps us to see if the two fit well and if we need to change any of the two as soon as possible :)

->end()
->validate()
->ifTrue(function ($config) {
return 'constant' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of these, can't we explicitly set the retry strategy options in retry_strategy.constant and retry_strategy.exponential? The only thing to validate after is that we only have one retry strategy.

;
}

private function fixXmlArguments($v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's exactly fixed by this method? Can't you simply have an XSD that allows any kind of data like the rest of the "free options"? 🤔

}

if (!$match) {
throw new \InvalidArgumentException(sprintf('The "framework.amqp.default_connection" option "%s" does not exist.', $defaultConnectionName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The default connection configured ("%s") does not exist." ?


$container
->setAlias('amqp.broker', "amqp.broker.$defaultConnectionName")
->setPublic(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should I alias ? the Broker::class to the default connection ?

Yep 👍

<defaults public="false" />

<service id="amqp.command.move" class="Symfony\Component\Amqp\Command\AmqpMoveCommand">
<argument type="service" id="amqp.broker" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use a connection locator here I guess, in case there are multiple connections defined.


parent::setName($name);

if (Broker::DEAD_LETTER_EXCHANGE === $name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outch. That's super hardcoding: can you move this logic (i.e. name to type mapping) to wherever this is created?

parent::setFlags($arguments['flags'] ?? \AMQP_DURABLE);
unset($arguments['flags']);

parent::declareExchange();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we do so at construct time?

public function publish($message, $routingKey = null, $flags = null, array $attributes = null)
{
$attributes = array_merge(array(
'delivery_mode' => 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the AMQP_DURABLE constant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, why is it forced here?

{
const DEFAULT_EXCHANGE = 'symfony.default';
const DEAD_LETTER_EXCHANGE = 'symfony.dead_letter';
const RETRY_EXCHANGE = 'symfony.retry';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same than for Broker: should be configurable.

public function __construct(\AMQPConnection $connection, array $queues = array(), array $exchanges = array())
{
$this->connection = $connection;
$this->connection->setReadTimeout(4 * 60 * 60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a configuration?

@fabpot
Copy link
Member

fabpot commented Sep 4, 2018

I agree with @sroze, we want the integration in the same PR.

private $queuesBindings = array();

/**
* Create a new Broker instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creates (I won't do the same for other similar changes)

* Create a new Broker instance.
*
* Example of $queuesConfiguration
* array(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code example should be indented (and with a blank line before)

/**
* Creates a new Exchange.
*
* Special arguments: See the Exchange constructor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a regular @param tag here for that info

/**
* Creates a new Queue.
*
* Special arguments: See the Queue constructor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here


// WIP

interface BrokerInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need an interface?

@fabpot fabpot closed this Mar 4, 2019
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants