20
20
use Symfony \Component \Console \Input \InputOption ;
21
21
use Symfony \Component \Console \Output \OutputInterface ;
22
22
use Symfony \Component \Console \Style \SymfonyStyle ;
23
+ use Symfony \Component \Messenger \RoutableMessageBus ;
23
24
use Symfony \Component \Messenger \Transport \Receiver \StopWhenMemoryUsageIsExceededReceiver ;
24
25
use Symfony \Component \Messenger \Transport \Receiver \StopWhenMessageCountIsExceededReceiver ;
25
26
use Symfony \Component \Messenger \Transport \Receiver \StopWhenTimeLimitIsReachedReceiver ;
@@ -38,15 +39,13 @@ class ConsumeMessagesCommand extends Command
38
39
private $ receiverLocator ;
39
40
private $ logger ;
40
41
private $ receiverNames ;
41
- private $ busNames ;
42
42
43
- public function __construct (ContainerInterface $ busLocator , ContainerInterface $ receiverLocator , LoggerInterface $ logger = null , array $ receiverNames = [], array $ busNames = [] )
43
+ public function __construct (ContainerInterface $ busLocator , ContainerInterface $ receiverLocator , LoggerInterface $ logger = null , array $ receiverNames = [])
44
44
{
45
45
$ this ->busLocator = $ busLocator ;
46
46
$ this ->receiverLocator = $ receiverLocator ;
47
47
$ this ->logger = $ logger ;
48
48
$ this ->receiverNames = $ receiverNames ;
49
- $ this ->busNames = $ busNames ;
50
49
51
50
parent ::__construct ();
52
51
}
@@ -57,15 +56,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
57
56
protected function configure (): void
58
57
{
59
58
$ defaultReceiverName = 1 === \count ($ this ->receiverNames ) ? current ($ this ->receiverNames ) : null ;
60
- $ defaultBusName = 1 === \count ($ this ->busNames ) ? current ($ this ->busNames ) : null ;
61
59
62
60
$ this
63
61
->setDefinition ([
64
62
new InputArgument ('receiver ' , $ defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED , 'Name of the receiver ' , $ defaultReceiverName ),
65
63
new InputOption ('limit ' , 'l ' , InputOption::VALUE_REQUIRED , 'Limit the number of received messages ' ),
66
64
new InputOption ('memory-limit ' , 'm ' , InputOption::VALUE_REQUIRED , 'The memory limit the worker can consume ' ),
67
65
new InputOption ('time-limit ' , 't ' , InputOption::VALUE_REQUIRED , 'The time limit in seconds the worker can run ' ),
68
- new InputOption ('bus ' , 'b ' , InputOption::VALUE_REQUIRED , 'Name of the bus to which received messages should be dispatched ' , $ defaultBusName ),
66
+ new InputOption ('bus ' , 'b ' , InputOption::VALUE_REQUIRED , 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically. ' ),
69
67
])
70
68
->setDescription ('Consumes messages ' )
71
69
->setHelp (<<<'EOF'
@@ -84,6 +82,12 @@ protected function configure(): void
84
82
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
85
83
86
84
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
85
+
86
+ Use the --bus option to specify the message bus to dispatch received messages
87
+ to instead of trying to determine it automatically. This is required if the
88
+ messages didn't originate from Messenger:
89
+
90
+ <info>php %command.full_name% <receiver-name> --bus=event_bus</info>
87
91
EOF
88
92
)
89
93
;
@@ -107,24 +111,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
107
111
}
108
112
}
109
113
}
110
-
111
- $ busName = $ input ->getOption ('bus ' );
112
- if ($ this ->busNames && !$ this ->busLocator ->has ($ busName )) {
113
- if (null === $ busName ) {
114
- $ io ->block ('Missing bus argument. ' , null , 'error ' , ' ' , true );
115
- $ input ->setOption ('bus ' , $ io ->choice ('Select one of the available buses ' , $ this ->busNames ));
116
- } elseif ($ alternatives = $ this ->findAlternatives ($ busName , $ this ->busNames )) {
117
- $ io ->block (sprintf ('Bus "%s" is not defined. ' , $ busName ), null , 'error ' , ' ' , true );
118
-
119
- if (1 === \count ($ alternatives )) {
120
- if ($ io ->confirm (sprintf ('Do you want to dispatch to "%s" instead? ' , $ alternatives [0 ]), true )) {
121
- $ input ->setOption ('bus ' , $ alternatives [0 ]);
122
- }
123
- } else {
124
- $ input ->setOption ('bus ' , $ io ->choice ('Did you mean one of the following buses instead? ' , $ alternatives , $ alternatives [0 ]));
125
- }
126
- }
127
- }
128
114
}
129
115
130
116
/**
@@ -136,12 +122,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
136
122
throw new RuntimeException (sprintf ('Receiver "%s" does not exist. ' , $ receiverName ));
137
123
}
138
124
139
- if (!$ this ->busLocator ->has ($ busName = $ input ->getOption ('bus ' ))) {
140
- throw new RuntimeException (sprintf ('Bus "%s" does not exist. ' , $ busName ));
141
- }
142
-
143
125
$ receiver = $ this ->receiverLocator ->get ($ receiverName );
144
- $ bus = $ this ->busLocator ->get ($ busName );
126
+
127
+ if (null !== $ input ->getOption ('bus ' )) {
128
+ $ bus = $ this ->busLocator ->get ($ input ->getOption ('bus ' ));
129
+ } else {
130
+ $ bus = new RoutableMessageBus ($ this ->busLocator );
131
+ }
145
132
146
133
$ stopsWhen = [];
147
134
if ($ limit = $ input ->getOption ('limit ' )) {
@@ -160,7 +147,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
160
147
}
161
148
162
149
$ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
163
- $ io ->success (sprintf ('Consuming messages from transport "%s" on bus "%s" . ' , $ receiverName, $ busName ));
150
+ $ io ->success (sprintf ('Consuming messages from transport "%s". ' , $ receiverName ));
164
151
165
152
if ($ stopsWhen ) {
166
153
$ last = array_pop ($ stopsWhen );
0 commit comments