21
21
use Symfony \Component \Console \Output \OutputInterface ;
22
22
use Symfony \Component \Console \Style \SymfonyStyle ;
23
23
use Symfony \Component \EventDispatcher \EventDispatcherInterface ;
24
+ use Symfony \Component \Messenger \RoutableMessageBus ;
24
25
use Symfony \Component \Messenger \Transport \Receiver \StopWhenMemoryUsageIsExceededReceiver ;
25
26
use Symfony \Component \Messenger \Transport \Receiver \StopWhenMessageCountIsExceededReceiver ;
26
27
use Symfony \Component \Messenger \Transport \Receiver \StopWhenTimeLimitIsReachedReceiver ;
@@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
39
40
private $ receiverLocator ;
40
41
private $ logger ;
41
42
private $ receiverNames ;
42
- private $ busNames ;
43
43
private $ retryStrategyLocator ;
44
44
private $ eventDispatcher ;
45
45
46
- public function __construct (ContainerInterface $ busLocator , ContainerInterface $ receiverLocator , LoggerInterface $ logger = null , array $ receiverNames = [], array $ busNames = [], ContainerInterface $ retryStrategyLocator = null , EventDispatcherInterface $ eventDispatcher = null )
46
+ public function __construct (ContainerInterface $ busLocator , ContainerInterface $ receiverLocator , LoggerInterface $ logger = null , array $ receiverNames = [], ContainerInterface $ retryStrategyLocator = null , EventDispatcherInterface $ eventDispatcher = null )
47
47
{
48
48
$ this ->busLocator = $ busLocator ;
49
49
$ this ->receiverLocator = $ receiverLocator ;
50
50
$ this ->logger = $ logger ;
51
51
$ this ->receiverNames = $ receiverNames ;
52
- $ this ->busNames = $ busNames ;
53
52
$ this ->retryStrategyLocator = $ retryStrategyLocator ;
54
53
$ this ->eventDispatcher = $ eventDispatcher ;
55
54
@@ -62,15 +61,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
62
61
protected function configure (): void
63
62
{
64
63
$ defaultReceiverName = 1 === \count ($ this ->receiverNames ) ? current ($ this ->receiverNames ) : null ;
65
- $ defaultBusName = 1 === \count ($ this ->busNames ) ? current ($ this ->busNames ) : null ;
66
64
67
65
$ this
68
66
->setDefinition ([
69
67
new InputArgument ('receiver ' , $ defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED , 'Name of the receiver ' , $ defaultReceiverName ),
70
68
new InputOption ('limit ' , 'l ' , InputOption::VALUE_REQUIRED , 'Limit the number of received messages ' ),
71
69
new InputOption ('memory-limit ' , 'm ' , InputOption::VALUE_REQUIRED , 'The memory limit the worker can consume ' ),
72
70
new InputOption ('time-limit ' , 't ' , InputOption::VALUE_REQUIRED , 'The time limit in seconds the worker can run ' ),
73
- new InputOption ('bus ' , 'b ' , InputOption::VALUE_REQUIRED , 'Name of the bus to which received messages should be dispatched ' , $ defaultBusName ),
71
+ new InputOption ('bus ' , 'b ' , InputOption::VALUE_REQUIRED , 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically. ' ),
74
72
])
75
73
->setDescription ('Consumes messages ' )
76
74
->setHelp (<<<'EOF'
@@ -89,6 +87,12 @@ protected function configure(): void
89
87
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
90
88
91
89
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
90
+
91
+ Use the --bus option to specify the message bus to dispatch received messages
92
+ to instead of trying to determine it automatically. This is required if the
93
+ messages didn't originate from Messenger:
94
+
95
+ <info>php %command.full_name% <receiver-name> --bus=event_bus</info>
92
96
EOF
93
97
)
94
98
;
@@ -112,24 +116,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
112
116
}
113
117
}
114
118
}
115
-
116
- $ busName = $ input ->getOption ('bus ' );
117
- if ($ this ->busNames && !$ this ->busLocator ->has ($ busName )) {
118
- if (null === $ busName ) {
119
- $ io ->block ('Missing bus argument. ' , null , 'error ' , ' ' , true );
120
- $ input ->setOption ('bus ' , $ io ->choice ('Select one of the available buses ' , $ this ->busNames ));
121
- } elseif ($ alternatives = $ this ->findAlternatives ($ busName , $ this ->busNames )) {
122
- $ io ->block (sprintf ('Bus "%s" is not defined. ' , $ busName ), null , 'error ' , ' ' , true );
123
-
124
- if (1 === \count ($ alternatives )) {
125
- if ($ io ->confirm (sprintf ('Do you want to dispatch to "%s" instead? ' , $ alternatives [0 ]), true )) {
126
- $ input ->setOption ('bus ' , $ alternatives [0 ]);
127
- }
128
- } else {
129
- $ input ->setOption ('bus ' , $ io ->choice ('Did you mean one of the following buses instead? ' , $ alternatives , $ alternatives [0 ]));
130
- }
131
- }
132
- }
133
119
}
134
120
135
121
/**
@@ -147,18 +133,19 @@ protected function execute(InputInterface $input, OutputInterface $output): void
147
133
throw new RuntimeException (sprintf ('Receiver "%s" does not exist. ' , $ receiverName ));
148
134
}
149
135
150
- if (!$ this ->busLocator ->has ($ busName = $ input ->getOption ('bus ' ))) {
151
- throw new RuntimeException (sprintf ('Bus "%s" does not exist. ' , $ busName ));
152
- }
153
-
154
136
if (null !== $ this ->retryStrategyLocator && !$ this ->retryStrategyLocator ->has ($ receiverName )) {
155
137
throw new RuntimeException (sprintf ('Receiver "%s" does not have a configured retry strategy. ' , $ receiverName ));
156
138
}
157
139
158
140
$ receiver = $ this ->receiverLocator ->get ($ receiverName );
159
- $ bus = $ this ->busLocator ->get ($ busName );
160
141
$ retryStrategy = null !== $ this ->retryStrategyLocator ? $ this ->retryStrategyLocator ->get ($ receiverName ) : null ;
161
142
143
+ if (null !== $ input ->getOption ('bus ' )) {
144
+ $ bus = $ this ->busLocator ->get ($ input ->getOption ('bus ' ));
145
+ } else {
146
+ $ bus = new RoutableMessageBus ($ this ->busLocator );
147
+ }
148
+
162
149
$ stopsWhen = [];
163
150
if ($ limit = $ input ->getOption ('limit ' )) {
164
151
$ stopsWhen [] = "processed {$ limit } messages " ;
@@ -176,7 +163,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
176
163
}
177
164
178
165
$ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
179
- $ io ->success (sprintf ('Consuming messages from transport "%s" on bus "%s" . ' , $ receiverName, $ busName ));
166
+ $ io ->success (sprintf ('Consuming messages from transport "%s". ' , $ receiverName ));
180
167
181
168
if ($ stopsWhen ) {
182
169
$ last = array_pop ($ stopsWhen );
0 commit comments