Skip to content

The AmqpConnection does not seem concurrent for consumers #160

@artembilan

Description

@artembilan

Describe the bug

From high perspective, I tried to do concurrent consumers on the same queue from the same connection like this:

				for (String queue : this.queues) {
					for (int i = 0; i < this.consumersPerQueue; i++) {
						Consumer consumer =
								this.connection.consumerBuilder()
										.queue(queue)
										.priority(this.priority)
										.initialCredits(this.initialCredits)
										.listeners(this.stateListeners)
										.messageHandler(new ConsumerMessageHandler())
										.build();
						this.queueToConsumers.add(queue, consumer);
					}
				}

But looks like all of them are parked on the same dispatching-rabbitmq-amqp-0-0-0 thread.
The AmqpConnection logic is like this:

        this.dispatchingExecutorService =
            Executors.newSingleThreadExecutor(
                Utils.threadFactory("dispatching-" + this.name() + "-"));

I can understand why it is a single-threaded: to avoid thread shifting on delivery and overflowing from broker sender.
But isn't that supposed to be per consumer, not per shared connection?

Reproduction steps

Create several consumers from the same connection and observe that all the handlers are called sequentially on the same dispatching-rabbitmq-amqp-0-0-0 thread, even if those consumers are for different queues.

Expected behavior

The dispatchingExecutorService is created per-consumer, not in the shared connection.

Additional context

No response

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions