0% found this document useful (0 votes)
4 views83 pages

System Design of Distributed Cache

Caching is essential for web applications to improve response times and maintain functionality during data store outages. A distributed cache allows for scalability and high availability by partitioning data across multiple machines, using strategies like least recently used (LRU) for data management. Implementing consistent hashing helps efficiently manage data distribution and minimize cache misses when adding or removing cache hosts.

Uploaded by

ajay16oct
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
4 views83 pages

System Design of Distributed Cache

Caching is essential for web applications to improve response times and maintain functionality during data store outages. A distributed cache allows for scalability and high availability by partitioning data across multiple machines, using strategies like least recently used (LRU) for data management. Implementing consistent hashing helps efficiently manage data distribution and minimize cache misses when adding or removing cache hosts.

Uploaded by

ajay16oct
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 83

Why do we need cache ?

A web application backed by a data store. This data store may be a database or another
web service.Client makes a call to the web application, which in turn makes a call to the data
store and the result is returned back to the client.
There may be several issues with this setup :
1. Calls to the data store may take a long time to execute or may utilize a lot of system
resources.It would be good to store at least some results of these calls in memory, so that
these results are retrieved and returned back to the client much faster and if the data store is
down or experiences a performance degradation and calls to the data store start to fail, our
web application may still process requests as usual, at least for some period of time. So,
storing data in memory will help to address these issues.
When a client request comes, we first check the cache and try to retrieve information from
memory (write through cache strategy) and only if data is unavailable or stale, we then make
a call to the datastore.
Why do we call it a distributed cache?
Because the amount of data is too large to be stored in memory of a single machine and we
need to split the data and store it across several machines.
Requirements :
a) We need to implement two main operations:
1. Put and,
2. Get
Put stores object in the cache under some unique key and get retrieves object from the
cache based on the key.
b) we want to design scalable, highly available and fast cache.
High scalability will help to ensure our cache can handle increased number of put and get
requests and be able to handle increasing amount of data we may need to store in the
cache.
High availability will help to ensure that data in the cache is not lost during hardware failures
and cache is accessible in case of network partitions.This will minimize number of cache
misses and as a result number of calls to the datastore.
High performance is probably the number one requirement for the cache. The whole point of
the cache is to be fast as it is called on every request.
To design a distributed system, think about the following 4 requirements
1. Scalability,
2. Availability,
3. Performance,
4. Durability ( if data persistence is important)
Let’s start with the local cache first and later we will see how distributed cache design
evolves from the solution for the local cache.
So, we start with a single server and need to implement a basic in-memory data store, that
has limited capacity. Local cache implementation is an algorithmic problem. We need to
come up with data structures and an algorithm for storing and retrieving data.
What data structure comes to your mind when you hear about cache. Most likely the same
that comes to my mind, which is a hash table. We add key-value pairs to the hash table and
retrieve them in constant time. Simple, right?
But only until the moment when we reach maximum hash table size and cannot add any
more elements. We need to evict some old data from the hash table before new data can be
added.
What data should be evicted?
It turns out there are several dozens of different approaches, so called eviction or
replacement policies. One of the easiest to implement is the least recently used policy,
where we discard the least recently used items first (LRU) But hash tables do not track
which entry has been used recently.
Meaning that we need one more data structure to keep track of what was used when. Some
kind of a queue, but with the constant time for add, update and delete operations. And I
already hear you shouting: doubly linked list and I have no other choice than to agree with
you.
When GET operation is called, we first check if this item is in the cache (hash table). If item
is not in the cache, we return null immediately. If item is found, we need to move it to the
head of the list and return the item back to the caller, but why do we need to move the item
to the list head?
To preserve the order of use. Item at the head of the list is the most recently used and item
at the tail of the list is the least recently used. So, when cache is full and we need to free
space for a new item, we remove an item at the tail of the list.
When PUT operation is called, we also check if item is in the cache and if found, we update
the item value and move the item to the head of the list. In case ,item is not in the cache, we
need to check if cache is full.
If cache has capacity (not full), we simply add this item to the hash table and to the list
(at the head position).
If cache is full, we need to free some space first. We take an item at the tail and remove it
from both the hash table and the list.Now we have space for a new element and we add it to
both the hash table and the list.
LRU cache Flow
Now, let’s run a simple simulation that will help to further understand and then code the
algorithm.
Cache is initially empty, so we just keep adding new items to it. We always add new items to
the head of the list.

4 items have been added and cache became full.


Now, GET operation has been called for item B, and we need to move it to the head. Right
after that item A has been accessed (also a GET call) and we move it to the head as well.
Now, PUT operation has been called. For a new item E.
We first delete the item that is at the list tail, because cache is full.

And then add item E to both the hash table and the list.
Easy, right?
Then let’s code the least recently used cache. First, let’s define a class, which instances will
be stored in the cache. Class stores key, value and references to the previous and next
nodes in the linked list.
In the cache class we define :
1. References to the hash table (HashMap class in Java),
2. Cache size
3. References to the head and tail of the doubly liked list.
4. Constructor accepts a single argument — cache size.
How to make it distributed ?
Distributed cache cluster : We can start with a really straightforward idea, when we move the
least recently used cache we just implemented to its own host. The benefit of this, we can
now make each host to store only chunk of data, called shard. Because data is split across
several hosts, we now can store much more data in memory. Service hosts know about all
shards, and they forward put and get requests to a particular shard.
Co-located cache :The same idea, but a slightly different realization, is to use service hosts
for the cache. We run cache as a separate process on a service host and data is also split
into shards and similar to the first option, when service needs to make a call to the cache, it
picks the shard that stores data and makes a call.
Let’s compare both approach :
Dedicated cluster helps to isolate cache resources from the service resources. Both the
cache and the service do not share memory and CPU anymore and can scale on their own.
Dedicated cluster can be used by multiple services. Dedicated cluster also gives us flexibility
in choosing hardware.
Co-located cache, the biggest benefit is that we do not need a separate cluster. This helps to
save on hardware cost and is usually less operationally intensive than a separate and with
co-location, both the service and the cache scale out at the same time. We just add more
hosts to the service cluster when needed.

Dedicated VS Co-located Cache


