diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..5e6f021 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: php +sudo: false +php: + - '5.6' + - '7.0' + - nightly +install: + - composer install +cache: + directories: + - vendor +script: + - php examples/simple.php diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 8bcefe9..4bd5427 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,6 +2,7 @@ namespace EProcess\Adapter; +use EMessenger\Transport\UnixTransport; use React\EventLoop\LoopInterface; abstract class BaseAdapter @@ -15,27 +16,34 @@ public function __construct(LoopInterface $loop) $this->node = uniqid('thread_'); } - protected function createUnixSocket() + public function getUnixSocketFile() { - $unixFile = sprintf('tmp/%s.sock', $this->node); - $unix = sprintf('unix://%s', $unixFile); - - $cleanup = function() use ($unixFile) { - $this->loop->stop(); - @unlink($unixFile); - }; - - register_shutdown_function($cleanup); - pcntl_signal(SIGINT, $cleanup); + return sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node); + } - return $unix; + public function getUnixSocketAddress() + { + return sprintf('unix://%s', $this->getUnixSocketFile()); } - protected function getNode() + protected function createUnixTransport() { - + if (!defined('EPROCESS_SOCKET_DIR')) { + throw new \RuntimeException('EPROCESS_SOCKET_DIR is not defined.'); + } + + if (!defined('EPROCESS_AUTOLOAD')) { + throw new \RuntimeException('EPROCESS_AUTOLOAD is not defined.'); + } + + if (!is_writable(EPROCESS_SOCKET_DIR)) { + throw new \RuntimeException(sprintf('Cannot write to "%s".', EPROCESS_SOCKET_DIR)); + } + + return new UnixTransport($this->loop, $this->getUnixSocketAddress()); } abstract public function create($class, array $data = []); + abstract public function kill(); } diff --git a/EProcess/Adapter/ChildProcess.php b/EProcess/Adapter/ChildProcess.php index c8a24e4..12f961e 100644 --- a/EProcess/Adapter/ChildProcess.php +++ b/EProcess/Adapter/ChildProcess.php @@ -2,15 +2,15 @@ namespace EProcess\Adapter; -use EProcess\Behaviour\UniversalSerializer; -use EProcess\MessengerFactory; - +use UniversalSerializer\UniversalSerializerTrait; +use EMessenger\MessengerFactory; +use EProcess\Stream\FullDrainStream; use React\ChildProcess\Process; use Symfony\Component\Process\PhpExecutableFinder; class ChildProcess extends BaseAdapter { - use UniversalSerializer; + use UniversalSerializerTrait; private $script = <<loop(\$loop); \$application->data(\$application->unserialize(base64_decode('%s'))); -\$messenger->emit('initialized', true); +\$messenger->send('initialized', true); try { \$application->run(); @@ -56,31 +56,36 @@ public function create($class, array $data = []) throw new \RuntimeException('Unable to find the PHP executable.'); } - $unix = $this->createUnixSocket(); - $messenger = MessengerFactory::server($unix, $this->loop); - - $file = sprintf(__DIR__ . '/../../tmp/%s.php', $this->node); + $transport = $this->createUnixTransport(); + $messenger = MessengerFactory::server($transport); - file_put_contents($file, sprintf( + $script = sprintf( $this->script, EPROCESS_AUTOLOAD, - $unix, + $this->getUnixSocketAddress(), $class, base64_encode($this->serialize($data)) - )); + ); + + $this->process = new Process($php); + $this->process->start($this->loop, 0.1); + + $this->process->stdin = new FullDrainStream( + $this->process->stdin->stream, + $this->loop + ); - $this->process = new Process(sprintf('exec %s %s', $php, realpath($file))); - $this->process->start($this->loop); + $this->process->stdin->write($script); - $this->loop->addTimer(3, function() use ($file) { - unlink($file); + $this->process->stdin->on('full-drain', function () { + $this->process->stdin->close(); }); - $this->process->stdout->on('data', function($data) { + $this->process->stdout->on('data', function ($data) { echo $data; }); - $this->process->stderr->on('data', function($data) { + $this->process->stderr->on('data', function ($data) { echo $data; }); diff --git a/EProcess/Adapter/PThreads.php b/EProcess/Adapter/PThreads.php index 6fa53e3..b934a5a 100644 --- a/EProcess/Adapter/PThreads.php +++ b/EProcess/Adapter/PThreads.php @@ -2,7 +2,7 @@ namespace EProcess\Adapter; -use EProcess\MessengerFactory; +use EMessenger\MessengerFactory; class PThreads extends BaseAdapter { @@ -10,10 +10,10 @@ class PThreads extends BaseAdapter public function create($class, array $data = []) { - $unix = $this->createUnixSocket(); - $messenger = MessengerFactory::server($unix, $this->loop); + $transport = $this->createUnixTransport(); + $messenger = MessengerFactory::server($transport); - $this->process = new Thread($unix, $class, $data); + $this->process = new Thread($transport, $class, $data); $this->process->start(PTHREADS_INHERIT_NONE); return $messenger; diff --git a/EProcess/Adapter/SymfonyProcess.php b/EProcess/Adapter/SymfonyProcess.php index ddc005e..d8f0578 100644 --- a/EProcess/Adapter/SymfonyProcess.php +++ b/EProcess/Adapter/SymfonyProcess.php @@ -2,13 +2,13 @@ namespace EProcess\Adapter; -use EProcess\Behaviour\UniversalSerializer; use Symfony\Component\Process\PhpProcess; -use EProcess\MessengerFactory; +use EMessenger\MessengerFactory; +use UniversalSerializer\UniversalSerializerTrait; class SymfonyProcess extends BaseAdapter { - use UniversalSerializer; + use UniversalSerializerTrait; private $script = <<loop(\$loop); \$application->data(\$application->unserialize(base64_decode('%s'))); -\$messenger->emit('initialized', true); +\$messenger->send('initialized', true); try { \$application->run(); @@ -44,15 +44,19 @@ class SymfonyProcess extends BaseAdapter } PHP; - private $loop; private $process; public function create($class, array $data = []) { - $unix = $this->createUnixSocket(); - $messenger = MessengerFactory::server($unix, $this->loop); - - $script = sprintf($this->script, EPROCESS_AUTOLOAD, $unix, $class, base64_encode($this->serialize($data))); + $transport = $this->createUnixTransport(); + $messenger = MessengerFactory::server($transport); + + $script = sprintf( + $this->script, + EPROCESS_AUTOLOAD, + $transport, $class, + base64_encode($this->serialize($data)) + ); $this->process = new PhpProcess($script); $this->process->start(); diff --git a/EProcess/Adapter/Thread.php b/EProcess/Adapter/Thread.php index 29a42c7..e38a88b 100644 --- a/EProcess/Adapter/Thread.php +++ b/EProcess/Adapter/Thread.php @@ -32,7 +32,7 @@ public function run() $application->loop($loop); $application->data($this->data); - $messenger->emit('initialized', true); + $messenger->send('initialized', true); try { $application->run(); diff --git a/EProcess/Application/Application.php b/EProcess/Application/Application.php index 8d0af36..961c190 100644 --- a/EProcess/Application/Application.php +++ b/EProcess/Application/Application.php @@ -2,24 +2,48 @@ namespace EProcess\Application; -use EProcess\Behaviour\UniversalSerializer; +use EMessenger\Message; +use EMessenger\Messenger; use EProcess\Behaviour\Workable; +use EProcess\Worker; use Evenement\EventEmitterTrait; +use MKraemer\ReactPCNTL\PCNTL; use React\EventLoop\LoopInterface; -use EProcess\Messenger; -use EProcess\Message; +use UniversalSerializer\UniversalSerializerTrait; abstract class Application { - use EventEmitterTrait { - EventEmitterTrait::emit as emitterEmit; - } - use UniversalSerializer; + use EventEmitterTrait; + use UniversalSerializerTrait; use Workable; private $loop; private $messenger; private $data; + private $pcntl; + private $workers = []; + + public function addWorker(Worker $worker) + { + $this->workers[] = $worker; + } + + public function cleanWorkers() + { + foreach ($this->workers as $worker) { + $worker->send('shutdown'); + unlink($worker->adapter()->getUnixSocketFile()); + } + } + + public function pcntl(PCNTL $pcntl = null) + { + if ($pcntl) { + $this->pcntl = $pcntl; + } + + return $this->pcntl; + } public function loop(LoopInterface $loop = null) { @@ -33,8 +57,8 @@ public function loop(LoopInterface $loop = null) public function messenger(Messenger $messenger = null) { if ($messenger) { - $messenger->on('message', function(Message $message) { - $this->emitterEmit($message->getEvent(), [$message->getContent()]); + $messenger->on('message', function (Message $message) { + $this->emit($message->getEvent(), [$message->getContent()]); }); $this->messenger = $messenger; @@ -52,9 +76,9 @@ public function data(array $data = null) return $this->data; } - public function emit($event, $data) + public function send($event, $data = '') { - $this->messenger->emit($event, $data); + $this->messenger->send($event, $data); } abstract public function run(); diff --git a/EProcess/Application/ApplicationFactory.php b/EProcess/Application/ApplicationFactory.php index a2d61e9..806c539 100644 --- a/EProcess/Application/ApplicationFactory.php +++ b/EProcess/Application/ApplicationFactory.php @@ -2,6 +2,8 @@ namespace EProcess\Application; +use EProcess\Worker; +use MKraemer\ReactPCNTL\PCNTL; use React\EventLoop\Factory; class ApplicationFactory @@ -29,6 +31,22 @@ public static function create($fqcn) $application = new $fqcn(); $application->loop($loop); + $shutdown = function() use ($application) { + $application->loop()->stop(); + $application->cleanWorkers(); + }; + + $pcntl = new PCNTL($loop); + $pcntl->on(SIGINT, $shutdown); + + $application->on('shutdown', $shutdown); + + $application->pcntl($pcntl); + + $application->on('worker.created', function(Worker $worker) use ($application) { + $application->addWorker($worker); + }); + return $application; } } diff --git a/EProcess/Behaviour/UniversalSerializer.php b/EProcess/Behaviour/UniversalSerializer.php deleted file mode 100644 index 14d4d2f..0000000 --- a/EProcess/Behaviour/UniversalSerializer.php +++ /dev/null @@ -1,90 +0,0 @@ -toArray(); - } elseif (!is_array($data)) { - $data = [$data]; - } - - foreach ($data as $key => $piece) { - switch (gettype($piece)) { - case 'object': - $pack[$key] = serialize([ - 'class' => get_class($piece), - 'data' => $this->findSerializer()->serialize($piece, $this->serializationFormat) - ]); - break; - - case 'array': - $pack[$key] = $this->serialize($piece); - break; - - default: - $pack[$key] = serialize($piece); - break; - } - } - - return serialize($pack); - } - - public function unserialize($data) - { - $unpack = []; - $data = is_array($data) ? $data : unserialize($data); - - foreach ($data as $key => $piece) { - $piece = unserialize($piece); - - switch (gettype($piece)) { - case 'array': - if (isset($piece['class'])) { - $unpack[$key] = $this->findSerializer()->deserialize($piece['data'], $piece['class'], $this->serializationFormat); - - if (is_a($piece['class'], ArrayCollection::class, true)) { - $unpack[$key] = new ArrayCollection($unpack[$key]); - } - } else { - $unpack[$key] = $this->unserialize($piece); - } - - break; - - default: - $unpack[$key] = $piece; - break; - } - } - - return 1 === count($unpack) ? current($unpack) : $unpack; - } - - public function findSerializer() - { - if ($this->sharedSerializer) { - return $this->sharedSerializer; - } elseif (isset($this->serializer)) { - return $this->serializer; - } elseif (method_exists($this, 'serializer')) { - return $this->serializer(); - } else { - $this->sharedSerializer = SerializerBuilder::create()->build(); - - return $this->sharedSerializer; - } - } -} diff --git a/EProcess/Behaviour/Workable.php b/EProcess/Behaviour/Workable.php index 2b10f9a..a81c0f2 100644 --- a/EProcess/Behaviour/Workable.php +++ b/EProcess/Behaviour/Workable.php @@ -8,7 +8,16 @@ trait Workable { public function createWorker($fqcn, array $data = []) { - return new Worker($this->loop(), $fqcn, extension_loaded('pthreads') ? 'pthreads' : 'child_process', $data); + $worker = new Worker( + $this->loop(), + $fqcn, + extension_loaded('pthreads') ? 'pthreads' : 'child_process', + $data + ); + + $this->emit('worker.created', [$worker]); + + return $worker; } abstract public function loop(); diff --git a/EProcess/Message.php b/EProcess/Message.php deleted file mode 100644 index ca2d837..0000000 --- a/EProcess/Message.php +++ /dev/null @@ -1,46 +0,0 @@ -event = $event; - $this->content = $content; - } - - public function getContent() - { - return $this->content; - } - - public function getEvent() - { - return $this->event; - } - - public function serialize() - { - return json_encode([ - 'event' => $this->event, - 'content' => base64_encode($this->content) - ]); - } - - public function unserialize($data) - { - $data = json_decode($data, true); - - $this->event = $data['event']; - $this->content = base64_decode($data['content']); - } - - public function __toString() - { - return $this->serialize(); - } -} diff --git a/EProcess/Messenger.php b/EProcess/Messenger.php deleted file mode 100644 index 956d376..0000000 --- a/EProcess/Messenger.php +++ /dev/null @@ -1,49 +0,0 @@ -connection = $connection; - - $this->connection->on('message', function($message) { - $data = json_decode($message, true); - $message = new Message($data['event'], $this->unserialize(base64_decode($data['content']))); - - $this->emitterEmit('message', [$message]); - $this->emitterEmit($message->getEvent(), [$message->getContent()]); - }); - - $this->connection->on('close', array($this, 'close')); - - if ($this->connection instanceof ServerInterface) { - $this->connection->listen(); - } else { - $this->connection->connect(); - } - } - - public function emit($event, $data) - { - $this->connection->send((string) new Message($event, $this->serialize($data))); - } - - public function close() - { - $this->emitterEmit('close'); - } -} diff --git a/EProcess/MessengerFactory.php b/EProcess/MessengerFactory.php deleted file mode 100644 index 56e7dce..0000000 --- a/EProcess/MessengerFactory.php +++ /dev/null @@ -1,20 +0,0 @@ -buffer->on('full-drain', function () { + $this->emit('full-drain', array($this)); + }); + } +} \ No newline at end of file diff --git a/EProcess/Worker.php b/EProcess/Worker.php index 0b6f70a..66ebaf7 100644 --- a/EProcess/Worker.php +++ b/EProcess/Worker.php @@ -9,12 +9,11 @@ use Evenement\EventEmitterTrait; use React\EventLoop\LoopInterface; use React\EventLoop\Timer\Timer; +use EMessenger\Message; class Worker { - use EventEmitterTrait { - EventEmitterTrait::emit as emitterEmit; - } + use EventEmitterTrait; private $loop; private $adapter; @@ -32,16 +31,12 @@ public function __construct(LoopInterface $loop, $class, $adapter = null, array $this->messenger = $this->adapter->create($class, $data); $this->messenger->on('message', function(Message $message) { - $this->emitterEmit($message->getEvent(), [$message->getContent()]); + $this->emit($message->getEvent(), [$message->getContent()]); }); $this->messenger()->on('initialized', function() { $this->initialized = true; }); - - register_shutdown_function(function() { - $this->kill(); - }); } public function kill() @@ -54,14 +49,19 @@ public function messenger() return $this->messenger; } - public function emit($event, $data) + public function adapter() + { + return $this->adapter; + } + + public function send($event, $data = []) { if ($this->initialized) { - $this->messenger->emit($event, $data); + $this->messenger->send($event, $data); } else { - $this->loop->addPeriodicTimer(0.1, function(Timer $timer) use ($event, $data) { + $this->loop->addPeriodicTimer(0.001, function(Timer $timer) use ($event, $data) { if ($this->initialized) { - $this->messenger->emit($event, $data); + $this->messenger->send($event, $data); $timer->cancel(); } }); diff --git a/LICENSE b/LICENSE index d6c5ff1..c44ea27 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2014 Evgeniy Guseletov +Copyright (c) 2014-2016 Yevhenii Huselietov Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 2ae0cdb..97c7367 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,14 @@ EProcess ======== +[![Build Status](https://travis-ci.org/cursedcoder/eprocess.svg?branch=master)](https://travis-ci.org/cursedcoder/eprocess) + The idea is to have multiple non-blocking contexts with a transparent inter-process communication out of the box. This lib is just a PoC, use at your own risk. Check out examples in `examples/` dir. -No tests – no problems. - Features ======== * 3 adapters: child process (react), pthreads, symfony process (not tested) @@ -28,6 +28,10 @@ Install&try Example explains features ========================= +Be aware this snippet below is only for explanatory reasons and will not work out (or at least yet). + +For real examples see `exampes/simple.php` and related. + ```php use EProcess\Application\Application; use EProcess\Application\ApplicationFactory; @@ -44,7 +48,7 @@ class Main extends Application // like that one in c++ public function run() { $worker = $this->createWorker(MyWorker::class); // create external non-blocking thread of MyWorker class - $worker->emit('any_event', 'Hello my worker!'); + $worker->send('any_event', 'Hello my worker!'); $worker->on('hello_master', function() { // Receive back-call from child }); @@ -58,10 +62,12 @@ class MyWorker extends Application $this->on('any_event', function($data) { echo 'Got any_event event from my master: ' . $data; // data == Hello my worker // Still we can send any event back to master - $this->emit('hello_master'); - $this->emit('send-any-data', new Data()); // you can send any object, array or scalar + $this->send('hello_master'); + $this->send('send-any-data', new Data()); // you can send any object, array or scalar // object should have jms serializer metadata to be serialized }); + + $this->getSubscribedEvents(); } } @@ -69,10 +75,3 @@ ApplicationFactory::launch(Main::class); ``` You need to have proper autoloading established in order to use this example. - -ToDo -======= - -* Cleanup unix sockets on worker down. -* Refactor adapters, remove hacks, pass php code directly with stdin instead of file. -* Can IPC be done without unix sockets or using only single socket? diff --git a/composer.json b/composer.json index 37b7c1b..7b3d352 100644 --- a/composer.json +++ b/composer.json @@ -12,16 +12,19 @@ ], "homepage": "https://github.com/cursedcoder/eprocess", "require": { - "php": ">=5.5", + "php": ">=5.6", "react/react": "~0.4@dev", "jms/serializer": "^1.1", "doctrine/collections": "~1.3", "concerto/comms": "~0.8", - "symfony/process": "^3.0" + "symfony/process": "^3.0", + "mkraemer/react-pcntl": "^2.0", + "cursedcoder/emessenger": "^0.1.0" }, "license": "MIT", "require-dev": { - "divi/pthreads-stub": "dev-master" + "divi/pthreads-stub": "dev-master", + "phpunit/phpunit": "^5.3" }, "autoload": { "psr-4": { @@ -30,7 +33,8 @@ }, "autoload-dev": { "psr-4": { - "Examples\\": "examples/" + "Examples\\": "examples/", + "Tests\\": "tests/" } } } diff --git a/examples/Simple/Bank.php b/examples/Simple/Bank.php index 41f0a99..72185da 100644 --- a/examples/Simple/Bank.php +++ b/examples/Simple/Bank.php @@ -10,7 +10,7 @@ class Bank extends Application public function run() { $this->loop()->addPeriodicTimer(1.5, function() { - $this->emit('transaction', new Transaction( + $this->send('transaction', new Transaction( ['usd', 'eur'][rand(0, 1)], rand(10, 250) )); diff --git a/examples/Simple/Crawler.php b/examples/Simple/Crawler.php index 6a57549..5e71e7b 100644 --- a/examples/Simple/Crawler.php +++ b/examples/Simple/Crawler.php @@ -15,7 +15,7 @@ public function crawl($url) { $crawl = function() use ($url) { $data = file_get_contents($url); - $this->emit('result', $data); + $this->send('result', $data); }; $this->loop()->addPeriodicTimer(5, $crawl); diff --git a/examples/Simple/Main.php b/examples/Simple/Main.php index 25e8f7c..7d3179b 100644 --- a/examples/Simple/Main.php +++ b/examples/Simple/Main.php @@ -14,7 +14,7 @@ class Main extends Application public function run() { $this->crawler = $this->createWorker(Crawler::class); - $this->crawler->emit('crawl', 'http://google.com/'); + $this->crawler->send('crawl', 'http://google.com/'); $this->crawler->on('result', function($data) { echo '[Crawler] Got new result: ' . strlen($data) . ' chars' . PHP_EOL; }); @@ -32,5 +32,9 @@ public function run() $transaction->getCurrency() ); }); + + $this->loop()->addTimer(5, function() { + $this->loop()->stop(); + }); } } diff --git a/examples/Simple/SomeThing.php b/examples/Simple/SomeThing.php index a731338..80517d9 100644 --- a/examples/Simple/SomeThing.php +++ b/examples/Simple/SomeThing.php @@ -9,7 +9,7 @@ class SomeThing extends Application public function run() { $this->loop()->addPeriodicTimer(2.5, function() { - $this->emit('status', 'I am here too!'); + $this->send('status', 'I am here too!'); }); } } diff --git a/examples/autoload.php b/examples/autoload.php index f8d059a..6c7a415 100644 --- a/examples/autoload.php +++ b/examples/autoload.php @@ -1,8 +1,7 @@ + + + + + + + + + + + tests + + + diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..a053460 --- /dev/null +++ b/todo.md @@ -0,0 +1,50 @@ +```php +use EProcess\Application\Application; +use EProcess\Application\ApplicationFactory; + +class Data +{ + // jms serializer metadata + private $id; + // setters getters etc. +} + +class Main extends Application // like that one in c++ +{ + public function run() + { + $worker = $this->createWorker(MyWorker::class); // create external non-blocking thread of MyWorker class + $worker->send('any_event', 'Hello my worker!'); + $worker->subscribe('hello_master', function() { + // Receive back-call from child + }); + + // tracker + $worker->getTracker()->getMemoryUsage(); + + // logger + $worker->getLogger()->getLogs(); + + // remote worker + $remoteWorker = $this->connectWorker(); + } +} + +class MyWorker extends Application +{ + public function run() + { + $this->subscribe('any_event', function($data) { + echo 'Got any_event event from my master: ' . $data; // data == Hello my worker + // Still we can send any event back to master + $this->send('hello_master'); + $this->send('send-any-data', new Data()); // you can send any object, array or scalar + // object should have jms serializer metadata to be serialized + }); + + $this->getSubscribedEvents(); + } +} + +ApplicationFactory::launch(Main::class); +``` \ No newline at end of file