0% found this document useful (0 votes)
47 views

Design A Distributed Queue

Uploaded by

Kshitij Aggarwal
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
47 views

Design A Distributed Queue

Uploaded by

Kshitij Aggarwal
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 20

Design a

Distributed
Messaging Queue
RabbitMQ or Amazon SQS

A Deep Dive for


System Design Interviews
Design a Distributed Messaging Queue - like
RabbitMQ or Amazon SQS
This is a very popular question. Designing a distributed queue is actually quite
simple once you understand the basics.

Let’s take a look at the requirements of this queue:

Requirements:

1. High Throughput
2. The queue should have persistence as much as we can without compromising
on high throughput
3. Support for multiple consumers and producers
4. Once an item is taken from a queue, it can be deleted. Only one consumer can
access one item

Let’s say we have a single-machine implementation.

We can add and remove items from a created queue.

We can create multiple queues and add and remove items from each one of them by
specifying the queue id or the queue name.

Get more such content: harsh.fyi/subscribe


Using this as a building block, we can figure out how to do this on multiple
machines.

High Throughput Distributed Queue Scenario

Let’s now say we have 3 machines and one queue to create and manage. We have a
large volume of data, so we want this queue to be sharded. How can we manage this?
We have a lot of producers writing to this queue and a lot of consumers pulling from
the queue. How do we handle the high throughput?

Well the logical thing to do will be to partition the queue into 3 separate queues, one
on each machine. Each producer can write to any partition, and consumers can
consume from any partition.

For huge workloads (think Facebook scale), you can partition this queue over 100
machines and achieve a lot of scale.

Get more such content: harsh.fyi/subscribe


There is one problem though - this implementation will not be strictly FIFO for the
entire queue. It will only be FIFO within each partition. For most use cases, this is
ok. For example, if we’re using it as a task queue or as a notifications queue, it is ok if
two tasks are out-of-order within a small time frame.

Get more such content: harsh.fyi/subscribe


We should try to keep all partitions evenly occupied. That will be ideal for the
queue’s performance because we don’t want to overload one partition.

With X machines, our throughput should be X times the throughput of a single


machine.

How to keep the Distributed Queue evenly balanced?

Keeping the queue balanced means writing and reading evenly from all the machines.
If one machine gets starved of items, then it’s readers will be starved as well.

Get more such content: harsh.fyi/subscribe


In the Queue above, readers of Machine B and C will soon get starved because
Machine A is getting a higher rate of writes.

The challenge really becomes writing and reading evenly from all partitions. So we
need some sort of a balancing approach, or some sort of coordinator.

Coordinating between the Queue partitions

Let’s see how to coordinate a queue between 3 machines.

Let’s say we are creating a Queue Q1. We create 3 partitions - P1, P2 and P3 - each on
a different machine. We will need some sort of Queue Manager that keeps track of
the partitions.

Let’s say that we have such a program running on one of the three machines:

Get more such content: harsh.fyi/subscribe


Try to figure this out yourself before reading our solution.

When a producer needs to write to the Queue, it asks the Queue Manager for the IP
address of a machine it can write to. Let’s say the Queue Manager returns the IP
address of Machine C (where P3 is located). The Producer can now establish a
persistent TCP/IP connection and start writing to P3.

Get more such content: harsh.fyi/subscribe


Now you may ask, why is the producer connecting directly with the Partition
Machine?
Why doesn’t it just pass along the message to the Queue Manager, and the Queue
Manager can send it to a partition, like this:

This will make things simple - just hand it off to the Queue Manager and the Queue
manager can take care of the rest. The problem with this approach is that the Queue
Manager becomes a bottleneck. All of the data has to go through it, and there’s only
so much throughput this one machine can handle.

Get more such content: harsh.fyi/subscribe


As the number of producers increases, this might slow down the entire queue. Then
you would have to add more queue managers to handle more writes. Instead of all
this, the general pattern is to directly let the producers connect to the machine. This
way the writes are decentralized, and that makes it more scalable horizontally. This is
also the pattern used in most distributed file systems for writing data. For example,
Google File System uses this exact pattern.

So now we’ve established that the producers will directly connect to the partition
machine and write the data.

So far we had one producer. Now, let’s say one more producer wants to write to the
Queue. The Queue Manager can send this producer to a different Machine so that
the previous partition is not hogged and other partitions are also written to.

This way, the Queue Manager keeps sending new producers to different partitions,
distributing the throughput across partitions.

Get more such content: harsh.fyi/subscribe


As you can see, we also have heartbeat messages going from the Queue Manager to
each partition, regularly communicating the partition’s health with the Queue
Manager. If the partition goes down or becomes overcrowded in relation to the other
partitions, the Queue manager can assign less producers to the partition.

This above setup has a flaw though - can you spot it? Spend some time thinking
about it.

Here it is: This model of assigning a single partition per producer works well if the
producers are homogeneous and more in number than the queue partitions. That

Get more such content: harsh.fyi/subscribe


way, each partition will get a similar amount load, and we don’t need to do any load
balancing.

For load that doesn’t have a lot of bursts, and which has a lot of producer servers, this
will work well. If there is even one “power producer” that is producing a lot of data at
a high rate, one partition will be hogged up.

