From 0772d87d2244fd750757b8cab615c2444dc65099 Mon Sep 17 00:00:00 2001 From: wpjscc Date: Mon, 7 Oct 2024 09:47:09 +0800 Subject: [PATCH 1/5] v2 --- composer.json | 4 +- examples/example.php | 38 ++++------ src/Pool.php | 172 ++++++++++++++----------------------------- 3 files changed, 75 insertions(+), 139 deletions(-) diff --git a/composer.json b/composer.json index 17f52c1..1ee742d 100644 --- a/composer.json +++ b/composer.json @@ -1,9 +1,9 @@ { "name":"reactphp-x/mysql-pool", "require": { - "react/mysql": "^0.6", "react/async": "^4.1", - "reactphp-x/pool": "^1.0" + "react/mysql": "^0.7@dev", + "reactphp-x/pool": "^2.0@dev" }, "autoload": { "psr-4": { diff --git a/examples/example.php b/examples/example.php index 3d7cec9..333d66c 100644 --- a/examples/example.php +++ b/examples/example.php @@ -3,24 +3,18 @@ require_once __DIR__ . '/../vendor/autoload.php'; use ReactphpX\MySQL\Pool; -use React\MySQL\QueryResult; +use React\MySQL\MysqlResult; use React\EventLoop\Loop; -$pool = new Pool(getenv('MYSQL_URI') ?: 'username:password@host/databasename?timeout=5', [ - 'min_connections' => 2, // 10 connection - 'max_connections' => 10, // 10 connection - 'max_wait_queue' => 100, // how many sql in queue - 'wait_timeout' => 5, // wait time include response time - 'keep_alive' => 60 -]); +$pool = new Pool(getenv('MYSQL_URI') ?: 'username:password@host/databasename?timeout=5'); -query($pool); -// queryStream($pool); +// query($pool); +queryStream($pool); function query($pool) { - for ($i = 0; $i < 90; $i++) { - $pool->query('select * from blog')->then(function (QueryResult $command) use ($i) { + for ($i = 0; $i < 120; $i++) { + $pool->query('select * from blog')->then(function (MysqlResult $command) use ($i) { echo "query:$i\n"; if (isset($command->resultRows)) { // this is a response to a SELECT etc. with some rows (0+) @@ -43,7 +37,7 @@ function query($pool) function queryStream($pool) { - for ($i = 0; $i < 90; $i++) { + for ($i = 0; $i < 120; $i++) { (function ($pool, $i) { $stream = $pool->queryStream('select * from blog'); @@ -51,8 +45,8 @@ function queryStream($pool) // echo "queryStream:$i\n"; // print_r($data); }); - $stream->on('error', function ($err) { - echo 'Error: ' . $err->getMessage() . PHP_EOL; + $stream->on('error', function ($err) use ($i) { + echo 'Error: ' .$i.' ' .$err->getMessage() . PHP_EOL; }); $stream->on('end', function () use ($i) { echo 'Completed.' . $i . PHP_EOL; @@ -62,16 +56,16 @@ function queryStream($pool) } \React\EventLoop\Loop::addTimer(10, function () use ($pool) { - queryStream($pool); + // queryStream($pool); }); -Loop::addPeriodicTimer(2, function () use ($pool) { - // query($pool); - // queryStream($pool); - echo 'pool_count:' . $pool->getPoolCount() . PHP_EOL; - echo 'idleConnectionCount:' . $pool->idleConnectionCount() . PHP_EOL; -}); +// Loop::addPeriodicTimer(2, function () use ($pool) { +// // query($pool); +// // queryStream($pool); +// echo 'pool_count:' . $pool->getPoolCount() . PHP_EOL; +// echo 'idleConnectionCount:' . $pool->idleConnectionCount() . PHP_EOL; +// }); $pool->transaction(function ($connection) { // throw new Exception("Error Processing Request", 1); diff --git a/src/Pool.php b/src/Pool.php index 2ed6d17..b389ce0 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -2,143 +2,85 @@ namespace ReactphpX\MySQL; -use React\MySQL\Factory; -use React\EventLoop\LoopInterface; -use React\Socket\ConnectorInterface; -use React\Promise\Deferred; -use React\MySQL\ConnectionInterface; -use React\MySQL\QueryResult; -use React\MySQL\Exception; + +use React\Mysql\MysqlClient; use React\EventLoop\Loop; use ReactphpX\Pool\AbstractConnectionPool; -use function React\Promise\resolve; -class Pool extends AbstractConnectionPool implements ConnectionInterface,TranactionInterface +final class Pool extends AbstractConnectionPool implements TranactionInterface { - use \Evenement\EventEmitterTrait; - - private $factory; - private $uri; - - public function __construct( - $uri, - $config = [], - Factory $factory = null, - LoopInterface $loop = null, - ConnectorInterface $connector = null - ) { - $this->uri = $uri; - $this->factory = $factory ?: new Factory($loop, $connector);; - parent::__construct($config, $loop); + protected function createConnection() + { + $connection = new MysqlClient($this->uri); + $connection->on('close', function () use ($connection) { + if ($this->pool->contains($connection)) { + $this->pool->detach($connection); + } + $this->currentConnections--; + }); + $this->currentConnections++; + return $connection; } - public function query($sql, array $params = []) + public function query(string $sql, array $params = []) { - $deferred = new Deferred(); - - $this->getConnection()->then(function (ConnectionInterface $connection) use ($sql, $params, $deferred) { - $connection->query($sql, $params)->then(function (QueryResult $command) use ($deferred, $connection) { + return $this->getConnection()->then(function ($connection) use ($sql, $params) { + return $connection->query($sql, $params)->then(function ($result) use ($connection) { $this->releaseConnection($connection); - try { - $deferred->resolve($command); - } catch (\Throwable $th) { - //todo handle $th - } - }, function ($e) use ($deferred, $connection) { - $deferred->reject($e); - - $this->_ping($connection); + return $result; + }, function ($error) use ($connection) { + $this->releaseConnection($connection); + throw $error; }); - }, function ($e) use ($deferred) { - $deferred->reject($e); }); - - return $deferred->promise(); } - - public function queryStream($sql, $params = []) + + public function queryStream(string $sql, array $params = []) { $error = null; - - $stream = \React\Promise\Stream\unwrapReadable( - $this->getConnection()->then(function (ConnectionInterface $connection) use ($sql, $params) { - $stream = $connection->queryStream($sql, $params); - $stream->on('end', function () use ($connection) { - $this->releaseConnection($connection); - }); - $stream->on('error', function ($err) use ($connection) { - $this->_ping($connection); - }); - return $stream; - }, function ($e) use (&$error) { - $error = $e; - throw $e; - }) - ); + $p = $this->getConnection()->then(function ($connection) use ($sql, $params) { + $stream = $connection->queryStream($sql, $params); + $stream->on('close', function () use ($connection) { + $this->releaseConnection($connection); + }); + return $stream; + })->then(null, function ($e) use (&$error) { + $error = $e; + }); if ($error) { - Loop::addTimer(0.0001, function () use ($stream, $error) { - $stream->emit('error', [$error]); + $stream = new \React\Stream\ThroughStream(); + Loop::futureTick(function () use ($stream, $error) { + $stream->emit('error', [$error, $stream]); + $stream->close(); }); + return $stream; } - return $stream; + return \React\Promise\Stream\unwrapReadable($p); } - - public function ping() - { - throw new \Exception("not support"); - } - - public function quit() - { - $this->close(); - return resolve(true); - } - - public function transaction(callable $callable) { - $that = $this; - $deferred = new Deferred(); - - $this->getConnection()->then(function (ConnectionInterface $connection) use ($callable, $deferred, $that) { - $connection->query('BEGIN') - ->then(function () use ($callable, $connection) { - try { - return \React\Async\async(function () use ($callable, $connection) { - return $callable($connection); - })(); - } catch (\Throwable $th) { - throw $th; - } - }) - ->then(function ($result) use ($connection, $deferred, $that) { - $connection->query('COMMIT')->then(function () use ($result, $deferred, $connection, $that) { - $that->releaseConnection($connection); - $deferred->resolve($result); - }, function ($error) use ($deferred, $connection, $that) { - $that->_ping($connection); - $deferred->reject($error); - }); - }, function ($error) use ($connection, $deferred, $that) { - $connection->query('ROLLBACK')->then(function () use ($error, $deferred, $connection, $that) { - $that->releaseConnection($connection); - $deferred->reject($error); - }, function () use ($deferred, $connection, $that, $error) { - $that->_ping($connection); - $deferred->reject($error); - }); + return $this->getConnection()->then(function ($connection) use ($callable) { + return $connection->query('BEGIN')->then(function () use ($connection, $callable) { + try { + return \React\Async\async(function () use ($callable, $connection) { + return $callable($connection); + })(); + } catch (\Throwable $th) { + throw $th; + } + })->then(function ($result) use ($connection) { + return $connection->query('COMMIT')->then(function () use ($connection, $result) { + $this->releaseConnection($connection); + return $result; }); - }, function ($error) use ($deferred) { - $deferred->reject($error); + }, function ($error) use ($connection) { + return $connection->query('ROLLBACK')->then(function () use ($connection, $error) { + $this->releaseConnection($connection); + throw $error; + }); + }); }); - - return $deferred->promise(); - } - - protected function createConnection() - { - return $this->factory->createLazyConnection($this->uri); } } From ab43ea7ab62f82c8d01b8c1f4349ac1ec66a62d4 Mon Sep 17 00:00:00 2001 From: wpjscc Date: Mon, 7 Oct 2024 10:45:07 +0800 Subject: [PATCH 2/5] readme --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index bddbfb9..d31ab62 100644 --- a/README.md +++ b/README.md @@ -29,13 +29,13 @@ use ReactphpX\MySQL\Pool; use React\MySQL\QueryResult; use React\EventLoop\Loop; -$pool = new Pool('username:password@host/databasename?timeout=5', [ - 'min_connections' => 2, // min 2 connection - 'max_connections' => 10, // max 10 connection - 'max_wait_queue' => 110, // how many sql in queue - 'wait_timeout' => 5,// wait time include response time - 'keep_alive' => 60, // -]); +$pool = new Pool( + uri: 'username:password@host/databasename?timeout=5', + minConnections: 2, + maxConnections: 10, + waitQueue: 100, + waitTimeout: 0, +); qeury($pool); queryStream($pool); From a0c53902edba498abfc5dd2118d1200be96d9da0 Mon Sep 17 00:00:00 2001 From: wpjscc Date: Thu, 10 Oct 2024 08:31:48 +0800 Subject: [PATCH 3/5] 0.6 --- composer.json | 2 +- src/Pool.php | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 1ee742d..4f43b9c 100644 --- a/composer.json +++ b/composer.json @@ -2,7 +2,7 @@ "name":"reactphp-x/mysql-pool", "require": { "react/async": "^4.1", - "react/mysql": "^0.7@dev", + "react/mysql": "^0.6", "reactphp-x/pool": "^2.0@dev" }, "autoload": { diff --git a/src/Pool.php b/src/Pool.php index b389ce0..b8d163a 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -11,7 +11,8 @@ final class Pool extends AbstractConnectionPool implements TranactionInterface { protected function createConnection() { - $connection = new MysqlClient($this->uri); + // $connection = new MysqlClient($this->uri); + $connection = (new \React\MySQL\Factory())->createLazyConnection($this->uri); $connection->on('close', function () use ($connection) { if ($this->pool->contains($connection)) { $this->pool->detach($connection); From 8ca7f7de0de69b0a82c28ef69a870f736fed9af3 Mon Sep 17 00:00:00 2001 From: wpjscc Date: Sat, 12 Oct 2024 14:44:17 +0800 Subject: [PATCH 4/5] update --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 4f43b9c..94dd827 100644 --- a/composer.json +++ b/composer.json @@ -2,8 +2,8 @@ "name":"reactphp-x/mysql-pool", "require": { "react/async": "^4.1", - "react/mysql": "^0.6", - "reactphp-x/pool": "^2.0@dev" + "reactphp-x/pool": "^2.0@dev", + "react/mysql": "^0.7@dev" }, "autoload": { "psr-4": { From 91ad606753be8589dbf96db3ae347687d9b713d7 Mon Sep 17 00:00:00 2001 From: wpjscc Date: Sat, 12 Oct 2024 14:44:41 +0800 Subject: [PATCH 5/5] 0.7 --- src/Pool.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Pool.php b/src/Pool.php index b8d163a..2bd1530 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -11,8 +11,8 @@ final class Pool extends AbstractConnectionPool implements TranactionInterface { protected function createConnection() { - // $connection = new MysqlClient($this->uri); - $connection = (new \React\MySQL\Factory())->createLazyConnection($this->uri); + $connection = new MysqlClient($this->uri); + // $connection = (new \React\MySQL\Factory())->createLazyConnection($this->uri); $connection->on('close', function () use ($connection) { if ($this->pool->contains($connection)) { $this->pool->detach($connection);