Ok, we have implemented a least recently used cache and made it runnable as a separate
process, we told cache clients to call the cache process using either TCP or UDP
connection.
But how do cache clients decide which cache shard to call?
Let’s discuss a native approach first.
1. A MOD function : Based on the item key and some hash function we compute a hash. We
divide this hash number by a number of available cache hosts and take a remainder. We
treat this remainder as an index in the array of cache hosts.
For example, we have 3 cache hosts and hash is equal to 8. 8 % 3 = 2, so the cache host
with index 2 will be selected by the service to store this item in the cache and while retrieving
the item from the cache.
But what happens when we add a new cache host (or some host dies due to hardware
failures)? The MOD function will start to produce completely different results. Service hosts
will start choosing completely different cache hosts than they did previously, resulting in a
high percentage of cache misses. A much better option is to use consistent hashing.
2. Consistent hashing is based on mapping each object to a point on a circle. We pick an
arbitrary point on this circle and assign a 0 number to it. We move clockwise along the circle
and assign values. We then take a list of cache hosts and calculate a hash for each host
based on a host identifier, for example IP address or name. The hash value tells us where
on the consistent hashing circle that host lives and the reason we do all that, is that we want
to assign a list of hash ranges each cache host owns. Specifically, each host will own all the
cache items that live between this host and the nearest clockwise neighbor. It can be counter
clockwise, not matter much.
So, for a particular item, when we need to look up what cache host stores it, we calculate a
hash and move backwards to identify the host. In this case, host 4 is storing the item and
what happens when we add a new host to the cache cluster?
Same as before, we calculate a hash for a new host, and this new host becomes responsible
for its own range of keys on the circle. While its counter clockwise neighbor (host 4 in this
case) becomes responsible for a smaller range. In other words, host 6 took responsibility for
a subset of what was formerly owned by host 4 and nothing has changed for all the other
hosts. Which is exactly what we wanted, to minimize a number of keys we need to re-hash.
Consistent hashing is much better than MOD hashing, as significantly smaller fraction of
keys is re-hashed when new host is added or host is removed from the cache cluster.
Cache client is that component. It’s a small and lightweight library, that is integrated with the
service code and is responsible for the cache host selection. Cache client knows about all
cache servers and all clients should have the same list. Otherwise, different clients will have
their own view of the consistent hashing circle and the same key may be routed to different
cache hosts.
Client stores list of cache hosts in sorted order (for a fast host lookup) and binary search can
be used to find a cache server that owns the key and if cache host is unavailable, client
proceeds as though it was a cache miss.
List of cache hosts is the most important knowledge for clients and what we need to
understand, is how this list is created, maintained and shared among all the clients.
Let’s discuss several options.
In the first option we store a list of cache hosts in a file and deploy this file to service hosts
using some continuous deployment pipeline. We can use configuration management tools
such as chef and puppet to deploy file to every service host. This is the simplest option but
not very flexible.
Every time list changes we need to make a code change and deploy it out to every service
host.
What if we keep the file, but simplify the deployment process?
Specifically, we may put the file to the shared storage and make service hosts poll for the file
periodically. This is exactly what the second option is about.
All service hosts try to retrieve the file from some common location, for example S3 storage
service.
To implement this option, we may introduce a daemon process that runs on each service
host and polls data from the storage once a minute or several minutes. The drawback of this
approach is that we still need to maintain the file manually. Make changes and deploy it to
the shared storage every time cache host dies or new host is added.
It would be great if we could somehow monitor cache server health and if something bad
happens
to the cache server, all service hosts are notified and stop sending any requests to the
unavailable cache server and if a new cache server is added, all service hosts are also
notified and start sending requests to it.
To implement this approach, we will need a new service, configuration service, whose
purpose is to discover cache hosts and monitor their health. Each cache server registers
itself with the configuration service and sends heartbeats to the configuration service
periodically. As long as heartbeats come, server is keep registered in the system. If
heartbeats stop coming, the configuration service unregisters a cache server that is no
longer alive or inaccessible and every cache client grabs the list of registered cache servers
from the configuration service.
Let’s quickly summarize what we have discussed so far.
Functional Requirements :
1. To store more data in memory we partition data into shards and put each shard on its own
server.
2. Every cache client knows about all cache shards and cache clients use consistent
hashing algorithm to pick a shard for storing and retrieving a particular cache key.
Non-functional requirements :
Fast, highly scalable and available distributed cache.
Have we built a highly performant cache?
Yes. Least recently used cache implementation uses constant time operations.Cache client
picks cache server in log n time, very fast and connection between cache client and cache
server is done over TCP or UDP, also fast. so, performance is there.
Have we built a scalable cache?
Yes. We can easily create more shards and have more data stored in memory. Although
those of you who did data sharding in real systems know that common problem for shards is
that some of them may become hot. Meaning that some shards process much more
requests than their peers. Resulting in a bottleneck and adding more cache servers may not
be very effective.
With consistent hashing in place, a new cache server will further split some shard into two
smaller shards. But we do not want to split any shard, we need to split a very concrete one
and high availability is not there at all. If some shard dies or becomes unavailable due to a
network partition, all cache data for that shard is lost and all requests to that shard will result
in a cache miss, until keys are re-hashed.
Can you think of a mechanism that will help us to improve availability and better deal with a
hot shard problem?
Data replication. There are many different techniques for data replication. We can distinguish
two categories of data replication protocols.
The first category includes a set of probabilistic protocols like gossip, epidemic broadcast
trees, bimodal multicast. These protocols tend to favor eventual consistency.
The second category includes consensus protocols such as 2 or 3 phase commit, paxos,
raft, chain replication. These protocols tend to favor strong consistency.
Let’s keep things simple and use leader follower (also known as master-slave) replication.
For each shard we will designate a master cache server and several read replicas. Replicas
(or followers) try to be an exact copy of the master. Every time the connection between
master and replica breaks, replica attempts to automatically reconnect to the master and
replicas live in different data centers, so that cache data is still available when one data
center is down.
All put calls go through the master node, while get calls are handled by both master node
and all the replicas and because calls to a cache shard are now spread across several
nodes, it is much easier to deal with hot shards. We may scale out by adding more read
replicas and while talking about leaders, we need to mention how these leaders are elected.
There are two options:
1. We can rely on a separate component, let’s call it a Configuration service.
2. If we want to avoid a separate component, we can implement leader election in the cache
cluster.
Configuration service is responsible for monitoring of both leaders and followers and
failover.if some leader is not working as expected, configuration service can promote
follower to leader. Configuration service is a distributed service by its nature. Zookeeper is a
good candidate for a configuration service. Redis also implemented Redis Sentinel for this
purpose.
Ok, by introducing data replication we are able to better deal with hot shard problem and
also increased availability.
Increased, but we did not actually achieve true high availability. Why?
Because there are still points of failure. We do data replication asynchronously, to have a
better performance. We do not want to wait until leader sever replicates data to all the
followers and if leader server got some data and failed before this data was replicated by
any of the followers, data is lost.
And this is actually an acceptable behavior in many real-life use cases, when we deal with
cache.
The first priority of the cache is to be fast, and if it loses data in some rare scenarios, it
should not be a big deal. This is just a cache miss and we should design our service in a
way that such failures are expected.
Let’s see what other topics may pop up during an interview.
Distributed cache we built favors performance and availability over consistency. There are
several things that lead to inconsistency.
1. We replicate data asynchronously to have a better performance. So, a get call processed
by the master node, may return a different result than a get call for the same key but
processed by a read replica.
2. Another potential source of inconsistency is when clients have a different list of cache
servers.
Cache servers may go down and go up again, and it is possible that a client write values that
no other clients can read.
Can we fix these issues ?
Yes. Introduce synchronous replication and make sure all clients share a single view of the
cache servers list but this will increase latency and overall complexity of the system. I highly
encourage you to discuss these tradeoffs with your interviewer when you get a chance.
Least recently used algorithm evicts data from cache when cache is full. But if cache is not
full, some items may sit there for a long time and such items may become stale.
To address this issue, we may introduce some metadata for a cache entry and include time-
to-live attribute.
There are two common approaches to how expired items are cleaned up from cache.
1. We can passively expire an item, when some client tries to access it, and the item is found
to be expired.
2. We can actively expire, when we create a maintenance thread that runs at regular
intervals and removes expired items.
As there may be billions of items in the cache, we cannot simply iterate over all cache items.
Usually, some probabilistic algorithms are used, when several random items are tested with
every run. Services that use distributed (or remote) cache, often use local cache as well. If
data is not found in the local cache, a call to the distributed cache is initiated.
To make the life of service teams easier, so they do not need to deal with both caches, we
can implement a support for the local cache inside the cache client. So, when a cache client
instance is created, we also construct a local cache. This way we hide all the complexity
behind a single component — cache client.
We can utilize previously introduced LRU cache implementation as a local cache, or use
well-known3-rd party implementations, for example Guava cache.Caches are optimized for
maximum performance, as well as simplicity and not optimized for security.
Caches are usually accessed by trusted clients inside trusted environments and we should
not expose cache servers directly to the internet, if it is not absolutely required. For these
reasons we should use a firewall to restrict access to cache server ports and ensure only
approved clients can access the cache.
Clients may also encrypt data before storing it in cache and decrypt it on the way out. But we
should expect performance implications.
Consistent hashing algorithm is great. Simple and effective. But it has two major flaws: so
called domino effect and the fact that cache servers do not split the circle evenly.
Let’s clarify this.
1. Domino effect may appear when the cache server dies and all of its load is transferred to
the next server. This transfer might overload the next server, and then that server would fail,
causing a chain reaction of failures.
2. The second problem, remember how we placed cache servers on the circle. Some
servers may reside close to each other and some may be far apart. Causing uneven
distribution of keys among the cache servers. To deal with these problems, several
modifications of the consistent hashing algorithm have been introduced. One simple idea is
to add each server on the circle multiple times.You can also read about Jump Hash
algorithm (a paper published by Google in 2014) or proportional hashing (algorithm used by
Yahoo! Video Platform).
We started with a single host and implemented the least recently used cache. Because local
cache has limited capacity and does not scale.
We decided to run our LRU cache as a standalone process either on its own host or on the
service host and we made each process responsible for its own part of the data set.
We introduced a consistent hash
ing ring, a logical structure that helps to assign owners for ranges of cache keys and we
introduced a cache client that is responsible for routing requests for each key to a specific
shard that stores data for this key.We could have stopped right there. These simple ideas
proved to be very effective in practice.
We introduced master-slave data replication for monitoring leaders and read replicas and to
provide failover support, we brought in a configuration service. Which is also used by cache
clients for discovering cache servers.
Thanks to Mikhail Smarshchok.

System design of a Grocery System (Amazon Fresh / BigBasket/ JioMart)

Table of Contents:

• Introduction

• System design of Amazon Fresh

o Requirements

o Capacity estimates

▪ Storage

▪ Bandwidth

▪ Number of servers needed


o High-level design

▪ Database design

▪ APIs

▪ Achieving other requirements

o Components

o Recommended technologies and algorithms

▪ Database

▪ Server

▪ Network protocol

▪ Load balancer routing method

▪ Geo-hashing

• Conclusion

Note: Amazon and the Amazon logo are trademarks of Amazon.com, Inc. or its affiliates.

Introduction

The topic of this article at OpenGenus is system design, namely the system design of a grocery
system such as Amazon Fresh or BigBasket or Flipkart Grocery or JioMart or DMart.

The Amazon Fresh application is an online grocery store, within which users can view products, add
them to cart, order, and have them delivered in just about any place, where they can either show up
to take them(attended delivery) or provide a 2-hour delivery window for the order to be placed in a
safe space to be picked later, without their presence being required(unattended delivery).

Before diving right into the intricacies of such a task, let's talk a bit about system design.

Why would we design a system?

Now, if we were to think about most applications that we develop, we clearly wouldn't need to
design a system. If the application is scaled relatively low, in other words, has few requests and
doesn't get much traffic, designing a system for it isn't necessary at all.

However, as the application grows, things that can go wrong start affecting a larger number of users
and begin causing major financial losses.
Those simple problems that you paid no attention to in your small-scale application can have effects
hardly imaginable when applied to larger applications.

Here are some benefits of system design:

• more reliable

• cost effective

• greater performance, lower latency

• scalability
So, when exactly should we start designing a system? What is the threshold that needs to be reached
in order for the application to require such a solution?

While there is no exact limit that we can reference, we can confidently say that such a solution is
recommended when building a large scale distributed system.

What is a large scale distributed system?

Let's break this down.

Large scale means that the system deals with a lot of data, it is being used by a lot of people, has a
lot of requests, needs to be extremely performant, is updated frequently etc.

Distributed means that it runs on multiple servers that need to be abstracted and hidden away from
the client (that is, regardless of how many components go into building the system, the client only
sees an intuitive, performant application).

Let's get back to our goal. Designing a grocery store such as Amazon Fresh.

One can probably imagine that Amazon Fresh can easily fit into the category of a large scale
distributed system. Let's do a bit of math, though. We're going to need to get used to it anyway.

According to a report on the internet, Amazon gets about 197 million users per month. If we were to
divide that by 8, the number of store subsidiaries Amazon has, we would get roughly 24 million users
per month. It's definitely extensively scaled.

System design of Amazon Fresh / BigBasket/ JioMart

Now that we've established that Amazon Fresh is system design material, let's go ahead and design a
system for such a product.

Here are the main concepts we need to focus on in order to successfully create a system design:

• Requirements

• Capacity estimates

• High-level design

• Components

Requirements

The first notion we will turn our attention to is the requirements of the app. Simply put, what
functionalities will this product have?

When establishing this point, we need to outline the core functionalities, those without which the
application couldn't function properly. There are going to be secondary cases we won't cater for
while doing this.

Here is a list of the core functionalities for a grocery system such as Amazon Fresh:

• View all the products, by category

• Search for products

• View product page


• Check stock of product

• Add a product to cart

• Comment on product

• Upload a product

• Order

• Assign the delivery

• Track the delivery

Capacity estimates

Here we will talk about two concepts, depending on the way data is accessed.

• Storage

• Bandwidth

Getting started

Since there are no reports made regarding the average number of products a person views or the
number of products that get added daily, we're just going to assume an average based on the
number of montly active users.

24,000,000 MAU / 30 = 800,000 DAU

800,000 * 25 = 20,000,000 products viewed per day

5,000 * 3 = 15,000 products added per day

Explanation:

We presumed that the number of monthly active users is roughly 24 million, which would mean
800,000 daily active users if we used the average number of days a month can have (30).
Of those 800k users, let's assume that the average user who opens the platform views about 25
products. That would mean that 20 million products are viewed every day.
Of the 800,000 daily users, let's assume only 5,000 upload products on any given day. Let's say that
each of them uploads an average of 3 products daily. This gives us 15,000 products added daily.

