Skip to content

Added two new components: AMQP and Worker #23315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed

Conversation

lyrixx
Copy link
Member

@lyrixx lyrixx commented Jun 28, 2017

Q A
Branch? 3.4
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets -
License MIT
Doc PR -

Hello.

I'm happy and excited to share with you 2 new components.

note: The PR description (what you are currently reading) is also committed (as
pr.body.md). I will remove it just before the merge. Like that you could also
ask question about the "documentation". But please, don't over-comment the
"language / English". This part of the job will be done in the doc repository.

AMQP

It is a library created at @sensiolabs few years ago (Mon Mar 18 17:26:01 2013 +0100).
Its goal is to ease the communication with a service that implement AMQP
For example, RabbitMQ implements AMQP.

At that time, Swarrot did not exist yet
and only php-amqplib existed.

We started by using php-amqplib but we faced many issues: memory leak, bad
handling of signal, poor documentation

So we decided to stop using it and to build our own library. Over the years, we
added very nice features, we fixed very weird edge case and we gain real
expertise on AMQP.

Nowadays, it's very common to use AMQP in a web / CLI project.

So four years later, we decided to open-source it and to add it to Symfony to
leverage the Symfony Ecosystem (code quality, release process, documentation,
visibility, community, etc.)

So basically it's an abstraction of the AMQP pecl.

Here is the README.rst we had for this lib. I have updated it to match the
version that will land in Symfony

The old README (but updated)

Symfony AMQP

Fed up of writing the same boiler-plate code over and over again whenever you
need to use your favorite AMQP broker? Have you a hard time remembering how to
publish a message or how to wire exchanges and queues? I had the exact same
feeling. There are many AMQP libraries providing a very good low-level access to
the AMQP protocol, but what about providing a simple API for abstracting the
most common use cases? This library gives you an opinionated way of using any
AMQP brokers and it also provides a nice and consistent API for low-level
interaction with any AMQP brokers.

Dependencies

This library depends on the amqp PECL extensions (version 1.4.0-beta2 or
later)::

sudo apt-get install php-amqp

Using the Conventions

The simplest usage of an AMQP broker is sending a message that is consumed by
another script::

use Symfony\Component\Amqp\Broker;

// connects to a local AMQP broker by default
$broker = new Broker();

// publish a message on the 'log' queue
$broker->publish('log', 'some message');

// in another script (non-blocking)
// $message is false if no messages are in the queue
$message = $broker->get('log');

// blocking (waits for a message to be available in the queue)
$message = $broker->consume('log');

The example above is based on some "conventions" and as such makes the
following assumptions:

  • A default exchange is used to publish the message (named
    symfony.default);

  • The routing is done via the routing key (log in this example);

  • Queues and exchanges are created implicitly when first accessed;

  • The connection to the broker is done lazily whenever a message must be sent
    or received.

Retrying a Message

Retrying processing a message when an error occurs is as easy as defining a
retry strategy on a queue::

use Symfony\Component\Amqp\RetryStrategy\ConstantRetryStrategy;

// configure the queue explicitly
$broker->createQueue('log', array(
    // retry every 5 seconds
    'retry_strategy' => new ConstantRetryStrategy(5),
));

Whenever you $broker->retry() a message, it is going to be automatically re-
enqueued after a 5 seconds wait for a retry.

You can also drop the message after a limited number of retries (2 in the
following example)::

$broker->createQueue('log', array(
    // retry 2 times
    'retry_strategy' => new ConstantRetryStrategy(5, 2),
));

Instead of trying every n seconds, you can also use a retry mechanism based
on a truncated exponential backoff algorithm::

use Symfony\Component\Amqp\RetryStrategy\ExponentialRetryStrategy;

$broker->createQueue('log', array(
    // retry 5 times
    'retry_strategy' => new ExponentialRetryStrategy(5),
));

The message will be re-enqueued after 1 second the first time you call
retry(), 2^1 seconds the second time, 2^2 seconds the third time,
and 2^n seconds the nth time. If you want to wait more than 1 second the
first time, you can pass an offset::

$broker->createQueue('log', array(
    // starts at 2^3
    'retry_strategy' => new ExponentialRetryStrategy(5, 3),
));

