Skip to content

Commit 392178d

Browse files
committed
bench for amqp
1 parent f8ac4eb commit 392178d

19 files changed

+7949
-1
lines changed

bench/AmqpBunnyBench.php

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
<?php
2+
3+
namespace Enqueue\Bench;
4+
5+
use Bunny\Channel;
6+
use Bunny\Client;
7+
8+
require_once __DIR__.'/../vendor/autoload.php';
9+
10+
/**
11+
* @OutputTimeUnit("seconds", precision=3)
12+
* @Iterations(5)
13+
*/
14+
class AmqpBunnyBench
15+
{
16+
/**
17+
* @var Channel
18+
*/
19+
private $channel;
20+
21+
/**
22+
* @var string
23+
*/
24+
private $queue;
25+
26+
private $bodySize = 10000;
27+
28+
private $body;
29+
30+
private $messagesLimit = 10000;
31+
32+
/**
33+
* @BeforeMethods({"beforeBenchPublish"})
34+
*/
35+
public function benchPublish()
36+
{
37+
for ($i = 0; $i < $this->messagesLimit; ++$i) {
38+
$this->channel->publish($this->body, [], '', $this->queue);
39+
}
40+
}
41+
42+
// /**
43+
// * @BeforeMethods({"beforeBenchConsume"})
44+
// */
45+
// public function benchConsume()
46+
// {
47+
//$count = 0;
48+
//$callback = function($msg) use (&$count, $channel) {
49+
// $count++;
50+
//
51+
// if ($count >= 100000) {
52+
// $channel->callbacks = [];
53+
// }
54+
//};
55+
//
56+
//$startConsumeTime = microtime(true);
57+
//$startConsumeMemory = memory_get_usage();
58+
//
59+
//echo 'Consuming...'.PHP_EOL;
60+
//
61+
//$channel->basic_consume('amqp_lib_bench', 'amqp_lib', false, true, false, false, $callback);
62+
//while(count($channel->callbacks)) {
63+
// $channel->wait();
64+
//}
65+
//
66+
//$endConsumeTime = microtime(true);
67+
//$endConsumeMemory = memory_get_usage();
68+
//
69+
//$channel->close();
70+
//$connection->close();
71+
//
72+
//echo sprintf('Publish took %s seconds, %skb memory', $endPublishTime - $startPublishTime, ($endPublishMemory - $startPublishMemory) / 1000).PHP_EOL;
73+
//echo sprintf('Consume took %s seconds, %skb memory', $endConsumeTime - $startConsumeTime, ($endConsumeMemory - $startConsumeMemory) / 1000).PHP_EOL;
74+
75+
// $this->context->setQos(0, 3, false);
76+
//
77+
// $count = 0;
78+
//
79+
// $callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$count) {
80+
// $count++;
81+
//
82+
// $consumer->acknowledge($message);
83+
//
84+
// if ($count >= $this->messagesLimit) {
85+
// return false;
86+
// }
87+
//
88+
// return true;
89+
// };
90+
//
91+
// $consumer = $this->context->createConsumer($this->queue);
92+
// $consumer->setConsumerTag('enqueue_amqp_lib');
93+
//
94+
// $this->context->subscribe($consumer, $callback);
95+
// $this->context->consume();
96+
// }
97+
98+
public function beforeBenchPublish()
99+
{
100+
$bodySize = ((int) getenv('BODY_SIZE'));
101+
$this->body = str_repeat('a', $bodySize);
102+
103+
$this->queue = 'amqp_bunny_bench';
104+
105+
$connection = [
106+
'host' => getenv('RABBITMQ_HOST'),
107+
'vhost' => getenv('RABBITMQ_VHOST'),
108+
'user' => getenv('RABBITMQ_USER'),
109+
'password' => getenv('RABBITMQ_PASSWORD'),
110+
];
111+
112+
$bunny = new Client($connection);
113+
$bunny->connect();
114+
115+
$this->channel = $bunny->channel();
116+
117+
$this->channel->queueDeclare($this->queue, false, false, false, false);
118+
$this->channel->queuePurge($this->queue);
119+
}
120+
121+
public function beforeBenchConsume()
122+
{
123+
// $this->channel = $this->createContext();
124+
//
125+
// $this->queue = $this->channel->createQueue('enqueue_amqp_consume_bench');
126+
// $this->channel->declareQueue($this->queue);
127+
// $this->channel->purgeQueue($this->queue);
128+
//
129+
// $producer = $this->channel->createProducer();
130+
// foreach (range(1, $this->messagesLimit) as $index) {
131+
// $producer->send($this->queue, $this->channel->createMessage($index));
132+
// }
133+
}
134+
}

