-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[HttpFoundation] Streamlining server event streaming #58743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
a48d275
to
f27e9c1
Compare
From an infrastructure PoV, this doesn't make sense to me: doing it like that, each stream connection consumes way to much memory to make anything useful. This can scale up to maybe hundreds of connections (maybe tenth) and then you hit a dead-end. |
@nicolas-grekas If we do it with the Symfony stack for sure, I agree with you, but we can still use the HttpFoundation component with ReactPHP and the proper infrastructure, right? |
Sure, but is that a real use case? How would such an app look like? |
I don't have a real use case with ReactPHP, just with Symfony itself and an app within a private network. My infra is very small and I don't have the connections/memory problems in this case, just a few users. My intention here is to simplify things for these straightforward cases; otherwise I'd recommend using Mercure of course. |
f27e9c1
to
eb84dfe
Compare
I still think these improvements can significantly enhance the developer experience for small applications with a limited number of connections or where memory consumption is manageable. Friendly ping @symfony/mergers is this something unconventional? |
Fwiw, I think it's useful too. We also have something similar in house mainly for live feedback, for interoperability with cli (symfony commands), it extends Output class (not ideal I know but fulfills the needs). So yes, we would benefit from this change. <?php
declare(strict_types=1);
namespace Zzz\Common;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Output\Output;
use Symfony\Contracts\Service\Attribute\Required;
class EventSourceOutput extends Output
{
#[Required]
public LoggerInterface $logger;
private bool $streamStarted = false;
public function writeln(string|iterable $messages, int $options = self::OUTPUT_RAW): void
{
parent::writeln($messages, $options);
}
public function write(string|iterable $messages, bool $newline = false, int $options = self::OUTPUT_RAW): void
{
parent::write($messages, $newline, $options);
}
protected function doWrite(string $message, bool $newline): void
{
echo "{$this->stream($message)}\n\n";
}
private function stream(string $message): string
{
if (!$this->streamStarted) {
$this->startStream();
}
if ((bool) \preg_match('|^<(\w+)>(.+)</(\w+)>$|', $message, $matches)) {
return 'close' === $matches[1]
? "data: {$matches[2]}"
: $this->prepareStream($matches[1], $matches[2]);
}
return $this->prepareStream('info', (string) \preg_replace('/\s+/', ' ', \strip_tags($message)));
}
private function prepareStream(string $type, string $text): string
{
if (!(bool) \preg_match('!!u', $text)) {
// Not UTF-8
$text = '...';
}
try {
return \sprintf('data: %s', \json_encode(['type' => $type, 'text' => $text], \JSON_THROW_ON_ERROR));
} catch (\JsonException $e) {
$this->logger->critical($e->getMessage(), ['exception' => $e]);
return 'data: END_SSE';
}
}
private function startStream(): void
{
\ini_set('output_buffering', 'off');
\ini_set('zlib.output_compression', '0');
\ob_implicit_flush();
while (\ob_get_level() > 0) {
\ob_end_clean();
}
if (\function_exists('apache_setenv')) {
apache_setenv('no-gzip', '1');
apache_setenv('dont-vary', '1');
}
\header('Content-Type: text/event-stream');
\header('Cache-Control: no-cache');
\header('Connection: keep-alive');
\header('X-Accel-Buffering: no');
echo \str_pad('', 10000);
echo "\n";
$this->streamStarted = true;
}
public function hasStreamStarted(): bool
{
return $this->streamStarted;
}
public function endStream(): void
{
$this->writeln('<close>END_SSE</close>');
}
} Do note that |
I like this, as it allows developers of small/private projects with small traffic to leverage SSE without supporting a Mercure instance. But, I also agree with Nicolas, this can quickly leads to high memory consumption and this should be well documented as a warning. |
I'm in favor of this feature too. Mercure focus is event broadcasting, which cannot be (easily) achieved using this patch. But SSE is also useful for other use cases than broadcasting. For instance, this feature could be useful for hinting the client about the progress of a synchronous job. Regarding the memory/performance concerns, mentioning in the documentation that for broadcasting but also for high-traffic sites using Mercure would be more straightforward should be good enough. |
Happy to read your positive comments! I'll be addressing them soon, thanks! |
b692c6c
to
3abc5b1
Compare
db27a24
to
2498bd8
Compare
I’ve addressed all the comments. hopefully, I understood them all correctly. it's ready for review again 🤞 |
2498bd8
to
0bfd3d3
Compare
Thank you @yceruto. |
Side note to mention that this feature will ease the integration of Symfony apps as MCP servers, which as 2 transports modes: |
While Mercure is the recommended option for most cases, implementing a custom Server-sent Event (SSE) backend relying on Symfony components is still a good alternative (considering you're building an app with a few users only)
Example using today’s code:
So, this PR simplifies that implementation by adding two new artifacts:
✨ New
ServerEvent
DTO classSpec: https://html.spec.whatwg.org/multipage/server-sent-events.html
The
ServerEvent
class represents individual events streamed from the server to clients. It includes fields likedata
,type
,id
,retry
, andcomment
, so it can cover a variety of use cases.One key feature is the
getIterator()
method, which automatically concat all these fields into the proper SSE format. This saves us from having to manually format the events, making the process a lot smoother. Plus, it supports iterabledata
, meaning it can handle multi-line messages or more complex data structures all in one event.✨ New
EventStreamResponse
classSimplified Headers Setup
This class automatically sets the required headers (
Content-Type
,Cache-Control
, andConnection
), ensuring that every response is properly configured for event streaming without additional code.Generator-based Event Streaming with Auto Flush
The callback function can now
yield
events directly, automatically handling serialization and output buffering as each new event occurs.Event Streaming with Sending Control
The callback function receives a new argument—the response instance—allowing you to manually
sendEvent()
.This method is especially useful when
yield
is not possible due to scope limitations. In the example below, the callback handles the event inside an event stream response, listening for Redis messages and directly triggering thesendEvent()
method:Retry Event Fallback
ServerEvent
allows setting individualretry
intervals, which take precedence over the default retry configured inEventStreamResponse
. If not specified at the event level, the global retry setting applies.This is the final, optimized version after the proposed improvements:
Cheers!