Storage

Great! Now we can go ahead and talk about storage capacity. This refers to the number of items that
are being uploaded or the "writes" of the application.

15,000 products added per day * 5MB = ~75GB per day

Explanation:
We already know from before that roughly 15,000 products get added every day. Assuming
uploading a product costs us about 5MB, that would mean the capacity of the storage should be
about 75GB per day.

Result:
75GB per day
Bandwidth

This refers to the number of items that are being accessed or the "reads" of the application.

20,000,000 products viewed per day * 5MB = 100TB per day

Explanation:
We already know from before that roughly 20 million products get viewed every day, so assuming
viewing a product costs us 5MB, that means the capacity of the bandwidth should be roughly 100TB
per day.

Result:
100TB per day

Number of servers needed

Let's have a bit more fun by trying to estimate exactly how many servers a product such as Amazon
Fresh would need.

When talking about the servers, we are going to be talking in terms of CPU cores, RAM memory
needed and hard drive space(storage) needed.

We will assume each server is higher-end, having 4 CPU cores, 8GB RAM and 120GB SSD storage.

• CPU cores

800,000 DAU / 250 users per core = 3200 cores

3200 cores / 4 cores per server = 800 servers

Explanation:
We know from previous calculations that the website would get about 800,000 daily active users.
Each CPU core can typically handle about 250 users, so that means we would need at least 3200 CPU
cores in order for the website to function. If we translate it to servers with the properties we set
before, that would amount to about 800 servers.

• RAM memory
100TB memory per day / 0.12TB per server = 834 servers

Explanation:
We know from previous calculations that about 100TB of memory is required daily, so we can obtain
the necessary number of servers by dividing by the total RAM memory by the RAM memory each
server has. This way, we need at least 834 servers for the website to function properly.

• SSD (Storage)
75GB per day / 120GB per server =~ 1

Explanation:
Since we worked out that about 75GB of data is getting stored every day, we can divide that number
by the capacity of a server. Here is where we realize that the application really doesn't save that
much information, and is, therefore, read-driven. 1 server would be sufficient if we were only
relating to storage.
So, after all those computations, we can work out how many servers we really need. The results we
got from all the computations were 800, 834, and 1. To be sensible, we should choose the largest
one and then some.

A great number here would be about 900 servers, to equate for any failures that may occur.

Result:
900 servers

High-level design

Within this category, we will deal with two concepts:

• Database design

• Server APIs

Database design

First of all, let's design the database.

Most important of all, we shall need a User class, with an user id by which it can be identified and
other relevant information that can be stored in order to make the process of ordering
straightforward(such as address, email address, name etc).

Then, a Product class will be required, storing an unique id, the user id of the person that posted it,
and other information that can be displayed on the product page to help customers understand what
it is that they're buying (title, description, images, price etc).

One functionality we mentioned in the requirements was commenting on a product. To be able to do


that, we are going to need a Comment class, which will store a comment id, by which it will be
identified, a product id, by which the product on which was commented will be recognized, and an
user id, which will point to the user who composed the comment. Moreover, we are going to need to
store the text of the comment and the time at which it was added.

Another concept we will need to consider is the shopping cart. We are probably going to need a
ShoppingCart class, which will store an unique id and the user's id. Then, we're also going to need a
ShoppingCartItem, which is going to have its own id, its cart's id, the product's id and a quantity.

Moreover, we will probably need to have a table dedicated specifically to stores. A Store class should
contain an unique id for the shop, a geo-hash(more on this later, but, in short, it computes the store's
location) and any other optional properties we might feel are necessary(such as zip code).

One last class we will need is the Driver class. The main purpose of this will be to track the location
of the drivers. This class should include the following properties: an unique id for the driver, their
geo-hash, and any other relevant information such as first and last name.

Here is a diagram of what we have discussed so far:

APIs

Another important part without which our application wouldn't be able to function are the APIs
where information is stored and from where it can be retrieved.
Let's look at our requirements and devise server APIs for those that need it.

• Viewing all the products by category => Here, we'll need an API that retrieves the products
based on the category they are in. Let's assume we also want to only load a few at a time, so
the application doesn't lag(if there are a lot of products and we request all at the same time,
it has a high risk of doing so). A getProductByCategory(category, limit) API could be
reasonable.

• Search for products => Although it still has to return a number of products that match a
criterion, it is not based on category, so we shall need a new API for it.
getProductByTitle(query, limit) could be good. Of course, we would construct the function
so that titles that match the query parameter would pop up, not only exact replicas of the
query.

• View product page => While a product page is standard, the information presented in it is
specific to the product and, therefore, has to be obtained from a specific API.
getProduct(product_id) is the go-to here.

• Comment on product => This is the first API with a method other than GET. Here, we are not
asking for anything, but rather we are adding something to our database. This method is
called POST. postComment(user_id, product_id, body, time) should be sufficient. We are
specifying the user who posted the comment, the product on which the comment was
posted, the actual text of the comment and the time at which it was posted.

• Upload a product => This is another API with a POST method. Let's call it
postProduct(user_id, info). The user_id is the ID of the user who posted it, and the info is an
object that has all the relevant information of the product, since it would be tedious to have
to write all properties down every time we call the API.

• Add to cart => This API also uses a POST method. We could call it addToCart(user_id,
product_id, quantity). This would create a new shoppingCartItem and append it to the user's
existent shoppingCart.

Take a look at the summary of the APIs of this product:

Achieving other requirements

Here, we'll talk about how to achieve the requirements that don't necesarily need a built API, but
rather are better off using external services.

• Check stock of product => In order to be able to add a product to cart, we first have to check
whether that product is available in stores near our location. What this should do is take the
current location, decide which facility is nearest, access the inventory database, and check
the availability of the product in said establishment. This is usually achieved with an
inventory management system, rather than making calls directly to the database. If we
receive a positive response, we can go ahead and allow adding it to cart. Otherwise, we
should disable the 'add to cart' function and indicate that the product is not available in an
intuitive way.
• Order => This is the most dangerous tass we have to deal with here, because it involves real
world implication, namely money. Instead of risking anything, we could use other already
established products that deal with this in an efficient way, called payment processors. Such
tools would ensure payments on our application are done safely. Some of the most famous
payment processors are Stripe, PayPal and Square.
When adding products to cart or ordering said products, we also have to consider the
inventory. So, for example, if one user adds a few products to cart, the purchased quantity of
those products has to be marked inavailable for a standard amount of time displayed on the
website. When ordering it, the product has to be removed all together from the database.
This is done through the inventory management system.

• Assign the delivery => Here, we are going to focus on how drivers are assigned to the
delivery of goods.
Keep in mind we are working on making an efficient app. Thus, in order for someone to be
assigned to deliver the order, we will need to find the quickest and most performant way to
do so. In this case, the most efficient way to assign a delivery is to find the closest driver.
We already know the location of the store that's being ordered from (check the geo-hashing
algorithm explained at the additional resources for a bit more information on this). What we
need now is to know the location of the drivers. Sure, we could request their latitude and
longitude once and then hash it as we did for the stores, but it's likely not going to work,
since, unlike stores, drivers move around.
The most efficient solution here would be to keep track of the driver's area in a driver
table, which we will need to add to our database solution.
The way this would work is by using a driver update service. Each driver would have to
run a software on their phone, that would request their location(latitude and longitude)
periodically, such as every 10 seconds. The driver update service should be called every time
and convert or call another service that converts the latitude and longitude to geo-hash.
Then, based on their geo-hash, the drivers could be assigned orders.

• Track the delivery => In order to track the delivery, we need to establish a connection
between the client and the driver. This would likely be done by using a load balancer, so that
they can be connected to the same server, and then proceeding to send the latitude and
longitude of the driver to the client. There could be a debate over the protocol being used to
send the data, but something like HTTP could be just fine.

Components

Now let's look at the actual components into play in more depth.

We may want to add other components, such as load balancers, cache and others in order to
improve the performance of our app.

Here is an extremely abstracted solution:

Keep in mind that this is very broad. There could be a lot more done for optimization. For example, a
large platform like Amazon would probably have a lot more servers and many of load balancers.

My goal here is to emphasize that the client, server and database are clearly not the only instances in
this system. A large scale application would probably require a CDN and caching for improving their
performance and saving money, considering that reading information from memory is about 100
times faster than from the database. Also, caching would not only be done on the database, but
rather also on additional layers, such as the DNS, the CDN, and the application server.

Moreover, we added the load balancer to prevent single-point failure. Imagine what could happen if
the whole product would depend on one single server. Any difficulty to that server and the app is
down! This is why scaling horizontally is amazing.

Recommended technologies and algorithms

Let's discuss one final point, that is, the technologies and algorithms that can be used in order to
create all that we talked about today.

Database

In the matter of the database, a SQL, relational type of database would be recommended. It would
be the best choice because it synchronizes different objects easier, which is very useful when dealing
with sensitive actions, such as payment.

Some examples of databases that could be used:

• MySQL

• Oracle

• PostgreSQL

Server

There are a lot of technologies that can be considered for this task. Here, the most important
criterion would probably be the programming language. Some technologies are easier to use than
others, so researching this is also important.

For this application, I would probably go with Django, since it is easy to use, authentication can be
done easily, and it is designed precisely for large scale application development. Django uses Python.

If other teams feel more experienced in Javascript, for example, Node.js or Express.js are also great,
top choices.

Network protocol

When choosing a network protocol, we are going to want to focus on security most of all. We can
choose HTTPS if we'd prefer a stateless protocol, or something like SSL if we'd rather a stateful
protocol. This, of course, also depends on the request that is being made and every detail should be
taken into account, since, in a large application, any delay that may seem minor can compound.

Load balancer routing method

Let's also decide on which load balancer routing algorithm is the best.

The best routing method for such a website would definetly be IP hashing algorithm. Described
simply, this hashes the user's IP the first time they log onto the website, and then redirects said user
to the same server every time they access the application.

Geo-hashing

One additional point that is worth making before we end this article is geo-hashing.
This algorithm enables us to do something we spoke about at serveral earlier points, but didn't go
too deep into. All throughout the article, we discussed being able to locate the nearest store to the
client, and then to the driver when assigning the delivery. But how exactly can we tell what each
store's location is?