This model breaks down as soon if we have different producers producing different
quantities of data.

Get more such content: harsh.fyi/subscribe


As we can see above, the power producer fills up one partition quicker. When
consumers read the queue randomly, the filled up partition’s items are at a
disadvantage, because they will be read much later - after the items in other slower
partitions have been read. This reduces the FIFO properties of the queue.

What can we do to solve this? Well, one solution is for each producer to write to
multiple partitions. Each producer can do a round-robin write to different partitions.

Get more such content: harsh.fyi/subscribe


When the producer connects, the Queue manager can give it the IPs of all three
partitions. The producer can then connect to all three machines and write to all 3
machines in a round robin fashion - once to Machine A, then to Machine B, then to
C, and so on.

Get more such content: harsh.fyi/subscribe


Now, you might ask, if I am writing code on the producer, do I need to write a loop
to pick one partition and write, then pick another partition, etc.? No, that will be
handled by the Queue’s client library.

For example, RabbitMQ has client libraries that the producer machine will install.
Let’s say the producer is a Web server and the web server needs to write lots of JSON
objects to the queue. The web server will install your Queue’s (MyQueue) client
library. This library will have functions to connect to the queue and write to it. For
example:

Connection queueConnect = MyQueueManager.get(“My JSON


Queue”);

Get more such content: harsh.fyi/subscribe


queueConnect.enqueue(“{id:33435, data: {...}}”);

The enqueue function and the MyQueueManager library should handle round
robin writes to different partitions. All of this is abstracted away from the end user
aka the producer.

So now we have seen two approaches for writing to the Queue - assign one partition
to a producer and assign multiple partitions to the producer. In reality, different
situations might deem different approaches to be more suitable.

If we have 1000 producers and only 3 machines, then it might make sense to assign
one partition per producer, because evenly spreading so many producers might result
in good load balancing anyway.

In our library, we can give a configurable property for this. The developer can
configure it according to their situation and customize the load balancing.

How do we make sure the system is fault tolerant?

From the discussion in the non-distributed version, we know that till the data flushes
to disk, it is susceptible to being lost if the process crashes or if the machine goes
down.

Get more such content: harsh.fyi/subscribe


This is usually small - the size of the flush interval - usually less than a second.
However, this is still a major cause of concern because in a messaging queue, we want
zero data loss. If you are sending a task to be queued and the messaging queue loses
that task, we have suddenly lost the task. This is not acceptable in most use cases.
Imagine you sending a message to a friend, and that message just disappearing into
thin air without you ever knowing or being informed. That is what will happen if
our queue loses data.

Now, if a hard drive fails, we have suddenly lost all contents in a queue.

Get more such content: harsh.fyi/subscribe


This is not desirable at all. If the drive fails, all the data is not recoverable. Assuming
the process also crashes when the drive fails, the entire queue is lost, including the
data in the RAM. How do we save ourselves from this horrible fate?

We turn to a time tested method of replication. That is the only way to safeguard
against total machine failure.

This means that we need to replicate each partition into multiple machines. Here is
what that would look like:

Get more such content: harsh.fyi/subscribe


Each partition has a leader machine. All reads and writes to that partition go to that
leader machine . Note that in a Queue, a “Read” is also effectively a write because the
item has to be dequeued. So we cannot take advantage of replication to increase read
throughput as we do in replicated databases - where read replicas are able to increase
our read throughput. This might be different in a streaming platform like Kafka
where a read does not delete the queue.

Let’s look at how exactly the replication happens. As we can see, there is a leader
machine for each partition. The producer will connect with the leader machine and
send data. The leader will write data to its partition and to all its synchronous
replicas. Synchronous replicas are those who are updated synchronously with the
leader replica. Let’s say we have 1 synchronous replica. This ensures that the write is
written to 2 machines right away.

Get more such content: harsh.fyi/subscribe


Before we send an acknowledgement to the producer that the write is done, it is
written in 2 machines. This might take longer than writing to one machine, but it
ensures that data loss will be very hard.

In order to lose data, now 2 machines have to go down at the same time, which is
much less likely. To make this even less likely, the administrator can also do things
like ensuring the two machines are connected to different power supplies or different
network routers - so that they don’t have a common cause of failure.

We can also have normal replicas or asynchronous replicas, which are synchronized
after the main replicas returns an ack to the producer. These replicas are faster for the
throughput, because the write is propagated asynchronously. This is what the
replication looks like now after adding the asynchronous replicas.

As we can see, we have a replication factor of 3 here - 2 synchronous and 1


asynchronous replica. You can adjust this as per the data needs. For example, if the
data is not too critical, like user behavior logs, it’s probably ok to lose small amounts

Get more such content: harsh.fyi/subscribe


of data once in a while, so you can do 1 synchronous replica and 1 async replica. It’s
up to the developer to adjust this according to business needs and costs.

After adding fault tolerance, this queueing system can now scale to many machines.
We can auto scale the queue and add more partitions as load increases. To add
another partition to a new machine, we simply create a new partition and point
producers to it.

Get more such content: harsh.fyi/subscribe

You might also like