|
| 1 | +<?php |
| 2 | + |
| 3 | +namespace Enqueue\Bench; |
| 4 | + |
| 5 | +require_once __DIR__.'/../vendor/autoload.php'; |
| 6 | + |
| 7 | +/** |
| 8 | + * @OutputTimeUnit("seconds", precision=3) |
| 9 | + * @Iterations(5) |
| 10 | + */ |
| 11 | +class AmqpExtBench |
| 12 | +{ |
| 13 | + /** |
| 14 | + * @var \AMQPChannel |
| 15 | + */ |
| 16 | + private $channel; |
| 17 | + |
| 18 | + /** |
| 19 | + * @var string |
| 20 | + */ |
| 21 | + private $queue; |
| 22 | + |
| 23 | + private $bodySize = 10000; |
| 24 | + |
| 25 | + private $body; |
| 26 | + |
| 27 | + private $messagesLimit = 10000; |
| 28 | + |
| 29 | + /** |
| 30 | + * @BeforeMethods({"beforeBenchPublish"}) |
| 31 | + */ |
| 32 | + public function benchPublish() |
| 33 | + { |
| 34 | + /** @var \AMQPExchange $destination */ |
| 35 | + $amqpExchange = new \AMQPExchange($this->channel); |
| 36 | + $amqpExchange->setType(AMQP_EX_TYPE_DIRECT); |
| 37 | + $amqpExchange->setName(''); |
| 38 | + |
| 39 | + for ($i = 0; $i < $this->messagesLimit; ++$i) { |
| 40 | + $amqpExchange->publish($this->body, $this->queue); |
| 41 | + } |
| 42 | + } |
| 43 | + |
| 44 | +// /** |
| 45 | +// * @BeforeMethods({"beforeBenchConsume"}) |
| 46 | +// */ |
| 47 | +// public function benchConsume() |
| 48 | +// { |
| 49 | + //$count = 0; |
| 50 | + //$callback = function($msg) use (&$count, $channel) { |
| 51 | +// $count++; |
| 52 | +// |
| 53 | +// if ($count >= 100000) { |
| 54 | +// $channel->callbacks = []; |
| 55 | +// } |
| 56 | + //}; |
| 57 | +// |
| 58 | + //$startConsumeTime = microtime(true); |
| 59 | + //$startConsumeMemory = memory_get_usage(); |
| 60 | +// |
| 61 | + //echo 'Consuming...'.PHP_EOL; |
| 62 | +// |
| 63 | + //$channel->basic_consume('amqp_lib_bench', 'amqp_lib', false, true, false, false, $callback); |
| 64 | + //while(count($channel->callbacks)) { |
| 65 | +// $channel->wait(); |
| 66 | + //} |
| 67 | +// |
| 68 | + //$endConsumeTime = microtime(true); |
| 69 | + //$endConsumeMemory = memory_get_usage(); |
| 70 | +// |
| 71 | + //$channel->close(); |
| 72 | + //$connection->close(); |
| 73 | +// |
| 74 | + //echo sprintf('Publish took %s seconds, %skb memory', $endPublishTime - $startPublishTime, ($endPublishMemory - $startPublishMemory) / 1000).PHP_EOL; |
| 75 | + //echo sprintf('Consume took %s seconds, %skb memory', $endConsumeTime - $startConsumeTime, ($endConsumeMemory - $startConsumeMemory) / 1000).PHP_EOL; |
| 76 | + |
| 77 | +// $this->context->setQos(0, 3, false); |
| 78 | +// |
| 79 | +// $count = 0; |
| 80 | +// |
| 81 | +// $callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$count) { |
| 82 | +// $count++; |
| 83 | +// |
| 84 | +// $consumer->acknowledge($message); |
| 85 | +// |
| 86 | +// if ($count >= $this->messagesLimit) { |
| 87 | +// return false; |
| 88 | +// } |
| 89 | +// |
| 90 | +// return true; |
| 91 | +// }; |
| 92 | +// |
| 93 | +// $consumer = $this->context->createConsumer($this->queue); |
| 94 | +// $consumer->setConsumerTag('enqueue_amqp_lib'); |
| 95 | +// |
| 96 | +// $this->context->subscribe($consumer, $callback); |
| 97 | +// $this->context->consume(); |
| 98 | +// } |
| 99 | + |
| 100 | + public function beforeBenchPublish() |
| 101 | + { |
| 102 | + $bodySize = ((int) getenv('BODY_SIZE')); |
| 103 | + $this->body = str_repeat('a', $bodySize); |
| 104 | + |
| 105 | + $this->queue = 'amqp_bunny_bench'; |
| 106 | + |
| 107 | + $extConfig = []; |
| 108 | + $extConfig['host'] = getenv('RABBITMQ_HOST'); |
| 109 | + $extConfig['port'] = getenv('RABBITMQ_AMQP_PORT'); |
| 110 | + $extConfig['vhost'] = getenv('RABBITMQ_VHOST'); |
| 111 | + $extConfig['login'] = getenv('RABBITMQ_USER'); |
| 112 | + $extConfig['password'] = getenv('RABBITMQ_PASSWORD'); |
| 113 | + |
| 114 | + $connection = new \AMQPConnection($extConfig); |
| 115 | + $connection->pconnect(); |
| 116 | + |
| 117 | + $this->channel = new \AMQPChannel($connection); |
| 118 | + |
| 119 | + $queue = new \AMQPQueue($this->channel); |
| 120 | + $queue->setName($this->queue); |
| 121 | + $queue->declareQueue(); |
| 122 | + $queue->purge(); |
| 123 | + } |
| 124 | + |
| 125 | + public function beforeBenchConsume() |
| 126 | + { |
| 127 | +// $this->channel = $this->createContext(); |
| 128 | +// |
| 129 | +// $this->queue = $this->channel->createQueue('enqueue_amqp_consume_bench'); |
| 130 | +// $this->channel->declareQueue($this->queue); |
| 131 | +// $this->channel->purgeQueue($this->queue); |
| 132 | +// |
| 133 | +// $producer = $this->channel->createProducer(); |
| 134 | +// foreach (range(1, $this->messagesLimit) as $index) { |
| 135 | +// $producer->send($this->queue, $this->channel->createMessage($index)); |
| 136 | +// } |
| 137 | + } |
| 138 | +} |
0 commit comments