One great way would be a using geosharded database for the stores. What this means is that we
would take each area as a 'box', give it a hash, and then start dividing the area by a constant
number(for example, 3) in smaller 'boxes'. Each of these smaller boxes need to start with the same
hash as that of the parent 'box', and then have added to them one unique character at the end. As it
goes in deeper and deeper, it accounts for each location on the map.

Then, when we get the latitude and longitude of an user or driver, we will know which store they are
closest to.

The hashing API should be taken from external sources, especially since applications that have their
focus on geolocation have worked tremendously to improve theirs. One such great example would
be Uber's.

Conclusion

In conclusion of this OpenGenus article, a system such as Amazon Fresh/ BigBasket/ JioMart is not at
all easy to design nor build.

Our goal here was to design a framework, a frame of reference of how such a system can be
achieved. Intermediate solutions can also be added, that is, tools by various companies that can
enhance the users' experience or make the developers' work easier.

I hope that this article at OpenGenus leads you to consider what goes into designing and building
large scale distribted systems and provides you with a frame of points to follow when doing so.

Design a Distributed Job Scheduler - System Design Interview

A distributed job scheduler is a system designed to manage, schedule, and execute tasks (referred
to as "jobs") across multiple computers or nodes in a distributed network.
Distributed job schedulers are used for automating and managing large-scale tasks like batch
processing, report generation, and orchestrating complex workflows across multiple nodes.

In this article, we will walk through the process of designing a scalable distributed job scheduling
service that can handle millions of tasks, and ensure high availability.

1. Requirements Gathering

Before diving into the design, let’s outline the functional and non-functional requirements.

Functional Requirements:

1. Users can submit one-time or periodic jobs for execution.

2. Users can cancel the submitted jobs.

3. The system should distribute jobs across multiple worker nodes for execution.

4. The system should provide monitoring of job status (queued, running, completed, failed).

5. The system should prevent the same job from being executed multiple times concurrently.

Non-Functional Requirements:

• Scalability: The system should be able to schedule and execute millions of jobs.

• High Availability: The system should be fault-tolerant with no single point of failure. If a
worker node fails, the system should reschedule the job to other available nodes.

• Latency: Jobs should be scheduled and executed with minimal delay.

• Consistency: Job results should be consistent, ensuring that jobs are executed once (or with
minimal duplication).

Additional Requirements (Out of Scope):

1. Job prioritization: The system should support scheduling based on job priority.

2. Job dependencies: The system should handle jobs with dependencies.

2. High Level Design

At a high level, our distributed job scheduler will consist of the following components:
Sketched using Multiplayer

1. Job Submission Service

The Job Submission Service is the entry point for clients to interact with the system.

It provides an interface for users or services to submit, update, or cancel jobs via APIs.

This layer exposes a RESTful API that accepts job details such as:

• Job name

• Frequency (One-time, Daily)

• Execution time

• Job payload (task details)

It saves job metadata (e.g., execution_time, frequency, status = pending) in the Job Store (a
database) and returns a unique Job ID to the client.

2. Job Store

The Job Store is responsible for persisting job information and maintaining the current state of all
jobs and workers in the system.

The Job Store contains following database tables:

Job Table

This table stores the metadata of the job, including job id, user id, frequency, payload, execution
time, retry count and status (pending, running, completed, failed).
Sketched using Multiplayer

Job Execution Table

Jobs can be executed multiple times in case of failures.

This table tracks the execution attempts for each job, storing information like execution id, start time,
end time, worker id, status and error message.

If a job fails and is retried, each attempt will be logged here.

Sketched using Multiplayer

Job Schedules

The Schedules Table stores scheduling details for each job, including the next_run_time.

• For one-time jobs, the next_run_time is the same as the job’s execution time, and the
last_run_time remains null.

• For recurring jobs, the next_run_time is updated after each execution to reflect the next
scheduled run.

Sketched using Multiplayer

Worker Table
The Worker Node Table stores information about each worker node, including its ip address, status,
last heartbeat, capacity and current load.

Sketched using Multiplayer

3. Scheduling Service

The Scheduling Service is responsible for selecting jobs for execution based on their next_run_time
in the Job Schedules Table.

It periodically queries the table for jobs scheduled to run at the current minute:

SELECT * FROM JobSchedulesTable WHERE next_run_time = 1726110000;

Once the due jobs are retrieved, they are pushed to the Distributed Job Queue for worker nodes to
execute.

Simultaneously, the status in Job Table is updated to SCHEDULED.

4. Distributed Job Queue

The Distributed Job Queue (e.g., Kafka, RabbitMQ) acts as a buffer between the Scheduling Service
and the Execution Service, ensuring that jobs are distributed efficiently to available worker nodes.

It holds the jobs and allows the execution service to pull jobs and assign it to worker nodes.

5. Execution Service

The Execution Service is responsible for running the jobs on worker nodes and updating the results
in the Job Store.

It consists of a coordinator and a pool of worker nodes.

Coordinator

A coordinator (or orchestrator) node takes responsibility for:

• Assigning jobs: Distributes jobs from the queue to the available worker nodes.

• Managing worker nodes: Tracks the status, health, capacity, and workload of active workers.

• Handling worker node failures: Detects when a worker node fails and reassigns its jobs to
other healthy nodes.

• Load balancing: Ensures the workload is evenly distributed across worker nodes based on
available resources and capacity.

Worker Nodes
Worker nodes are responsible for executing jobs and updating the Job Store with the results (e.g.,
completed, failed, output).

• When a worker is assigned a job, it creates a new entry in the Job Execution Table with the
job’s status set to running and begins execution.

• After execution is finished, the worker updates the job’s final status (e.g., completed or
failed) along with any output in both the Jobs and Job Execution Table.

• If a worker fails during execution, the coordinator re-queues the job in the distributed job
queue, allowing another worker to pick it up and complete it.

Design A Search Autocomplete System

Search autocomplete is the feature provided by many platforms such as Amazon, Google and others
when you put your cursor in your search bar and start typing something you're looking for:

Step 1 - Understand the problem and establish design scope

• C: Is the matching only supported at the beginning of a search term or eg at the middle?
• I: Only at the beginning

• C: How many autocompletion suggestions should the system return?

• I: 5

• C: Which suggestions should the system choose?

• I: Determined by popularity based on historical query frequency

• C: Does system support spell check?

• I: Spell check or auto-correct is not supported.

• C: Are search queries in English?

• I: Yes, if time allows, we can discuss multi-language support

• C: Is capitalization and special characters supported?

• I: We assume all queries use lowercase characters

• C: How many users use the product?

• I: 10mil DAU

Summary:

• Fast response time. An article about facebook autocomplete reviews that suggestions should
be returned with 100ms delay at most to avoid stuttering

• Relevant - autocomplete suggestions should be relevant to search term

• Sorted - suggestions should be sorted by popularity

• Scalable - system can handle high traffic volumes

• Highly available - system should be up even if parts of the system are unresponsive

Back of the envelope estimation

• Assume we have 10mil DAU

• On average, person performs 10 searches per day

• 10mil * 10 = 100mil searches per day = 100 000 000 / 86400 = 1200 searches.

• given 4 works of 5 chars search on average -> 1200 * 20 = 24000 QPS. Peak QPS = 48000
QPS.

• 20% of daily queries are new -> 100mil * 0.2 = 20mil new searches * 20 bytes = 400mb new
data per day.

Step 2 - Propose high-level design and get buy-in

At a high-level, the system has two components:

• Data gathering service - gathers user input queries and aggregates them in real-time.

• Query service - given search query, return topmost 5 suggestions.


Data gathering service

This service is responsible for maintaining a frequency table:

Query service

Given a frequency table like the one above, this service is responsible for returning the top 5
suggestions based on the frequency column:
Querying the data set is a matter of running the following SQL query:

This is acceptable for small data sets but becomes impractical for large ones.

Step 3 - Design deep dive

In this section, we'll deep dive into several components which will improve the initial high-level
design.

Trie data structure

We use relational databases in the high-level design, but to achieve a more optimal solution, we'll
need to leverage a suitable data structure.

We can use tries for fast string prefix retrieval.

• It is a tree-like data structure

• The root represents the empty string

• Each node has 26 children, representing each of the next possible characters. To save space,
we don't store empty links.

• Each node represents a single word or prefix


• For this problem, apart from storing the strings, we'll need to store the frequency against
each leaf

To implement the algorithm, we need to:

• first find the node representing the prefix (time complexity O(p), where p = length of prefix)

• traverse subtree to find all leafs (time complexity O(c), where c = total children)
• sort retrieved children by their frequencies (time complexity O(clogc), where c = total
children)

This algorithm works, but there are ways to optimize it as we'll have to traverse the entire trie in the
worst-case scenario.

Limit the max length of prefix

We can leverage the fact that users rarely use a very long search term to limit max prefix to 50 chars.

This reduces the time complexity from O(p) + O(c) + O(clogc) -> O(1) + O(c) + O(clogc).

Cache top search queries at each node


To avoid traversing the whole trie, we can cache the top k most frequently accessed works in each
node:

This reduces the time complexity to O(1) as top K search terms are already cached. The trade-off is
that it takes much more space than a traditional trie.

Data gathering service

In previous design, when user types in search term, data is updated in real-time. This is not practical
on a bigger scale due to:

• billions of queries per day

• Top suggestions may not change much once trie is built

Hence, we'll instead update the trie asynchronously based on analytics data:
The analytics logs contain raw rows of data related to search terms \w timestamps:

The aggregators' responsibility is to map the analytics data into a suitable format and also aggregate
it to lesser records.

The cadence at which we aggregate depends on the use-case for our auto-complete functionality. If
we need the data to be relatively fresh & updated real-time (eg twitter search), we can aggregate
once every eg 30m. If, on the other hand, we don't need the data to be updated real-time (eg google
search), we can aggregate once per week.

Example weekly aggregated data:


The workers are responsible for building the trie data structure, based on aggregated data, and
storing it in DB.

The trie cache keeps the trie loaded in-memory for fast read. It takes a weekly snapshot of the DB.

The trie DB is the persistent storage. There are two options for this problem:

• Document store (eg MongoDB) - we can periodically build the trie, serialize it and store it in
the DB.

• Key-value store (eg DynamoDB) - we can also store the trie in hashmap format.

Query service
The query service fetches top suggestions from Trie Cache or fallbacks to Trie DB on cache miss:

Some additional optimizations for the Query service:

• Using AJAX requests on client-side - these prevent the browser from refreshing the page.

• Data sampling - instead of logging all requests, we can log a sample of them to avoid too
many logs.

