System Design of Distributed Cache
System Design of Distributed Cache
A web application backed by a data store. This data store may be a database or another
web service.Client makes a call to the web application, which in turn makes a call to the data
store and the result is returned back to the client.
There may be several issues with this setup :
1. Calls to the data store may take a long time to execute or may utilize a lot of system
resources.It would be good to store at least some results of these calls in memory, so that
these results are retrieved and returned back to the client much faster and if the data store is
down or experiences a performance degradation and calls to the data store start to fail, our
web application may still process requests as usual, at least for some period of time. So,
storing data in memory will help to address these issues.
When a client request comes, we first check the cache and try to retrieve information from
memory (write through cache strategy) and only if data is unavailable or stale, we then make
a call to the datastore.
Why do we call it a distributed cache?
Because the amount of data is too large to be stored in memory of a single machine and we
need to split the data and store it across several machines.
Requirements :
a) We need to implement two main operations:
1. Put and,
2. Get
Put stores object in the cache under some unique key and get retrieves object from the
cache based on the key.
b) we want to design scalable, highly available and fast cache.
High scalability will help to ensure our cache can handle increased number of put and get
requests and be able to handle increasing amount of data we may need to store in the
cache.
High availability will help to ensure that data in the cache is not lost during hardware failures
and cache is accessible in case of network partitions.This will minimize number of cache
misses and as a result number of calls to the datastore.
High performance is probably the number one requirement for the cache. The whole point of
the cache is to be fast as it is called on every request.
To design a distributed system, think about the following 4 requirements
1. Scalability,
2. Availability,
3. Performance,
4. Durability ( if data persistence is important)
Let’s start with the local cache first and later we will see how distributed cache design
evolves from the solution for the local cache.
So, we start with a single server and need to implement a basic in-memory data store, that
has limited capacity. Local cache implementation is an algorithmic problem. We need to
come up with data structures and an algorithm for storing and retrieving data.
What data structure comes to your mind when you hear about cache. Most likely the same
that comes to my mind, which is a hash table. We add key-value pairs to the hash table and
retrieve them in constant time. Simple, right?
But only until the moment when we reach maximum hash table size and cannot add any
more elements. We need to evict some old data from the hash table before new data can be
added.
What data should be evicted?
It turns out there are several dozens of different approaches, so called eviction or
replacement policies. One of the easiest to implement is the least recently used policy,
where we discard the least recently used items first (LRU) But hash tables do not track
which entry has been used recently.
Meaning that we need one more data structure to keep track of what was used when. Some
kind of a queue, but with the constant time for add, update and delete operations. And I
already hear you shouting: doubly linked list and I have no other choice than to agree with
you.
When GET operation is called, we first check if this item is in the cache (hash table). If item
is not in the cache, we return null immediately. If item is found, we need to move it to the
head of the list and return the item back to the caller, but why do we need to move the item
to the list head?
To preserve the order of use. Item at the head of the list is the most recently used and item
at the tail of the list is the least recently used. So, when cache is full and we need to free
space for a new item, we remove an item at the tail of the list.
When PUT operation is called, we also check if item is in the cache and if found, we update
the item value and move the item to the head of the list. In case ,item is not in the cache, we
need to check if cache is full.
If cache has capacity (not full), we simply add this item to the hash table and to the list
(at the head position).
If cache is full, we need to free some space first. We take an item at the tail and remove it
from both the hash table and the list.Now we have space for a new element and we add it to
both the hash table and the list.
LRU cache Flow
Now, let’s run a simple simulation that will help to further understand and then code the
algorithm.
Cache is initially empty, so we just keep adding new items to it. We always add new items to
the head of the list.
And then add item E to both the hash table and the list.
Easy, right?
Then let’s code the least recently used cache. First, let’s define a class, which instances will
be stored in the cache. Class stores key, value and references to the previous and next
nodes in the linked list.
In the cache class we define :
1. References to the hash table (HashMap class in Java),
2. Cache size
3. References to the head and tail of the doubly liked list.
4. Constructor accepts a single argument — cache size.
How to make it distributed ?
Distributed cache cluster : We can start with a really straightforward idea, when we move the
least recently used cache we just implemented to its own host. The benefit of this, we can
now make each host to store only chunk of data, called shard. Because data is split across
several hosts, we now can store much more data in memory. Service hosts know about all
shards, and they forward put and get requests to a particular shard.
Co-located cache :The same idea, but a slightly different realization, is to use service hosts
for the cache. We run cache as a separate process on a service host and data is also split
into shards and similar to the first option, when service needs to make a call to the cache, it
picks the shard that stores data and makes a call.
Let’s compare both approach :
Dedicated cluster helps to isolate cache resources from the service resources. Both the
cache and the service do not share memory and CPU anymore and can scale on their own.
Dedicated cluster can be used by multiple services. Dedicated cluster also gives us flexibility
in choosing hardware.
Co-located cache, the biggest benefit is that we do not need a separate cluster. This helps to
save on hardware cost and is usually less operationally intensive than a separate and with
co-location, both the service and the cache scale out at the same time. We just add more
hosts to the service cluster when needed.
Table of Contents:
• Introduction
o Requirements
o Capacity estimates
▪ Storage
▪ Bandwidth
▪ Database design
▪ APIs
o Components
▪ Database
▪ Server
▪ Network protocol
▪ Geo-hashing
• Conclusion
Note: Amazon and the Amazon logo are trademarks of Amazon.com, Inc. or its affiliates.
Introduction
The topic of this article at OpenGenus is system design, namely the system design of a grocery
system such as Amazon Fresh or BigBasket or Flipkart Grocery or JioMart or DMart.
The Amazon Fresh application is an online grocery store, within which users can view products, add
them to cart, order, and have them delivered in just about any place, where they can either show up
to take them(attended delivery) or provide a 2-hour delivery window for the order to be placed in a
safe space to be picked later, without their presence being required(unattended delivery).
Before diving right into the intricacies of such a task, let's talk a bit about system design.
Now, if we were to think about most applications that we develop, we clearly wouldn't need to
design a system. If the application is scaled relatively low, in other words, has few requests and
doesn't get much traffic, designing a system for it isn't necessary at all.
However, as the application grows, things that can go wrong start affecting a larger number of users
and begin causing major financial losses.
Those simple problems that you paid no attention to in your small-scale application can have effects
hardly imaginable when applied to larger applications.
• more reliable
• cost effective
• scalability
So, when exactly should we start designing a system? What is the threshold that needs to be reached
in order for the application to require such a solution?
While there is no exact limit that we can reference, we can confidently say that such a solution is
recommended when building a large scale distributed system.
Large scale means that the system deals with a lot of data, it is being used by a lot of people, has a
lot of requests, needs to be extremely performant, is updated frequently etc.
Distributed means that it runs on multiple servers that need to be abstracted and hidden away from
the client (that is, regardless of how many components go into building the system, the client only
sees an intuitive, performant application).
Let's get back to our goal. Designing a grocery store such as Amazon Fresh.
One can probably imagine that Amazon Fresh can easily fit into the category of a large scale
distributed system. Let's do a bit of math, though. We're going to need to get used to it anyway.
According to a report on the internet, Amazon gets about 197 million users per month. If we were to
divide that by 8, the number of store subsidiaries Amazon has, we would get roughly 24 million users
per month. It's definitely extensively scaled.
Now that we've established that Amazon Fresh is system design material, let's go ahead and design a
system for such a product.
Here are the main concepts we need to focus on in order to successfully create a system design:
• Requirements
• Capacity estimates
• High-level design
• Components
Requirements
The first notion we will turn our attention to is the requirements of the app. Simply put, what
functionalities will this product have?
When establishing this point, we need to outline the core functionalities, those without which the
application couldn't function properly. There are going to be secondary cases we won't cater for
while doing this.
Here is a list of the core functionalities for a grocery system such as Amazon Fresh:
• Comment on product
• Upload a product
• Order
Capacity estimates
Here we will talk about two concepts, depending on the way data is accessed.
• Storage
• Bandwidth
Getting started
Since there are no reports made regarding the average number of products a person views or the
number of products that get added daily, we're just going to assume an average based on the
number of montly active users.
Explanation:
We presumed that the number of monthly active users is roughly 24 million, which would mean
800,000 daily active users if we used the average number of days a month can have (30).
Of those 800k users, let's assume that the average user who opens the platform views about 25
products. That would mean that 20 million products are viewed every day.
Of the 800,000 daily users, let's assume only 5,000 upload products on any given day. Let's say that
each of them uploads an average of 3 products daily. This gives us 15,000 products added daily.
Storage
Great! Now we can go ahead and talk about storage capacity. This refers to the number of items that
are being uploaded or the "writes" of the application.
Explanation:
We already know from before that roughly 15,000 products get added every day. Assuming
uploading a product costs us about 5MB, that would mean the capacity of the storage should be
about 75GB per day.
Result:
75GB per day
Bandwidth
This refers to the number of items that are being accessed or the "reads" of the application.
Explanation:
We already know from before that roughly 20 million products get viewed every day, so assuming
viewing a product costs us 5MB, that means the capacity of the bandwidth should be roughly 100TB
per day.
Result:
100TB per day
Let's have a bit more fun by trying to estimate exactly how many servers a product such as Amazon
Fresh would need.
When talking about the servers, we are going to be talking in terms of CPU cores, RAM memory
needed and hard drive space(storage) needed.
We will assume each server is higher-end, having 4 CPU cores, 8GB RAM and 120GB SSD storage.
• CPU cores
Explanation:
We know from previous calculations that the website would get about 800,000 daily active users.
Each CPU core can typically handle about 250 users, so that means we would need at least 3200 CPU
cores in order for the website to function. If we translate it to servers with the properties we set
before, that would amount to about 800 servers.
• RAM memory
100TB memory per day / 0.12TB per server = 834 servers
Explanation:
We know from previous calculations that about 100TB of memory is required daily, so we can obtain
the necessary number of servers by dividing by the total RAM memory by the RAM memory each
server has. This way, we need at least 834 servers for the website to function properly.
• SSD (Storage)
75GB per day / 120GB per server =~ 1
Explanation:
Since we worked out that about 75GB of data is getting stored every day, we can divide that number
by the capacity of a server. Here is where we realize that the application really doesn't save that
much information, and is, therefore, read-driven. 1 server would be sufficient if we were only
relating to storage.
So, after all those computations, we can work out how many servers we really need. The results we
got from all the computations were 800, 834, and 1. To be sensible, we should choose the largest
one and then some.
A great number here would be about 900 servers, to equate for any failures that may occur.
Result:
900 servers
High-level design
• Database design
• Server APIs
Database design
Most important of all, we shall need a User class, with an user id by which it can be identified and
other relevant information that can be stored in order to make the process of ordering
straightforward(such as address, email address, name etc).
Then, a Product class will be required, storing an unique id, the user id of the person that posted it,
and other information that can be displayed on the product page to help customers understand what
it is that they're buying (title, description, images, price etc).
Another concept we will need to consider is the shopping cart. We are probably going to need a
ShoppingCart class, which will store an unique id and the user's id. Then, we're also going to need a
ShoppingCartItem, which is going to have its own id, its cart's id, the product's id and a quantity.
Moreover, we will probably need to have a table dedicated specifically to stores. A Store class should
contain an unique id for the shop, a geo-hash(more on this later, but, in short, it computes the store's
location) and any other optional properties we might feel are necessary(such as zip code).
One last class we will need is the Driver class. The main purpose of this will be to track the location
of the drivers. This class should include the following properties: an unique id for the driver, their
geo-hash, and any other relevant information such as first and last name.
APIs
Another important part without which our application wouldn't be able to function are the APIs
where information is stored and from where it can be retrieved.
Let's look at our requirements and devise server APIs for those that need it.
• Viewing all the products by category => Here, we'll need an API that retrieves the products
based on the category they are in. Let's assume we also want to only load a few at a time, so
the application doesn't lag(if there are a lot of products and we request all at the same time,
it has a high risk of doing so). A getProductByCategory(category, limit) API could be
reasonable.
• Search for products => Although it still has to return a number of products that match a
criterion, it is not based on category, so we shall need a new API for it.
getProductByTitle(query, limit) could be good. Of course, we would construct the function
so that titles that match the query parameter would pop up, not only exact replicas of the
query.
• View product page => While a product page is standard, the information presented in it is
specific to the product and, therefore, has to be obtained from a specific API.
getProduct(product_id) is the go-to here.
• Comment on product => This is the first API with a method other than GET. Here, we are not
asking for anything, but rather we are adding something to our database. This method is
called POST. postComment(user_id, product_id, body, time) should be sufficient. We are
specifying the user who posted the comment, the product on which the comment was
posted, the actual text of the comment and the time at which it was posted.
• Upload a product => This is another API with a POST method. Let's call it
postProduct(user_id, info). The user_id is the ID of the user who posted it, and the info is an
object that has all the relevant information of the product, since it would be tedious to have
to write all properties down every time we call the API.
• Add to cart => This API also uses a POST method. We could call it addToCart(user_id,
product_id, quantity). This would create a new shoppingCartItem and append it to the user's
existent shoppingCart.
Here, we'll talk about how to achieve the requirements that don't necesarily need a built API, but
rather are better off using external services.
• Check stock of product => In order to be able to add a product to cart, we first have to check
whether that product is available in stores near our location. What this should do is take the
current location, decide which facility is nearest, access the inventory database, and check
the availability of the product in said establishment. This is usually achieved with an
inventory management system, rather than making calls directly to the database. If we
receive a positive response, we can go ahead and allow adding it to cart. Otherwise, we
should disable the 'add to cart' function and indicate that the product is not available in an
intuitive way.
• Order => This is the most dangerous tass we have to deal with here, because it involves real
world implication, namely money. Instead of risking anything, we could use other already
established products that deal with this in an efficient way, called payment processors. Such
tools would ensure payments on our application are done safely. Some of the most famous
payment processors are Stripe, PayPal and Square.
When adding products to cart or ordering said products, we also have to consider the
inventory. So, for example, if one user adds a few products to cart, the purchased quantity of
those products has to be marked inavailable for a standard amount of time displayed on the
website. When ordering it, the product has to be removed all together from the database.
This is done through the inventory management system.
• Assign the delivery => Here, we are going to focus on how drivers are assigned to the
delivery of goods.
Keep in mind we are working on making an efficient app. Thus, in order for someone to be
assigned to deliver the order, we will need to find the quickest and most performant way to
do so. In this case, the most efficient way to assign a delivery is to find the closest driver.
We already know the location of the store that's being ordered from (check the geo-hashing
algorithm explained at the additional resources for a bit more information on this). What we
need now is to know the location of the drivers. Sure, we could request their latitude and
longitude once and then hash it as we did for the stores, but it's likely not going to work,
since, unlike stores, drivers move around.
The most efficient solution here would be to keep track of the driver's area in a driver
table, which we will need to add to our database solution.
The way this would work is by using a driver update service. Each driver would have to
run a software on their phone, that would request their location(latitude and longitude)
periodically, such as every 10 seconds. The driver update service should be called every time
and convert or call another service that converts the latitude and longitude to geo-hash.
Then, based on their geo-hash, the drivers could be assigned orders.
• Track the delivery => In order to track the delivery, we need to establish a connection
between the client and the driver. This would likely be done by using a load balancer, so that
they can be connected to the same server, and then proceeding to send the latitude and
longitude of the driver to the client. There could be a debate over the protocol being used to
send the data, but something like HTTP could be just fine.
Components
Now let's look at the actual components into play in more depth.
We may want to add other components, such as load balancers, cache and others in order to
improve the performance of our app.
Keep in mind that this is very broad. There could be a lot more done for optimization. For example, a
large platform like Amazon would probably have a lot more servers and many of load balancers.
My goal here is to emphasize that the client, server and database are clearly not the only instances in
this system. A large scale application would probably require a CDN and caching for improving their
performance and saving money, considering that reading information from memory is about 100
times faster than from the database. Also, caching would not only be done on the database, but
rather also on additional layers, such as the DNS, the CDN, and the application server.
Moreover, we added the load balancer to prevent single-point failure. Imagine what could happen if
the whole product would depend on one single server. Any difficulty to that server and the app is
down! This is why scaling horizontally is amazing.
Let's discuss one final point, that is, the technologies and algorithms that can be used in order to
create all that we talked about today.
Database
In the matter of the database, a SQL, relational type of database would be recommended. It would
be the best choice because it synchronizes different objects easier, which is very useful when dealing
with sensitive actions, such as payment.
• MySQL
• Oracle
• PostgreSQL
Server
There are a lot of technologies that can be considered for this task. Here, the most important
criterion would probably be the programming language. Some technologies are easier to use than
others, so researching this is also important.
For this application, I would probably go with Django, since it is easy to use, authentication can be
done easily, and it is designed precisely for large scale application development. Django uses Python.
If other teams feel more experienced in Javascript, for example, Node.js or Express.js are also great,
top choices.
Network protocol
When choosing a network protocol, we are going to want to focus on security most of all. We can
choose HTTPS if we'd prefer a stateless protocol, or something like SSL if we'd rather a stateful
protocol. This, of course, also depends on the request that is being made and every detail should be
taken into account, since, in a large application, any delay that may seem minor can compound.
Let's also decide on which load balancer routing algorithm is the best.
The best routing method for such a website would definetly be IP hashing algorithm. Described
simply, this hashes the user's IP the first time they log onto the website, and then redirects said user
to the same server every time they access the application.
Geo-hashing
One additional point that is worth making before we end this article is geo-hashing.
This algorithm enables us to do something we spoke about at serveral earlier points, but didn't go
too deep into. All throughout the article, we discussed being able to locate the nearest store to the
client, and then to the driver when assigning the delivery. But how exactly can we tell what each
store's location is?
One great way would be a using geosharded database for the stores. What this means is that we
would take each area as a 'box', give it a hash, and then start dividing the area by a constant
number(for example, 3) in smaller 'boxes'. Each of these smaller boxes need to start with the same
hash as that of the parent 'box', and then have added to them one unique character at the end. As it
goes in deeper and deeper, it accounts for each location on the map.
Then, when we get the latitude and longitude of an user or driver, we will know which store they are
closest to.
The hashing API should be taken from external sources, especially since applications that have their
focus on geolocation have worked tremendously to improve theirs. One such great example would
be Uber's.
Conclusion
In conclusion of this OpenGenus article, a system such as Amazon Fresh/ BigBasket/ JioMart is not at
all easy to design nor build.
Our goal here was to design a framework, a frame of reference of how such a system can be
achieved. Intermediate solutions can also be added, that is, tools by various companies that can
enhance the users' experience or make the developers' work easier.
I hope that this article at OpenGenus leads you to consider what goes into designing and building
large scale distribted systems and provides you with a frame of points to follow when doing so.
A distributed job scheduler is a system designed to manage, schedule, and execute tasks (referred
to as "jobs") across multiple computers or nodes in a distributed network.
Distributed job schedulers are used for automating and managing large-scale tasks like batch
processing, report generation, and orchestrating complex workflows across multiple nodes.
In this article, we will walk through the process of designing a scalable distributed job scheduling
service that can handle millions of tasks, and ensure high availability.
1. Requirements Gathering
Before diving into the design, let’s outline the functional and non-functional requirements.
Functional Requirements:
3. The system should distribute jobs across multiple worker nodes for execution.
4. The system should provide monitoring of job status (queued, running, completed, failed).
5. The system should prevent the same job from being executed multiple times concurrently.
Non-Functional Requirements:
• Scalability: The system should be able to schedule and execute millions of jobs.
• High Availability: The system should be fault-tolerant with no single point of failure. If a
worker node fails, the system should reschedule the job to other available nodes.
• Consistency: Job results should be consistent, ensuring that jobs are executed once (or with
minimal duplication).
1. Job prioritization: The system should support scheduling based on job priority.
At a high level, our distributed job scheduler will consist of the following components:
Sketched using Multiplayer
The Job Submission Service is the entry point for clients to interact with the system.
It provides an interface for users or services to submit, update, or cancel jobs via APIs.
This layer exposes a RESTful API that accepts job details such as:
• Job name
• Execution time
It saves job metadata (e.g., execution_time, frequency, status = pending) in the Job Store (a
database) and returns a unique Job ID to the client.
2. Job Store
The Job Store is responsible for persisting job information and maintaining the current state of all
jobs and workers in the system.
Job Table
This table stores the metadata of the job, including job id, user id, frequency, payload, execution
time, retry count and status (pending, running, completed, failed).
Sketched using Multiplayer
This table tracks the execution attempts for each job, storing information like execution id, start time,
end time, worker id, status and error message.
Job Schedules
The Schedules Table stores scheduling details for each job, including the next_run_time.
• For one-time jobs, the next_run_time is the same as the job’s execution time, and the
last_run_time remains null.
• For recurring jobs, the next_run_time is updated after each execution to reflect the next
scheduled run.
Worker Table
The Worker Node Table stores information about each worker node, including its ip address, status,
last heartbeat, capacity and current load.
3. Scheduling Service
The Scheduling Service is responsible for selecting jobs for execution based on their next_run_time
in the Job Schedules Table.
It periodically queries the table for jobs scheduled to run at the current minute:
Once the due jobs are retrieved, they are pushed to the Distributed Job Queue for worker nodes to
execute.
The Distributed Job Queue (e.g., Kafka, RabbitMQ) acts as a buffer between the Scheduling Service
and the Execution Service, ensuring that jobs are distributed efficiently to available worker nodes.
It holds the jobs and allows the execution service to pull jobs and assign it to worker nodes.
5. Execution Service
The Execution Service is responsible for running the jobs on worker nodes and updating the results
in the Job Store.
Coordinator
• Assigning jobs: Distributes jobs from the queue to the available worker nodes.
• Managing worker nodes: Tracks the status, health, capacity, and workload of active workers.
• Handling worker node failures: Detects when a worker node fails and reassigns its jobs to
other healthy nodes.
• Load balancing: Ensures the workload is evenly distributed across worker nodes based on
available resources and capacity.
Worker Nodes
Worker nodes are responsible for executing jobs and updating the Job Store with the results (e.g.,
completed, failed, output).
• When a worker is assigned a job, it creates a new entry in the Job Execution Table with the
job’s status set to running and begins execution.
• After execution is finished, the worker updates the job’s final status (e.g., completed or
failed) along with any output in both the Jobs and Job Execution Table.
• If a worker fails during execution, the coordinator re-queues the job in the distributed job
queue, allowing another worker to pick it up and complete it.
Search autocomplete is the feature provided by many platforms such as Amazon, Google and others
when you put your cursor in your search bar and start typing something you're looking for:
• C: Is the matching only supported at the beginning of a search term or eg at the middle?
• I: Only at the beginning
• I: 5
• I: 10mil DAU
Summary:
• Fast response time. An article about facebook autocomplete reviews that suggestions should
be returned with 100ms delay at most to avoid stuttering
• Highly available - system should be up even if parts of the system are unresponsive
• 10mil * 10 = 100mil searches per day = 100 000 000 / 86400 = 1200 searches.
• given 4 works of 5 chars search on average -> 1200 * 20 = 24000 QPS. Peak QPS = 48000
QPS.
• 20% of daily queries are new -> 100mil * 0.2 = 20mil new searches * 20 bytes = 400mb new
data per day.
• Data gathering service - gathers user input queries and aggregates them in real-time.
Query service
Given a frequency table like the one above, this service is responsible for returning the top 5
suggestions based on the frequency column:
Querying the data set is a matter of running the following SQL query:
This is acceptable for small data sets but becomes impractical for large ones.
In this section, we'll deep dive into several components which will improve the initial high-level
design.
We use relational databases in the high-level design, but to achieve a more optimal solution, we'll
need to leverage a suitable data structure.
• Each node has 26 children, representing each of the next possible characters. To save space,
we don't store empty links.
• first find the node representing the prefix (time complexity O(p), where p = length of prefix)
• traverse subtree to find all leafs (time complexity O(c), where c = total children)
• sort retrieved children by their frequencies (time complexity O(clogc), where c = total
children)
This algorithm works, but there are ways to optimize it as we'll have to traverse the entire trie in the
worst-case scenario.
We can leverage the fact that users rarely use a very long search term to limit max prefix to 50 chars.
This reduces the time complexity from O(p) + O(c) + O(clogc) -> O(1) + O(c) + O(clogc).
This reduces the time complexity to O(1) as top K search terms are already cached. The trade-off is
that it takes much more space than a traditional trie.
In previous design, when user types in search term, data is updated in real-time. This is not practical
on a bigger scale due to:
Hence, we'll instead update the trie asynchronously based on analytics data:
The analytics logs contain raw rows of data related to search terms \w timestamps:
The aggregators' responsibility is to map the analytics data into a suitable format and also aggregate
it to lesser records.
The cadence at which we aggregate depends on the use-case for our auto-complete functionality. If
we need the data to be relatively fresh & updated real-time (eg twitter search), we can aggregate
once every eg 30m. If, on the other hand, we don't need the data to be updated real-time (eg google
search), we can aggregate once per week.
The trie cache keeps the trie loaded in-memory for fast read. It takes a weekly snapshot of the DB.
The trie DB is the persistent storage. There are two options for this problem:
• Document store (eg MongoDB) - we can periodically build the trie, serialize it and store it in
the DB.
• Key-value store (eg DynamoDB) - we can also store the trie in hashmap format.
Query service
The query service fetches top suggestions from Trie Cache or fallbacks to Trie DB on cache miss:
• Using AJAX requests on client-side - these prevent the browser from refreshing the page.
• Data sampling - instead of logging all requests, we can log a sample of them to avoid too
many logs.
• Browser caching - since auto-complete suggestions don't change often, we can leverage the
browser cache to avoid extra calls to backend.
Example with Google search caching search results on the browser for 1h:
Trie operations
Create
The trie is created by workers using aggregated data, collected via analytics logs.
Update
• Not updating the trie, but reconstructing it instead. This is acceptable if we don't need real-
time suggestions.
• Updating individual nodes directly - we prefer to avoid it as it's slow. Updating a single node
required updating all parent nodes as well due to the cached suggestions:
Delete
To avoid showing suggestions including hateful content or any other content we don't want to show,
we can add a filter between the trie cache and the API servers:
At some point, our trie won't be able to fit on a single server. We need to devise a sharding
mechanism.
One option to achieve this is to shard based on the letters of the alphabet - eg a-m goes on one
shard, n-z on the other.
This doesn't work well as data is unevenly distributed due to eg the letter a being much more
frequent than x.
To mitigate this, we can have a dedicated shard mapper, which is responsible for devising a smart
sharding algorithm, which factors in the uneven distribution of search terms:
Step 4 - Wrap up
• What if top search queries differ across countries - we can build different tries per country
and leverage CDNs to improve response time.
• How can we support trending (real-time) search queries? - current design doesn't support
this and improving it to support it is beyond the scope of the book. Some options:
o Data may come as streams which you filter upon and use map-reduce technologies
to process it - Hadoop, Apache Spark, Apache Storm, Apache Kafka, etc.
Payment System
We'll design a payment system in this chapter, which underpins all of modern e-commerce.
• C: What payment options are supported - Credit cards, PayPal, bank cards, etc?
• I: The system should support all these options in real life. For the purposes of the interview,
we can use credit card payments.
• I: Due to compliance reasons, we do not store credit card data directly in our systems. We
rely on third-party payment processors.
• I: The application is global, but we assume only one currency is used for the purposes of the
interview.
Functional requirements
• Pay-in flow - payment system receives money from customers on behalf of merchants
• Pay-out flow - payment system sends money to sellers around the world
Non-functional requirements
Back-of-the-envelope estimation
The system needs to process 1mil transactions per day, which is 10 transactions per second.
This is not a high throughput for any database system, so it's not the focus of this interview.
Pay-in flow
Here's the high-level overview of the pay-in flow:
• Payment service - accepts payment events and coordinates the payment process. It typically
also does a risk check using a third-party provider for AML violations or criminal activity.
• Payment executor - executes a single payment order via the Payment Service Provider (PSP).
Payment events may contain several payment orders.
• Payment service provider (PSP) - moves money from one account to another, eg from buyer's
credit card account to e-commerce site's bank account.
• Card schemes - organizations that process credit card operations, eg Visa MasterCard, etc.
• user clicks "place order" and a payment event is sent to the payment service
• payment service calls the payment executor for all payment orders, part of that payment
event
• payment executor calls external PSP to process the credit card payment
• After the payment executor processes the payment, the payment service updates the wallet
to record how much money the seller has
"buyer_info": {...},
"checkout_id": "some_id",
"credit_card_info": {...},
Example payment_order:
"seller_account": "SELLER_IBAN",
"amount": "3.15",
"currency": "USD",
"payment_order_id": "globally_unique_payment_id"
Caveats:
• The amount field is string as double is not appropriate for representing monetary values.
GET /v1/payments/{:id}
This endpoint returns the execution status of a single payment, based on the payment_order_id.
For payments, performance is typically not an important factor. Strong consistency, however, is.
• Proven track-record where the database has been used by other big financial institutions
• buyer_info - string (personal note - prob a foreign key to another table is more appropriate)
• seller_info - string (personal note - same remark as above)
• is_payment_done - boolean
• buyer_account - string
• amount - string
• currency - string
• ledger_updated - boolean
• wallet_updated - boolean
Caveats:
• we don't need the seller_info for the pay-in flow. That's required on pay-out only
• ledger_updated and wallet_updated are updated when the respective service is called to
record the result of a payment
• payment transitions are managed by a background job, which checks updates of in-flight
payments and triggers an alert if a payment is not processed in a reasonable timeframe
The double-entry accounting mechanism is key to any payment system. It is a mechanism of tracking
money movements by always applying money operations to two accounts, where one's account
balance increases (credit) and the other decreases (debit):
buyer $1
seller $1
Sum of all transaction entries is always zero. This mechanism provides end-to-end traceability of all
money movements within the system.
To avoid storing credit card information and having to comply with various heavy regulations, most
companies prefer utilizing a widget, provided by PSPs, which store and handle credit card payments
for you:
Pay-out flow
The components of the pay-out flow are very similar to the pay-in flow.
Main differences:
• money is moved from e-commerce site's bank account to merchant's bank account
• There's a lot of bookkeeping and regulatory requirements to handle with regards to pay-outs
as well
This section focuses on making the system faster, more robust and secure.
PSP Integration
If our system can directly connect to banks or card schemes, payment can be made without a PSP.
These kinds of connections are very rare and uncommon, typically done at large companies which
can justify the investment.
If we go down the traditional route, a PSP can be integrated in one of two ways:
• Through a hosted payment page to avoid dealing with payment information regulations
• Client calls the payment service with the payment order information
• After receiving payment order information, the payment service sends a payment
registration request to the PSP.
• The PSP receives payment info such as currency, amount, expiration, etc, as well as a UUID
for idempotency purposes. Typically the UUID of the payment order.
• The PSP returns a token back which uniquely identifies the payment registration. The token is
stored in the payment service database.
• Once token is stored, the user is served with a PSP-hosted payment page. It is initialized
using the token as well as a redirect URL for success/failure.
• User fills in payment details on the PSP page, PSP processes payment and returns the
payment status
• User is now redirected back to the redirectURL. Example redirect url - https://your-
company.com/?tokenID=JIOUIQ123NSF&payResult=X324FSa
• Asynchronously, the PSP calls our payment service via a webhook to inform our backend of
the payment result
• Payment service records the payment result based on the webhook received
Reconciliation
The previous section explains the happy path of a payment. Unhappy paths are detected and
reconciled using a background reconciliation process.
Every night, the PSP sends a settlement file which our system uses to compare the external system's
state against our internal system's state.
This process can also be used to detect internal inconsistencies between eg the ledger and the wallet
services.
Mismatches are handled manually by the finance team. Mismatches are handled as:
• classifiable, hence, it is a known mismatch which can be adjusted using a standard procedure
There are cases, where a payment can take hours to complete, although it typically takes seconds.
• credit card requires extra protection, eg 3D Secure Authentication, which requires extra
details from card holder to complete
• showing a "pending" status to the user and giving them a page, where they can check-in for
payment updates. We could also send them an email once their payment is complete
There are two types of communication patterns services use to communicate with one another -
synchronous and asynchronous.
Synchronous communication (ie HTTP) works well for small-scale systems, but suffers as scale
increases:
• low performance - request-response cycle is long as more services get involved in the call
chain
• poor failure isolation - if PSPs or any other service fails, user will not receive a response
• hard to scale - not easy to support sudden increase in traffic due to not having a buffer
Latter model works well for our payment system as a payment can trigger multiple side effects,
handled by different services.
Every payment system needs to address failed payments. Here are some of the mechanism we'll use
to achieve that:
• Retry queue - payments which we'll retry are published to a retry queue
• Dead-letter queue - payments which have terminally failed are pushed to a dead-letter
queue, where the failed payment can be debugged and inspected.
Exactly-once delivery
• cancel - client cancels the request. This happens when the error is terminal or retry threshold
is reached
As a rule of thumb, default to an exponential back-off retry strategy. A good practice is for the server
to specify a retry interval using a Retry-After header.
An issue with retries is that the server can potentially process a payment twice:
• client clicks the "pay button" twice, hence, they are charged twice
• payment is successfully processed by PSP, but not by downstream services (ledger, wallet).
Retry causes the payment to be processed by the PSP again
To address the double payment problem, we need to use an idempotency mechanism - a property
that an operation applied multiple times is processed only once.
From an API perspective, clients can make multiple calls which produce the same result.
Idempotency is managed by a special header in the request (eg idempotency-key), which is typically
a UUID.
Idempotency can be achieved using the database's mechanism of adding unique key constraints:
• server detects that error and instead returns the existing object back to the client
Idempotency is also applied at the PSP side, using the nonce, which was previously discussed. PSPs
will take care to not process payments with the same nonce twice.
Consistency
There are several stateful services called throughout a payment's lifecycle - PSP, ledger, wallet,
payment service.
Communication between any two services can fail. We can ensure eventual data consistency
between all services by implementing exactly-once processing and reconciliation.
If we use replication, we'll have to deal with replication lag, which can lead to users observing
inconsistent data between primary and replica databases.
To mitigate that, we can serve all reads and writes from the primary database and only utilize replicas
for redundancy and fail-over. Alternatively, we can ensure replicas are always in-sync by utilizing a
consensus algorithm such as Paxos or Raft. We could also use a consensus-based distributed
database such as YugabyteDB or CockroachDB.
Payment security
• Data loss - replicate data across multiple regions and take data snapshots
• Card theft - use tokens instead of storing real card information in our system
• PCI compliance - a security standard for organizations which handle branded credit cards
• Fraud - address verification, card verification value (CVV), user behavior analysis, etc
Step 4 - Wrap Up
• Debugging tools - we need tools which make it easy to understand why a payment has failed
• Currency exchange - important when designing a payment system for international use
Digital Wallet
Payment platforms usually have a wallet service, where they allow clients to store funds within the
application, which they can withdraw later.
You can also use it to pay for goods & services or transfer money to other users, who use the digital
wallet service. That can be faster and cheaper than doing it via normal payment rails.
• C: How many transactions per second does the system need to support?
• I: Sounds good
• I: We can do that via reconciliation, but that only detects discrepancies vs. showing us the
root cause for them. Instead, we want to be able to replay data from the beginning to
reconstruct the history.
• I: Yes
• Reliability is 99.99%
• Support transactions
• Support reproducibility
Back-of-the-envelope estimation
A traditional relational database, provisioned in the cloud can support ~1000 TPS.
In order to reach 1mil TPS, we'd need 1000 database nodes. But if each transfer has two legs, then
we actually need to support 2mil TPS.
One of our design goals would be to increase the TPS a single node can handle so that we can have
less database nodes.
100 20,000
1,000 2,000
10,000 200
Step 2 - Propose High-Level Design and Get Buy-In
API Design
Request parameters - from_account, to_account, amount (string to not lose precision), currency,
transaction_id (idempotency key).
Sample response:
"status": "success"
"transaction_id": "01589980-2664-11ec-9621-0242ac130002"
Our wallet application maintains account balances for every user account.
One good data structure to represent this is a map<user_id, balance>, which can be implemented
using an in-memory Redis store.
Since one redis node cannot withstand 1mil TPS, we need to partition our redis cluster into multiple
nodes.
Int partitionNumber = 7;
Zookeeper can be used to store the number of partitions and addresses of redis nodes as it's a
highly-available configuration storage.
Finally, a wallet service is a stateless service responsible for carrying out transfer operations. It can
easily scale horizontally:
Although this solution addresses scalability concerns, it doesn't allow us to execute balance transfers
atomically.
Distributed transactions
One approach for handling transactions is to use the two-phase commit protocol on top of standard,
sharded relational databases:
Here's how the two-phase commit (2PC) protocol works:
• Coordinator (wallet service) performs read and write operations on multiple databases as
normal
• When application is ready to commit the transaction, coordinator asks all databases to
prepare it
• If all databases replied with a "yes", then the coordinator asks the databases to commit the
transaction.
TC/C is a variation of the 2PC protocol, which works with compensating transactions:
• Coordinator collects replies from DBs - if yes, DBs are asked to try-confirm. If no, DBs are
asked to try-cancel.
One important difference between TC/C and 2PC is that 2PC performs a single transaction, whereas
in TC/C, there are two independent transactions.
Phase Operation A C
Phase 2a - confirm:
• A's DB receives NOP, whereas C's DB is instructed to increase C's balance by 1$ (local
transaction)
Phase 2b - cancel:
Commit/Cancel all
2PC transactions are not done yet Cancel all transactions
transactions
All transactions are completed - Execute new transactions Reverse the already
TC/C
committed or canceled if needed committed transaction
• Details and complexity of distributed transactions need to be handled in the business logic
If the coordinator dies mid-flight, it needs to recover its intermediary state. That can be done by
maintaining phase status tables, atomically updated within the database shards:
• status of try phase - not sent, has been sent, response received
This is fine as long as we always recover from this state and that users cannot use the intermediary
state to eg spend it. This is guaranteed by always executing deductions prior to additions.
Note that choice 3 from table above is invalid because we cannot guarantee atomic execution of
transactions across different databases without relying on 2PC.
It is possible that a database receives a cancel operation, before receiving a try. This edge case can be
handled by adding an out of order flag in our phase status table. When we receive a try operation,
we first check if the out of order flag is set and if so, a failure is returned.
• all operations are ordered in a sequence. All operations are independent in their own
databases.
• when an operation fails, the entire process starts to roll back until the beginning with
compensating operations
How do we coordinate the workflow? There are two approaches we can take:
• Choreography - all services involved in a saga subscribe to the related events and do their
part in the saga
• Orchestration - a single coordinator instructs all services to do their jobs in the correct order
The challenge of using choreography is that business logic is split across multiple service, which
communicate asynchronously. The orchestration approach handles complexity well, so it is typically
the preferred approach in a digital wallet system.
TC/C Saga
The main difference is that TC/C is parallelizable, so our decision is based on the latency requirement
- if we need to achieve low latency, we should go for the TC/C approach.
Regardless of the approach we take, we still need to support auditing and replaying history to
recover from failed states.
Event sourcing
In real-life, a digital wallet application might be audited and we have to answer certain questions:
• command - intended action from the real world, eg transfer 1$ from account A to B. Need to
have a global order, due to which they're put into a FIFO queue.
o commands, unlike events, can fail and have some randomness due to eg IO or invalid
state.
o event generation can contain randomness such as external IO. This will be revisited
later
• event - historical facts about events which occured in the system, eg "transferred 1$ from A
to B".
o unlike commands, events are facts that have happened within our system
• state - what has changed as a result of an event. Eg a key-value store between account and
their balances.
• state machine - drives the event sourcing process. It mainly validates commands and applies
events to update the system state.
o the state machine should be deterministic, hence, it shouldn't read external IO or
rely on randomness.
For our wallet service, the commands are balance transfer requests. We can put them in a FIFO
queue, such as Kafka:
Here's the full picture:
• command is validated. If valid, two events for each of the accounts is generated
• next event is read and applied by updating the balance (state) in the database
The main advantage of using event sourcing is its reproducibility. In this design, all state update
operations are saved as immutable history of all balance changes.
Historical balances can always be reconstructed by replaying events from the beginning. Because the
event list is immutable and the state machine is deterministic, we are guaranteed to succeed in
replaying any of the intermediary states.
All audit-related questions asked in the beginning of the section can be addressed by relying on
event sourcing:
• Do we know the account balance at any given time? - events can be replayed from the start
until the point which we are interested in
• How do we know the historical and current balances are correct? - correctness can be
verified by recalculating all events from the start
• How do we prove the system logic is correct after a code change? - we can run different
versions of the code against the events and verify their results are identical
Answering client queries about their balance can be addressed using the CQRS architecture - there
can be multiple read-only state machines which are responsible for querying the historical state,
based on the immutable events list:
In this section we'll explore some performance optimizations as we're still required to scale to 1mil
TPS.
The first optimization we'll explore is to save commands and events into local disk store instead of an
external store such as Kafka.
This avoids the network latency and also, since we're only doing appends, that operation is generally
fast for HDDs.
The next optimization is to cache recent commands and events in-memory in order to save the time
of loading them back from disk.
At a low-level, we can achieve the aforementioned optimizations by leveraging a command called
mmap, which stores data in local disk as well as cache it in-memory:
The next optimization we can do is also store state in the local file system using SQLite - a file-based
local relational database. RocksDB is also another good option.
For our purposes, we'll choose RocksDB because it uses a log-structured merge-tree (LSM), which is
optimized for write operations. Read performance is optimized via caching.
To optimize the reproducibility, we can periodically save snapshots to disk so that we don't have to
reproduce a given state from the very beginning every time. We could store snapshots as large binary
files in distributed file storage, eg HDFS:
Before we do that, we should analyze what kind of data needs high reliability in our system:
• state and snapshot can always be regenerated by reproducing them from the events list.
Hence, we only need to guarantee the event list reliability.
• one might think we can always regenerate the events list from the command list, but that is
not true, since commands are non-deterministic.
• conclusion is that we need to ensure high reliability for the events list only
In order to achieve high reliability for events, we need to replicate the list across multiple nodes. We
need to guarantee:
• the relative order of data within a log file remains the same across replicas
With Raft, there is a leader who is active and there are followers who are passive. If a leader dies,
one of the followers picks up. As long as more than half of the nodes are up, the system continues
running.
With this approach, all nodes update the state, based on the events list. Raft ensures leader and
followers have the same events list.
So far, we've managed to design a system which has high single-node performance and is reliable.
• In the CQRS architecture, the request/response flow is slow. A client would need to
periodically poll the system to learn when their wallet has been updated
Polling is not real-time, hence, it can take a while for a user to learn about an update in their balance.
Also, it can overload the query services if the polling frequency is too high:
To mitigate the system load, we can introduce a reverse proxy, which sends commands on behalf of
the user and polls for response on their behalf:
This alleviates the system load as we could fetch data for multiple users using a single request, but it
still doesn't solve the real-time receipt requirement.
One final change we could do is make the read-only state machines push responses back to the
reverse proxy once it's available. This can give the user the sense that updates happen real-time:
Finally, to scale the system even further, we can shard the system into multiple raft groups, where we
implement distributed transactions on top of them using an orchestrator either via TC/C or Sagas:
• User A sends a distributed transaction to the Saga coordinator with two operations - A-1 and
C+1.
• Saga coordinator creates a record in the phase status table to trace the status of the
transaction
• Partition 1's raft leader receives the A-1 command, validates it, converts it to an event and
replicates it across other nodes in the raft group
• Event result is synchronized to the read state machine, which pushes a response back to the
coordinator
• Coordinator creates a record indicating that the operation was successful and proceeds with
the next operation - C+1
• Next operation is executed similarly to the first one - partition is determined, command is
sent, executed, read state machine pushes back a response
• Coordinator creates a record indicating operation 2 was also successful and finally informs
the client of the result
Step 4 - Wrap Up
• We started from a solution using an in-memory Redis. The problem with this approach is that
it is not durable storage.
• Next, we introduced event sourcing in order to make all the operations auditable
• We started by storing the data into external storage using external database and queue, but
that's not performant
• We proceeded to store data in local file storage, leveraging the performance of append-only
operations. We also used caching to optimize the read path
• The previous approach, although performant, wasn't durable. Hence, we introduced Raft
consensus with replication to avoid single points of failure
• We also adopted CQRS with a reverse proxy to manage a transaction's lifecycle on behalf of
our users
• Finally, we partitioned our data across multiple raft groups, which are orchestrated using a
distributed transaction mechanism - TC/C or distributed saga
Spotify is the most popular music streaming platform in the world, with over 600 million monthly
active users (MAU) and 200 million paid users.
In this article, we will learn how to design a music streaming service like Spotify that can handle
100s of millions of users and billions of music streams every day ensuring low latency and high
availability.
If you’re finding this newsletter valuable and want to deepen your learning, consider becoming a
paid subscriber.
As a paid subscriber, you'll receive an exclusive deep-dive article every week, access to a structured
System Design Resource (100+ topics and interview questions), and other premium perks.
1. Requirements Gathering
Before diving into the design, lets outline the functional and non-functional requirements.
• Search: Users can search for songs, artists, albums, and playlists.
1. Scalability: The system should handle 100s of millions of users globally and the ability to
stream millions of songs concurrently.
2. Low Latency: Real-time streaming must have low latency for a seamless user experience.
3. High Availability: The system must be available at all times with minimal downtime.
4. Global Reach: Support users from different geographic regions, potentially leveraging CDNs
to serve audio files faster.
2. Capacity Estimation
User Base:
Total storage for music = 100 million songs × 5 MB/song = 500 terabytes.
Assuming 2 KB of metadata per song and 10 KB of metadata per user (user details, preferences,
playlists etc..)
• Total storage for 500 million users = 500 million × (10 KB) = 5 TB.
Caching plays a significant role in reducing the load on the storage system and ensuring low latency
for popular content.
Assuming Spotify has 100 million songs and the top 20% are cached.
The system architecture of Spotify can be broken down into several high-level components:
3.1 Client Application
The client application consists of the mobile, desktop, and web versions of Spotify, which provides a
clean and intuitive UI to interact with the service.
It communicates with backend APIs for search, streaming, playlists, and recommendations and
supports offline listening by caching music on the device (downloaded content).
The Load Balancer is the entry point for all client requests.
It distributes incoming client requests evenly across multiple instances of backend services,
preventing overload on any single server.
Receives incoming requests from load balancer and re-directs the request to the appropriate service.
3.4 Services
• Streaming Service: Handles streaming of music from the storage system to user’s device in
real-time.
• Users Service: Stores and manages user profiles, including personal information, subscription
type, and preferences. Manages user playlist, allowing users to create, modify and share
them.
3.5 Storage
• Databases: Stores user profiles, playlists, songs metadata and search indices.
• Blob Storage: A distributed storage system (e.g., AWS S3) for handling large-scale storage of
audio and ad files.
• Content Delivery Network (CDN): Used to deliver large audio files efficiently to users across
the globe with minimal latency.
• Caches: Caches frequently accessed data such as popular songs and recommendations to
improve performance and reduce the load on the storage and database systems.
The Analytics and Monitoring service tracks user engagement, system performance, and logs system
health.
It generates alerts when issues are detected and logs all system activities for troubleshooting.
Share
4. Database Design
Users, Songs, Artists, Albums, Playlists, Streams, Search Index and Recommendations.
Given the diverse types of data and high query demands, we use a combination of relational
databases, NoSQL databases, and distributed storage systems.
To store highly structured data like user profiles, playlists, songs metadata, artists and albums, we
can use a relational databases like PostgreSQL or MySQL.
NoSQL databases provide flexibility and scalability, making them ideal for handling highly dynamic
data such as recommendations, and search indices.
Recommendations Table
Spotify generates recommendations for users based on their listening behavior and this data is
updated frequently.
Example Record:
Search Indices
Search indices are stored in NoSQL databases like Elasticsearch to allow quick, fuzzy search queries
across songs, artists, and albums.
Example Record:
4.3 Distributed Storage System
To store large volumes of audio and ad files, we can use a distributed storage system like AWS S3.
S3 ensures high durability and availability, making it an ideal storage solution for serving large static
files.
• File: https://s3.amazonaws.com/spotify/songs/blinding_lights.mp3
We use a Content Delivery Network (CDN) for distributing large audio files (songs) to users globally
with minimal latency.
By serving music from CDN edge servers, Spotify ensures low-latency music streaming experiences
for users across the world, minimizing buffering times and reducing load on the central storage
system.
Original music files are stored in a distributed storage system (e.g., AWS S3). The CDN pulls from this
origin storage when a song is requested for the first time and caches it for future requests.
Caching frequently accessed data like user preferences, popular songs, or recommendations can
improve performance.
A caching layer like Redis can be used to store this data temporarily.
• Search Queries: Cache popular search queries to avoid hitting the search index repeatedly.
• Popular Songs: Frequently streamed songs can be cached to reduce database queries.
• User Preferences: Store the user's liked songs and playlists in the cache for faster retrieval.
GET user:preferences:12345
Analytics and monitoring data are critical for tracking user engagement, system performance, and
identifying potential issues.
Data is aggregated and processed in a data warehouse or distributed data stores (e.g., Hadoop,
BigQuery).
• User Engagement: Data on streams, skips, and playlist additions are used to generate
insights into user behavior.
• System Monitoring: Logs from various services are used to monitor system health, detect
anomalies, and perform performance tuning.
• Royalty Calculations: Streaming data is used to calculate payments for artists based on song
plays and geographic reach.
5. API Design
We'll design RESTful APIs that are intuitive, efficient, and scalable.
Let's break down our API design into several key endpoints:
The Search API allows users to search for songs, artists, albums, or playlists. The search results are
ranked based on relevance, popularity, and user preferences.
Endpoints
GET /search
Query Parameters:
• type: The type of resource to search for (song, artist, album, playlist).
Response:
"results": [
"type": "song",
"id": "12345",
},
"type": "artist",
"id": "67890",
The Streaming API handles the delivery of music files from the backend or CDN to the user’s device.
Endpoints
GET /stream/{song_id}
Response:
• HTTP 302 Redirect to the CDN URL where the song is hosted:
"url": "https://cdn.spotify.com/song/12345"
The Recommendations API provides personalized song suggestions based on the user’s listening
history, preferences, and likes.
Endpoints
GET /recommendations/{user_id}
Query Parameters:
Response:
"recommendations": [
"song_id": "12345",
"score": 0.98
},
"song_id": "67890",
"score": 0.95
The Ad Delivery API fetches and serves personalized ads based on user preferences and
demographics.
Endpoints
GET /ads/{user_id}
Response:
"ad_id": "ad12345",
"ad_url": "https://cdn.spotify.com/ads/ad12345.mp3",
"duration": 30
The Streaming Service is at the heart of Spotify’s architecture, responsible for delivering music
content efficiently, securely, and reliably to millions of users in real time.
The actual delivery of music files is managed by a Content Delivery Networks (Cloudflare, AWS
CloudFront). This ensures that music is served from geographically distributed servers close to the
user, minimizing latency and bandwidth consumption.
Request Workflow:
2. The App server authenticates the user and routes the request to the Streaming Service.
3. If the song is not in the CDN, the Streaming Service retrieves the audio file’s location (from
the blob storage) and pushes the file to the nearest CDN edge server. The CDN returns a URL
to the streaming service to stream the audio.
4. The CDN URL is returned to the client, allowing the client to stream the audio.
The recommendation system analyzes the user's listening habits, likes, and playlists. It uses a
combination of collaborative filtering (based on users with similar preferences) and content-based
filtering (based on song metadata).
Collaborative Filtering
Collaborative filtering is one of the most commonly used techniques in recommendation systems.
This method leverages the behavior of users with similar music tastes to generate recommendations.
• User-Based Collaborative Filtering: This technique groups users based on their listening
history. For example, if User A and User B both frequently listen to the same set of artists
and songs, the system may recommend songs that User A has listened to but User B hasn’t.
Content-Based Filtering
Content-based filtering focuses on the properties of songs, such as genre, artist, album, tempo, and
instrumentation, to recommend similar songs to users.
• Song Attributes: Spotify collects metadata on each song, including genre, tempo, mood, and
instruments. This data is used to recommend songs with similar attributes to what the user
has already liked or listened to.
• Artist Similarity: If a user listens to multiple songs from a particular artist, the system may
recommend songs from similar artists, based on shared attributes (e.g., genre, style).
The Search Service in Spotify allows users to find songs, artists, albums, playlists, and podcasts from
a vast catalog efficiently.
The architecture of Search Service can be broken down into the following key components:
2. Search Index: A dynamically updated index that contains metadata for all songs, artists,
albums, and playlists. A search engine like Elasticsearch or Apache Solr can be used to build
and manage this index.
3. Ranking Engine: Once the search index returns matching results, the Ranking Engine sorts
the results to ensure that the most relevant results appear at the top.
4. Personalization Layer: Customizes search results based on the user’s listening history,
preferences, and demographic information.
5. Search Autocomplete: Provides users with suggestions as they type their queries, speeding
up the search process.
6. Cache Layer: Caches frequently searched queries to improve performance and reduce the
load on the backend.
7. Search Index Updater: Ensures that the search index stays up to date with new content
being added to Spotify’s catalog.
7.1 Scalability
• Sharding: To scale the SQL databases and distribute the load evenly, implement sharding for
large tables like user, playlist and song metadata.
• Indexes: Add indexes on frequently accessed fields like user_id and playlist_id to improve
query performance.
• Partitioning: NoSQL databases can use partitioning strategies to distribute data across
multiple nodes, ensuring low-latency access even at large scales.
• TTL (Time to Live): Cached data is given a TTL to ensure that stale data is regularly
invalidated.
7.2 Reliability
• Replicated Databases: Replicate user, song and playlists data across multiple data centers to
prevent data loss.
• Cache Replication: Redis can be configured to replicate cached data across multiple
instances for fault tolerance.
• Graceful Failover: If a server fails, traffic is rerouted to another server without service
interruption.
7.3 Security
Spotify handles sensitive data such as user profiles and payment information.
• Rate Limiting: Rate limit users to ensure that excessive API requests from a single client are
throttled to protect the system.
• Data Privacy: Implement strong access controls to ensure user data is not leaked or misused.