bench/AmqpExtBench.php

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
<?php
2+
3+
namespace Enqueue\Bench;
4+
5+
require_once __DIR__.'/../vendor/autoload.php';
6+
7+
/**
8+
* @OutputTimeUnit("seconds", precision=3)
9+
* @Iterations(5)
10+
*/
11+
class AmqpExtBench
12+
{
13+
/**
14+
* @var \AMQPChannel
15+
*/
16+
private $channel;
17+
18+
/**
19+
* @var string
20+
*/
21+
private $queue;
22+
23+
private $bodySize = 10000;
24+
25+
private $body;
26+
27+
private $messagesLimit = 10000;
28+
29+
/**
30+
* @BeforeMethods({"beforeBenchPublish"})
31+
*/
32+
public function benchPublish()
33+
{
34+
/** @var \AMQPExchange $destination */
35+
$amqpExchange = new \AMQPExchange($this->channel);
36+
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
37+
$amqpExchange->setName('');
38+
39+
for ($i = 0; $i < $this->messagesLimit; ++$i) {
40+
$amqpExchange->publish($this->body, $this->queue);
41+
}
42+
}
43+
44+
// /**
45+
// * @BeforeMethods({"beforeBenchConsume"})
46+
// */
47+
// public function benchConsume()
48+
// {
49+
//$count = 0;
50+
//$callback = function($msg) use (&$count, $channel) {
51+
// $count++;
52+
//
53+
// if ($count >= 100000) {
54+
// $channel->callbacks = [];
55+
// }
56+
//};
57+
//
58+
//$startConsumeTime = microtime(true);
59+
//$startConsumeMemory = memory_get_usage();
60+
//
61+
//echo 'Consuming...'.PHP_EOL;
62+
//
63+
//$channel->basic_consume('amqp_lib_bench', 'amqp_lib', false, true, false, false, $callback);
64+
//while(count($channel->callbacks)) {
65+
// $channel->wait();
66+
//}
67+
//
68+
//$endConsumeTime = microtime(true);
69+
//$endConsumeMemory = memory_get_usage();
70+
//
71+
//$channel->close();
72+
//$connection->close();
73+
//
74+
//echo sprintf('Publish took %s seconds, %skb memory', $endPublishTime - $startPublishTime, ($endPublishMemory - $startPublishMemory) / 1000).PHP_EOL;
75+
//echo sprintf('Consume took %s seconds, %skb memory', $endConsumeTime - $startConsumeTime, ($endConsumeMemory - $startConsumeMemory) / 1000).PHP_EOL;
76+
77+
// $this->context->setQos(0, 3, false);
78+
//
79+
// $count = 0;
80+
//
81+
// $callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$count) {
82+
// $count++;
83+
//
84+
// $consumer->acknowledge($message);
85+
//
86+
// if ($count >= $this->messagesLimit) {
87+
// return false;
88+
// }
89+
//
90+
// return true;
91+
// };
92+
//
93+
// $consumer = $this->context->createConsumer($this->queue);
94+
// $consumer->setConsumerTag('enqueue_amqp_lib');
95+
//
96+
// $this->context->subscribe($consumer, $callback);
97+
// $this->context->consume();
98+
// }
99+
100+
public function beforeBenchPublish()
101+
{
102+
$bodySize = ((int) getenv('BODY_SIZE'));
103+
$this->body = str_repeat('a', $bodySize);
104+
105+
$this->queue = 'amqp_bunny_bench';
106+
107+
$extConfig = [];
108+
$extConfig['host'] = getenv('RABBITMQ_HOST');
109+
$extConfig['port'] = getenv('RABBITMQ_AMQP_PORT');
110+
$extConfig['vhost'] = getenv('RABBITMQ_VHOST');
111+
$extConfig['login'] = getenv('RABBITMQ_USER');
112+
$extConfig['password'] = getenv('RABBITMQ_PASSWORD');
113+
114+
$connection = new \AMQPConnection($extConfig);
115+
$connection->pconnect();
116+
117+
$this->channel = new \AMQPChannel($connection);
118+
119+
$queue = new \AMQPQueue($this->channel);
120+
$queue->setName($this->queue);
121+
$queue->declareQueue();
122+
$queue->purge();
123+
}
124+
125+
public function beforeBenchConsume()
126+
{
127+
// $this->channel = $this->createContext();
128+
//
129+
// $this->queue = $this->channel->createQueue('enqueue_amqp_consume_bench');
130+
// $this->channel->declareQueue($this->queue);
131+
// $this->channel->purgeQueue($this->queue);
132+
//
133+
// $producer = $this->channel->createProducer();
134+
// foreach (range(1, $this->messagesLimit) as $index) {
135+
// $producer->send($this->queue, $this->channel->createMessage($index));
136+
// }
137+
}
138+
}

0 commit comments

Comments
 (0)