• Browser caching - since auto-complete suggestions don't change often, we can leverage the
browser cache to avoid extra calls to backend.
Example with Google search caching search results on the browser for 1h:

Trie operations

Let's briefly describe common trie operations.

Create

The trie is created by workers using aggregated data, collected via analytics logs.

Update

There are two options to handling updates:

• Not updating the trie, but reconstructing it instead. This is acceptable if we don't need real-
time suggestions.

• Updating individual nodes directly - we prefer to avoid it as it's slow. Updating a single node
required updating all parent nodes as well due to the cached suggestions:

Delete
To avoid showing suggestions including hateful content or any other content we don't want to show,
we can add a filter between the trie cache and the API servers:

The database is asynchronously updated to remove hateful content.

Scale the storage

At some point, our trie won't be able to fit on a single server. We need to devise a sharding
mechanism.

One option to achieve this is to shard based on the letters of the alphabet - eg a-m goes on one
shard, n-z on the other.

This doesn't work well as data is unevenly distributed due to eg the letter a being much more
frequent than x.

To mitigate this, we can have a dedicated shard mapper, which is responsible for devising a smart
sharding algorithm, which factors in the uneven distribution of search terms:

Step 4 - Wrap up

Other talking points:


• How to support multi-language - we store unicode characters in trie nodes, instead of ASCII.

• What if top search queries differ across countries - we can build different tries per country
and leverage CDNs to improve response time.

• How can we support trending (real-time) search queries? - current design doesn't support
this and improving it to support it is beyond the scope of the book. Some options:

o Reduce working data set via sharding

o Change ranking model to assign more weight to recent search queries

o Data may come as streams which you filter upon and use map-reduce technologies
to process it - Hadoop, Apache Spark, Apache Storm, Apache Kafka, etc.

Payment System

We'll design a payment system in this chapter, which underpins all of modern e-commerce.

A payment system is used to settle financial transactions, transferring monetary value.

Step 1 - Understand the Problem and Establish Design Scope

• C: What kind of payment system are we building?

• I: A payment backend for an e-commerce system, similar to Amazon.com. It handles


everything related to money movement.

• C: What payment options are supported - Credit cards, PayPal, bank cards, etc?

• I: The system should support all these options in real life. For the purposes of the interview,
we can use credit card payments.

• C: Do we handle credit card processing ourselves?

• I: No, we use a third-party provider like Stripe, Braintree, Square, etc.

• C: Do we store credit card data in our system?

• I: Due to compliance reasons, we do not store credit card data directly in our systems. We
rely on third-party payment processors.

• C: Is the application global? Do we need to support different currencies and international


payments?

• I: The application is global, but we assume only one currency is used for the purposes of the
interview.

• C: How many payment transactions per day do we support?

• I: 1mil transactions per day.

• C: Do we need to support the payout flow to eg payout to payers each month?

• I: Yes, we need to support that


• C: Is there anything else I should pay attention to?

• I: We need to support reconciliations to fix any inconsistencies in communicating with


internal and external systems.

Functional requirements

• Pay-in flow - payment system receives money from customers on behalf of merchants

• Pay-out flow - payment system sends money to sellers around the world

Non-functional requirements

• Reliability and fault-tolerance. Failed payments need to be carefully handled

• A reconciliation between internal and external systems needs to be setup.

Back-of-the-envelope estimation

The system needs to process 1mil transactions per day, which is 10 transactions per second.

This is not a high throughput for any database system, so it's not the focus of this interview.

Step 2 - Propose High-Level Design and Get Buy-In

At a high-level, we have three actors, participating in money movement:

Pay-in flow
Here's the high-level overview of the pay-in flow:

• Payment service - accepts payment events and coordinates the payment process. It typically
also does a risk check using a third-party provider for AML violations or criminal activity.

• Payment executor - executes a single payment order via the Payment Service Provider (PSP).
Payment events may contain several payment orders.

• Payment service provider (PSP) - moves money from one account to another, eg from buyer's
credit card account to e-commerce site's bank account.

• Card schemes - organizations that process credit card operations, eg Visa MasterCard, etc.

• Ledger - keeps financial record of all payment transactions.

• Wallet - keeps the account balance for all merchants.

Here's an example pay-in flow:

• user clicks "place order" and a payment event is sent to the payment service

• payment service stores the event in its database

• payment service calls the payment executor for all payment orders, part of that payment
event

• payment executor stores the payment order in its database

• payment executor calls external PSP to process the credit card payment

• After the payment executor processes the payment, the payment service updates the wallet
to record how much money the seller has

• wallet service stores updated balance information in its database

• payment service calls the ledger to record all money movements

APIs for payment service


POST /v1/payments

"buyer_info": {...},

"checkout_id": "some_id",

"credit_card_info": {...},

"payment_orders": [{...}, {...}, {...}]

Example payment_order:

"seller_account": "SELLER_IBAN",

"amount": "3.15",

"currency": "USD",

"payment_order_id": "globally_unique_payment_id"

Caveats:

• The payment_order_id is forwarded to the PSP to deduplicate payments, ie it is the


idempotency key.

• The amount field is string as double is not appropriate for representing monetary values.

GET /v1/payments/{:id}

This endpoint returns the execution status of a single payment, based on the payment_order_id.

Payment service data model

We need to maintain two tables - payment_events and payment_orders.

For payments, performance is typically not an important factor. Strong consistency, however, is.

Other considerations for choosing the database:

• Strong market of DBAs to hire to administer the databaseS

• Proven track-record where the database has been used by other big financial institutions

• Richness of supporting tools

• Traditional SQL over NoSQL/NewSQL for its ACID guarantees

Here's what the payment_events table contains:

• checkout_id - string, primary key

• buyer_info - string (personal note - prob a foreign key to another table is more appropriate)
• seller_info - string (personal note - same remark as above)

• credit_card_info - depends on card provider

• is_payment_done - boolean

Here's what the payment_orders table contains:

• payment_order_id - string, primary key

• buyer_account - string

• amount - string

• currency - string

• checkout_id - string, foreign key

• payment_order_status - enum (NOT_STARTED, EXECUTING, SUCCESS, FAILED)

• ledger_updated - boolean

• wallet_updated - boolean

Caveats:

• there are many payment orders, linked to a given payment event

• we don't need the seller_info for the pay-in flow. That's required on pay-out only

• ledger_updated and wallet_updated are updated when the respective service is called to
record the result of a payment

• payment transitions are managed by a background job, which checks updates of in-flight
payments and triggers an alert if a payment is not processed in a reasonable timeframe

Double-entry ledger system

The double-entry accounting mechanism is key to any payment system. It is a mechanism of tracking
money movements by always applying money operations to two accounts, where one's account
balance increases (credit) and the other decreases (debit):

Account Debit Credit

buyer $1

seller $1

Sum of all transaction entries is always zero. This mechanism provides end-to-end traceability of all
money movements within the system.

Hosted payment page

To avoid storing credit card information and having to comply with various heavy regulations, most
companies prefer utilizing a widget, provided by PSPs, which store and handle credit card payments
for you:

Pay-out flow

The components of the pay-out flow are very similar to the pay-in flow.

Main differences:

• money is moved from e-commerce site's bank account to merchant's bank account

• we can utilize a third-party account payable provider such as Tipalti

• There's a lot of bookkeeping and regulatory requirements to handle with regards to pay-outs
as well

Step 3 - Design Deep Dive

This section focuses on making the system faster, more robust and secure.
PSP Integration

If our system can directly connect to banks or card schemes, payment can be made without a PSP.
These kinds of connections are very rare and uncommon, typically done at large companies which
can justify the investment.

If we go down the traditional route, a PSP can be integrated in one of two ways:

• Through API, if our payment system can collect payment information

• Through a hosted payment page to avoid dealing with payment information regulations

Here's how the hosted payment page workflow works:

• User clicks "checkout" button in the browser

• Client calls the payment service with the payment order information

• After receiving payment order information, the payment service sends a payment
registration request to the PSP.

• The PSP receives payment info such as currency, amount, expiration, etc, as well as a UUID
for idempotency purposes. Typically the UUID of the payment order.

• The PSP returns a token back which uniquely identifies the payment registration. The token is
stored in the payment service database.

• Once token is stored, the user is served with a PSP-hosted payment page. It is initialized
using the token as well as a redirect URL for success/failure.

• User fills in payment details on the PSP page, PSP processes payment and returns the
payment status

• User is now redirected back to the redirectURL. Example redirect url - https://your-
company.com/?tokenID=JIOUIQ123NSF&payResult=X324FSa
• Asynchronously, the PSP calls our payment service via a webhook to inform our backend of
the payment result

• Payment service records the payment result based on the webhook received

Reconciliation

The previous section explains the happy path of a payment. Unhappy paths are detected and
reconciled using a background reconciliation process.

Every night, the PSP sends a settlement file which our system uses to compare the external system's
state against our internal system's state.

This process can also be used to detect internal inconsistencies between eg the ledger and the wallet
services.

Mismatches are handled manually by the finance team. Mismatches are handled as:

• classifiable, hence, it is a known mismatch which can be adjusted using a standard procedure

• classifiable, but can't be automated. Manually adjusted by the finance team

• unclassifiable. Manually investigated and adjusted by the finance team

Handling payment processing delays

There are cases, where a payment can take hours to complete, although it typically takes seconds.

This can happen due to:

• a payment being flagged as high-risk and someone has to manually review it

• credit card requires extra protection, eg 3D Secure Authentication, which requires extra
details from card holder to complete

These situations are handled by:


• waiting for the PSP to send us a webhook when a payment is complete or polling its API if
the PSP doesn't provide webhooks

• showing a "pending" status to the user and giving them a page, where they can check-in for
payment updates. We could also send them an email once their payment is complete

Communication among internal services

There are two types of communication patterns services use to communicate with one another -
synchronous and asynchronous.

Synchronous communication (ie HTTP) works well for small-scale systems, but suffers as scale
increases:

• low performance - request-response cycle is long as more services get involved in the call
chain

• poor failure isolation - if PSPs or any other service fails, user will not receive a response

• tight coupling - sender needs to know the receiver

• hard to scale - not easy to support sudden increase in traffic due to not having a buffer

Asynchronous communication can be divided into two categories.


Single receiver - multiple receivers subscribe to the same topic and messages are processed only
once:
Multiple receivers - multiple receivers subscribe to the same topic, but messages are forwarded to all
of them:

Latter model works well for our payment system as a payment can trigger multiple side effects,
handled by different services.