.. note::

The retry strategies are implemented by using the dead-lettering feature of
AMQP. Behind the scene, a special exchange is bound to queues configured
based on the retry strategy you set.

.. note::
Don't forget to ack or nack your message if you retry it. And
obviously you should not use the AMQP_Requeue flag.

Configuring a Broker

By default, a broker tries to connect to a local AMQP broker with the default
port, username, and password. If you have a different setting, pass a URI to
the Broker constructor::

$broker = new Broker('amqp://user:pass@10.1.2.3:345/some-vhost');

Configuring an Exchange

The default exchange used by the library is of type direct. You can also
create your own exchange::

// define a new fanout exchange
$broker->createExchange('sensiolabs.fanout', array('type' => \AMQP_EX_TYPE_FANOUT));

You can then binding a queue to this named exchange easily::

$broker->createQueue('logs', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));
$broker->createQueue('logs.again', array('exchange' => 'sensiolabs.fanout', 'routing_keys' => null));

The second argument of createExchange() takes an array of arguments passed
to the exchange. The following keys are used to further configure the exchange:

  • flags: Sets the exchange flags;

  • type: Sets the type of the queue (see \AMQP_EX_TYPE_* constants).

.. note::

Note that ``createExchange()`` automatically declares the exchange.

Configuring a Queue

As demonstrated in some examples, you can create your own queue. As for the
exchange, the second argument of the createQueue() method is a list of
queue arguments; the following keys are used to further configure the queue:

  • exchange: The exchange name to bind the queue to (the default exchange is
    used if not set);

  • flags: Sets the exchange flags;

  • bind_arguments: An array of arguments to pass when binding the queue with
    an exchange;

  • retry_strategy: The retry strategy to use (an instance of
    RetryStrategyInterface).

.. note::

Note that ``createQueue()`` automatically declares and binds the queue.

Implementation details

The retry strategy
..................

The retry strategy is implemented with two customs and privates exchanges:
symfony.dead_letter and symfony.retry.

Calling Broker::retry will publish the same message in the
symfony.dead_letter exchange.

This exchange will route the message to a queue named like
%exchange%.%time%.wait. For example sensiolabs.default.000005.wait. This
queue has a TTL of 5 seconds. It means that if nothing consumes this message, it
will be dropped after 5 seconds. But this queue has also a Dead Letter (DL). It
means that instead of dropping the message, the AMQP server will re-publish
automatically the message to the Exchange configured as DL.

After 5 seconds the message will be re-published to symfony.retry Exchange.
This exchange is bound with every single queues. Finally, the message will land
in the original queue.

Worker

The second component was extracted from our internal SensioLabsAmqp component.
We extracted it as is decoupled from the AMQP component. Thus it could be used,
for example, to write redis, kafka daemon.

Documentation

Symfony Worker

The worker component help you to write simple but flexible daemon.

Introduction

First you need something that fetch some messages. If the message are sent
to AMQP, you should use the AmqpMessageFetcher::

use Symfony\Component\Amqp\Broker;
use Symfony\Component\Worker\MessageFetcher\AmqpMessageFetcher;

$broker = new Broker();
$fetcher = new AmqpMessageFetcher($broker, 'queue_name');

Then you need a Consumer that will consumer each AMQP message::

namespace AppBundle\Consumer;

use Symfony\Component\Amqp\Broker;
use Symfony\Component\Worker\Consumer\ConsumerInterface;
use Symfony\Component\Worker\MessageCollection;

class DumpConsumer implements ConsumerInterface
{
    private $broker;

    public function __construct(Broker $broker)
    {
        $this->broker = $broker;
    }

    public function consume(MessageCollection $messageCollection)
    {
        foreach ($messageCollection as $message) {
            dump($message);

            $this->broker->ack($message);
        }
    }
}

Finally plug everything together::

use AppBundle\Consumer\DumpConsumer;
use Symfony\Component\Amqp\Broker;
use Symfony\Component\Worker\Loop\Loop;
use Symfony\Component\Worker\MessageFetcher\AmqpMessageFetcher;

