-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Description
Describe the bug
From high perspective, I tried to do concurrent consumers on the same queue from the same connection
like this:
for (String queue : this.queues) {
for (int i = 0; i < this.consumersPerQueue; i++) {
Consumer consumer =
this.connection.consumerBuilder()
.queue(queue)
.priority(this.priority)
.initialCredits(this.initialCredits)
.listeners(this.stateListeners)
.messageHandler(new ConsumerMessageHandler())
.build();
this.queueToConsumers.add(queue, consumer);
}
}
But looks like all of them are parked on the same dispatching-rabbitmq-amqp-0-0-0
thread.
The AmqpConnection
logic is like this:
this.dispatchingExecutorService =
Executors.newSingleThreadExecutor(
Utils.threadFactory("dispatching-" + this.name() + "-"));
I can understand why it is a single-threaded: to avoid thread shifting on delivery and overflowing from broker sender.
But isn't that supposed to be per consumer, not per shared connection?
Reproduction steps
Create several consumers from the same connection and observe that all the handlers are called sequentially on the same dispatching-rabbitmq-amqp-0-0-0
thread, even if those consumers are for different queues.
Expected behavior
The dispatchingExecutorService
is created per-consumer, not in the shared connection.
Additional context
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working