In a nutshell, synchronous communication is simpler but doesn't allow services to be autonomous.


Async communication trades simplicity and consistency for scalability and resilience.

Handling failed payments

Every payment system needs to address failed payments. Here are some of the mechanism we'll use
to achieve that:

• Tracking payment state - whenever a payment fails, we can determine whether to


retry/refund based on the payment state.

• Retry queue - payments which we'll retry are published to a retry queue

• Dead-letter queue - payments which have terminally failed are pushed to a dead-letter
queue, where the failed payment can be debugged and inspected.

Exactly-once delivery

We need to ensure a payment gets processed exactly-once to avoid double-charging a customer.


An operation is executed exactly-once if it is executed at-least-once and at-most-once at the same
time.

To achieve the at-least-once guarantee, we'll use a retry mechanism:

Here are some common strategies on deciding the retry intervals:

• immediate retry - client immediately sends another request after failure

• fixed intervals - wait a fixed amount of time before retrying a payment

• incremental intervals - incrementally increase retry interval between each retry

• exponential back-off - double retry interval between subsequent retries

• cancel - client cancels the request. This happens when the error is terminal or retry threshold
is reached

As a rule of thumb, default to an exponential back-off retry strategy. A good practice is for the server
to specify a retry interval using a Retry-After header.

An issue with retries is that the server can potentially process a payment twice:

• client clicks the "pay button" twice, hence, they are charged twice

• payment is successfully processed by PSP, but not by downstream services (ledger, wallet).
Retry causes the payment to be processed by the PSP again
To address the double payment problem, we need to use an idempotency mechanism - a property
that an operation applied multiple times is processed only once.

From an API perspective, clients can make multiple calls which produce the same result.
Idempotency is managed by a special header in the request (eg idempotency-key), which is typically
a UUID.

Idempotency can be achieved using the database's mechanism of adding unique key constraints:

• server attempts to insert a new row in the database

• the insertion fails due to a unique key constraint violation

• server detects that error and instead returns the existing object back to the client

Idempotency is also applied at the PSP side, using the nonce, which was previously discussed. PSPs
will take care to not process payments with the same nonce twice.

Consistency

There are several stateful services called throughout a payment's lifecycle - PSP, ledger, wallet,
payment service.

Communication between any two services can fail. We can ensure eventual data consistency
between all services by implementing exactly-once processing and reconciliation.

If we use replication, we'll have to deal with replication lag, which can lead to users observing
inconsistent data between primary and replica databases.

To mitigate that, we can serve all reads and writes from the primary database and only utilize replicas
for redundancy and fail-over. Alternatively, we can ensure replicas are always in-sync by utilizing a
consensus algorithm such as Paxos or Raft. We could also use a consensus-based distributed
database such as YugabyteDB or CockroachDB.

Payment security

Here are some mechanisms we can use to ensure payment security:

• Request/response eavesdropping - we can use HTTPS to secure all communication

• Data tampering - enforce encryption and integrity monitoring

• Man-in-the-middle attacks - use SSL \w certificate pinning

• Data loss - replicate data across multiple regions and take data snapshots

• DDoS attack - implement rate limiting and firewall

• Card theft - use tokens instead of storing real card information in our system

• PCI compliance - a security standard for organizations which handle branded credit cards

• Fraud - address verification, card verification value (CVV), user behavior analysis, etc

Step 4 - Wrap Up

Other talking points:

• Monitoring and alerting

• Debugging tools - we need tools which make it easy to understand why a payment has failed

• Currency exchange - important when designing a payment system for international use

• Geography - different regions might have different payment methods

• Cash payment - very common in places like India and Brazil

• Google/Apple Pay integration

Digital Wallet

Payment platforms usually have a wallet service, where they allow clients to store funds within the
application, which they can withdraw later.

You can also use it to pay for goods & services or transfer money to other users, who use the digital
wallet service. That can be faster and cheaper than doing it via normal payment rails.

Step 1 - Understand the Problem and Establish Design Scope


• C: Should we only focus on transfers between digital wallets? Should we support any other
operations?

• I: Let's focus on transfers between digital wallets for now.

• C: How many transactions per second does the system need to support?

• I: Let's assume 1mil TPS

• C: A digital wallet has strict correctness requirements. Can we assume transactional


guarantees are sufficient?

• I: Sounds good

• C: Do we need to prove correctness?

• I: We can do that via reconciliation, but that only detects discrepancies vs. showing us the
root cause for them. Instead, we want to be able to replay data from the beginning to
reconstruct the history.

• C: Can we assume availability requirement is 99.99%?

• I: Yes

• C: Do we need to take foreign exchange into consideration?

• I: No, it's out of scope

Here's what we have to support in summary:

• Support balance transfers between two accounts

• Support 1mil TPS

• Reliability is 99.99%

• Support transactions

• Support reproducibility

Back-of-the-envelope estimation

A traditional relational database, provisioned in the cloud can support ~1000 TPS.

In order to reach 1mil TPS, we'd need 1000 database nodes. But if each transfer has two legs, then
we actually need to support 2mil TPS.

One of our design goals would be to increase the TPS a single node can handle so that we can have
less database nodes.

Per-node TPS Node Number

100 20,000

1,000 2,000

10,000 200
Step 2 - Propose High-Level Design and Get Buy-In

API Design

We only need to support one endpoint for this interview:

POST /v1/wallet/balance_transfer - transfers balance from one wallet to another

Request parameters - from_account, to_account, amount (string to not lose precision), currency,
transaction_id (idempotency key).

Sample response:

"status": "success"

"transaction_id": "01589980-2664-11ec-9621-0242ac130002"

In-memory sharding solution

Our wallet application maintains account balances for every user account.

One good data structure to represent this is a map<user_id, balance>, which can be implemented
using an in-memory Redis store.

Since one redis node cannot withstand 1mil TPS, we need to partition our redis cluster into multiple
nodes.

Example partitioning algorithm:

String accountID = "A";

Int partitionNumber = 7;

Int myPartition = accountID.hashCode() % partitionNumber;

Zookeeper can be used to store the number of partitions and addresses of redis nodes as it's a
highly-available configuration storage.
Finally, a wallet service is a stateless service responsible for carrying out transfer operations. It can
easily scale horizontally:

Although this solution addresses scalability concerns, it doesn't allow us to execute balance transfers
atomically.

Distributed transactions

One approach for handling transactions is to use the two-phase commit protocol on top of standard,
sharded relational databases:
Here's how the two-phase commit (2PC) protocol works:

• Coordinator (wallet service) performs read and write operations on multiple databases as
normal

• When application is ready to commit the transaction, coordinator asks all databases to
prepare it

• If all databases replied with a "yes", then the coordinator asks the databases to commit the
transaction.

• Otherwise, all databases are asked to abort the transaction

Downsides to the 2PC approach:

• Not performant due to lock contention

• The coordinator is a single point of failure

Distributed transaction using Try-Confirm/Cancel (TC/C)

TC/C is a variation of the 2PC protocol, which works with compensating transactions:

• Coordinator asks all databases to reserve resources for the transaction

• Coordinator collects replies from DBs - if yes, DBs are asked to try-confirm. If no, DBs are
asked to try-cancel.

One important difference between TC/C and 2PC is that 2PC performs a single transaction, whereas
in TC/C, there are two independent transactions.

Here's how TC/C works in phases:

Phase Operation A C

1 Try Balance change: -$1 Do nothing

2 Confirm Do nothing Balance change: +$1

Cancel Balance change: +$1 Do Nothing


Phase 1 - try:

• coordinator starts local transaction in A's DB to reduce A's balance by 1$

• C's DB is given a NOP instruction, which does nothing

Phase 2a - confirm:

• if both DBs replied with "yes", confirm phase starts.

• A's DB receives NOP, whereas C's DB is instructed to increase C's balance by 1$ (local
transaction)

Phase 2b - cancel:

• If any of the operations in phase 1 fails, the cancel phase starts.


• A's DB is instructed to increase A's balance by 1$, C's DB receives NOP

Here's a comparison between 2PC and TC/C:

First Phase Second Phase: success Second Phase: fail

Commit/Cancel all
2PC transactions are not done yet Cancel all transactions
transactions

All transactions are completed - Execute new transactions Reverse the already
TC/C
committed or canceled if needed committed transaction

TC/C is also referred to as a distributed transaction by compensation. High-level operation is handled


in the business logic.

Other properties of TC/C:

• database agnostic, as long as database supports transactions

• Details and complexity of distributed transactions need to be handled in the business logic

TC/C Failure modes

If the coordinator dies mid-flight, it needs to recover its intermediary state. That can be done by
maintaining phase status tables, atomically updated within the database shards:

What does that table contain:

• ID and content of distributed transaction

• status of try phase - not sent, has been sent, response received

• second phase name - confirm or cancel

• status of second phase

• out-of-order flag (explained later)


One caveat when using TC/C is that there is a brief moment where the account states are
inconsistent with each other while a distributed transaction is in-flight:

This is fine as long as we always recover from this state and that users cannot use the intermediary
state to eg spend it. This is guaranteed by always executing deductions prior to additions.

Try phase choices Account A Account C

Choice 1 -$1 NOP

Choice 2 (invalid) NOP +$1

Choice 3 (invalid) -$1 +$1

Note that choice 3 from table above is invalid because we cannot guarantee atomic execution of
transactions across different databases without relying on 2PC.

One edge-case to address is out of order execution:

It is possible that a database receives a cancel operation, before receiving a try. This edge case can be
handled by adding an out of order flag in our phase status table. When we receive a try operation,
we first check if the out of order flag is set and if so, a failure is returned.

Distributed transaction using Saga


Another popular approach is using Sagas - a standard for implementing distributed transactions with
microservice architectures.

Here's how it works:

• all operations are ordered in a sequence. All operations are independent in their own
databases.

• operations are executed from first to last

• when an operation fails, the entire process starts to roll back until the beginning with
compensating operations

How do we coordinate the workflow? There are two approaches we can take:

• Choreography - all services involved in a saga subscribe to the related events and do their
part in the saga

• Orchestration - a single coordinator instructs all services to do their jobs in the correct order

The challenge of using choreography is that business logic is split across multiple service, which
communicate asynchronously. The orchestration approach handles complexity well, so it is typically
the preferred approach in a digital wallet system.

Here's a comparison between TC/C and Saga:

TC/C Saga

Compensating action In Cancel phase In rollback phase

Central coordination Yes Yes (orchestration mode)


TC/C Saga

Operation execution order any linear

Parallel execution possibility Yes No (linear execution)

Could see the partial inconsistent status Yes Yes

Application or database logic Application Application

The main difference is that TC/C is parallelizable, so our decision is based on the latency requirement
- if we need to achieve low latency, we should go for the TC/C approach.

Regardless of the approach we take, we still need to support auditing and replaying history to
recover from failed states.

Event sourcing

In real-life, a digital wallet application might be audited and we have to answer certain questions:

• Do we know the account balance at any given time?

• How do we know the historical and current balances are correct?

• How do we prove the system logic is correct after a code change?

Event sourcing is a technique which helps us answer these questions.

It consists of four concepts:

• command - intended action from the real world, eg transfer 1$ from account A to B. Need to
have a global order, due to which they're put into a FIFO queue.

o commands, unlike events, can fail and have some randomness due to eg IO or invalid
state.

o commands can produce zero or more events

o event generation can contain randomness such as external IO. This will be revisited
later

• event - historical facts about events which occured in the system, eg "transferred 1$ from A
to B".

o unlike commands, events are facts that have happened within our system

o similar to commands, they need to be ordered, hence, they're enqueued in a FIFO


queue

• state - what has changed as a result of an event. Eg a key-value store between account and
their balances.

• state machine - drives the event sourcing process. It mainly validates commands and applies
events to update the system state.
o the state machine should be deterministic, hence, it shouldn't read external IO or
rely on randomness.

Here's a dynamic view of event sourcing:

For our wallet service, the commands are balance transfer requests. We can put them in a FIFO
queue, such as Kafka:
Here's the full picture:

• state machine reads commands from the command queue

• balance state is read from the database

• command is validated. If valid, two events for each of the accounts is generated

• next event is read and applied by updating the balance (state) in the database

The main advantage of using event sourcing is its reproducibility. In this design, all state update
operations are saved as immutable history of all balance changes.

Historical balances can always be reconstructed by replaying events from the beginning. Because the
event list is immutable and the state machine is deterministic, we are guaranteed to succeed in
replaying any of the intermediary states.

All audit-related questions asked in the beginning of the section can be addressed by relying on
event sourcing:

• Do we know the account balance at any given time? - events can be replayed from the start
until the point which we are interested in

• How do we know the historical and current balances are correct? - correctness can be
verified by recalculating all events from the start

• How do we prove the system logic is correct after a code change? - we can run different
versions of the code against the events and verify their results are identical

Answering client queries about their balance can be addressed using the CQRS architecture - there
can be multiple read-only state machines which are responsible for querying the historical state,
based on the immutable events list:

Step 3 - Design Deep Dive

In this section we'll explore some performance optimizations as we're still required to scale to 1mil
TPS.

High-performance event sourcing

The first optimization we'll explore is to save commands and events into local disk store instead of an
external store such as Kafka.

This avoids the network latency and also, since we're only doing appends, that operation is generally
fast for HDDs.

The next optimization is to cache recent commands and events in-memory in order to save the time
of loading them back from disk.
At a low-level, we can achieve the aforementioned optimizations by leveraging a command called
mmap, which stores data in local disk as well as cache it in-memory:

The next optimization we can do is also store state in the local file system using SQLite - a file-based
local relational database. RocksDB is also another good option.
For our purposes, we'll choose RocksDB because it uses a log-structured merge-tree (LSM), which is
optimized for write operations. Read performance is optimized via caching.

To optimize the reproducibility, we can periodically save snapshots to disk so that we don't have to
reproduce a given state from the very beginning every time. We could store snapshots as large binary
files in distributed file storage, eg HDFS:

Reliable high-performance event sourcing


All the optimizations done so far are great, but they make our service stateful. We need to introduce
some form of replication for reliability purposes.

Before we do that, we should analyze what kind of data needs high reliability in our system:

• state and snapshot can always be regenerated by reproducing them from the events list.
Hence, we only need to guarantee the event list reliability.

• one might think we can always regenerate the events list from the command list, but that is
not true, since commands are non-deterministic.

• conclusion is that we need to ensure high reliability for the events list only

In order to achieve high reliability for events, we need to replicate the list across multiple nodes. We
need to guarantee:

• that there is no data loss

• the relative order of data within a log file remains the same across replicas

To achieve this, we can employ a consensus algorithm, such as Raft.

With Raft, there is a leader who is active and there are followers who are passive. If a leader dies,
one of the followers picks up. As long as more than half of the nodes are up, the system continues
running.

With this approach, all nodes update the state, based on the events list. Raft ensures leader and
followers have the same events list.

Distributed event sourcing

So far, we've managed to design a system which has high single-node performance and is reliable.

Some limitations we have to tackle:


• The capacity of a single raft group is limited. At some point, we need to shard the data and
implement distributed transactions

• In the CQRS architecture, the request/response flow is slow. A client would need to
periodically poll the system to learn when their wallet has been updated

Polling is not real-time, hence, it can take a while for a user to learn about an update in their balance.
Also, it can overload the query services if the polling frequency is too high:
To mitigate the system load, we can introduce a reverse proxy, which sends commands on behalf of
the user and polls for response on their behalf:

This alleviates the system load as we could fetch data for multiple users using a single request, but it
still doesn't solve the real-time receipt requirement.

One final change we could do is make the read-only state machines push responses back to the
reverse proxy once it's available. This can give the user the sense that updates happen real-time:
Finally, to scale the system even further, we can shard the system into multiple raft groups, where we
implement distributed transactions on top of them using an orchestrator either via TC/C or Sagas:

Here's an example lifecycle of a balance transfer request in our final system:

• User A sends a distributed transaction to the Saga coordinator with two operations - A-1 and
C+1.

• Saga coordinator creates a record in the phase status table to trace the status of the
transaction

• Coordinator determines which partitions it needs to send commands to.

• Partition 1's raft leader receives the A-1 command, validates it, converts it to an event and
replicates it across other nodes in the raft group
• Event result is synchronized to the read state machine, which pushes a response back to the
coordinator

• Coordinator creates a record indicating that the operation was successful and proceeds with
the next operation - C+1

• Next operation is executed similarly to the first one - partition is determined, command is
sent, executed, read state machine pushes back a response

• Coordinator creates a record indicating operation 2 was also successful and finally informs
the client of the result

Step 4 - Wrap Up

Here's the evolution of our design:

• We started from a solution using an in-memory Redis. The problem with this approach is that
it is not durable storage.

• We moved on to using relational databases, on top of which we execute distributed


transactions using 2PC, TC/C or distributed saga.

• Next, we introduced event sourcing in order to make all the operations auditable

• We started by storing the data into external storage using external database and queue, but
that's not performant

• We proceeded to store data in local file storage, leveraging the performance of append-only
operations. We also used caching to optimize the read path

• The previous approach, although performant, wasn't durable. Hence, we introduced Raft
consensus with replication to avoid single points of failure

• We also adopted CQRS with a reverse proxy to manage a transaction's lifecycle on behalf of
our users

• Finally, we partitioned our data across multiple raft groups, which are orchestrated using a
distributed transaction mechanism - TC/C or distributed saga

Design Spotify - System Design Interview

Spotify is the most popular music streaming platform in the world, with over 600 million monthly
active users (MAU) and 200 million paid users.
In this article, we will learn how to design a music streaming service like Spotify that can handle
100s of millions of users and billions of music streams every day ensuring low latency and high
availability.

If you’re finding this newsletter valuable and want to deepen your learning, consider becoming a
paid subscriber.

As a paid subscriber, you'll receive an exclusive deep-dive article every week, access to a structured
System Design Resource (100+ topics and interview questions), and other premium perks.

Unlock Full Access

1. Requirements Gathering

Before diving into the design, lets outline the functional and non-functional requirements.

1.1 Functional Requirements:

• Search: Users can search for songs, artists, albums, and playlists.

• Music Streaming: Users can stream songs in real time.

• Playlists: Users can create, share, and modify playlists.

• Music Recommendations: Users receive song recommendations based on their listening


history and preferences.
• Ad-Supported Model: Free-tier users will encounter ads after a few songs.

1.2 Non-Functional Requirements:

1. Scalability: The system should handle 100s of millions of users globally and the ability to
stream millions of songs concurrently.

2. Low Latency: Real-time streaming must have low latency for a seamless user experience.

3. High Availability: The system must be available at all times with minimal downtime.

4. Global Reach: Support users from different geographic regions, potentially leveraging CDNs
to serve audio files faster.

2. Capacity Estimation

Let’s assume the following traffic characteristics:

User Base:

• Total active users: 500 million

• Daily active users: 100 million

Average streams per user per day: 10

Average song size: 5 MBs

Average song duration: 4 minutes

Song catalog size: 100 million songs

2.1 Network Bandwidth Estimation

• Daily song streams = 100M users × 10 songs = 1 billion streams/day.

• Data transfer per day = 1 billion × 5 MB = 5 petabytes/day.

• Data transfer per second = 5 petabytes / 86400 = 58 gigabytes/second

2.2 Storage Estimation

Total storage for music = 100 million songs × 5 MB/song = 500 terabytes.

Assuming 2 KB of metadata per song and 10 KB of metadata per user (user details, preferences,
playlists etc..)

• Total song metadata storage = 100 million songs × 2 KB = 200 GB.

• Total storage for 500 million users = 500 million × (10 KB) = 5 TB.

2.3 Caching Estimation

Caching plays a significant role in reducing the load on the storage system and ensuring low latency
for popular content.

Frequently played song metadata can be cached in memory.


Lets assume top 20% songs contribute to 80% of the requests.

Assuming Spotify has 100 million songs and the top 20% are cached.

• Cache size = 20 million songs × 2 KB/song = 40 GB.

3. High Level Design

The system architecture of Spotify can be broken down into several high-level components:
3.1 Client Application

The client application consists of the mobile, desktop, and web versions of Spotify, which provides a
clean and intuitive UI to interact with the service.

It communicates with backend APIs for search, streaming, playlists, and recommendations and
supports offline listening by caching music on the device (downloaded content).

3.2 Load Balancers

The Load Balancer is the entry point for all client requests.

It distributes incoming client requests evenly across multiple instances of backend services,
preventing overload on any single server.

3.3 App Servers

Receives incoming requests from load balancer and re-directs the request to the appropriate service.

3.4 Services

• Streaming Service: Handles streaming of music from the storage system to user’s device in
real-time.

• Search Service: Handles searching of songs, artists, albums and playlists.

• Recommendations Service: Provides personalized music recommendations based on user


