Skip to content

Commit 9d43404

Browse files
committed
Added two new components: AMQP and Worker
1 parent 06da874 commit 9d43404

File tree

76 files changed

+5404
-1
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+5404
-1
lines changed

.travis.yml

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ env:
1616
global:
1717
- MIN_PHP=5.5.9
1818
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/versions/5.6/bin/php
19+
- RABBITMQ_URL=amqp://guest:guest@localhost:5672/
1920

2021
matrix:
2122
include:
@@ -41,6 +42,7 @@ services:
4142
- memcached
4243
- mongodb
4344
- redis-server
45+
- rabbitmq
4446

4547
before_install:
4648
- |
@@ -82,6 +84,7 @@ before_install:
8284
echo apc.enable_cli = 1 >> $INI
8385
echo extension = ldap.so >> $INI
8486
echo extension = redis.so >> $INI
87+
echo extension = amqp.so >> $INI
8588
echo extension = memcached.so >> $INI
8689
[[ $PHP = 5.* ]] && echo extension = memcache.so >> $INI
8790
if [[ $PHP = 5.* ]]; then
@@ -159,6 +162,7 @@ install:
159162
160163
- if [[ ! $skip ]]; then $COMPOSER_UP; fi
161164
- if [[ ! $skip ]]; then ./phpunit install; fi
165+
- src/Symfony/Component/Amqp/bin/reset.php force
162166
- |
163167
# phpinfo
164168
if [[ ! $PHP = hhvm* ]]; then php -i; else hhvm --php -r 'print_r($_SERVER);print_r(ini_get_all());'; fi

composer.json

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"symfony/polyfill-util": "~1.0"
3333
},
3434
"replace": {
35+
"symfony/amqp": "self.version",
3536
"symfony/asset": "self.version",
3637
"symfony/browser-kit": "self.version",
3738
"symfony/cache": "self.version",
@@ -81,6 +82,7 @@
8182
"symfony/web-link": "self.version",
8283
"symfony/web-profiler-bundle": "self.version",
8384
"symfony/web-server-bundle": "self.version",
85+
"symfony/worker": "self.version",
8486
"symfony/workflow": "self.version",
8587
"symfony/yaml": "self.version"
8688
},

phpunit.xml.dist

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<env name="LDAP_PORT" value="3389" />
2020
<env name="REDIS_HOST" value="localhost" />
2121
<env name="MEMCACHED_HOST" value="localhost" />
22+
<env name="RABBITMQ_URL" value="amqp://guest:guest@localhost:5672/sensiolabs_amqp" />
2223
</php>
2324

2425
<testsuites>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace Symfony\Bundle\FrameworkBundle\Command;
4+
5+
use Symfony\Component\Console\Input\InputArgument;
6+
use Symfony\Component\Console\Input\InputInterface;
7+
use Symfony\Component\Console\Output\OutputInterface;
8+
9+
class WorkerAmqpMoveCommand extends ContainerAwareCommand
10+
{
11+
private $broker;
12+
private $logger;
13+
14+
protected function configure()
15+
{
16+
$this
17+
->setName('worker:amqp:move')
18+
->setDescription('Take all messages from a queue, and send them to the default exchange with a new routing key.')
19+
->setDefinition(array(
20+
new InputArgument('from', InputArgument::REQUIRED, 'The queue.'),
21+
new InputArgument('to', InputArgument::REQUIRED, 'The new routing key.'),
22+
))
23+
;
24+
}
25+
26+
protected function initialize(InputInterface $input, OutputInterface $output)
27+
{
28+
$this->broker = $this->getContainer()->get('amqp.broker');
29+
$this->logger = $this->getContainer()->get('logger');
30+
}
31+
32+
protected function execute(InputInterface $input, OutputInterface $output)
33+
{
34+
$from = $input->getArgument('from');
35+
$to = $input->getArgument('to');
36+
37+
while (false !== $message = $this->broker->get($from)) {
38+
$this->logger->info('Move a message...');
39+
$this->broker->move($message, $to);
40+
$this->broker->ack($message);
41+
$this->logger->debug('...message moved.');
42+
}
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Symfony\Bundle\FrameworkBundle\Command;
4+
5+
use Symfony\Component\Console\Input\InputInterface;
6+
use Symfony\Component\Console\Output\OutputInterface;
7+
8+
class WorkerListCommand extends ContainerAwareCommand
9+
{
10+
protected function configure()
11+
{
12+
$this
13+
->setName('worker:list')
14+
->setDescription('List available workers.')
15+
;
16+
}
17+
18+
protected function execute(InputInterface $input, OutputInterface $output)
19+
{
20+
$workers = $this->getContainer()->getParameter('worker.workers');
21+
22+
foreach ($workers as $name => $_) {
23+
$output->writeln($name);
24+
}
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Symfony\Bundle\FrameworkBundle\Command;
4+
5+
use Symfony\Component\Amqp\Worker\ConfigurableLoopInterface;
6+
use Symfony\Component\Console\Input\InputArgument;
7+
use Symfony\Component\Console\Input\InputInterface;
8+
use Symfony\Component\Console\Input\InputOption;
9+
use Symfony\Component\Console\Output\OutputInterface;
10+
11+
class WorkerRunCommand extends ContainerAwareCommand
12+
{
13+
protected function configure()
14+
{
15+
$this
16+
->setName('worker:run')
17+
->setDescription('Run a worker')
18+
->setDefinition(array(
19+
new InputArgument('worker', InputArgument::REQUIRED, 'The worker'),
20+
new InputOption('name', null, InputOption::VALUE_REQUIRED, 'A name, useful for stats/monitoring. Defaults to worker name.'),
21+
))
22+
;
23+
}
24+
25+
protected function execute(InputInterface $input, OutputInterface $output)
26+
{
27+
$loop = $this->getLoop($input);
28+
29+
$loopName = $input->getOption('name') ?: $loop->getName();
30+
31+
if ($loop instanceof ConfigurableLoopInterface) {
32+
$loop->setName($loopName);
33+
}
34+
35+
$processName = sprintf('%s_%s', $this->getContainer()->getParameter('worker.cli_title_prefix'), $loopName);
36+
37+
// On OSX, it may raise an error:
38+
// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly
39+
@cli_set_process_title($processName);
40+
41+
pcntl_signal(SIGTERM, function () use ($loop) {
42+
$loop->stop('Signaled with SIGTERM.');
43+
});
44+
pcntl_signal(SIGINT, function () use ($loop) {
45+
$loop->stop('Signaled with SIGINT.');
46+
});
47+
48+
$loop->run();
49+
}
50+
51+
private function getLoop(InputInterface $input)
52+
{
53+
$workers = $this->getContainer()->getParameter('worker.workers');
54+
55+
$workerName = $input->getArgument('worker');
56+
57+
if (!array_key_exists($workerName, $workers)) {
58+
throw new \InvalidArgumentException(sprintf(
59+
'The worker "%s" does not exist. Available ones are: "%s".',
60+
$workerName, implode('", "', array_keys($workers))
61+
));
62+
}
63+
64+
return $this->getContainer()->get($workers[$workerName]);
65+
}
66+
}

0 commit comments

Comments
 (0)