Skip to content

Commit f7f1921

Browse files
committed
Merge branch 'refs/heads/ext-event-loop'
Conflicts: .travis.yml
2 parents 9d92756 + b72dbf4 commit f7f1921

File tree

5 files changed

+387
-0
lines changed

5 files changed

+387
-0
lines changed

scripts/travis-init.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ if [ "$TRAVIS_PHP_VERSION" != "hhvm" ] && [ "\$(php --re libevent | grep 'does n
77
cd libevent-0.0.5 && phpize && ./configure && make && sudo make install && cd ../;
88
echo "extension=libevent.so" >> `php --ini | grep "Loaded Configuration" | sed -e "s|.*:\s*||"`;
99
fi
10+
echo "yes" | pecl install event
1011
composer self-update
1112
composer install --dev --prefer-source

src/React/EventLoop/ExtEventLoop.php

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
<?php
2+
3+
namespace React\EventLoop;
4+
5+
use Event;
6+
use EventBase;
7+
use React\EventLoop\Tick\NextTickQueue;
8+
use React\EventLoop\Timer\Timer;
9+
use React\EventLoop\Timer\TimerInterface;
10+
use SplObjectStorage;
11+
12+
/**
13+
* An ext-event based event-loop.
14+
*/
15+
class ExtEventLoop implements LoopInterface
16+
{
17+
private $eventBase;
18+
private $nextTickQueue;
19+
private $timerCallback;
20+
private $timerEvents;
21+
private $streamCallback;
22+
private $streamEvents = [];
23+
private $streamFlags = [];
24+
private $readListeners = [];
25+
private $writeListeners = [];
26+
private $running;
27+
28+
public function __construct()
29+
{
30+
$this->eventBase = new EventBase();
31+
$this->nextTickQueue = new NextTickQueue($this);
32+
$this->timerEvents = new SplObjectStorage();
33+
34+
$this->createTimerCallback();
35+
$this->createStreamCallback();
36+
}
37+
38+
/**
39+
* {@inheritdoc}
40+
*/
41+
public function addReadStream($stream, callable $listener)
42+
{
43+
$key = (int) $stream;
44+
45+
if (!isset($this->readListeners[$key])) {
46+
$this->readListeners[$key] = $listener;
47+
$this->subscribeStreamEvent($stream, Event::READ);
48+
}
49+
}
50+
51+
/**
52+
* {@inheritdoc}
53+
*/
54+
public function addWriteStream($stream, callable $listener)
55+
{
56+
$key = (int) $stream;
57+
58+
if (!isset($this->writeListeners[$key])) {
59+
$this->writeListeners[$key] = $listener;
60+
$this->subscribeStreamEvent($stream, Event::WRITE);
61+
}
62+
}
63+
64+
/**
65+
* {@inheritdoc}
66+
*/
67+
public function removeReadStream($stream)
68+
{
69+
$key = (int) $stream;
70+
71+
if (isset($this->readListeners[$key])) {
72+
unset($this->readListeners[$key]);
73+
$this->unsubscribeStreamEvent($stream, Event::READ);
74+
}
75+
}
76+
77+
/**
78+
* {@inheritdoc}
79+
*/
80+
public function removeWriteStream($stream)
81+
{
82+
$key = (int) $stream;
83+
84+
if (isset($this->writeListeners[$key])) {
85+
unset($this->writeListeners[$key]);
86+
$this->unsubscribeStreamEvent($stream, Event::WRITE);
87+
}
88+
}
89+
90+
/**
91+
* {@inheritdoc}
92+
*/
93+
public function removeStream($stream)
94+
{
95+
$key = (int) $stream;
96+
97+
if (isset($this->streamEvents[$key])) {
98+
$this->streamEvents[$key]->free();
99+
100+
unset(
101+
$this->streamFlags[$key],
102+
$this->streamEvents[$key],
103+
$this->readListeners[$key],
104+
$this->writeListeners[$key]
105+
);
106+
}
107+
}
108+
109+
/**
110+
* {@inheritdoc}
111+
*/
112+
public function addTimer($interval, callable $callback)
113+
{
114+
$timer = new Timer($this, $interval, $callback, false);
115+
116+
$this->scheduleTimer($timer);
117+
118+
return $timer;
119+
}
120+
121+
/**
122+
* {@inheritdoc}
123+
*/
124+
public function addPeriodicTimer($interval, callable $callback)
125+
{
126+
$timer = new Timer($this, $interval, $callback, true);
127+
128+
$this->scheduleTimer($timer);
129+
130+
return $timer;
131+
}
132+
133+
/**
134+
* {@inheritdoc}
135+
*/
136+
public function cancelTimer(TimerInterface $timer)
137+
{
138+
if ($this->isTimerActive($timer)) {
139+
$this->timerEvents[$timer]->free();
140+
$this->timerEvents->detach($timer);
141+
}
142+
}
143+
144+
/**
145+
* {@inheritdoc}
146+
*/
147+
public function isTimerActive(TimerInterface $timer)
148+
{
149+
return $this->timerEvents->contains($timer);
150+
}
151+
152+
/**
153+
* {@inheritdoc}
154+
*/
155+
public function nextTick(callable $listener)
156+
{
157+
$this->nextTickQueue->add($listener);
158+
}
159+
160+
/**
161+
* {@inheritdoc}
162+
*/
163+
public function tick()
164+
{
165+
$this->nextTickQueue->tick();
166+
167+
// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
168+
@$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK);
169+
}
170+
171+
/**
172+
* {@inheritdoc}
173+
*/
174+
public function run()
175+
{
176+
$this->running = true;
177+
178+
while ($this->running) {
179+
$this->nextTickQueue->tick();
180+
181+
if (!$this->streamEvents && !$this->timerEvents->count()) {
182+
break;
183+
}
184+
185+
// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
186+
@$this->eventBase->loop(EventBase::LOOP_ONCE);
187+
}
188+
}
189+
190+
/**
191+
* {@inheritdoc}
192+
*/
193+
public function stop()
194+
{
195+
$this->running = false;
196+
}
197+
198+
/**
199+
* Schedule a timer for execution.
200+
*
201+
* @param TimerInterface $timer
202+
*/
203+
private function scheduleTimer(TimerInterface $timer)
204+
{
205+
$flags = Event::TIMEOUT;
206+
207+
if ($timer->isPeriodic()) {
208+
$flags |= Event::PERSIST;
209+
}
210+
211+
$event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
212+
$this->timerEvents[$timer] = $event;
213+
214+
$event->add($timer->getInterval());
215+
}
216+
217+
/**
218+
* Create a new ext-event Event object, or update the existing one.
219+
*
220+
* @param stream $stream
221+
* @param integer $flag Event::READ or Event::WRITE
222+
*/
223+
private function subscribeStreamEvent($stream, $flag)
224+
{
225+
$key = (int) $stream;
226+
227+
if (isset($this->streamEvents[$key])) {
228+
$event = $this->streamEvents[$key];
229+
$flags = ($this->streamFlags[$key] |= $flag);
230+
231+
$event->del();
232+
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
233+
} else {
234+
$event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
235+
236+
$this->streamEvents[$key] = $event;
237+
$this->streamFlags[$key] = $flag;
238+
}
239+
240+
$event->add();
241+
}
242+
243+
/**
244+
* Update the ext-event Event object for this stream to stop listening to
245+
* the given event type, or remove it entirely if it's no longer needed.
246+
*
247+
* @param stream $stream
248+
* @param integer $flag Event::READ or Event::WRITE
249+
*/
250+
private function unsubscribeStreamEvent($stream, $flag)
251+
{
252+
$key = (int) $stream;
253+
254+
$flags = $this->streamFlags[$key] &= ~$flag;
255+
256+
if (0 === $flags) {
257+
$this->removeStream($stream);
258+
259+
return;
260+
}
261+
262+
$event = $this->streamEvents[$key];
263+
264+
$event->del();
265+
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
266+
$event->add();
267+
}
268+
269+
/**
270+
* Create a callback used as the target of timer events.
271+
*
272+
* A reference is kept to the callback for the lifetime of the loop
273+
* to prevent "Cannot destroy active lambda function" fatal error from
274+
* the event extension.
275+
*/
276+
private function createTimerCallback()
277+
{
278+
$this->timerCallback = function ($_, $_, $timer) {
279+
call_user_func($timer->getCallback(), $timer);
280+
281+
if (!$timer->isPeriodic() && $this->isTimerActive($timer)) {
282+
$this->cancelTimer($timer);
283+
}
284+
};
285+
}
286+
287+
/**
288+
* Create a callback used as the target of stream events.
289+
*
290+
* A reference is kept to the callback for the lifetime of the loop
291+
* to prevent "Cannot destroy active lambda function" fatal error from
292+
* the event extension.
293+
*/
294+
private function createStreamCallback()
295+
{
296+
$this->streamCallback = function ($stream, $flags) {
297+
$key = (int) $stream;
298+
299+
if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) {
300+
call_user_func($this->readListeners[$key], $stream, $this);
301+
}
302+
303+
if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) {
304+
call_user_func($this->writeListeners[$key], $stream, $this);
305+
}
306+
};
307+
}
308+
}

src/React/EventLoop/LibEventLoop.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace React\EventLoop;
44

5+
use Event;
6+
use EventBase;
57
use React\EventLoop\Tick\NextTickQueue;
68
use React\EventLoop\Timer\Timer;
79
use React\EventLoop\Timer\TimerInterface;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace React\Tests\EventLoop;
4+
5+
use React\EventLoop\ExtEventLoop;
6+
7+
class ExtEventLoopTest extends AbstractLoopTest
8+
{
9+
public function createLoop()
10+
{
11+
if ('Linux' === PHP_OS && !extension_loaded('posix')) {
12+
$this->markTestSkipped('libevent tests skipped on linux due to linux epoll issues.');
13+
}
14+
15+
if (!extension_loaded('event')) {
16+
$this->markTestSkipped('ext-event tests skipped because ext-event is not installed.');
17+
}
18+
19+
return new ExtEventLoop();
20+
}
21+
22+
public function createStream()
23+
{
24+
// Use a FIFO on linux to get around lack of support for disk-based file
25+
// descriptors when using the EPOLL back-end.
26+
if ('Linux' === PHP_OS) {
27+
$this->fifoPath = tempnam(sys_get_temp_dir(), 'react-');
28+
29+
unlink($this->fifoPath);
30+
31+
posix_mkfifo($this->fifoPath, 0600);
32+
33+
$stream = fopen($this->fifoPath, 'r+');
34+
35+
// ext-event (as of 1.8.1) does not yet support in-memory temporary
36+
// streams. Setting maxmemory:0 and performing a write forces PHP to
37+
// back this temporary stream with a real file.
38+
//
39+
// This problem is mentioned at https://bugs.php.net/bug.php?id=64652&edit=3
40+
// but remains unresolved (despite that issue being closed).
41+
} else {
42+
$stream = fopen('php://temp/maxmemory:0', 'r+');
43+
44+
fwrite($stream, 'x');
45+
ftruncate($stream, 0);
46+
}
47+
48+
return $stream;
49+
}
50+
51+
public function writeToStream($stream, $content)
52+
{
53+
if ('Linux' !== PHP_OS) {
54+
return parent::writeToStream($stream, $content);
55+
}
56+
57+
fwrite($stream, $content);
58+
}
59+
}

0 commit comments

Comments
 (0)