behavior, such as listening history, likes, and playlist creation.

• Ad Service: Handles the delivery of advertisements for free-tier users.

• Users Service: Stores and manages user profiles, including personal information, subscription
type, and preferences. Manages user playlist, allowing users to create, modify and share
them.

3.5 Storage

• Databases: Stores user profiles, playlists, songs metadata and search indices.

• Blob Storage: A distributed storage system (e.g., AWS S3) for handling large-scale storage of
audio and ad files.

• Content Delivery Network (CDN): Used to deliver large audio files efficiently to users across
the globe with minimal latency.

• Caches: Caches frequently accessed data such as popular songs and recommendations to
improve performance and reduce the load on the storage and database systems.

3.6 Analytics Service

The Analytics and Monitoring service tracks user engagement, system performance, and logs system
health.

It generates alerts when issues are detected and logs all system activities for troubleshooting.

Share
4. Database Design

Here are the key entities we need to store in our database:

Users, Songs, Artists, Albums, Playlists, Streams, Search Index and Recommendations.

Given the diverse types of data and high query demands, we use a combination of relational
databases, NoSQL databases, and distributed storage systems.

4.1 Relational Databases for Structured Data

To store highly structured data like user profiles, playlists, songs metadata, artists and albums, we
can use a relational databases like PostgreSQL or MySQL.

• subscription_type: Plan type (Free, Premium, Family, etc.).

• file_location: URL of the song file in storage (e.g., AWS S3).

• duration: Length of the song in seconds.

4.2 NoSQL Databases for Unstructured Data


To store unstructured and semi-structured data, we can use NoSQL databases like MongoDB,
Cassandra, or DynamoDB.

NoSQL databases provide flexibility and scalability, making them ideal for handling highly dynamic
data such as recommendations, and search indices.

Recommendations Table

Spotify generates recommendations for users based on their listening behavior and this data is
updated frequently.

Example Record:

Search Indices

Search indices are stored in NoSQL databases like Elasticsearch to allow quick, fuzzy search queries
across songs, artists, and albums.

These indices are continuously updated as new content is added.

Example Record:
4.3 Distributed Storage System

To store large volumes of audio and ad files, we can use a distributed storage system like AWS S3.

S3 ensures high durability and availability, making it an ideal storage solution for serving large static
files.

Example Storage Object:

• File: https://s3.amazonaws.com/spotify/songs/blinding_lights.mp3

• Metadata: File size: 4 MB, Bitrate: 128 kbps, Format: MP3

4.4 Content Delivery Network (CDN)

We use a Content Delivery Network (CDN) for distributing large audio files (songs) to users globally
with minimal latency.

By serving music from CDN edge servers, Spotify ensures low-latency music streaming experiences
for users across the world, minimizing buffering times and reducing load on the central storage
system.

Original music files are stored in a distributed storage system (e.g., AWS S3). The CDN pulls from this
origin storage when a song is requested for the first time and caches it for future requests.

4.5 Caching Layer

Caching frequently accessed data like user preferences, popular songs, or recommendations can
improve performance.

A caching layer like Redis can be used to store this data temporarily.

Examples of Cached Data:

• Search Queries: Cache popular search queries to avoid hitting the search index repeatedly.

• Popular Songs: Frequently streamed songs can be cached to reduce database queries.
• User Preferences: Store the user's liked songs and playlists in the cache for faster retrieval.

Example - SET/GET queries for User Preferences in cache:

SET user:preferences:12345 "{liked_songs: [1, 2, 3], playlists: [10, 11, 12]}"

GET user:preferences:12345

4.6 Analytics and Monitoring Data (Data Warehousing)

Analytics and monitoring data are critical for tracking user engagement, system performance, and
identifying potential issues.

Data is aggregated and processed in a data warehouse or distributed data stores (e.g., Hadoop,
BigQuery).

Key Use Cases for Analytics:

• User Engagement: Data on streams, skips, and playlist additions are used to generate
insights into user behavior.

• System Monitoring: Logs from various services are used to monitor system health, detect
anomalies, and perform performance tuning.

• Royalty Calculations: Streaming data is used to calculate payments for artists based on song
plays and geographic reach.

Stream Log Example:

5. API Design

We'll design RESTful APIs that are intuitive, efficient, and scalable.
Let's break down our API design into several key endpoints:

5.1 Search API

The Search API allows users to search for songs, artists, albums, or playlists. The search results are
ranked based on relevance, popularity, and user preferences.

Endpoints

GET /search

Query Parameters:

• query: The search term (e.g., "Blinding Lights").

• type: The type of resource to search for (song, artist, album, playlist).

• limit: Maximum number of results to return (default: 20).

• offset: For pagination (default: 0).

Response:

"results": [

"type": "song",

"id": "12345",

"title": "Blinding Lights",

"artist": "The Weeknd",

"album": "After Hours"

},

"type": "artist",

"id": "67890",

"name": "The Weeknd"

5.2 Music Streaming API

The Streaming API handles the delivery of music files from the backend or CDN to the user’s device.

Endpoints
GET /stream/{song_id}

Response:

• HTTP 302 Redirect to the CDN URL where the song is hosted:

"url": "https://cdn.spotify.com/song/12345"

5.3 Recommendations API

The Recommendations API provides personalized song suggestions based on the user’s listening
history, preferences, and likes.

Endpoints

GET /recommendations/{user_id}

Query Parameters:

• limit: Number of recommendations to return (default: 10).

Response:

"recommendations": [

"song_id": "12345",

"title": "Blinding Lights",

"artist": "The Weeknd",

"score": 0.98

},

"song_id": "67890",

"title": "Can't Feel My Face",

"artist": "The Weeknd",

"score": 0.95

5.4 Ad Delivery API


For free-tier users, Spotify injects advertisements into their listening experience.

The Ad Delivery API fetches and serves personalized ads based on user preferences and
demographics.

Endpoints

GET /ads/{user_id}

Fetch ads for a user to be played during music streaming.

Response:

"ad_id": "ad12345",

"ad_url": "https://cdn.spotify.com/ads/ad12345.mp3",

"duration": 30

6. Diving Deep into Key Components

6.1 Music Streaming Service

The Streaming Service is at the heart of Spotify’s architecture, responsible for delivering music
content efficiently, securely, and reliably to millions of users in real time.

The actual delivery of music files is managed by a Content Delivery Networks (Cloudflare, AWS
CloudFront). This ensures that music is served from geographically distributed servers close to the
user, minimizing latency and bandwidth consumption.

Request Workflow:

1. Client sends a streaming request (e.g., /stream/{song_id}).

2. The App server authenticates the user and routes the request to the Streaming Service.

3. If the song is not in the CDN, the Streaming Service retrieves the audio file’s location (from
the blob storage) and pushes the file to the nearest CDN edge server. The CDN returns a URL
to the streaming service to stream the audio.

4. The CDN URL is returned to the client, allowing the client to stream the audio.

6.2 Recommendation Service

The recommendation system analyzes the user's listening habits, likes, and playlists. It uses a
combination of collaborative filtering (based on users with similar preferences) and content-based
filtering (based on song metadata).

Collaborative Filtering

Collaborative filtering is one of the most commonly used techniques in recommendation systems.
This method leverages the behavior of users with similar music tastes to generate recommendations.
• User-Based Collaborative Filtering: This technique groups users based on their listening
history. For example, if User A and User B both frequently listen to the same set of artists
and songs, the system may recommend songs that User A has listened to but User B hasn’t.

• Item-Based Collaborative Filtering: In this technique, songs are recommended based on


their similarity to songs the user has previously liked. If many users who like Song X also like
Song Y, the system recommends Song Y to users who have listened to Song X.

Content-Based Filtering

Content-based filtering focuses on the properties of songs, such as genre, artist, album, tempo, and
instrumentation, to recommend similar songs to users.

• Song Attributes: Spotify collects metadata on each song, including genre, tempo, mood, and
instruments. This data is used to recommend songs with similar attributes to what the user
has already liked or listened to.

• Artist Similarity: If a user listens to multiple songs from a particular artist, the system may
recommend songs from similar artists, based on shared attributes (e.g., genre, style).

6.3 Search Service

The Search Service in Spotify allows users to find songs, artists, albums, playlists, and podcasts from
a vast catalog efficiently.

The architecture of Search Service can be broken down into the following key components:

1. Query Parser: Interprets and normalizes the user’s search query.

2. Search Index: A dynamically updated index that contains metadata for all songs, artists,
albums, and playlists. A search engine like Elasticsearch or Apache Solr can be used to build
and manage this index.

3. Ranking Engine: Once the search index returns matching results, the Ranking Engine sorts
the results to ensure that the most relevant results appear at the top.

4. Personalization Layer: Customizes search results based on the user’s listening history,
preferences, and demographic information.

5. Search Autocomplete: Provides users with suggestions as they type their queries, speeding
up the search process.

6. Cache Layer: Caches frequently searched queries to improve performance and reduce the
load on the backend.

7. Search Index Updater: Ensures that the search index stays up to date with new content
being added to Spotify’s catalog.

7. Addressing Key Issues and Bottlenecks

7.1 Scalability

• Sharding: To scale the SQL databases and distribute the load evenly, implement sharding for
large tables like user, playlist and song metadata.
• Indexes: Add indexes on frequently accessed fields like user_id and playlist_id to improve
query performance.

• Partitioning: NoSQL databases can use partitioning strategies to distribute data across
multiple nodes, ensuring low-latency access even at large scales.

• TTL (Time to Live): Cached data is given a TTL to ensure that stale data is regularly
invalidated.

7.2 Reliability

To ensure high availability, Spotify should implement fault-tolerant systems:

• Replicated Databases: Replicate user, song and playlists data across multiple data centers to
prevent data loss.

• Cache Replication: Redis can be configured to replicate cached data across multiple
instances for fault tolerance.

• Auto-scaling: Automatically scale the number of servers based on traffic load.

• Graceful Failover: If a server fails, traffic is rerouted to another server without service
interruption.

• Monitoring and Alerting: Implement comprehensive monitoring and alerting systems.

7.3 Security

Spotify handles sensitive data such as user profiles and payment information.

• Authentication: Use OAuth 2.0 for secure user authentication.

• Encryption: Encrypt all sensitive data in transit and at rest.

• Rate Limiting: Rate limit users to ensure that excessive API requests from a single client are
throttled to protect the system.

• Data Privacy: Implement strong access controls to ensure user data is not leaked or misused.

You might also like