From e4fa24e4412013b79296268dab38f1ef1b4e110c Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 12:52:03 +0800 Subject: [PATCH 01/23] Remove artifact --- EProcess/Adapter/BaseAdapter.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 8bcefe9..3764de1 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -31,11 +31,6 @@ protected function createUnixSocket() return $unix; } - protected function getNode() - { - - } - abstract public function create($class, array $data = []); abstract public function kill(); } From 8a90735f8eb48bf50d1358f2481b01f82aa3b16d Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:15:22 +0800 Subject: [PATCH 02/23] Add tests for universal serializer trait --- EProcess/Behaviour/UniversalSerializer.php | 8 ++- composer.json | 6 +- phpunit.xml.dist | 20 +++++++ .../Behaviour/UniversalSerializerTest.php | 57 +++++++++++++++++++ 4 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 phpunit.xml.dist create mode 100644 tests/EProcess/Behaviour/UniversalSerializerTest.php diff --git a/EProcess/Behaviour/UniversalSerializer.php b/EProcess/Behaviour/UniversalSerializer.php index 14d4d2f..8a2dcb7 100644 --- a/EProcess/Behaviour/UniversalSerializer.php +++ b/EProcess/Behaviour/UniversalSerializer.php @@ -13,6 +13,7 @@ trait UniversalSerializer public function serialize($data) { $pack = []; + $type = gettype($data); if ($data instanceof ArrayCollection) { $data = $data->toArray(); @@ -20,6 +21,8 @@ public function serialize($data) $data = [$data]; } + $data['type'] = $type; + foreach ($data as $key => $piece) { switch (gettype($piece)) { case 'object': @@ -47,6 +50,9 @@ public function unserialize($data) $unpack = []; $data = is_array($data) ? $data : unserialize($data); + $type = unserialize($data['type']); + unset($data['type']); + foreach ($data as $key => $piece) { $piece = unserialize($piece); @@ -70,7 +76,7 @@ public function unserialize($data) } } - return 1 === count($unpack) ? current($unpack) : $unpack; + return $type !== 'array' && 1 === count($unpack) ? current($unpack) : $unpack; } public function findSerializer() diff --git a/composer.json b/composer.json index 37b7c1b..69c0530 100644 --- a/composer.json +++ b/composer.json @@ -21,7 +21,8 @@ }, "license": "MIT", "require-dev": { - "divi/pthreads-stub": "dev-master" + "divi/pthreads-stub": "dev-master", + "phpunit/phpunit": "^5.3" }, "autoload": { "psr-4": { @@ -30,7 +31,8 @@ }, "autoload-dev": { "psr-4": { - "Examples\\": "examples/" + "Examples\\": "examples/", + "Tests\\": "tests/" } } } diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..b038323 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,20 @@ + + + + + + + + + + + + tests + + + diff --git a/tests/EProcess/Behaviour/UniversalSerializerTest.php b/tests/EProcess/Behaviour/UniversalSerializerTest.php new file mode 100644 index 0000000..5f98926 --- /dev/null +++ b/tests/EProcess/Behaviour/UniversalSerializerTest.php @@ -0,0 +1,57 @@ +assertData(['abcde' => 'dbce']); + } + + /** + * @test + */ + public function should_serialize_scalar() + { + $this->assertData('asdasd'); + } + + /** + * @test + */ + public function should_serialize_integer() + { + $this->assertData(5123123); + } + + /** + * @test + */ + public function should_serialize_object() + { + $this->assertData(new Transaction('EUR', 1235)); + } + + private function assertData($data) + { + $serializer = new SomeSerializer(); + + $serialized = $serializer->serialize($data); + $unserialized = $serializer->unserialize($serialized); + + $this->assertEquals($data, $unserialized); + } +} + +class SomeSerializer +{ + use UniversalSerializer; +} + From b0318affb0f8a81083057739bee44cbcad08a1a9 Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:17:15 +0800 Subject: [PATCH 03/23] Add complex test --- tests/EProcess/Behaviour/UniversalSerializerTest.php | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/EProcess/Behaviour/UniversalSerializerTest.php b/tests/EProcess/Behaviour/UniversalSerializerTest.php index 5f98926..e95dda0 100644 --- a/tests/EProcess/Behaviour/UniversalSerializerTest.php +++ b/tests/EProcess/Behaviour/UniversalSerializerTest.php @@ -39,6 +39,18 @@ public function should_serialize_object() $this->assertData(new Transaction('EUR', 1235)); } + /** + * @test + */ + public function should_serialize_all() + { + $this->assertData([ + 'abcde', // scalar + [1 => 'ok'], // array, + [2 => ['ok', new Transaction('USD', 555)]] // object + ]); + } + private function assertData($data) { $serializer = new SomeSerializer(); From 248ea881740fe02c53f2b33323f841cdd3e64ecf Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:18:49 +0800 Subject: [PATCH 04/23] Add travis --- .travis.yml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..6a58344 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,7 @@ +language: php +php: + - '5.5' + - '5.6' + - '7.0' + - hhvm + - nightly From cb1c627eb8360b12d6f0c0b5868ec95e5fdc2b53 Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:23:23 +0800 Subject: [PATCH 05/23] Update travis --- .travis.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.travis.yml b/.travis.yml index 6a58344..d1b2d02 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,13 @@ language: php +sudo: false php: - '5.5' - '5.6' - '7.0' - hhvm - nightly +install: + - composer install +cache: + directories: + - vendor From 4839bcbfaf1ef041b3c430cd198fff492324e1ef Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:24:53 +0800 Subject: [PATCH 06/23] Bump php to 5.6 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 69c0530..ab1b713 100644 --- a/composer.json +++ b/composer.json @@ -12,7 +12,7 @@ ], "homepage": "https://github.com/cursedcoder/eprocess", "require": { - "php": ">=5.5", + "php": ">=5.6", "react/react": "~0.4@dev", "jms/serializer": "^1.1", "doctrine/collections": "~1.3", From 498dfd5607c27af2a425e38d5fbdad0da4053840 Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:25:07 +0800 Subject: [PATCH 07/23] Remove build for 5.5 --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index d1b2d02..7a06f41 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: php sudo: false php: - - '5.5' - '5.6' - '7.0' - hhvm From 6c22914baa4795522fd33788154829f6f378f5f8 Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Sun, 3 Apr 2016 13:27:23 +0800 Subject: [PATCH 08/23] Add travis label --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 2ae0cdb..013f6fb 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ EProcess ======== +[![Build Status](https://travis-ci.org/cursedcoder/eprocess.svg?branch=master)](https://travis-ci.org/cursedcoder/eprocess) + The idea is to have multiple non-blocking contexts with a transparent inter-process communication out of the box. This lib is just a PoC, use at your own risk. From e9eaba95d47f12d6d83cf8c27ab41a41264c5c4b Mon Sep 17 00:00:00 2001 From: Yevhenii Huselietov Date: Sun, 3 Apr 2016 13:28:11 +0800 Subject: [PATCH 09/23] Update ToDo --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 013f6fb..e5f8b08 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,5 @@ You need to have proper autoloading established in order to use this example. ToDo ======= -* Cleanup unix sockets on worker down. * Refactor adapters, remove hacks, pass php code directly with stdin instead of file. * Can IPC be done without unix sockets or using only single socket? From 49ea9cb4a2cb27a028978b4b5cd4c369c511b503 Mon Sep 17 00:00:00 2001 From: vagrant Date: Mon, 4 Apr 2016 12:00:37 +0000 Subject: [PATCH 10/23] added custom definition for sockets folder --- EProcess/Adapter/BaseAdapter.php | 11 ++++++++++- examples/simple.php | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 3764de1..3251d68 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,6 +2,7 @@ namespace EProcess\Adapter; +use Ratchet\Wamp\Exception; use React\EventLoop\LoopInterface; abstract class BaseAdapter @@ -17,7 +18,15 @@ public function __construct(LoopInterface $loop) protected function createUnixSocket() { - $unixFile = sprintf('tmp/%s.sock', $this->node); + if(!defined('SOCKET_PATH')) + throw new \Exception("SOCKET_PATH is not defined."); + + if(!is_writable(SOCKET_PATH)){ + if(!mkdir(SOCKET_PATH)) + throw new \Exception("Cannot create folder at SOCKET_PATH."); + } + + $unixFile = sprintf('%s/%s.sock', SOCKET_PATH, $this->node); $unix = sprintf('unix://%s', $unixFile); $cleanup = function() use ($unixFile) { diff --git a/examples/simple.php b/examples/simple.php index cc0ba32..3c90106 100644 --- a/examples/simple.php +++ b/examples/simple.php @@ -1,5 +1,7 @@ Date: Mon, 4 Apr 2016 12:06:29 +0000 Subject: [PATCH 11/23] added custom definition for sockets folder --- EProcess/Adapter/BaseAdapter.php | 11 ++++++++++- examples/simple.php | 2 ++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 3764de1..3251d68 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,6 +2,7 @@ namespace EProcess\Adapter; +use Ratchet\Wamp\Exception; use React\EventLoop\LoopInterface; abstract class BaseAdapter @@ -17,7 +18,15 @@ public function __construct(LoopInterface $loop) protected function createUnixSocket() { - $unixFile = sprintf('tmp/%s.sock', $this->node); + if(!defined('SOCKET_PATH')) + throw new \Exception("SOCKET_PATH is not defined."); + + if(!is_writable(SOCKET_PATH)){ + if(!mkdir(SOCKET_PATH)) + throw new \Exception("Cannot create folder at SOCKET_PATH."); + } + + $unixFile = sprintf('%s/%s.sock', SOCKET_PATH, $this->node); $unix = sprintf('unix://%s', $unixFile); $cleanup = function() use ($unixFile) { diff --git a/examples/simple.php b/examples/simple.php index cc0ba32..3c90106 100644 --- a/examples/simple.php +++ b/examples/simple.php @@ -1,5 +1,7 @@ Date: Mon, 4 Apr 2016 12:14:16 +0000 Subject: [PATCH 12/23] small fix --- EProcess/Adapter/BaseAdapter.php | 1 - 1 file changed, 1 deletion(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 3251d68..ebabfe7 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,7 +2,6 @@ namespace EProcess\Adapter; -use Ratchet\Wamp\Exception; use React\EventLoop\LoopInterface; abstract class BaseAdapter From fe5855e67a19b458da333631519c25e337f7e95a Mon Sep 17 00:00:00 2001 From: 421p Date: Mon, 4 Apr 2016 13:00:04 +0000 Subject: [PATCH 13/23] added check for EPROCESS_AUTOLOAD, fixed formatting --- EProcess/Adapter/BaseAdapter.php | 21 +++++++++++++-------- examples/autoload.php | 1 + examples/simple.php | 2 -- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index ebabfe7..58afe2c 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -17,18 +17,22 @@ public function __construct(LoopInterface $loop) protected function createUnixSocket() { - if(!defined('SOCKET_PATH')) - throw new \Exception("SOCKET_PATH is not defined."); - - if(!is_writable(SOCKET_PATH)){ - if(!mkdir(SOCKET_PATH)) - throw new \Exception("Cannot create folder at SOCKET_PATH."); + if (!defined('EPROCESS_SOCKET_DIR')) { + throw new \Exception("EPROCESS_SOCKET_DIR is not defined."); } - $unixFile = sprintf('%s/%s.sock', SOCKET_PATH, $this->node); + if (!defined('EPROCESS_AUTOLOAD')) { + throw new \Exception('EPROCESS_AUTOLOAD is not defined.'); + } + + if (!is_writable(EPROCESS_SOCKET_DIR)) { + throw new \Exception(sprintf("Cannot write to %s.", EPROCESS_SOCKET_DIR)); + } + + $unixFile = sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node); $unix = sprintf('unix://%s', $unixFile); - $cleanup = function() use ($unixFile) { + $cleanup = function () use ($unixFile) { $this->loop->stop(); @unlink($unixFile); }; @@ -40,5 +44,6 @@ protected function createUnixSocket() } abstract public function create($class, array $data = []); + abstract public function kill(); } diff --git a/examples/autoload.php b/examples/autoload.php index f8d059a..f754fff 100644 --- a/examples/autoload.php +++ b/examples/autoload.php @@ -3,6 +3,7 @@ declare(ticks = 1); define('EPROCESS_AUTOLOAD', __FILE__); +define('EPROCESS_SOCKET_DIR', '/tmp/eprocess'); $loader = require __DIR__ . '/../vendor/autoload.php'; diff --git a/examples/simple.php b/examples/simple.php index 3c90106..cc0ba32 100644 --- a/examples/simple.php +++ b/examples/simple.php @@ -1,7 +1,5 @@ Date: Mon, 4 Apr 2016 21:51:48 +0800 Subject: [PATCH 14/23] Use relative path --- examples/autoload.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/autoload.php b/examples/autoload.php index f754fff..9ff4e6f 100644 --- a/examples/autoload.php +++ b/examples/autoload.php @@ -3,7 +3,7 @@ declare(ticks = 1); define('EPROCESS_AUTOLOAD', __FILE__); -define('EPROCESS_SOCKET_DIR', '/tmp/eprocess'); +define('EPROCESS_SOCKET_DIR', __DIR__ . '/../tmp/'); $loader = require __DIR__ . '/../vendor/autoload.php'; From e02736c3d90e85078fb3dda476396ddcd4a4c3d9 Mon Sep 17 00:00:00 2001 From: Yevhenii Huselietov Date: Mon, 4 Apr 2016 21:54:04 +0800 Subject: [PATCH 15/23] Use Runtime exceptions --- EProcess/Adapter/BaseAdapter.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 58afe2c..b5934df 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -18,15 +18,15 @@ public function __construct(LoopInterface $loop) protected function createUnixSocket() { if (!defined('EPROCESS_SOCKET_DIR')) { - throw new \Exception("EPROCESS_SOCKET_DIR is not defined."); + throw new \RuntimeException('EPROCESS_SOCKET_DIR is not defined.'); } if (!defined('EPROCESS_AUTOLOAD')) { - throw new \Exception('EPROCESS_AUTOLOAD is not defined.'); + throw new \RuntimeException('EPROCESS_AUTOLOAD is not defined.'); } if (!is_writable(EPROCESS_SOCKET_DIR)) { - throw new \Exception(sprintf("Cannot write to %s.", EPROCESS_SOCKET_DIR)); + throw new \RuntimeException(sprintf('Cannot write to "%s".', EPROCESS_SOCKET_DIR)); } $unixFile = sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node); From 4baa105a626975183919b7c4ce441a2b3172b966 Mon Sep 17 00:00:00 2001 From: Yevhenii Huselietov Date: Thu, 7 Apr 2016 01:36:02 +0800 Subject: [PATCH 16/23] Use react pcntl extension to get rid of cleanup --- EProcess/Adapter/BaseAdapter.php | 17 +++++------ EProcess/Adapter/ChildProcess.php | 2 +- EProcess/Application/Application.php | 34 ++++++++++++++++++--- EProcess/Application/ApplicationFactory.php | 18 +++++++++++ EProcess/Behaviour/Workable.php | 11 ++++++- EProcess/Messenger.php | 2 +- EProcess/Worker.php | 11 ++++--- composer.json | 3 +- examples/autoload.php | 2 -- 9 files changed, 75 insertions(+), 25 deletions(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index b5934df..51c7021 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,6 +2,7 @@ namespace EProcess\Adapter; +use EProcess\Terminator; use React\EventLoop\LoopInterface; abstract class BaseAdapter @@ -15,6 +16,11 @@ public function __construct(LoopInterface $loop) $this->node = uniqid('thread_'); } + public function getUnixSocketFile() + { + return sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node); + } + protected function createUnixSocket() { if (!defined('EPROCESS_SOCKET_DIR')) { @@ -29,16 +35,7 @@ protected function createUnixSocket() throw new \RuntimeException(sprintf('Cannot write to "%s".', EPROCESS_SOCKET_DIR)); } - $unixFile = sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node); - $unix = sprintf('unix://%s', $unixFile); - - $cleanup = function () use ($unixFile) { - $this->loop->stop(); - @unlink($unixFile); - }; - - register_shutdown_function($cleanup); - pcntl_signal(SIGINT, $cleanup); + $unix = sprintf('unix://%s', $this->getUnixSocketFile()); return $unix; } diff --git a/EProcess/Adapter/ChildProcess.php b/EProcess/Adapter/ChildProcess.php index c8a24e4..e7cb9b9 100644 --- a/EProcess/Adapter/ChildProcess.php +++ b/EProcess/Adapter/ChildProcess.php @@ -72,7 +72,7 @@ public function create($class, array $data = []) $this->process = new Process(sprintf('exec %s %s', $php, realpath($file))); $this->process->start($this->loop); - $this->loop->addTimer(3, function() use ($file) { + $this->loop->addTimer(1, function() use ($file) { unlink($file); }); diff --git a/EProcess/Application/Application.php b/EProcess/Application/Application.php index 8d0af36..dafe91d 100644 --- a/EProcess/Application/Application.php +++ b/EProcess/Application/Application.php @@ -4,10 +4,12 @@ use EProcess\Behaviour\UniversalSerializer; use EProcess\Behaviour\Workable; +use EProcess\Message; +use EProcess\Messenger; +use EProcess\Worker; use Evenement\EventEmitterTrait; +use MKraemer\ReactPCNTL\PCNTL; use React\EventLoop\LoopInterface; -use EProcess\Messenger; -use EProcess\Message; abstract class Application { @@ -20,6 +22,30 @@ abstract class Application private $loop; private $messenger; private $data; + private $pcntl; + private $workers = []; + + public function addWorker(Worker $worker) + { + $this->workers[] = $worker; + } + + public function cleanWorkers() + { + foreach ($this->workers as $worker) { + $worker->emit('shutdown'); + unlink($worker->adapter()->getUnixSocketFile()); + } + } + + public function pcntl(PCNTL $pcntl = null) + { + if ($pcntl) { + $this->pcntl = $pcntl; + } + + return $this->pcntl; + } public function loop(LoopInterface $loop = null) { @@ -33,7 +59,7 @@ public function loop(LoopInterface $loop = null) public function messenger(Messenger $messenger = null) { if ($messenger) { - $messenger->on('message', function(Message $message) { + $messenger->on('message', function (Message $message) { $this->emitterEmit($message->getEvent(), [$message->getContent()]); }); @@ -52,7 +78,7 @@ public function data(array $data = null) return $this->data; } - public function emit($event, $data) + public function emit($event, $data = '') { $this->messenger->emit($event, $data); } diff --git a/EProcess/Application/ApplicationFactory.php b/EProcess/Application/ApplicationFactory.php index a2d61e9..806c539 100644 --- a/EProcess/Application/ApplicationFactory.php +++ b/EProcess/Application/ApplicationFactory.php @@ -2,6 +2,8 @@ namespace EProcess\Application; +use EProcess\Worker; +use MKraemer\ReactPCNTL\PCNTL; use React\EventLoop\Factory; class ApplicationFactory @@ -29,6 +31,22 @@ public static function create($fqcn) $application = new $fqcn(); $application->loop($loop); + $shutdown = function() use ($application) { + $application->loop()->stop(); + $application->cleanWorkers(); + }; + + $pcntl = new PCNTL($loop); + $pcntl->on(SIGINT, $shutdown); + + $application->on('shutdown', $shutdown); + + $application->pcntl($pcntl); + + $application->on('worker.created', function(Worker $worker) use ($application) { + $application->addWorker($worker); + }); + return $application; } } diff --git a/EProcess/Behaviour/Workable.php b/EProcess/Behaviour/Workable.php index 2b10f9a..653b2a0 100644 --- a/EProcess/Behaviour/Workable.php +++ b/EProcess/Behaviour/Workable.php @@ -8,7 +8,16 @@ trait Workable { public function createWorker($fqcn, array $data = []) { - return new Worker($this->loop(), $fqcn, extension_loaded('pthreads') ? 'pthreads' : 'child_process', $data); + $worker = new Worker( + $this->loop(), + $fqcn, + extension_loaded('pthreads') ? 'pthreads' : 'child_process', + $data + ); + + $this->emitterEmit('worker.created', [$worker]); + + return $worker; } abstract public function loop(); diff --git a/EProcess/Messenger.php b/EProcess/Messenger.php index 956d376..a5f84d7 100644 --- a/EProcess/Messenger.php +++ b/EProcess/Messenger.php @@ -37,7 +37,7 @@ public function __construct($connection) } } - public function emit($event, $data) + public function emit($event, $data = []) { $this->connection->send((string) new Message($event, $this->serialize($data))); } diff --git a/EProcess/Worker.php b/EProcess/Worker.php index 0b6f70a..c2b7314 100644 --- a/EProcess/Worker.php +++ b/EProcess/Worker.php @@ -38,10 +38,6 @@ public function __construct(LoopInterface $loop, $class, $adapter = null, array $this->messenger()->on('initialized', function() { $this->initialized = true; }); - - register_shutdown_function(function() { - $this->kill(); - }); } public function kill() @@ -54,7 +50,12 @@ public function messenger() return $this->messenger; } - public function emit($event, $data) + public function adapter() + { + return $this->adapter; + } + + public function emit($event, $data = []) { if ($this->initialized) { $this->messenger->emit($event, $data); diff --git a/composer.json b/composer.json index ab1b713..1791768 100644 --- a/composer.json +++ b/composer.json @@ -17,7 +17,8 @@ "jms/serializer": "^1.1", "doctrine/collections": "~1.3", "concerto/comms": "~0.8", - "symfony/process": "^3.0" + "symfony/process": "^3.0", + "mkraemer/react-pcntl": "^2.0" }, "license": "MIT", "require-dev": { diff --git a/examples/autoload.php b/examples/autoload.php index 9ff4e6f..c97101d 100644 --- a/examples/autoload.php +++ b/examples/autoload.php @@ -1,7 +1,5 @@ Date: Thu, 7 Apr 2016 14:20:30 +0800 Subject: [PATCH 17/23] Well, there is some tests now... --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index e5f8b08..5dfb029 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,6 @@ This lib is just a PoC, use at your own risk. Check out examples in `examples/` dir. -No tests – no problems. - Features ======== * 3 adapters: child process (react), pthreads, symfony process (not tested) From fc1d86d3fe016ddbc82adb66acde3af497b5540a Mon Sep 17 00:00:00 2001 From: Yevhenii Huselietov Date: Thu, 7 Apr 2016 14:26:35 +0800 Subject: [PATCH 18/23] Note about example --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 5dfb029..8663339 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,10 @@ Install&try Example explains features ========================= +Be aware this snippet below is only for explanatory reasons and will not work out (or at least yet). + +For real examples see `exampes/simple.php` and related. + ```php use EProcess\Application\Application; use EProcess\Application\ApplicationFactory; From ecdd633a636601843c73aea963a5581e2d0c3a35 Mon Sep 17 00:00:00 2001 From: Yevhenii Huselietov Date: Sun, 10 Apr 2016 03:24:09 +0800 Subject: [PATCH 19/23] Remove hacks, pass script directly to stdin --- EProcess/Adapter/BaseAdapter.php | 1 - EProcess/Adapter/ChildProcess.php | 17 +++++----- EProcess/Adapter/SymfonyProcess.php | 1 - EProcess/Stream/FullDrainSteam.php | 51 +++++++++++++++++++++++++++++ README.md | 6 ---- 5 files changed, 60 insertions(+), 16 deletions(-) create mode 100644 EProcess/Stream/FullDrainSteam.php diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index 51c7021..f76dd7d 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,7 +2,6 @@ namespace EProcess\Adapter; -use EProcess\Terminator; use React\EventLoop\LoopInterface; abstract class BaseAdapter diff --git a/EProcess/Adapter/ChildProcess.php b/EProcess/Adapter/ChildProcess.php index e7cb9b9..d31b831 100644 --- a/EProcess/Adapter/ChildProcess.php +++ b/EProcess/Adapter/ChildProcess.php @@ -59,21 +59,22 @@ public function create($class, array $data = []) $unix = $this->createUnixSocket(); $messenger = MessengerFactory::server($unix, $this->loop); - $file = sprintf(__DIR__ . '/../../tmp/%s.php', $this->node); - - file_put_contents($file, sprintf( + $script = sprintf( $this->script, EPROCESS_AUTOLOAD, $unix, $class, base64_encode($this->serialize($data)) - )); + ); - $this->process = new Process(sprintf('exec %s %s', $php, realpath($file))); - $this->process->start($this->loop); + $this->process = new Process($php); + $this->process->start($this->loop, 0.1); + + $this->process->stdin->resume(); + $this->process->stdin->write($script); - $this->loop->addTimer(1, function() use ($file) { - unlink($file); + $this->process->stdin->on('full-drain', function() { + $this->process->stdin->close(); }); $this->process->stdout->on('data', function($data) { diff --git a/EProcess/Adapter/SymfonyProcess.php b/EProcess/Adapter/SymfonyProcess.php index ddc005e..ec8a256 100644 --- a/EProcess/Adapter/SymfonyProcess.php +++ b/EProcess/Adapter/SymfonyProcess.php @@ -44,7 +44,6 @@ class SymfonyProcess extends BaseAdapter } PHP; - private $loop; private $process; public function create($class, array $data = []) diff --git a/EProcess/Stream/FullDrainSteam.php b/EProcess/Stream/FullDrainSteam.php new file mode 100644 index 0000000..e4e87ba --- /dev/null +++ b/EProcess/Stream/FullDrainSteam.php @@ -0,0 +1,51 @@ +stream = $stream; + if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") { + throw new \InvalidArgumentException('First parameter must be a valid stream resource'); + } + + stream_set_blocking($this->stream, 0); + + // Use unbuffered read operations on the underlying stream resource. + // Reading chunks from the stream may otherwise leave unread bytes in + // PHP's stream buffers which some event loop implementations do not + // trigger events on (edge triggered). + // This does not affect the default event loop implementation (level + // triggered), so we can ignore platforms not supporting this (HHVM). + if (function_exists('stream_set_read_buffer')) { + stream_set_read_buffer($this->stream, 0); + } + + $this->loop = $loop; + $this->buffer = new Buffer($this->stream, $this->loop); + + $that = $this; + + $this->buffer->on('error', function ($error) use ($that) { + $that->emit('error', array($error, $that)); + $that->close(); + }); + + $this->buffer->on('drain', function () use ($that) { + $that->emit('drain', array($that)); + }); + + $this->buffer->on('full-drain', function () use ($that) { + $that->emit('full-drain', array($that)); + }); + + $this->resume(); + } +} \ No newline at end of file diff --git a/README.md b/README.md index e5f8b08..578f79b 100644 --- a/README.md +++ b/README.md @@ -71,9 +71,3 @@ ApplicationFactory::launch(Main::class); ``` You need to have proper autoloading established in order to use this example. - -ToDo -======= - -* Refactor adapters, remove hacks, pass php code directly with stdin instead of file. -* Can IPC be done without unix sockets or using only single socket? From 7f3b81fc9bb06476ea4b10c7d5ecc8208fbece79 Mon Sep 17 00:00:00 2001 From: Yevhenii Huselietov Date: Sun, 10 Apr 2016 03:51:04 +0800 Subject: [PATCH 20/23] Fix typo, missing hook-in --- EProcess/Adapter/ChildProcess.php | 10 ++++-- EProcess/Stream/FullDrainSteam.php | 51 ----------------------------- EProcess/Stream/FullDrainStream.php | 18 ++++++++++ 3 files changed, 25 insertions(+), 54 deletions(-) delete mode 100644 EProcess/Stream/FullDrainSteam.php create mode 100644 EProcess/Stream/FullDrainStream.php diff --git a/EProcess/Adapter/ChildProcess.php b/EProcess/Adapter/ChildProcess.php index d31b831..8081f56 100644 --- a/EProcess/Adapter/ChildProcess.php +++ b/EProcess/Adapter/ChildProcess.php @@ -4,7 +4,7 @@ use EProcess\Behaviour\UniversalSerializer; use EProcess\MessengerFactory; - +use EProcess\Stream\FullDrainStream; use React\ChildProcess\Process; use Symfony\Component\Process\PhpExecutableFinder; @@ -69,8 +69,12 @@ public function create($class, array $data = []) $this->process = new Process($php); $this->process->start($this->loop, 0.1); - - $this->process->stdin->resume(); + + $this->process->stdin = new FullDrainStream( + $this->process->stdin->stream, + $this->loop + ); + $this->process->stdin->write($script); $this->process->stdin->on('full-drain', function() { diff --git a/EProcess/Stream/FullDrainSteam.php b/EProcess/Stream/FullDrainSteam.php deleted file mode 100644 index e4e87ba..0000000 --- a/EProcess/Stream/FullDrainSteam.php +++ /dev/null @@ -1,51 +0,0 @@ -stream = $stream; - if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") { - throw new \InvalidArgumentException('First parameter must be a valid stream resource'); - } - - stream_set_blocking($this->stream, 0); - - // Use unbuffered read operations on the underlying stream resource. - // Reading chunks from the stream may otherwise leave unread bytes in - // PHP's stream buffers which some event loop implementations do not - // trigger events on (edge triggered). - // This does not affect the default event loop implementation (level - // triggered), so we can ignore platforms not supporting this (HHVM). - if (function_exists('stream_set_read_buffer')) { - stream_set_read_buffer($this->stream, 0); - } - - $this->loop = $loop; - $this->buffer = new Buffer($this->stream, $this->loop); - - $that = $this; - - $this->buffer->on('error', function ($error) use ($that) { - $that->emit('error', array($error, $that)); - $that->close(); - }); - - $this->buffer->on('drain', function () use ($that) { - $that->emit('drain', array($that)); - }); - - $this->buffer->on('full-drain', function () use ($that) { - $that->emit('full-drain', array($that)); - }); - - $this->resume(); - } -} \ No newline at end of file diff --git a/EProcess/Stream/FullDrainStream.php b/EProcess/Stream/FullDrainStream.php new file mode 100644 index 0000000..ea11340 --- /dev/null +++ b/EProcess/Stream/FullDrainStream.php @@ -0,0 +1,18 @@ +buffer->on('full-drain', function () { + $this->emit('full-drain', array($this)); + }); + } +} \ No newline at end of file From 14024f05e81158ab23c7d13a8815b9bc7863e4fc Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Wed, 13 Apr 2016 14:21:41 +0800 Subject: [PATCH 21/23] Package split: EMessenger --- .travis.yml | 2 + EProcess/Adapter/BaseAdapter.php | 13 ++- EProcess/Adapter/ChildProcess.php | 26 ++--- EProcess/Adapter/PThreads.php | 8 +- EProcess/Adapter/SymfonyProcess.php | 21 ++-- EProcess/Adapter/Thread.php | 2 +- EProcess/Application/Application.php | 20 ++-- EProcess/Behaviour/UniversalSerializer.php | 96 ------------------- EProcess/Behaviour/Workable.php | 2 +- EProcess/Message.php | 46 --------- EProcess/Messenger.php | 49 ---------- EProcess/MessengerFactory.php | 20 ---- EProcess/Worker.php | 15 ++- LICENSE | 2 +- README.md | 8 +- composer.json | 3 +- examples/Simple/Bank.php | 2 +- examples/Simple/Crawler.php | 2 +- examples/Simple/Main.php | 6 +- examples/Simple/SomeThing.php | 2 +- examples/autoload.php | 2 +- .../Behaviour/UniversalSerializerTest.php | 69 ------------- todo.md | 50 ++++++++++ 23 files changed, 127 insertions(+), 339 deletions(-) delete mode 100644 EProcess/Behaviour/UniversalSerializer.php delete mode 100644 EProcess/Message.php delete mode 100644 EProcess/Messenger.php delete mode 100644 EProcess/MessengerFactory.php delete mode 100644 tests/EProcess/Behaviour/UniversalSerializerTest.php create mode 100644 todo.md diff --git a/.travis.yml b/.travis.yml index 7a06f41..0b69349 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,3 +10,5 @@ install: cache: directories: - vendor +script: + - php examples/simple.php diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index f76dd7d..b8a1496 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -2,6 +2,7 @@ namespace EProcess\Adapter; +use EMessenger\Transport\UnixTransport; use React\EventLoop\LoopInterface; abstract class BaseAdapter @@ -20,7 +21,12 @@ public function getUnixSocketFile() return sprintf('%s/%s.sock', EPROCESS_SOCKET_DIR, $this->node); } - protected function createUnixSocket() + public function getUnixSocketAddress() + { + return sprintf('unix://%s', $this->getUnixSocketFile()); + } + + protected function createUnixTransport() { if (!defined('EPROCESS_SOCKET_DIR')) { throw new \RuntimeException('EPROCESS_SOCKET_DIR is not defined.'); @@ -34,9 +40,10 @@ protected function createUnixSocket() throw new \RuntimeException(sprintf('Cannot write to "%s".', EPROCESS_SOCKET_DIR)); } - $unix = sprintf('unix://%s', $this->getUnixSocketFile()); + echo $this->getUnixSocketFile(); + echo PHP_EOL; - return $unix; + return new UnixTransport($this->loop, $this->getUnixSocketAddress()); } abstract public function create($class, array $data = []); diff --git a/EProcess/Adapter/ChildProcess.php b/EProcess/Adapter/ChildProcess.php index 8081f56..12f961e 100644 --- a/EProcess/Adapter/ChildProcess.php +++ b/EProcess/Adapter/ChildProcess.php @@ -2,15 +2,15 @@ namespace EProcess\Adapter; -use EProcess\Behaviour\UniversalSerializer; -use EProcess\MessengerFactory; +use UniversalSerializer\UniversalSerializerTrait; +use EMessenger\MessengerFactory; use EProcess\Stream\FullDrainStream; use React\ChildProcess\Process; use Symfony\Component\Process\PhpExecutableFinder; class ChildProcess extends BaseAdapter { - use UniversalSerializer; + use UniversalSerializerTrait; private $script = <<loop(\$loop); \$application->data(\$application->unserialize(base64_decode('%s'))); -\$messenger->emit('initialized', true); +\$messenger->send('initialized', true); try { \$application->run(); @@ -56,13 +56,13 @@ public function create($class, array $data = []) throw new \RuntimeException('Unable to find the PHP executable.'); } - $unix = $this->createUnixSocket(); - $messenger = MessengerFactory::server($unix, $this->loop); + $transport = $this->createUnixTransport(); + $messenger = MessengerFactory::server($transport); $script = sprintf( $this->script, EPROCESS_AUTOLOAD, - $unix, + $this->getUnixSocketAddress(), $class, base64_encode($this->serialize($data)) ); @@ -77,15 +77,15 @@ public function create($class, array $data = []) $this->process->stdin->write($script); - $this->process->stdin->on('full-drain', function() { + $this->process->stdin->on('full-drain', function () { $this->process->stdin->close(); }); - $this->process->stdout->on('data', function($data) { + $this->process->stdout->on('data', function ($data) { echo $data; }); - $this->process->stderr->on('data', function($data) { + $this->process->stderr->on('data', function ($data) { echo $data; }); diff --git a/EProcess/Adapter/PThreads.php b/EProcess/Adapter/PThreads.php index 6fa53e3..b934a5a 100644 --- a/EProcess/Adapter/PThreads.php +++ b/EProcess/Adapter/PThreads.php @@ -2,7 +2,7 @@ namespace EProcess\Adapter; -use EProcess\MessengerFactory; +use EMessenger\MessengerFactory; class PThreads extends BaseAdapter { @@ -10,10 +10,10 @@ class PThreads extends BaseAdapter public function create($class, array $data = []) { - $unix = $this->createUnixSocket(); - $messenger = MessengerFactory::server($unix, $this->loop); + $transport = $this->createUnixTransport(); + $messenger = MessengerFactory::server($transport); - $this->process = new Thread($unix, $class, $data); + $this->process = new Thread($transport, $class, $data); $this->process->start(PTHREADS_INHERIT_NONE); return $messenger; diff --git a/EProcess/Adapter/SymfonyProcess.php b/EProcess/Adapter/SymfonyProcess.php index ec8a256..d8f0578 100644 --- a/EProcess/Adapter/SymfonyProcess.php +++ b/EProcess/Adapter/SymfonyProcess.php @@ -2,13 +2,13 @@ namespace EProcess\Adapter; -use EProcess\Behaviour\UniversalSerializer; use Symfony\Component\Process\PhpProcess; -use EProcess\MessengerFactory; +use EMessenger\MessengerFactory; +use UniversalSerializer\UniversalSerializerTrait; class SymfonyProcess extends BaseAdapter { - use UniversalSerializer; + use UniversalSerializerTrait; private $script = <<loop(\$loop); \$application->data(\$application->unserialize(base64_decode('%s'))); -\$messenger->emit('initialized', true); +\$messenger->send('initialized', true); try { \$application->run(); @@ -48,10 +48,15 @@ class SymfonyProcess extends BaseAdapter public function create($class, array $data = []) { - $unix = $this->createUnixSocket(); - $messenger = MessengerFactory::server($unix, $this->loop); - - $script = sprintf($this->script, EPROCESS_AUTOLOAD, $unix, $class, base64_encode($this->serialize($data))); + $transport = $this->createUnixTransport(); + $messenger = MessengerFactory::server($transport); + + $script = sprintf( + $this->script, + EPROCESS_AUTOLOAD, + $transport, $class, + base64_encode($this->serialize($data)) + ); $this->process = new PhpProcess($script); $this->process->start(); diff --git a/EProcess/Adapter/Thread.php b/EProcess/Adapter/Thread.php index 29a42c7..e38a88b 100644 --- a/EProcess/Adapter/Thread.php +++ b/EProcess/Adapter/Thread.php @@ -32,7 +32,7 @@ public function run() $application->loop($loop); $application->data($this->data); - $messenger->emit('initialized', true); + $messenger->send('initialized', true); try { $application->run(); diff --git a/EProcess/Application/Application.php b/EProcess/Application/Application.php index dafe91d..961c190 100644 --- a/EProcess/Application/Application.php +++ b/EProcess/Application/Application.php @@ -2,21 +2,19 @@ namespace EProcess\Application; -use EProcess\Behaviour\UniversalSerializer; +use EMessenger\Message; +use EMessenger\Messenger; use EProcess\Behaviour\Workable; -use EProcess\Message; -use EProcess\Messenger; use EProcess\Worker; use Evenement\EventEmitterTrait; use MKraemer\ReactPCNTL\PCNTL; use React\EventLoop\LoopInterface; +use UniversalSerializer\UniversalSerializerTrait; abstract class Application { - use EventEmitterTrait { - EventEmitterTrait::emit as emitterEmit; - } - use UniversalSerializer; + use EventEmitterTrait; + use UniversalSerializerTrait; use Workable; private $loop; @@ -33,7 +31,7 @@ public function addWorker(Worker $worker) public function cleanWorkers() { foreach ($this->workers as $worker) { - $worker->emit('shutdown'); + $worker->send('shutdown'); unlink($worker->adapter()->getUnixSocketFile()); } } @@ -60,7 +58,7 @@ public function messenger(Messenger $messenger = null) { if ($messenger) { $messenger->on('message', function (Message $message) { - $this->emitterEmit($message->getEvent(), [$message->getContent()]); + $this->emit($message->getEvent(), [$message->getContent()]); }); $this->messenger = $messenger; @@ -78,9 +76,9 @@ public function data(array $data = null) return $this->data; } - public function emit($event, $data = '') + public function send($event, $data = '') { - $this->messenger->emit($event, $data); + $this->messenger->send($event, $data); } abstract public function run(); diff --git a/EProcess/Behaviour/UniversalSerializer.php b/EProcess/Behaviour/UniversalSerializer.php deleted file mode 100644 index 8a2dcb7..0000000 --- a/EProcess/Behaviour/UniversalSerializer.php +++ /dev/null @@ -1,96 +0,0 @@ -toArray(); - } elseif (!is_array($data)) { - $data = [$data]; - } - - $data['type'] = $type; - - foreach ($data as $key => $piece) { - switch (gettype($piece)) { - case 'object': - $pack[$key] = serialize([ - 'class' => get_class($piece), - 'data' => $this->findSerializer()->serialize($piece, $this->serializationFormat) - ]); - break; - - case 'array': - $pack[$key] = $this->serialize($piece); - break; - - default: - $pack[$key] = serialize($piece); - break; - } - } - - return serialize($pack); - } - - public function unserialize($data) - { - $unpack = []; - $data = is_array($data) ? $data : unserialize($data); - - $type = unserialize($data['type']); - unset($data['type']); - - foreach ($data as $key => $piece) { - $piece = unserialize($piece); - - switch (gettype($piece)) { - case 'array': - if (isset($piece['class'])) { - $unpack[$key] = $this->findSerializer()->deserialize($piece['data'], $piece['class'], $this->serializationFormat); - - if (is_a($piece['class'], ArrayCollection::class, true)) { - $unpack[$key] = new ArrayCollection($unpack[$key]); - } - } else { - $unpack[$key] = $this->unserialize($piece); - } - - break; - - default: - $unpack[$key] = $piece; - break; - } - } - - return $type !== 'array' && 1 === count($unpack) ? current($unpack) : $unpack; - } - - public function findSerializer() - { - if ($this->sharedSerializer) { - return $this->sharedSerializer; - } elseif (isset($this->serializer)) { - return $this->serializer; - } elseif (method_exists($this, 'serializer')) { - return $this->serializer(); - } else { - $this->sharedSerializer = SerializerBuilder::create()->build(); - - return $this->sharedSerializer; - } - } -} diff --git a/EProcess/Behaviour/Workable.php b/EProcess/Behaviour/Workable.php index 653b2a0..a81c0f2 100644 --- a/EProcess/Behaviour/Workable.php +++ b/EProcess/Behaviour/Workable.php @@ -15,7 +15,7 @@ public function createWorker($fqcn, array $data = []) $data ); - $this->emitterEmit('worker.created', [$worker]); + $this->emit('worker.created', [$worker]); return $worker; } diff --git a/EProcess/Message.php b/EProcess/Message.php deleted file mode 100644 index ca2d837..0000000 --- a/EProcess/Message.php +++ /dev/null @@ -1,46 +0,0 @@ -event = $event; - $this->content = $content; - } - - public function getContent() - { - return $this->content; - } - - public function getEvent() - { - return $this->event; - } - - public function serialize() - { - return json_encode([ - 'event' => $this->event, - 'content' => base64_encode($this->content) - ]); - } - - public function unserialize($data) - { - $data = json_decode($data, true); - - $this->event = $data['event']; - $this->content = base64_decode($data['content']); - } - - public function __toString() - { - return $this->serialize(); - } -} diff --git a/EProcess/Messenger.php b/EProcess/Messenger.php deleted file mode 100644 index a5f84d7..0000000 --- a/EProcess/Messenger.php +++ /dev/null @@ -1,49 +0,0 @@ -connection = $connection; - - $this->connection->on('message', function($message) { - $data = json_decode($message, true); - $message = new Message($data['event'], $this->unserialize(base64_decode($data['content']))); - - $this->emitterEmit('message', [$message]); - $this->emitterEmit($message->getEvent(), [$message->getContent()]); - }); - - $this->connection->on('close', array($this, 'close')); - - if ($this->connection instanceof ServerInterface) { - $this->connection->listen(); - } else { - $this->connection->connect(); - } - } - - public function emit($event, $data = []) - { - $this->connection->send((string) new Message($event, $this->serialize($data))); - } - - public function close() - { - $this->emitterEmit('close'); - } -} diff --git a/EProcess/MessengerFactory.php b/EProcess/MessengerFactory.php deleted file mode 100644 index 56e7dce..0000000 --- a/EProcess/MessengerFactory.php +++ /dev/null @@ -1,20 +0,0 @@ -messenger = $this->adapter->create($class, $data); $this->messenger->on('message', function(Message $message) { - $this->emitterEmit($message->getEvent(), [$message->getContent()]); + $this->emit($message->getEvent(), [$message->getContent()]); }); $this->messenger()->on('initialized', function() { @@ -55,14 +54,14 @@ public function adapter() return $this->adapter; } - public function emit($event, $data = []) + public function send($event, $data = []) { if ($this->initialized) { - $this->messenger->emit($event, $data); + $this->messenger->send($event, $data); } else { - $this->loop->addPeriodicTimer(0.1, function(Timer $timer) use ($event, $data) { + $this->loop->addPeriodicTimer(0.001, function(Timer $timer) use ($event, $data) { if ($this->initialized) { - $this->messenger->emit($event, $data); + $this->messenger->send($event, $data); $timer->cancel(); } }); diff --git a/LICENSE b/LICENSE index d6c5ff1..c44ea27 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2014 Evgeniy Guseletov +Copyright (c) 2014-2016 Yevhenii Huselietov Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index a1e98d0..97c7367 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ class Main extends Application // like that one in c++ public function run() { $worker = $this->createWorker(MyWorker::class); // create external non-blocking thread of MyWorker class - $worker->emit('any_event', 'Hello my worker!'); + $worker->send('any_event', 'Hello my worker!'); $worker->on('hello_master', function() { // Receive back-call from child }); @@ -62,10 +62,12 @@ class MyWorker extends Application $this->on('any_event', function($data) { echo 'Got any_event event from my master: ' . $data; // data == Hello my worker // Still we can send any event back to master - $this->emit('hello_master'); - $this->emit('send-any-data', new Data()); // you can send any object, array or scalar + $this->send('hello_master'); + $this->send('send-any-data', new Data()); // you can send any object, array or scalar // object should have jms serializer metadata to be serialized }); + + $this->getSubscribedEvents(); } } diff --git a/composer.json b/composer.json index 1791768..7b3d352 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,8 @@ "doctrine/collections": "~1.3", "concerto/comms": "~0.8", "symfony/process": "^3.0", - "mkraemer/react-pcntl": "^2.0" + "mkraemer/react-pcntl": "^2.0", + "cursedcoder/emessenger": "^0.1.0" }, "license": "MIT", "require-dev": { diff --git a/examples/Simple/Bank.php b/examples/Simple/Bank.php index 41f0a99..72185da 100644 --- a/examples/Simple/Bank.php +++ b/examples/Simple/Bank.php @@ -10,7 +10,7 @@ class Bank extends Application public function run() { $this->loop()->addPeriodicTimer(1.5, function() { - $this->emit('transaction', new Transaction( + $this->send('transaction', new Transaction( ['usd', 'eur'][rand(0, 1)], rand(10, 250) )); diff --git a/examples/Simple/Crawler.php b/examples/Simple/Crawler.php index 6a57549..5e71e7b 100644 --- a/examples/Simple/Crawler.php +++ b/examples/Simple/Crawler.php @@ -15,7 +15,7 @@ public function crawl($url) { $crawl = function() use ($url) { $data = file_get_contents($url); - $this->emit('result', $data); + $this->send('result', $data); }; $this->loop()->addPeriodicTimer(5, $crawl); diff --git a/examples/Simple/Main.php b/examples/Simple/Main.php index 25e8f7c..7d3179b 100644 --- a/examples/Simple/Main.php +++ b/examples/Simple/Main.php @@ -14,7 +14,7 @@ class Main extends Application public function run() { $this->crawler = $this->createWorker(Crawler::class); - $this->crawler->emit('crawl', 'http://google.com/'); + $this->crawler->send('crawl', 'http://google.com/'); $this->crawler->on('result', function($data) { echo '[Crawler] Got new result: ' . strlen($data) . ' chars' . PHP_EOL; }); @@ -32,5 +32,9 @@ public function run() $transaction->getCurrency() ); }); + + $this->loop()->addTimer(5, function() { + $this->loop()->stop(); + }); } } diff --git a/examples/Simple/SomeThing.php b/examples/Simple/SomeThing.php index a731338..80517d9 100644 --- a/examples/Simple/SomeThing.php +++ b/examples/Simple/SomeThing.php @@ -9,7 +9,7 @@ class SomeThing extends Application public function run() { $this->loop()->addPeriodicTimer(2.5, function() { - $this->emit('status', 'I am here too!'); + $this->send('status', 'I am here too!'); }); } } diff --git a/examples/autoload.php b/examples/autoload.php index c97101d..6c7a415 100644 --- a/examples/autoload.php +++ b/examples/autoload.php @@ -1,7 +1,7 @@ assertData(['abcde' => 'dbce']); - } - - /** - * @test - */ - public function should_serialize_scalar() - { - $this->assertData('asdasd'); - } - - /** - * @test - */ - public function should_serialize_integer() - { - $this->assertData(5123123); - } - - /** - * @test - */ - public function should_serialize_object() - { - $this->assertData(new Transaction('EUR', 1235)); - } - - /** - * @test - */ - public function should_serialize_all() - { - $this->assertData([ - 'abcde', // scalar - [1 => 'ok'], // array, - [2 => ['ok', new Transaction('USD', 555)]] // object - ]); - } - - private function assertData($data) - { - $serializer = new SomeSerializer(); - - $serialized = $serializer->serialize($data); - $unserialized = $serializer->unserialize($serialized); - - $this->assertEquals($data, $unserialized); - } -} - -class SomeSerializer -{ - use UniversalSerializer; -} - diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..a053460 --- /dev/null +++ b/todo.md @@ -0,0 +1,50 @@ +```php +use EProcess\Application\Application; +use EProcess\Application\ApplicationFactory; + +class Data +{ + // jms serializer metadata + private $id; + // setters getters etc. +} + +class Main extends Application // like that one in c++ +{ + public function run() + { + $worker = $this->createWorker(MyWorker::class); // create external non-blocking thread of MyWorker class + $worker->send('any_event', 'Hello my worker!'); + $worker->subscribe('hello_master', function() { + // Receive back-call from child + }); + + // tracker + $worker->getTracker()->getMemoryUsage(); + + // logger + $worker->getLogger()->getLogs(); + + // remote worker + $remoteWorker = $this->connectWorker(); + } +} + +class MyWorker extends Application +{ + public function run() + { + $this->subscribe('any_event', function($data) { + echo 'Got any_event event from my master: ' . $data; // data == Hello my worker + // Still we can send any event back to master + $this->send('hello_master'); + $this->send('send-any-data', new Data()); // you can send any object, array or scalar + // object should have jms serializer metadata to be serialized + }); + + $this->getSubscribedEvents(); + } +} + +ApplicationFactory::launch(Main::class); +``` \ No newline at end of file From cf3096708d60778f2f1388ae13af220217ebdf7c Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Wed, 13 Apr 2016 14:24:44 +0800 Subject: [PATCH 22/23] Remove artifact --- EProcess/Adapter/BaseAdapter.php | 3 --- 1 file changed, 3 deletions(-) diff --git a/EProcess/Adapter/BaseAdapter.php b/EProcess/Adapter/BaseAdapter.php index b8a1496..4bd5427 100644 --- a/EProcess/Adapter/BaseAdapter.php +++ b/EProcess/Adapter/BaseAdapter.php @@ -40,9 +40,6 @@ protected function createUnixTransport() throw new \RuntimeException(sprintf('Cannot write to "%s".', EPROCESS_SOCKET_DIR)); } - echo $this->getUnixSocketFile(); - echo PHP_EOL; - return new UnixTransport($this->loop, $this->getUnixSocketAddress()); } From aa3ff7f43594e9d0f8d50208a8d6e5295689c703 Mon Sep 17 00:00:00 2001 From: Evgeniy Guseletov Date: Wed, 13 Apr 2016 14:25:58 +0800 Subject: [PATCH 23/23] Remove hhvm temporary --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 0b69349..5e6f021 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,6 @@ sudo: false php: - '5.6' - '7.0' - - hhvm - nightly install: - composer install