Skip to content

[HttpFoundation] Streamlining server event streaming #58743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2025

Conversation

yceruto
Copy link
Member

@yceruto yceruto commented Nov 4, 2024

Q A
Branch? 7.3
Bug fix? no
New feature? yes
Deprecations? no
Issues -
License MIT

While Mercure is the recommended option for most cases, implementing a custom Server-sent Event (SSE) backend relying on Symfony components is still a good alternative (considering you're building an app with a few users only)

Example using today’s code:

public function __invoke(): StreamedResponse
{
    $response = new StreamedResponse(function () {
        foreach ($this->watchJobsInProgress() as $job) {
            echo "type: jobs\n";
            echo "data: ".$job->toJson()."\n\n";

            StreamedResponse::closeOutputBuffers(0, true);
            flush();

            if (connection_aborted()) {
                break;
            }

            sleep(1);
        }
    });
    $response->headers->set('Content-Type', 'text/event-stream');
    $response->headers->set('Cache-Control', 'no-cache');
    $response->headers->set('Connection', 'keep-alive');
    $response->headers->set('X-Accel-Buffering', 'no');

    return $response;
}

So, this PR simplifies that implementation by adding two new artifacts:

✨ New ServerEvent DTO class

Spec: https://html.spec.whatwg.org/multipage/server-sent-events.html

The ServerEvent class represents individual events streamed from the server to clients. It includes fields like data, type, id, retry, and comment, so it can cover a variety of use cases.

One key feature is the getIterator() method, which automatically concat all these fields into the proper SSE format. This saves us from having to manually format the events, making the process a lot smoother. Plus, it supports iterable data, meaning it can handle multi-line messages or more complex data structures all in one event.

✨ New EventStreamResponse class

  • Simplified Headers Setup

    return new EventStreamResponse(/* ... */);

    This class automatically sets the required headers (Content-Type, Cache-Control, and Connection), ensuring that every response is properly configured for event streaming without additional code.

  • Generator-based Event Streaming with Auto Flush

    return new EventStreamResponse(function (): \Generator {    
        yield new ServerEvent(time(), type: 'ping');
    }); 

    The callback function can now yield events directly, automatically handling serialization and output buffering as each new event occurs.

  • Event Streaming with Sending Control

    return new EventStreamResponse(function (EventStreamResponse $response) {    
        $response->sendEvent(new ServerEvent('...'));
        // do something in between ...
        $response->sendEvent(new ServerEvent('...'));
    }); 

    The callback function receives a new argument—the response instance—allowing you to manually sendEvent().

    This method is especially useful when yield is not possible due to scope limitations. In the example below, the callback handles the event inside an event stream response, listening for Redis messages and directly triggering the sendEvent() method:

    return new EventStreamResponse(function (EventStreamResponse $response) {  
        $redis = new \Redis();  
        $redis->connect('127.0.0.1');  
        $redis->subscribe(['message'], function (/* ... */, string $message) use ($response) {  
            $response->sendEvent(new ServerEvent($message));  
        });  
    }); 
  • Retry Event Fallback

    return new EventStreamResponse(/* ... */, retry: 1000);

    ServerEvent allows setting individual retry intervals, which take precedence over the default retry configured in EventStreamResponse. If not specified at the event level, the global retry setting applies.

This is the final, optimized version after the proposed improvements:

public function __invoke(): EventStreamResponse
{
    return new EventStreamResponse(function () {
        foreach ($this->watchJobsInProgress() as $job) {
            yield new ServerEvent($job->toJson(), type: 'jobs');

            sleep(1);
        }
    });
}

Cheers!

@carsonbot carsonbot added Status: Needs Review DX DX = Developer eXperience (anything that improves the experience of using Symfony) Feature HttpFoundation labels Nov 4, 2024
@carsonbot carsonbot added this to the 7.2 milestone Nov 4, 2024
@carsonbot carsonbot changed the title [HttpFoundation][DX] Streamlining server event streaming [HttpFoundation] Streamlining server event streaming Nov 4, 2024
@yceruto yceruto force-pushed the event_stream_response branch from a48d275 to f27e9c1 Compare November 4, 2024 03:21
@nicolas-grekas
Copy link
Member

From an infrastructure PoV, this doesn't make sense to me: doing it like that, each stream connection consumes way to much memory to make anything useful. This can scale up to maybe hundreds of connections (maybe tenth) and then you hit a dead-end.

@yceruto
Copy link
Member Author

yceruto commented Nov 5, 2024

@nicolas-grekas If we do it with the Symfony stack for sure, I agree with you, but we can still use the HttpFoundation component with ReactPHP and the proper infrastructure, right?

@nicolas-grekas
Copy link
Member

Sure, but is that a real use case? How would such an app look like?

@yceruto
Copy link
Member Author

yceruto commented Nov 7, 2024

I don't have a real use case with ReactPHP, just with Symfony itself and an app within a private network. My infra is very small and I don't have the connections/memory problems in this case, just a few users.

My intention here is to simplify things for these straightforward cases; otherwise I'd recommend using Mercure of course.

@fabpot fabpot modified the milestones: 7.2, 7.3 Nov 20, 2024
@yceruto yceruto force-pushed the event_stream_response branch from f27e9c1 to eb84dfe Compare December 19, 2024 22:45
@yceruto
Copy link
Member Author

yceruto commented Dec 19, 2024

I still think these improvements can significantly enhance the developer experience for small applications with a limited number of connections or where memory consumption is manageable.

Friendly ping @symfony/mergers is this something unconventional?

@faizanakram99
Copy link
Contributor

faizanakram99 commented Dec 20, 2024

Fwiw, I think it's useful too. We also have something similar in house mainly for live feedback, for interoperability with cli (symfony commands), it extends Output class (not ideal I know but fulfills the needs).

So yes, we would benefit from this change.

<?php

declare(strict_types=1);

namespace Zzz\Common;

use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Output\Output;
use Symfony\Contracts\Service\Attribute\Required;

class EventSourceOutput extends Output
{
    #[Required]
    public LoggerInterface $logger;

    private bool $streamStarted = false;

    public function writeln(string|iterable $messages, int $options = self::OUTPUT_RAW): void
    {
        parent::writeln($messages, $options);
    }

    public function write(string|iterable $messages, bool $newline = false, int $options = self::OUTPUT_RAW): void
    {
        parent::write($messages, $newline, $options);
    }

    protected function doWrite(string $message, bool $newline): void
    {
        echo "{$this->stream($message)}\n\n";
    }

    private function stream(string $message): string
    {
        if (!$this->streamStarted) {
            $this->startStream();
        }

        if ((bool) \preg_match('|^<(\w+)>(.+)</(\w+)>$|', $message, $matches)) {
            return 'close' === $matches[1]
                ? "data: {$matches[2]}"
                : $this->prepareStream($matches[1], $matches[2]);
        }

        return $this->prepareStream('info', (string) \preg_replace('/\s+/', ' ', \strip_tags($message)));
    }

    private function prepareStream(string $type, string $text): string
    {
        if (!(bool) \preg_match('!!u', $text)) {
            // Not UTF-8
            $text = '...';
        }

        try {
            return \sprintf('data: %s', \json_encode(['type' => $type, 'text' => $text], \JSON_THROW_ON_ERROR));
        } catch (\JsonException $e) {
            $this->logger->critical($e->getMessage(), ['exception' => $e]);

            return 'data: END_SSE';
        }
    }

    private function startStream(): void
    {
        \ini_set('output_buffering', 'off');
        \ini_set('zlib.output_compression', '0');
        \ob_implicit_flush();

        while (\ob_get_level() > 0) {
            \ob_end_clean();
        }

        if (\function_exists('apache_setenv')) {
            apache_setenv('no-gzip', '1');
            apache_setenv('dont-vary', '1');
        }

        \header('Content-Type: text/event-stream');
        \header('Cache-Control: no-cache');
        \header('Connection: keep-alive');
        \header('X-Accel-Buffering: no');

        echo \str_pad('', 10000);
        echo "\n";

        $this->streamStarted = true;
    }

    public function hasStreamStarted(): bool
    {
        return $this->streamStarted;
    }

    public function endStream(): void
    {
        $this->writeln('<close>END_SSE</close>');
    }
}

Do note that X-Accel-Buffering: no header is needed for ngjnx, otherwise it buffers output

@welcoMattic
Copy link
Member

I like this, as it allows developers of small/private projects with small traffic to leverage SSE without supporting a Mercure instance.

But, I also agree with Nicolas, this can quickly leads to high memory consumption and this should be well documented as a warning.

@dunglas
Copy link
Member

dunglas commented Feb 3, 2025

I'm in favor of this feature too.

Mercure focus is event broadcasting, which cannot be (easily) achieved using this patch. But SSE is also useful for other use cases than broadcasting.

For instance, this feature could be useful for hinting the client about the progress of a synchronous job.

Regarding the memory/performance concerns, mentioning in the documentation that for broadcasting but also for high-traffic sites using Mercure would be more straightforward should be good enough.

@yceruto
Copy link
Member Author

yceruto commented Feb 3, 2025

Happy to read your positive comments! I'll be addressing them soon, thanks!

@yceruto yceruto force-pushed the event_stream_response branch 5 times, most recently from b692c6c to 3abc5b1 Compare February 6, 2025 05:17
@yceruto yceruto force-pushed the event_stream_response branch 2 times, most recently from db27a24 to 2498bd8 Compare February 6, 2025 05:31
@yceruto
Copy link
Member Author

yceruto commented Feb 6, 2025

I’ve addressed all the comments. hopefully, I understood them all correctly.
I also updated the PR description with a more specific use case.

it's ready for review again 🤞

@yceruto yceruto force-pushed the event_stream_response branch from 2498bd8 to 0bfd3d3 Compare February 6, 2025 06:06
@fabpot
Copy link
Member

fabpot commented Feb 7, 2025

Thank you @yceruto.

@fabpot fabpot merged commit b27f2ec into symfony:7.3 Feb 7, 2025
9 of 11 checks passed
@welcoMattic
Copy link
Member

Side note to mention that this feature will ease the integration of Symfony apps as MCP servers, which as 2 transports modes: stdio and SSE. 🤖 cc @lyrixx

@yceruto yceruto deleted the event_stream_response branch February 7, 2025 14:13
@fabpot fabpot mentioned this pull request May 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DX DX = Developer eXperience (anything that improves the experience of using Symfony) Feature HttpFoundation Status: Reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants