Skip to content

[Messenger] Add a way to no ack message automatically #42873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

lyrixx
Copy link
Member

@lyrixx lyrixx commented Sep 3, 2021

Q A
Branch? 5.4
Bug fix? no
New feature? yes
Deprecations? no
Tickets Fix #36910
License MIT
Doc PR

This PR add a way to manually controller auto ACK from the handler.
It open the door to buffering.

Here is the code one must write to buffer message, and process the buffer each 10s. No more code is needed!

<?php

namespace App\MessageHandler;

use App\Message\HelloMessage;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Handler\ConfigureAutoAckInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class HelloMessageHandler implements MessageHandlerInterface, ConfigureAutoAckInterface, EventSubscriberInterface
{
    private $buffer = [];
    private $lastFlushedAt = null;

    public function __construct(
        private ContainerInterface $receiverLocator,
        private LoggerInterface $logger,
    ) {
        $this->lastFlushedAt = time();
    }

    public function __invoke(HelloMessage $message, Envelope $envelope)
    {
        if ($this->isAutoAckDisabled($envelope)) {
            $this->logger->info('Add message to buffer.');
            $this->buffer[] = $envelope;
            $this->flushIfNeeded();

            return;
        }

        $this->logger->info('process message.');

        // Do regular processing
    }

    public function isAutoAckDisabled(Envelope $envelope): bool
    {
        // This handler could be used by many transports.
        // But only the async one should be manually controlled // bufferised 
        return $envelope->last(ReceivedStamp::class)->getTransportName() === 'async';
    }

    public function flushIfNeeded()
    {
        $this->logger->info('Flush buffer if needed.');
        if (time() < $this->lastFlushedAt + 10) {
            return;
        }
        $this->flush()
    }

    public function flush()
    {
        $this->logger->info('Flush buffer.');

        $this->lastFlushedAt = time();

        // Do your custom processing on the buffer

        try {
            foreach ($this->buffer as $envelope) {
                /** @var ReceiverInterface */
                $receiver = $this->receiverLocator->get($envelope->last(ReceivedStamp::class)->getTransportName());
                $receiver->ack($envelope);
            }
        } finally {
            $this->buffer = [];
        }
    }

    public static function getSubscribedEvents()
    {
        return [
            WorkerRunningEvent::class =>  'flushIfNeeded',
            WorkerStoppedEvent::class =>  'flush',
        ];
    }
}
# services.yaml
services:
    App\MessageHandler\HelloMessageHandler:
        arguments:
            $receiverLocator: '@messenger.receiver_locator'

sponsored by arte.tv

@carsonbot
Copy link

Hey!

I think @tyx has recently worked with this code. Maybe they can help review this?

Cheers!

Carsonbot

Copy link
Member

@jderusse jderusse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this feature!.

Beware in your example, flushing in not an option on consumer stop

-WorkerStoppedEvent::class =>  'flushIfNeeded',
+WorkerStoppedEvent::class =>  'forceFlush',

@lyrixx
Copy link
Member Author

lyrixx commented Sep 22, 2021

Hello @symfony/mergers

The initial issue got 8 👍🏼 and, this PR got 5 👍🏼 and @jderusse liked this feature.

So without feedback, I'm planning to merge this PR by the end of the month, before the feature freeze.
If you are against it, it's time to raise your voice :)

@okwinza
Copy link
Contributor

okwinza commented Sep 22, 2021

This is indeed a feature I would like to see in 5.4. 👍

Btw, @lyrixx any chance you could also take a look at my #42335 before the feature freeze is upon us? 😅

@lyrixx
Copy link
Member Author

lyrixx commented Oct 1, 2021

I have rebased the PR (to fix conflict on CHANGELOG.md file)

Copy link
Member

@nicolas-grekas nicolas-grekas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes handlers aware of the transport layer, which is something that we refused to do as of now, see eg #42005.
By doing so, this approach is also giving the responsibility of buffering to handlers. I'm wondering if handlers are the right place to do that. I don't think so for now.

Instead, what about adding a middleware that does the buffering, and communicates with the worker via a stamp as done here? Buffering would be enabled/disabled by either setting a limit on the number of messages per batch, or by injecting a buffering decision strategy if we want something super flexible.

BTW, how are we sure that messages are flushed eventually, when we don't know when messages can arrive? Should we inject a delayed message that would trigger the flush if nothing happened before it?

About naming, "DelayedAckStamp" made me think: "delayed? when?". Maybe this should be named "DisableAutoAckStamp"?

@lyrixx
Copy link
Member Author

lyrixx commented Oct 6, 2021

Instead, what about adding a middleware that does the buffering, and communicates with the worker via a stamp as done here? Buffering would be enabled/disabled by either setting a limit on the number of messages per batch, or by injecting a buffering decision strategy if we want something super flexible.

