-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Console][Messenger] Asynchronously notify transports which messages are still being processed #53508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
317d963
to
7f91734
Compare
a7fd8df
to
7a8460c
Compare
I find this interesting since I've faced the issue you described with SQS. So to add this for SQS, we'd implement |
02f893f
to
674be99
Compare
Based on my previous comment, I have SQS implementation here valtzu@59a77be |
@valtzu That's great, thank you for giving this feature a try and implementing it for SQS. Hopefully, it will prove useful if this PR gets merged. |
src/Symfony/Component/Console/Command/AlarmableCommandInterface.php
Outdated
Show resolved
Hide resolved
8e6d3f2
to
29ba5b4
Compare
29ba5b4
to
89bf0e5
Compare
bd7e536
to
77f9da9
Compare
…`ConsoleAlarmEvent` (HypeMC) This PR was merged into the 7.2 branch. Discussion ---------- [Console] Add ability to schedule alarm signals and a `ConsoleAlarmEvent` | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | Fix #47920 | License | MIT Part of #53508 This PR introduces the ability to schedule alarm signals and a `console.alarm` event. A command using this feature will automatically dispatch an alarm at the specified interval: ```php #[AsCommand('app:alarm')] class AlarmCommand extends Command implements SignalableCommandInterface { private bool $run = true; private int $count = 0; private OutputInterface $output; protected function initialize(InputInterface $input, OutputInterface $output): void { $this->getApplication()->setAlarmInterval(10); } protected function execute(InputInterface $input, OutputInterface $output): int { $this->output = $output; while ($this->run) { $this->count++; $output->writeln('execute '.$this->count); sleep(1); } $output->writeln('Done'); return Command::SUCCESS; } public function getSubscribedSignals(): array { return [\SIGINT, \SIGALRM]; } public function handleSignal(int $signal, false|int $previousExitCode = 0): int|false { if (\SIGINT === $signal) { $this->run = false; $this->output->writeln('Bye'); } elseif (\SIGALRM === $signal) { $this->count = 0; $this->output->writeln('handleAlarm'); } return false; } } ``` The `console.alarm` event is dispatched on every `SIGALRM` signal: ```php #[AsEventListener(event: ConsoleAlarmEvent::class)] final class ConsoleAlarmListener { public function __invoke(ConsoleAlarmEvent $event): void { $event->getOutput()->writeln('ConsoleAlarmListener'); } } ``` Commits ------- 97e4391 [Console] Add ability to schedule alarm signals and a `console.alarm` event
…d, using `pcntl_alarm()`
@fabpot Done |
Thank you @HypeMC. |
… that are still being processed (valtzu) This PR was merged into the 7.2 branch. Discussion ---------- [Messenger] Extend SQS visibility timeout for messages that are still being processed | Q | A | ------------- | --- | Branch? | 7.2 | Bug fix? | no | New feature? | yes | Deprecations? | no | License | MIT Now that #53508 is merged, let's add keepalive implementation for SQS too. Commits ------- fa795c3 [Messenger] Extend SQS visibility timeout for messages that are still being processed
…eanstalkd) (OskarStark) This PR was squashed before being merged into the 7.2 branch. Discussion ---------- [Messenger] Describe `--keepalive` option (AmazonSQS & Beanstalkd) Follows * symfony/symfony#53508 Fixes #20305 #20381 cc `@HypeMC` `@chalasr` Commits ------- 8ce8f9b [Messenger] Describe `--keepalive` option (AmazonSQS & Beanstalkd)
Certain transports, such as Beanstalkd, have a timeout that determines how long a certain message is considered as "in progress". If the message is not acknowledged or rejected within that time period, it is returned to the "ready" queue.
This could cause potential issues when it takes a handler longer to finish processing the message. In those instances, the message being processed could be returned to the ready queue and taken by another instance of the receiver. This would result in the message being processed multiple times, or even worse, in an infinite loop.
Usually, transports that have a timeout also have the option to be notified that the message is still being processed, so the timeout can be postponed.
The idea of this PR is to utilize the
SignalableCommandInterface
andpcntl_alarm()
to notify transports which messages are still being processed. Since theSignalRegistry
has asynchronous signals enabled by default, the whole process would happen asynchronously. I've named this feature "keepalive" for lack of a better name.Currently, I've added this option only to the Beanstalkd transport since that's the one I'm familiar with and use, but as far as I was able to gather, at least one other transport supports this feature. Amazon SQS has a visibility timeout which can be increased. This visibility timeout seems to be the same thing as Beanstalkd's TTR. (Disclaimer: I've never used Amazon SQS, so if I got something wrong, please let me know.)
I've split the PR into two commits so it would (hopefully) be easier to review:
ConsoleAlarmEvent
#53533