$broker = new Broker();
$fetcher = new AmqpMessageFetcher($broker, 'queue_name');
$consumer = new DumpConsumer($broker);

$loop = new Loop(new DirectRouter($fetcher, $consumer));

$loop->run();

Message Fetcher

  • AmqpMessageFetcher: Proxy to interact with an AMQP server
  • BufferedMessageFetcher: Wrapper to buffer some message. Useful if you want to call an API in a "bulk" way.
  • InMemoryMessageFetcher: Useful in test env

Router

The router has the responsibility to fetch a message, then to dispatch it to a
consumer.

  • DirectRouter: Use a MessageFetcherInterface and a ConsumerInterface. Each message fetched is passed to the consumer.
  • RoundRobinRouter: Wrapper to be able to fetch message from various sources.

In Symfony full stack, everything is simpler.

I have forked the standard edition to show how it works.


Current Status:

  • Make command container un-aware (don't know how to do that ATM, PR welcome ;) )
  • More PHPDoc
  • Think what class can be final / internal
  • Make travis pass (I will need @nicolas-grekas on this one)
  • Disable AMQP TS on appveyoer as they don't support rabbitmq ATM

$this->logger->info('Move a message...');
$this->broker->move($message, $to);
$this->broker->ack($message);
$this->logger->debug('...message moved.');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the same level? Especially since there are ellipsis that should correspond to each other?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not flood to the log output. And it's clearly at debug level.

@sstok
Copy link
Contributor

sstok commented Jun 28, 2017

Instead of putting this logic in the FrameworkBundle is it possible to keep it in it's own bundle?

{
$workers = $this->getContainer()->getParameter('worker.workers');

foreach ($workers as $name => $_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach (array_keys($this->getContainer()->getParameter('worker.workers')) as $name) { is 2 less variables in the namespace, one of which is quite weird

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really readable + it traverses twice the array.

Copy link
Member

@chalasr chalasr Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better as foreach ($this->workers as $worker) :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are referring to the fact this command should be a service and so we could inject only what we need?

->setDescription('Run a worker')
->setDefinition(array(
new InputArgument('worker', InputArgument::REQUIRED, 'The worker'),
new InputOption('name', null, InputOption::VALUE_REQUIRED, 'A name, useful for stats/monitoring. Default to worker name.'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "Defaults"
  • consider linebreaks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider linebreaks?

Do we do that in symfony? Does it breaks the output?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm right, the usual Symfony practice is to not break this, whatever its length.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @javiereguiluz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok then

->register($id, Worker\MessageFetcher\BufferedMessageFetcher::class)
->addArgument(new Reference($fetchers[$fetcher['wrap']]))
->addArgument(array(
'max_buffurisation_time' => $fetcher['max_buffurisation_time'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/buffur/buffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_buffurisation_time => max_buffering_time no ?


/**
* @param int $time Time to wait in the queue in seconds
* @param int $max The maximum number of time to retry (0 means indefinitely)
Copy link
Contributor

@greg0ire greg0ire Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"number of times" ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum number of attempts (0 means no limit)

{
$exchange = Exchange::createFromUri(getenv('RABBITMQ_URL').'/?type=fanout');

$this->fail('An exception should have been thrown.');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this useful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? If no exception is thrown, won't phpunit complain about it because of the annotation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, sorry.

{
private function __construct()
{
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

Copy link
Contributor

@greg0ire greg0ire Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment to explain why? It's certainly going to raise some eyebrows...

private $threshold;

/**
* @param int $threshold in byte. Default to 100Mb
Copy link
Contributor

@greg0ire greg0ire Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "bytes"
  • "Defaults"

$this->stop('Force shut down of the worker because a StopException has been thrown.', $e);

return;
} catch (\Exception $e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this normal? If yes, maybe explain it in a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the fact we do nothing in the catch block?

Indeed it can be hard to understand. Initially we had 2 catch block (AMQPException and Exception) with 2 different log messages. So it's a relic of our previous lib.

Now it could be simplified indeed. Except It guess I have to support FatalError too. ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I catch also throwable error.

if (isset($fetchers[$name])) {
throw new \InvalidArgumentException("A fetcher named \"$name\" already exist.");
}
if (isset($fetcher['connection'])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you use a ternary operator instead?

$reference = new Reference(isset($fetcher['connection']) ? 'amqp.broker.'.$fetcher['connection'] : 'amqp.broker');

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I could. But I don't like it as it's not really readable.
What would be the benefit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability ^^

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no !
ternary operator are usually less readable. Many language don't even support them (golang, ...)


foreach ($config['fetchers']['amqps'] as $name => $fetcher) {
if (isset($fetchers[$name])) {
throw new \InvalidArgumentException("A fetcher named \"$name\" already exist.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use sprintf() instead of variable interpolation in error messages.
Minor typo: already exist. -> already exists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use sprintf() instead of variable interpolation in error messages.

Variable interpolation is optimized in PHP7, maybe starting using this kind of practice is nice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my "personal" / "professional" projects I started using the following syntax "hello $name".
But in symfony, indeed we don't use it. So I'm going to change it.

@lyrixx
Copy link
Member Author

lyrixx commented Jun 28, 2017

Instead of putting this logic in the FrameworkBundle is it possible to keep it in it's own bundle?

Yes, it's possible. But (for now), it's not the logic adopted by symfony.

public static function getSubscribedEvents()
{
return array(
LoopEvents::SLEEP => 'clearDoctrine',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be limitMemoryUsage

@lyrixx lyrixx changed the title Added two new component: AMQP and Worker Added two new components: AMQP and Worker Jun 28, 2017

use Symfony\Component\Amqp\RetryStrategy\RetryStrategyInterface;

class NonRetryableException extends \RuntimeException implements ExceptionInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in Broker::retry


namespace Symfony\Component\Amqp\RetryStrategy;

interface RetryStrategyInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used?

I mean, ExponentialRetryStrategy and ConstantRetryStrategy implement it, but it seam to be not used elsewhere

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in NonRetryableException::__construct() and I added a check in queue::__construct()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, GitHub didn't display the content of this class... That's why I missed a lot of things. Now it displays well and answers my comments.

$this->offset = $offset;
}

public function isRetryable(\AMQPEnvelope $msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's about passing an Exception or a Context about the need to retry a message to let the strategy choose a better response.

@rande
Copy link
Contributor

rande commented Jun 28, 2017

Any reason to put this inside Symfony ? I don't see any reason not to have it ... it just look weird for me as this kind of project can have its own lifecycle without respecting symfony version. (like https://github.com/avalanche123/Imagine ... do you plan to integrate an image manipulation library inside symfony ? )

@fabpot
Copy link
Member

fabpot commented Jun 28, 2017

@rande: There are MANY benefits in including this into Symfony. And yes, we plan to have exactly imagine into Symfony, see #21820

@rande
Copy link
Contributor

rande commented Jun 28, 2017

@fabpot ok then ;)

@lyrixx lyrixx force-pushed the amqp-worker branch 3 times, most recently from 9d43404 to 3211bfb Compare June 28, 2017 15:24
@chalasr
Copy link
Member

chalasr commented Jun 28, 2017

Yes, it's possible. But (for now), it's not the logic adopted by symfony.

The lint:yaml, lint:twig, security:encode-password console commands have been moved to their component/bridges/bundles. I think console commands added here should definitely go into their component.

// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly
@cli_set_process_title($processName);

pcntl_signal(SIGTERM, function () use ($loop) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this in one of my projects using php-amqp and for me it did not work reliably that's why I'm using https://github.com/rstgroup/php-signal-handler

But maybe you have some other experiences?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use the ->consumer() method that is blocking.
but with ->get() there are no issues ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if PCNTL is not enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check + a dependency in the worker/composer.json

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a default Symfony installation will need PCNTL extension too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@makasim What do you mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not understand what you want especially the part "for a kind of extension"

@lyrixx
Copy link
Member Author

lyrixx commented Jun 28, 2017

@chalasr It's not easy as it depends on the DIC. We could add an intermediary layer though

Copy link
Contributor

@linaori linaori left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not marked everything but:

  • Missing a lot of documentation on class/method level
  • The majority (if not all) of the classes should be final

protected function initialize(InputInterface $input, OutputInterface $output)
{
$this->broker = $this->getContainer()->get('amqp.broker');
$this->logger = $this->getContainer()->get('logger');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why not to make this a service? It's already trying to mimic one here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you mean by this? The command?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, command as a service seems to be the pattern here, except that it locates the services on initialization by itself

$this->logger->info('Move a message...');
$this->broker->move($message, $to);
$this->broker->ack($message);
$this->logger->debug('...message moved.');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I see those messages in my console (or where ever it mails), they are useless. Can you add useful message and context to the message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you want? I can not dump the message as it can be HUGE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could at least add a few context information like the $to and $from


protected function execute(InputInterface $input, OutputInterface $output)
{
$workers = $this->getContainer()->getParameter('worker.workers');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the last command, is there any specific reason to not make it a service? It clearly has dependencies. Especially parameters are not something I'd rely on to be an array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Injecting an array isn't reliable too, because array can be of any structure. Still, I also like to have commands as a service. Do we need to check somewhere if worker.workers has the correct structure?

// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly
@cli_set_process_title($processName);

pcntl_signal(SIGTERM, function () use ($loop) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if PCNTL is not enabled?

));
}

return $this->getContainer()->get($workers[$workerName]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be solved by using the lazy injections instead?

*/
interface ConfigurableLoopInterface
{
public function setName($name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing docs

/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class StopException extends \RuntimeException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no dedicated component interface?

public function run()
{
if (null !== $this->logger) {
$this->logger->notice('Worker {worker} started.', array(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not require a null logger instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do that in symfony.

Copy link
Member

@dunglas dunglas Jun 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring a null logger makes it harder to initialize the class by hand. But I usually do the following in the constructor to avoid if (null !== $this->logger) statements everywhere:

$this->logger = $logger ?? new NullLogger();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has already been rejected in the past in the symfony core: #14682

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here #15594, even if I often favor readability over such optims (is the performance cost of noop method calls vs if statements really considerable?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializing it by hand is a bad argument and performance impact is seriously minimal. In fact, the chances of calling a method on null should not exist. You have the dependency, you have the class, you can write new NullLogger() and forget about it for the rest of your life.

I think @dunglas points out a valid solution, but imo the logger argument can simply become mandatory and require injection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my projects, I always use $this->logger = $logger ?: new NullLogger(); But In symfony we don't do that.
I don't think this PR is the right place to discuss about that, isn't?

/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
interface LoopInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing docs

"require": {
"php": ">=5.5.9",
"symfony/event-dispatcher": "^2.3|^3.0|^4.0",
"psr/log": "~1.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing dependency on pcntl

@chalasr
Copy link
Member

chalasr commented Jun 28, 2017

@lyrixx They do not really need to know about the DIC. As pointed out by @iltar, I would make them container unaware and register them as services in the framework, making their scope explicit throughout their constructor. I think using this component and the commands that come with outside of the fullstack makes a lot of sense.

@lyrixx
Copy link
Member Author

lyrixx commented Jun 28, 2017

@iltar :

I have not marked everything but:

Missing a lot of documentation on class/method level
The majority (if not all) of the classes should be final

Yes I agree. I let the boring stuff for the end ;)

But please don't add missing docs comment on each methods. It does not bring anything useful for now

@fabpot
Copy link
Member

fabpot commented Jul 17, 2017

@lyrixx Don't forget to rebase to get rid of the merge commit.

@lyrixx
Copy link
Member Author

lyrixx commented Aug 1, 2017

Would you be interested in a PR that does the same

Could you define "the same". I mean, Are you talking about worker too ? and about AMQP, do you expose the same features: retry, delay, lazy creation etc. And also the integration with the framework bundle ?

If yes, does it mean this PR should be closed ?

@lyrixx
Copy link
Member Author

lyrixx commented Aug 1, 2017

it will support all features that are currently implemented (or supported by AMQP itself).

I don't see the retry /delay feature in your interfaces, nor the lazy loading feature.

So for now, I would wait a bit before integrating amqp interop.

@lyrixx
Copy link
Member Author

lyrixx commented Aug 1, 2017

Ah yes. I would like to finish this PR fist. Then It will be easier to manage as this PR is already big (and so the discussion ;) )

@gedimin45
Copy link

Hello!

Any way we can get this in 3.4? :)

@stof stof modified the milestones: 3.4, 4.1 Sep 27, 2017
@trntv
Copy link

trntv commented Oct 2, 2017

What about SQS, Beanstalkd? Only AMQP is supported? IMHO, it would be nice to make such a component as a layer on top of "queues" not only amqp.

@sroze
Copy link
Contributor

sroze commented Oct 2, 2017

@trntv agree with you. Hence the Message component proposal here:
#24326

],
"require": {
"php": ">=5.5.9",
"ext-pcntl": "*",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes the component incompatible with Windows (and so any project depending on it), as pcntl does not exist for Windows. This should be an optional dependency

Copy link
Member

@Nyholm Nyholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR. I added some few comments.

I really like the idea of not making "one component to rule them all". But do we need a AMPQ library? Cant we use any of the 5-6(?) ones that already exists?

Also, I do not see any producer.

/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
class ConsumerEvents
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final

interface MessageFetcherInterface
{
/**
* @return string|bool The message or false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchMessages should always return a MessageCollection. The message collection might we empty, but that is okey.
However, do not change the signature for the RouterInterface

*/
interface RouterInterface
{
public function fetchMessages();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must add some doc block here

/**
* @author Grégoire Pineau <lyrixx@lyrixx.info>
*/
interface ConsumerInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you have not provided an implementation of ConsumerInterface. How come?

* file that was distributed with this source code.
*/

namespace Symfony\Component\Amqp\Test;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be in "Tests"?

@mvrhov
Copy link

mvrhov commented Dec 27, 2017

But do we need a AMPQ library? Cant we use any of the 5-6(?) ones that already exists?

IMO if one needs full AMPQ support is should use one of external libraries.
Also the Messaging component #24411 will be able to handle this for ones that need something small/light

fabpot added a commit that referenced this pull request Mar 23, 2018
This PR was squashed before being merged into the 4.1-dev branch (closes #24411).

Discussion
----------

[Messenger] Add a new Messenger component

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #24326
| License       | MIT
| Doc PR        | symfony/symfony-docs#9437

As discussed in #24326. This PR is to help going forward with the discussions of having a Message component.

# Resources

| What | Where
| --- | ---
| Documentation | [In the PR](https://github.com/sroze/symfony/blob/add-message-component/src/Symfony/Component/Message/README.md)
| Demo | [In `sroze/symfony-demo:message-component-demo`](https://github.com/sroze/symfony-demo/compare/message-component-demo)
| [php-enqueue](https://github.com/php-enqueue/enqueue-dev) adapter | 1. Source: [In `sroze/enqueue-bridge`](https://github.com/sroze/enqueue-bridge) _(to be moved as `symfony/enqueue-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-enqueue`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-enqueue)
| [Swarrot](https://github.com/swarrot/swarrot) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/swarrot-bridge`](https://github.com/sroze/swarrot-bridge) _(to be moved as `symfony/swarrot-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-swarrot`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-swarrot)
| [HTTP](https://github.com/sroze/message-http-adapter) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/message-http-adapter`](https://github.com/sroze/message-http-adapter) <br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-http-adapter`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-http-adapter)
| Web profiler integration | _In the pull-request_

# Important points

1. **Tests are not in the PR as they were written in PhpSpec & Behat.** If we decide to go forward with this approach, I'll translate them to PHPUnit.
2. The aim is not to solve all the message/queuing problems but provide a good, simple and extensible message bus for developers.
3. The communication with the actual AMQP/API brokers is down to the adapters for now. Not sure if we need to ship some by default or not 🤔

I guess that this would replace #23842 & #23315.

# Changes from the proposals

Based on the comments, a few changes have been made from the proposal.

1. `MessageProducer`s have been renamed to `MessageSender`s
2. `MessageConsumer`s have been renamed to `MessageReceiver`s

Commits
-------

c9cfda9 [Messenger] Add a new Messenger component
@fabpot
Copy link
Member

fabpot commented Mar 23, 2018

Closing as we've just merged the Messenger component. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.