Hmm, indeed it might be possible. But ATM, I fail to see how the middleware could actually call the "code that process the buffer". Should we introduce a new method (via an interface) on the handler (Like it's done on monolog handler)?

So basically, you want me to move the the ConfigurableAutoAckInterface::isAutoAckDisabled() to a middleware instead of the handler itself ? (I'm asking to be sure I fully understand)

BTW, how are we sure that messages are flushed eventually, when we don't know when messages can arrive? Should we inject a delayed message that would trigger the flush if nothing happened before it?

As you can see in the PR body, user need to register also a listener but a timer to flush after a configured time. But this could leave in the middleware

About naming, "DelayedAckStamp" made me think: "delayed? when?". Maybe this should be named "DisableAutoAckStamp"?

I agree 👍🏼


But I see a drawback with your idea: a new middleware implies a new bus because one doesn't want to buffer all message: https://symfony.com/doc/current/messenger.html#middleware

So publishing a message will be more complexe because one will have to choose the right buses when dispatching the message. IMHO, it's error prone.

So you want to move the "handlers [is] aware of the transport layer" to the publisher is aware of the transport layer. Is it really better?

@nicolas-grekas
Copy link
Member

how the middleware could actually call the "code that process the buffer".

can't the middleware call the handler many times in a loop, once per buffered message?
the middleware would do that when the Nth message arrives (aka when the buffer is full).

So basically, you want me to move the the ConfigurableAutoAckInterface::isAutoAckDisabled() to a middleware instead of the handler itself ?

I think we won't need any new interface if we do this. The stamp might be enough.

a new middleware implies a new bus because one doesn't want to buffer all message

we could have a config option that gives the size of the buffer, and if the size is > 1, then we add this middleware? (or we remove it if it's <= 1? or we make it a no-op if we can't skip registering it).

@lyrixx
Copy link
Member Author

lyrixx commented Oct 6, 2021

can't the middleware call the handler many times in a loop, once per buffered message?
the middleware would do that when the Nth message arrives (aka when the buffer is full).

nope, because you don't know how it's the "last message". You need all messages at once.

I think we won't need any new interface if we do this. The stamp might be enough.

You don't have access to the stamp in the handler. Adding it is what you don't want.

we could have a config option that gives the size of the buffer, and if the size is > 1, then we add this middleware? (or we remove it if it's <= 1? or we make it a no-op if we can't skip registering it).

I don't understand. Middlerware a setup for a busses. not for a transport or handler. See the doc to better understand.

Copy link
Member

@nicolas-grekas nicolas-grekas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the PR description:

__invoke(HelloMessage $message, Envelope $envelope)

the envelope is not passed as 2nd arg so this doesn't work it would be in this PR :)
to be well thought as we rejected doing so in the past

$envelope->last(ReceivedStamp::class)->getTransportName() === 'async';

hardcoding the name of the transport in the class should be really avoided

I'm putting this on hold while looking for alternative approaches.

@nicolas-grekas
Copy link
Member

Here is another proposal on the topic: #43354

@nicolas-grekas
Copy link
Member

Closing in favor of #43354, thanks for pushing this @lyrixx!

@lyrixx lyrixx deleted the messenger-batch branch October 28, 2021 07:15
fabpot added a commit that referenced this pull request Oct 30, 2021
…las-grekas)

This PR was merged into the 5.4 branch.

Discussion
----------

[Messenger] allow processing messages in batches

| Q             | A
| ------------- | ---
| Branch?       | 5.4
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | #36910
| License       | MIT
| Doc PR        | -

This replaces #42873 as it proposes an alternative approach to handling messages in batch.

`BatchHandlerInterface` says it all: if a handler implements this interface, then it should expect a new `$ack` optional argument to be provided when `__invoke()` is called. When `$ack` is not provided, `__invoke()` is expected to handle the message synchronously as usual. But when `$ack` is provided, `__invoke()` is expected to buffer the message and its `$ack` function, and to return the number of pending messages in the batch.

Batch handlers are responsible for deciding when they flush their buffers, calling the `$ack` functions while doing so.

Best reviewed [ignoring whitespaces](https://github.com/symfony/symfony/pull/43354/files?w=1).

Here is what a batch handler might look like:

```php
class MyBatchHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __invoke(MyMessage $message, Acknowledger $ack = null)
    {
        return $this->handle($message, $ack);
    }

    private function process(array $jobs): void
    {
        foreach ($jobs as [$job, $ack]) {
            try {
                // [...] compute $result from $job
                $ack->ack($result);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }
}
```

By default, `$jobs` contains the messages to handle, but it can be anything as returned by `BatchHandlerTrait::schedule()` (eg a Symfony HttpClient response derived from the message, a promise, etc.).

The size of the batch is controlled by `BatchHandlerTrait::shouldProcess()` (defaults to 10).

The transport is acknowledged in batch, *after* the bus returned from dispatching (unlike what is done in #42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully.

By default, pending batches are flushed when the worker is idle and when it stops.

Commits
-------

81e52b2 [Messenger] allow processing messages in batches
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants