Skip to content

[Console][Messenger] Asynchronously notify transports which messages are still being processed #53508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 7, 2024

Conversation

HypeMC
Copy link
Contributor

@HypeMC HypeMC commented Jan 11, 2024

Q A
Branch? 7.2
Bug fix? no
New feature? yes
Deprecations? no
Issues -
License MIT

Certain transports, such as Beanstalkd, have a timeout that determines how long a certain message is considered as "in progress". If the message is not acknowledged or rejected within that time period, it is returned to the "ready" queue.

This could cause potential issues when it takes a handler longer to finish processing the message. In those instances, the message being processed could be returned to the ready queue and taken by another instance of the receiver. This would result in the message being processed multiple times, or even worse, in an infinite loop.

Usually, transports that have a timeout also have the option to be notified that the message is still being processed, so the timeout can be postponed.

The idea of this PR is to utilize the SignalableCommandInterface and pcntl_alarm() to notify transports which messages are still being processed. Since the SignalRegistry has asynchronous signals enabled by default, the whole process would happen asynchronously. I've named this feature "keepalive" for lack of a better name.

Currently, I've added this option only to the Beanstalkd transport since that's the one I'm familiar with and use, but as far as I was able to gather, at least one other transport supports this feature. Amazon SQS has a visibility timeout which can be increased. This visibility timeout seems to be the same thing as Beanstalkd's TTR. (Disclaimer: I've never used Amazon SQS, so if I got something wrong, please let me know.)

I've split the PR into two commits so it would (hopefully) be easier to review:

  1. I've extracted the first commit into a separate PR: [Console] Add ability to schedule alarm signals and a ConsoleAlarmEvent #53533
  2. The second commit adds the keepalive feature to the Messenger component.

@HypeMC HypeMC requested a review from chalasr as a code owner January 11, 2024 11:44
@carsonbot carsonbot added this to the 7.1 milestone Jan 11, 2024
@carsonbot carsonbot changed the title [Messenger][Console] Asynchronously notify transports which messages are still being processed [Console][Messenger] Asynchronously notify transports which messages are still being processed Jan 11, 2024
@HypeMC HypeMC force-pushed the add-keepalive branch 5 times, most recently from 317d963 to 7f91734 Compare January 13, 2024 09:32
@HypeMC HypeMC force-pushed the add-keepalive branch 3 times, most recently from a7fd8df to 7a8460c Compare January 13, 2024 19:36
@valtzu
Copy link
Contributor

valtzu commented Jan 24, 2024

I find this interesting since I've faced the issue you described with SQS.

So to add this for SQS, we'd implement KeepaliveReceiverInterface on AmazonSqsTransport and call ChangeMessageVisibility with visibility_timeout in AmazonSqsTransport::keepalive(). Since this keep-alive mechanism is only used when --keepalive is provided to messenger:consume, it would not be a BC break even though visibility_timeout would have a slightly different meaning then. It would also mean in the configuration visibility_timeout should always be bigger than --keepalive – probably something to address in the documentation.

@valtzu
Copy link
Contributor

valtzu commented Feb 25, 2024

Based on my previous comment, I have SQS implementation here valtzu@59a77be

@HypeMC
Copy link
Contributor Author

HypeMC commented Feb 25, 2024

@valtzu That's great, thank you for giving this feature a try and implementing it for SQS. Hopefully, it will prove useful if this PR gets merged.

@xabbuh xabbuh removed this from the 7.1 milestone May 15, 2024
@HypeMC HypeMC force-pushed the add-keepalive branch 4 times, most recently from 8e6d3f2 to 29ba5b4 Compare September 17, 2024 07:39
@HypeMC HypeMC force-pushed the add-keepalive branch 4 times, most recently from bd7e536 to 77f9da9 Compare October 1, 2024 10:37
fabpot added a commit that referenced this pull request Oct 6, 2024
…`ConsoleAlarmEvent` (HypeMC)

This PR was merged into the 7.2 branch.

Discussion
----------

[Console] Add ability to schedule alarm signals and a `ConsoleAlarmEvent`

| Q             | A
| ------------- | ---
| Branch?       | 7.2
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Issues        | Fix #47920
| License       | MIT

Part of #53508

This PR introduces the ability to schedule alarm signals and a `console.alarm` event. A command using this feature will automatically dispatch an alarm at the specified interval:

```php
#[AsCommand('app:alarm')]
class AlarmCommand extends Command implements SignalableCommandInterface
{
    private bool $run = true;
    private int $count = 0;
    private OutputInterface $output;

    protected function initialize(InputInterface $input, OutputInterface $output): void
    {
        $this->getApplication()->setAlarmInterval(10);
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $this->output = $output;

        while ($this->run) {
            $this->count++;
            $output->writeln('execute '.$this->count);
            sleep(1);
        }

        $output->writeln('Done');

        return Command::SUCCESS;
    }

    public function getSubscribedSignals(): array
    {
        return [\SIGINT, \SIGALRM];
    }

    public function handleSignal(int $signal, false|int $previousExitCode = 0): int|false
    {
        if (\SIGINT === $signal) {
            $this->run = false;
            $this->output->writeln('Bye');
        } elseif (\SIGALRM === $signal) {
            $this->count = 0;
            $this->output->writeln('handleAlarm');
        }

        return false;
    }
}
```

The `console.alarm` event is dispatched on every `SIGALRM` signal:

```php
#[AsEventListener(event: ConsoleAlarmEvent::class)]
final class ConsoleAlarmListener
{
    public function __invoke(ConsoleAlarmEvent $event): void
    {
        $event->getOutput()->writeln('ConsoleAlarmListener');
    }
}
```

Commits
-------

97e4391 [Console] Add ability to schedule alarm signals and a `console.alarm` event
@fabpot
Copy link
Member

fabpot commented Oct 6, 2024

@HypeMC Can you rebase now that #53533 has been merged? Thank you.

@HypeMC
Copy link
Contributor Author

HypeMC commented Oct 6, 2024

@fabpot Done

@chalasr
Copy link
Member

chalasr commented Oct 7, 2024

Thank you @HypeMC.

@chalasr chalasr merged commit f18789f into symfony:7.2 Oct 7, 2024
9 of 10 checks passed
@HypeMC HypeMC deleted the add-keepalive branch October 7, 2024 18:28
@fabpot fabpot mentioned this pull request Oct 27, 2024
fabpot added a commit that referenced this pull request Nov 9, 2024
… that are still being processed (valtzu)

This PR was merged into the 7.2 branch.

Discussion
----------

[Messenger] Extend SQS visibility timeout for messages that are still being processed

| Q             | A
| ------------- | ---
| Branch?       | 7.2
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| License       | MIT

Now that #53508 is merged, let's add keepalive implementation for SQS too.

Commits
-------

fa795c3 [Messenger] Extend SQS visibility timeout for messages that are still being processed
javiereguiluz added a commit to symfony/symfony-docs that referenced this pull request Jan 28, 2025
…eanstalkd) (OskarStark)

This PR was squashed before being merged into the 7.2 branch.

Discussion
----------

[Messenger] Describe `--keepalive` option (AmazonSQS & Beanstalkd)

Follows
* symfony/symfony#53508

Fixes #20305 #20381

cc `@HypeMC` `@chalasr`

Commits
-------

8ce8f9b [Messenger] Describe `--keepalive` option (AmazonSQS & Beanstalkd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants