Skip to content

Commit b692c6c

Browse files
committed
Streamlining server event streaming
1 parent 7e9ecaf commit b692c6c

File tree

4 files changed

+380
-0
lines changed

4 files changed

+380
-0
lines changed

src/Symfony/Component/HttpFoundation/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add support for iterable of string in `StreamedResponse`
8+
* Add `EventStreamResponse` and `ServerEvent` classes to streamline server event streaming
89

910
7.2
1011
---
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpFoundation;
13+
14+
/**
15+
* Represents a streaming HTTP response for sending server events
16+
* as part of the Server-Sent Events (SSE) streaming technique.
17+
*
18+
* To broadcast events to multiple users at once, for long-running
19+
* connections and for high-traffic websites, prefer using the Mercure
20+
* Symfony Component, which relies on Software designed for these use
21+
* cases: https://symfony.com/doc/current/mercure.html
22+
*
23+
* @see ServerEvent
24+
*
25+
* @author Yonel Ceruto <open@yceruto.dev>
26+
*
27+
* Example usage:
28+
*
29+
* return new EventStreamResponse(function () {
30+
* yield new ServerEvent(time());
31+
*
32+
* sleep(1);
33+
*
34+
* yield new ServerEvent(time());
35+
* });
36+
*/
37+
class EventStreamResponse extends StreamedResponse
38+
{
39+
/**
40+
* @param int|null $retry The number of milliseconds the client should wait
41+
* before reconnecting in case of network failure
42+
*/
43+
public function __construct(?callable $callback = null, int $status = 200, array $headers = [], private ?int $retry = null)
44+
{
45+
$headers += [
46+
'Connection' => 'keep-alive',
47+
'Content-Type' => 'text/event-stream',
48+
'Cache-Control' => 'private, no-cache, no-store, must-revalidate, max-age=0',
49+
'X-Accel-Buffering' => 'no',
50+
'Pragma' => 'no-cache',
51+
'Expire' => '0',
52+
];
53+
54+
parent::__construct($callback, $status, $headers);
55+
}
56+
57+
public function setCallback(callable $callback): static
58+
{
59+
if ($this->callback) {
60+
return parent::setCallback($callback);
61+
}
62+
63+
$this->callback = function () use ($callback) {
64+
if (is_iterable($events = $callback($this))) {
65+
foreach ($events as $event) {
66+
$this->sendEvent($event);
67+
68+
if (connection_aborted()) {
69+
break;
70+
}
71+
}
72+
}
73+
};
74+
75+
return $this;
76+
}
77+
78+
/**
79+
* Sends a server event to the client.
80+
*
81+
* @return $this
82+
*/
83+
public function sendEvent(ServerEvent $event): static
84+
{
85+
if ($this->retry > 0 && !$event->getRetry()) {
86+
$event->setRetry($this->retry);
87+
}
88+
89+
foreach ($event->stream() as $string) {
90+
echo $string;
91+
92+
if (!\in_array(\PHP_SAPI, ['cli', 'phpdbg', 'embed'], true)) {
93+
static::closeOutputBuffers(0, true);
94+
flush();
95+
}
96+
}
97+
98+
return $this;
99+
}
100+
101+
public function getRetry(): ?int
102+
{
103+
return $this->retry;
104+
}
105+
106+
public function setRetry(int $retry): void
107+
{
108+
$this->retry = $retry;
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpFoundation;
13+
14+
/**
15+
* An event generated on the server intended for streaming to the client
16+
* as part of the SSE streaming technique.
17+
*
18+
* @author Yonel Ceruto <open@yceruto.dev>
19+
*/
20+
class ServerEvent
21+
{
22+
/**
23+
* @param string|iterable<string> $data The event data field for the message
24+
* @param string|null $type The event type
25+
* @param int|null $retry The number of milliseconds the client should wait
26+
* before reconnecting in case of network failure
27+
* @param string|null $id The event ID to set the EventSource object's last event ID value
28+
* @param string|null $comment The event comment
29+
*/
30+
public function __construct(
31+
private string|iterable $data,
32+
private ?string $type = null,
33+
private ?int $retry = null,
34+
private ?string $id = null,
35+
private ?string $comment = null,
36+
) {
37+
}
38+
39+
public function getData(): iterable|string
40+
{
41+
return $this->data;
42+
}
43+
44+
/**
45+
* @return $this
46+
*/
47+
public function setData(iterable|string $data): static
48+
{
49+
$this->data = $data;
50+
51+
return $this;
52+
}
53+
54+
public function getType(): ?string
55+
{
56+
return $this->type;
57+
}
58+
59+
/**
60+
* @return $this
61+
*/
62+
public function setType(string $type): static
63+
{
64+
$this->type = $type;
65+
66+
return $this;
67+
}
68+
69+
public function getRetry(): ?int
70+
{
71+
return $this->retry;
72+
}
73+
74+
/**
75+
* @return $this
76+
*/
77+
public function setRetry(?int $retry): static
78+
{
79+
$this->retry = $retry;
80+
81+
return $this;
82+
}
83+
84+
public function getId(): ?string
85+
{
86+
return $this->id;
87+
}
88+
89+
/**
90+
* @return $this
91+
*/
92+
public function setId(string $id): static
93+
{
94+
$this->id = $id;
95+
96+
return $this;
97+
}
98+
99+
public function getComment(): ?string
100+
{
101+
return $this->comment;
102+
}
103+
104+
public function setComment(string $comment): static
105+
{
106+
$this->comment = $comment;
107+
108+
return $this;
109+
}
110+
111+
/**
112+
* @return \Traversable<string>
113+
*/
114+
public function stream(): \Traversable
115+
{
116+
static $lastRetry = null;
117+
118+
if ($this->comment) {
119+
yield \sprintf(': %s', $this->comment)."\n";
120+
}
121+
if ($this->id) {
122+
yield \sprintf('id: %s', $this->id)."\n";
123+
}
124+
if ($this->retry > 0 && $this->retry !== $lastRetry) {
125+
yield \sprintf('retry: %s', $lastRetry = $this->retry)."\n";
126+
}
127+
if ($this->type) {
128+
yield \sprintf('event: %s', $this->type)."\n";
129+
}
130+
if ($this->data) {
131+
if (is_iterable($this->data)) {
132+
foreach ($this->data as $data) {
133+
yield \sprintf('data: %s', $data)."\n";
134+
}
135+
} else {
136+
yield \sprintf('data: %s', $this->data)."\n";
137+
}
138+
}
139+
140+
yield "\n";
141+
}
142+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\HttpFoundation\Tests;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\HttpFoundation\EventStreamResponse;
16+
use Symfony\Component\HttpFoundation\ServerEvent;
17+
18+
class EventStreamResponseTest extends TestCase
19+
{
20+
public function testInitializationWithDefaultValues()
21+
{
22+
$response = new EventStreamResponse();
23+
24+
$this->assertSame('text/event-stream', $response->headers->get('content-type'));
25+
$this->assertSame('max-age=0, must-revalidate, no-cache, no-store, private', $response->headers->get('cache-control'));
26+
$this->assertSame('keep-alive', $response->headers->get('connection'));
27+
28+
$this->assertSame(200, $response->getStatusCode());
29+
$this->assertNull($response->getRetry());
30+
}
31+
32+
public function testStreamSingleEvent()
33+
{
34+
$response = new EventStreamResponse(function () {
35+
yield new ServerEvent(
36+
data: 'foo',
37+
type: 'bar',
38+
retry: 100,
39+
id: '1',
40+
comment: 'bla bla',
41+
);
42+
});
43+
44+
$expected = <<<STR
45+
: bla bla
46+
id: 1
47+
retry: 100
48+
event: bar
49+
data: foo
50+
51+
52+
STR;
53+
54+
$this->assertSameResponseContent($expected, $response);
55+
}
56+
57+
public function testStreamEventsAndData()
58+
{
59+
$data = static function (): iterable {
60+
yield 'first line';
61+
yield 'second line';
62+
yield 'third line';
63+
};
64+
65+
$response = new EventStreamResponse(function () use ($data) {
66+
yield new ServerEvent('single line');
67+
yield new ServerEvent(['first line', 'second line']);
68+
yield new ServerEvent($data());
69+
});
70+
71+
$expected = <<<STR
72+
data: single line
73+
74+
data: first line
75+
data: second line
76+
77+
data: first line
78+
data: second line
79+
data: third line
80+
81+
82+
STR;
83+
84+
$this->assertSameResponseContent($expected, $response);
85+
}
86+
87+
public function testStreamEventsWithRetryFallback()
88+
{
89+
$response = new EventStreamResponse(function () {
90+
yield new ServerEvent('foo');
91+
yield new ServerEvent('bar');
92+
yield new ServerEvent('baz', retry: 1000);
93+
}, retry: 1500);
94+
95+
$expected = <<<STR
96+
retry: 1500
97+
data: foo
98+
99+
data: bar
100+
101+
retry: 1000
102+
data: baz
103+
104+
105+
STR;
106+
107+
$this->assertSameResponseContent($expected, $response);
108+
}
109+
110+
public function testStreamEventWithSendMethod()
111+
{
112+
$response = new EventStreamResponse(function (EventStreamResponse $response) {
113+
$response->sendEvent(new ServerEvent('foo'));
114+
});
115+
116+
$this->assertSameResponseContent("data: foo\n\n", $response);
117+
}
118+
119+
private function assertSameResponseContent(string $expected, EventStreamResponse $response): void
120+
{
121+
ob_start();
122+
$response->send();
123+
$actual = ob_get_clean();
124+
125+
$this->assertSame($expected, $actual);
126+
}
127+
}

0 commit comments

Comments
 (0)