Packt Rxjs Cookbook For Reactive Programming
Packt Rxjs Cookbook For Reactive Programming
Reactive Programming
Nikola Mitrović
RxJS Cookbook for Reactive Programming
Copyright © 2025 Packt Publishing
All rights reserved. No part of this book may be reproduced, stored in a retrieval system, or transmitted in
any form or by any means, without the prior written permission of the publisher, except in the case of brief
quotations embedded in critical articles or reviews.
The author acknowledges the use of cutting-edge AI, such as ChatGPT to enhance language and clarity,
and Gemini to create the illustrations within the book, all with the aim of ensuring a smooth reading
experience. It’s important to note that the content itself is crafted by the author and edited by a professional
publishing team.
Every effort has been made in the preparation of this book to ensure the accuracy of the information
presented. However, the information contained in this book is sold without warranty, either express or
implied. Neither the author, nor Packt Publishing or its dealers and distributors, will be held liable for any
damages caused or alleged to have been caused directly or indirectly by this book.
Packt Publishing has endeavored to provide trademark information about all of the companies and products
mentioned in this book by the appropriate use of capitals. However, Packt Publishing cannot guarantee
the accuracy of this information.
ISBN 978-1-78862-405-3
www.packtpub.com
To my mom for her tireless dedication and sacrifices that paved the way for everything I have today.
Thank you for believing in me and supporting my dreams, even when it meant
putting my needs before your own. There are no words to describe how grateful
I am for your love and everything you have done for me. You are the real MVP.
- Nikola Mitrović
Foreword
I met Nikola in 2023 during my visit to Belgrade for a conference, where we both shared the stage
for the first time. Nikola was talking about browser web APIs and gave an amazing talk. He is
a well-known person in the community for sharing knowledge on web technologies and their
ecosystem. Nikola is the best person to write this book as he has great knowledge of working on
web APIs, Angular, and NestJS.
This is a great guide if you want to get comfortable with reactive concepts using RxJS. The first two
chapters cover the real-time usage of various RxJS operators, and then we learn about animation,
testing, and how you can improve performance using RxJS.
I really like how the state management topic was covered. NgRx is a widely adopted solution for
state management, but it’s important to understand how we can do it using RxJS, and the book
covers that part really well.
Toward the end, the book covers real-time communication using RxJS, where RxJS really shines.
It also demonstrates how to effectively use RxJS with NestJS by building microservices.
Santosh Yadav
Google Developer Expert for Angular | GitHub Star | Nx Champion | Developer Advocate
Contributors
For the last couple of years, Nikola has been in leadership roles, inspiring a culture of technical
excellence, continuous learning, collaboration, and psychological safety within the organization.
He strongly believes in leading with empathy, honesty, and passion, empowering teams with
trust and autonomy.
Prior to this, Nikola was a partner, technical architect, and development lead at Vega IT, where
he was recognized as the company’s MVP for three consecutive years (2022-2024).
About the reviewers
Aleksandar Makarić is a passionate, self-taught developer with a strong focus on performance
optimization and accessibility. Proactive and detail-oriented, he excels in UI/UX, SEO, and refining
team processes. Aleksandar believes in the power of knowledge-sharing and values soft skills,
which foster team cohesion and productivity. More than just a coder, he thrives as a problem
solver, always aiming to create meaningful impact through his work. And when it comes to RxJS,
he relishes the challenge of picking the perfect operator—why settle for throttleTime when a
well-timed auditTime could be the secret to optimal throttling in high-frequency streams?
I extend my heartfelt gratitude to my family and colleagues for their unwavering support during my transition
to software engineering. Your encouragement has been invaluable. Special thanks to the author of this book,
whose work has inspired and guided my growth in both my career and my life. His insights have profoundly
shaped my approach to programming – thank you for being a mentor and role model.
Sunil Raj Thota is a seasoned software engineer with extensive experience in web development
and AI applications. Currently working in the Amazon QuickSight team, Sunil has previously
contributed to significant projects at Yahoo Inc., enhancing user engagement and satisfaction
through innovative features at Yahoo and AOL Mail. He has also worked at Northeastern University
as a research assistant and at MOURI Tech as a senior software engineer, optimizing multiple
websites and leading successful project deployments. Sunil co-founded ISF Technologies, where
he has championed user-centric design and agile methodologies. He has also contributed to the
books AI Strategies for Web Development and The Art of Micro Frontends. His academic background
includes a master’s in analytics from Northeastern University and a bachelor’s in electronics and
communications engineering from Andhra University.
Learn more on Discord
To join the Discord community for this book – where you can share feedback, ask questions to
the author, and learn about new releases – follow the QR code below:
https://packt.link/RxJSCookbook
Table of Contents
Preface xxi
Index 269
However, there is a belief in the community that RxJS has a steep learning curve, despite its
amazing capabilities. One reason why RxJS has this reputation is that most tutorials, books, and
online materials focus heavily on RxJS operators. These resources explain how each operator works
in depth, with accompanying visuals. While these resources are really helpful and essential when
starting to learn RxJS, developers may still face challenges such as debugging and optimizing
RxJS streams, testing RxJS streams effectively, managing reactive state, fully adopting a reactive
paradigm, and identifying the correct RxJS operator for real-world scenarios.
RxJS Cookbook for Reactive Programming takes a slightly different approach. This is a full stack
book that emphasizes building modern web applications using RxJS techniques, patterns, and
operators that naturally fit specific scenarios. Each chapter is composed of practical recipes
that offer solutions to a wide range of challenges, spanning from handling side effects and error
resiliency patterns in client apps to creating real-time chat applications and event-driven backend
microservices.
As you progress through the book, you will develop a profound understanding of the potential
of reactive programming in complex real-life scenarios. This book empowers developers to
seamlessly integrate RxJS with popular web development frameworks and libraries such as
Angular and NestJS, serving as an invaluable guide for developing web applications that are
modern, progressive, resilient, responsive, performant, and interactive.
By the end of the book, you will have mastered the art of reactive programming principles, the
RxJS library, and working with Observables, while crafting code that reacts to changes in data
and events in a declarative and asynchronous manner.
xxii Preface
You’ll also delve into the art of side effect management, learning how to seamlessly perform
tasks such as logging, API calls, and DOM updates without disrupting your data flows. Finally,
you’ll explore the fascinating world of WebSockets as side effects and discover how to implement
heartbeat techniques to ensure connection integrity in a truly reactive way.
Chapter 2, Building User Interfaces with RxJS, shows you how to craft components such as reactive
audio players, infinite scroll experiences that captivate users, intuitive drag-and-drop interfaces,
responsive phone swipe components, and many more.
By harnessing RxJS to handle user input, create event streams, and connect to asynchronous
data, you’ll unlock the full potential of reactive UI components, enabling you to create seamless
user experiences.
Chapter 3, Understanding Reactive Animation Systems with RxJS, teaches you how RxJS can be used
to craft dynamic and interactive animations that captivate users.
You will learn how to model animation logic as streams of values, transforming and combining
them to achieve fluid, performant animations that run at 60 fps. You’ll explore techniques for
creating smooth transitions, choreographing complex sequences, and synchronizing animations
with other application events. You’ll also build engaging animation components, such as a
bouncing ball animation, animate loading button state transitions, and recreate the mesmerizing
effects of particles.js.
Preface xxiii
Chapter 4, Testing RxJS Applications, guides you through various techniques for testing your reactive
code effectively, including a deep dive into using Mock Service Worker (MSW) for seamless
integration testing and exploring NgRx state unit testing.
You will discover how to handle asynchronous data streams in your tests, master marble testing
for complex scenarios to confidently verify complex scenarios and prevent regressions, and
learn how to simulate time-based operations with ease. You’ll also explore practical examples
of using MSW to mock API responses and streamline your integration testing workflow, delve
into the intricacies of NgRx state management, and learn how to write effective unit tests for
your state management logic. By the end of this chapter, you’ll be equipped to create a reliable
and maintainable RxJS code base.
Chapter 5, Performance Optimizations with RxJS, delves into managing data flow and strategically
using operators to streamline asynchronous operations. You will discover how to choose the right
operators to minimize redundant calculations and reduce rendering overhead.
This chapter also explores building a custom performance monitoring system to track Core Web
Vitals, gaining valuable insights into an application’s performance, and identifying areas for
improvement. You will learn how to leverage Web Workers alongside RxJS streams to offload
heavy calculations from the browser’s main thread, further enhancing performance. You will also
discover how to transform performance bottlenecks into optimized, efficient streams.
Chapter 6, Building Reactive State Management Systems with RxJS, explores how RxJS provides
a reactive approach to managing application state, promoting predictability, testability, and
reactive updates. This foundation will then enable you to build custom reactive state management
solutions from scratch.
You will learn how to navigate the complexities of state management in Angular applications
using powerful libraries such as NgRx, mastering even the most intricate state interactions.
You will learn about TanStack Query by building your own custom version of it, gaining deep
insights into asynchronous state management, and discovering how the async nature of
Observables fits perfectly into this paradigm.
Chapter 7, Building Progressive Web Apps with RxJS, explores how RxJS can be leveraged to enhance
your Angular apps with key progressive web app (PWA) features, including push notifications,
background synchronization, and offline capabilities.
xxiv Preface
You will learn how to use RxJS to manage push notifications effectively, delivering timely and
relevant updates to your users. You will implement background synchronization with Dexie.js
and RxJS to keep data up to date without interrupting the user’s workflow. Finally, you will learn
how to leverage RxDB and RxJS to provide a seamless user experience even when the network is
unavailable, ensuring your application remains accessible and functional at all times.
Chapter 8, Building Offline-First Applications with RxJS, delves into the crucial world of offline-first
applications and demonstrates how RxJS empowers you to achieve seamless offline experiences.
You will gain fine-grained control over offline data synchronization by mastering various strategies,
including cache-first, network-first, stale-while-revalidate, and cache-network race, and learn
how each strategy impacts user experience and data reliability and choose the best approach for
your specific needs.
Beyond basic synchronization, you will explore advanced techniques for handling data updates
with the optimistic update pattern. You will also learn how to provide an immediate response to
user actions, even before confirming with the server, while ensuring data integrity and a smooth
transition when the connection is restored.
Chapter 9, Going Real-Time with RxJS, dives into the world of WebSockets and demonstrates how
RxJS empowers you to create seamless real-time features in your Angular and NestJS applications.
You will learn how to use WebSockets to establish persistent client-server connections for
bidirectional communication and instant data updates, and explore practical examples such as
crafting a real-time dashboard that dynamically tracks and visualizes data updates. Then, you
will dive into building smooth gameplay for a multiplayer tic-tac-toe game. Finally, you will
craft a chat application with voice messaging capabilities for a truly immersive chat experience.
Chapter 10, Building Reactive NestJS Microservices with RxJS, explores how RxJS can bring reactive
programming elegance to building NestJS APIs.
You will learn how to model real-time data flows and build fault-tolerant microservices with RxJS’s
sophisticated error handling and resiliency patterns. You will expand your toolkit by integrating
asynchronous messaging platforms such as Kafka, enabling event-driven architectures and
handling high-volume data streams for seamless communication between services.
Finally, you will delve into gRPC, leveraging its efficiency for high-performance remote procedure
calls in your microservices architecture.
Preface xxv
In Chapter 10, you will need to run a local Kafka server for one of the recipes. You can find detailed
information on this in the Kafka documentation https://kafka.apache.org/quickstart.
Note
This book does not use Angular’s Resource API, as it was still in the experimental
phase at the time of publication.
If you are using the digital version of this book, we advise you to type the code yourself or
access the code from the book’s GitHub repository (a link is available in the next section).
Doing so will help you avoid any potential errors related to the copying and pasting of code.
xxvi Preface
We also have other code bundles from our rich catalog of books and videos available at https://
github.com/PacktPublishing/. Check them out!
Conventions used
There are a number of text conventions used throughout this book.
Code in text: Indicates code words in text, database table names, folder names, filenames, file
extensions, pathnames, dummy URLs, user input, and X/Twitter handles. Here is an example:
“Notice one more important change from the previous example with distinctUntilChange.”
Bold: Indicates a new term, an important word, or words that you see onscreen. For instance,
words in menus or dialog boxes appear in bold. Here is an example: “RxJS marble diagrams are
a powerful tool for visualizing and understanding the behavior of Observables and operators in
reactive programming.”
Get in touch
Feedback from our readers is always welcome.
General feedback: If you have questions about any aspect of this book, email us at customercare@
packtpub.com and mention the book title in the subject of your message.
Errata: Although we have taken every care to ensure the accuracy of our content, mistakes do
happen. If you have found a mistake in this book, we would be grateful if you would report this
to us. Please visit www.packtpub.com/support/errata and fill in the form.
Piracy: If you come across any illegal copies of our works in any form on the internet, we would
be grateful if you would provide us with the location address or website name. Please contact us
at copyright@packt.com with a link to the material.
If you are interested in becoming an author: If there is a topic that you have expertise in and
you are interested in either writing or contributing to a book, please visit authors.packtpub.com.
Your review is important to us and the tech community and will help us make sure we’re
delivering excellent quality content.
xxviii Preface
Do you like to read on the go but are unable to carry your print books everywhere?
Is your eBook purchase not compatible with the device of your choice?
Don’t worry, now with every Packt book you get a DRM-free PDF version of that book at no cost.
Read anywhere, any place, on any device. Search, copy, and paste code from your favorite technical
books directly into your application.
The perks don’t stop there, you can get exclusive access to discounts, newsletters, and great free
content in your inbox daily.
https://packt.link/free-ebook/9781788624053
After working with RxJS for a while, and learning all about every operator in RxJS docs, have you
ever felt stuck or didn’t know how to get your RxJS game to the next level? This book is Packt(ed)
with advanced recipes and practical use cases to take you to that next level and make you ready
for any real-life challenge in the web development world. Buckle up, it’s going to be a fun ride!
This chapter explores techniques to manage the inevitable complexities of real-world reactive
programming. We’ll learn how to gracefully handle errors and maintain stream integrity. We will
delve into side effect management to perform tasks such as logging, API calls, and DOM updates
without disrupting data flows. Also, we will master strategies to isolate side effects and ensure
predictable, testable RxJS code. Finally, we will understand the role of WebSockets as side effects
and explore heartbeat techniques to ensure connection integrity in a Reactive way.
2 Handling Errors and Side Effects in RxJS
Technical requirements
To follow along in this chapter, you’ll need the following:
• Angular v19+
• RxJS v7
• Node.js v22+
• npm v11+ or pnpm v10+
The code for recipes in this chapter is placed in the GitHub repository:
https://github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/
main/Chapter01
How to do it…
In this example, we will build a small cooking recipe app, where we will load a list of recipes
from the mocked BE (using MSW) and show them in the list. After that, we will implement two
search fields to find desired recipes by name and ingredient. We will do this by handling input
updates from both filters in a declarative way, then combining the query results and providing
filtered results in the end.
Chapter 1 3
ngAfterViewInit() {
fromEvent<InputEvent>(
this.searchNameInputElement.nativeElement,'input')
.pipe(
map((searchInput: InputEvent) =>
(searchInput.target as HTMLInputElement)
.value),
4 Handling Errors and Side Effects in RxJS
startWith(''), debounceTime(500),
distinctUntilChanged(),
switchMap((searchName: string) =>
this.recipesService.searchRecipes$(searchName)))
.subscribe((recipes) => this.recipes = recipes);
}
1. With the map operator, we will extract the value of the input and state that the starting
value should be an empty string with the startWith operator.
2. To prevent sending unnecessary requests and increasing load on the server, we will
debounce every user keystroke up to 500 milliseconds. Also, we will check whether the
search query has changed from the previous one (e.g., if we wanted to search for lasagna,
we would type the query "lasag", get the result, and then delete “g" and put “g" back in
the query within 500 milliseconds; we won’t send another request, because the query
hasn’t changed).
3. In the end, once we get the search query, we will use switchMap to take the query value
and send the request to the BE API.
Why switchMap?
The main reason we are using switchMap here is the cancellation effect. This is what
it means. Assume that a user types a search query, and we have an ongoing request.
Now, if the user changes the query to a new one, the previous request will be cancelled
automatically, since we are no longer interested in the previous result.
Chapter 1 5
Now, when we type a search query for recipes, we might see the results in the UI:
Here the startWith operator comes in handy as well, since combineLatest won’t emit any values
until both inputs emit at least once. That would mean that we would see the empty recipes list
initially without using startWith. This is what our code looks like after adding the second input:
const searchNameInputValue$ = fromEvent<InputEvent>(
this.searchNameInputElement.nativeElement, 'input')
.pipe(
map((searchInput: InputEvent) =>
(searchInput.target as HTMLInputElement) .value),
startWith('')
6 Handling Errors and Side Effects in RxJS
);
const searchIngredientInputValue$ = fromEvent<InputEvent>(
this.searchIngredientInputElement.nativeElement, 'input')
.pipe(
map((searchInput: InputEvent) =>
(searchInput.target as HTMLInputElement) .value),
startWith('')
);
combineLatest({
searchName: searchNameInputValue$,
searchIngredient: searchIngredientInputValue$
})
.pipe(debounceTime(500),
distinctUntilChanged(
(prev, curr) => prev.searchName ===
curr.searchName && prev.searchIngredient ===
curr.searchIngredient),
switchMap(({ searchName, searchIngredient }) =>
this.recipesService.searchRecipes$(
searchName, searchIngredient)))
.subscribe((recipes) => this.recipes = recipes);
Notice one more important change from the previous example with distinctUntilChange. One of
the most common mistakes when using operator is assuming it does figure out on its own when
the stream has changed, but that only works for primitive values coming out of a stream as a result.
Previously, we emitted string values from the first search input, but now since we are combining
the results of two search inputs, we have a stream of object values. Therefore, we must do a deep
check on previous and current object properties, in our case, searchName and searchIngredient.
Alternatively, we could use the distinctUntilKeyChanged operator.
Chapter 1 7
If we open our app in the browser, now we can search recipes not only by name but also by
ingredient:
See also
• MSW: https://mswjs.io/
• fromEvent operator: https://rxjs.dev/api/index/function/fromEvent
• map: https://rxjs.dev/api/operators/map
• startWith operator: https://rxjs.dev/api/operators/startWith
• switchMap: https://rxjs.dev/api/operators/switchMap
• combineLatest function: https://rxjs.dev/api/index/function/combineLatest
• distinctUntilKeyChanged operator: https://rxjs.dev/api/operators/distinct
UntilKeyChanged
8 Handling Errors and Side Effects in RxJS
How to do it…
In this example, we will build a small cooking recipe app, where we will load a list of recipes from
the mocked BE (using MSW) and show them in the list. Then after clicking on a specific recipe,
we will show that recipe on a new page with more details, which will require another layer of
communication with the BE to fetch those details.
getRecipeDetails$(
id: number
): Observable<{
recipe: Recipe,
details: RecipeDetails
}> {
return this.getRecipeById$(id).pipe(
Chapter 1 9
3. In the end, we will use the map operator to combine the results from both endpoints, since
we’re going to need all that information in our details component.
If we click on one of the recipes, that will navigate us to the RecipeDetails page, with some
specific nutrient info for each recipe:
Also, we will leverage the power of the forkJoin operator to send image requests in parallel:
getRecipesWithImageInParallel$(): Observable<ImageUrl[]> {
return this.getRecipes$().pipe(
tap((recipes: Recipe[]) =>
this.recipes.next(recipes)),
switchMap((recipes: Recipe[]) => {
const imageRequests = recipes.map((recipe) =>
this.httpClient.get<ImageUrl>(
`/api/recipes/images?id=${recipe.id}`
)
);
return forkJoin(imageRequests);
}),
);
}
Now when we open our recipe in the browser, we can see placeholders instead of images, and in
the Network tab, we can see images being downloaded in parallel:
Chapter 1 11
The way we can achieve this effect is by using the RxJS mergeMap concurrent mode:
public recipes = new BehaviorSubject<Recipe[]>([]);
getRecipesWithConcurrentImage$(): Observable<ImageUrl> {
return this.getRecipes$().pipe(
tap((recipes: Recipe[]) => this.recipes.next(recipes)),
switchMap((recipes: Recipe[]) => {
const imageIds = recipes.map(
(recipe) => recipe.id);
return from(imageIds).pipe(
mergeMap((id) => this.httpClient.get(
`/api/recipes/images?id=${id}`), 3
);
})
);
}
By extracting ids of all images from the first request, we can create a new inner Observable from
that array of IDs and send concurrent requests with mergeMap. Note the second parameter of
mergeMap: that parameter will help us to fine-tune how many concurrent requests can go at the
same time. By doing so, we can rate-limit the number of requests and prevent our server from
having an overwhelming number of ongoing requests. This means that we will load the list of
images three by three, which is something we can observe in our Dev Tools.
Chapter 1 13
See also
• forkJoin operator: https://rxjs.dev/api/index/function/forkJoin
• mergeMap operator: https://rxjs.dev/api/operators/mergeMap
How to do it…
In this example, we are going to discover sophisticated resiliency patterns and learn how to deal
with network errors.
The strategy here is pretty simple. If there is a request error, catch the error, display a notification
in the UI, and gracefully complete the stream. We might also want in some instances to re-throw
the error, if we need it later in the stream chain:
throwError(() => new Error('Recipes could not be fetched.'))
Chapter 1 15
In our browser, we can open Dev Tools, disable network request and see this behavior in action,
alongside an error notification when requests fail:
If we just pass a number as a parameter, this indicates the number of retries that would happen
immediately one after the other.
The way we can apply this operator is by chaining it after the catchError operator:
getRecipes$(): Observable<Recipe[]> {
return this.httpClient.get<Recipe[]>('/api/recipes')
.pipe(
catchError((error) => {
this._snackBar.open(
16 Handling Errors and Side Effects in RxJS
However, ideally, we would like to leave some time in between two retry attempts for our server
to recover. That leads us to the next pattern.
Chapter 1 17
To implement this pattern, we will introduce a delay as part of the retry configuration. It’s a
callback function that has error and retryCount parameters. With the help of RxJS timer function,
we will emit a new notification for a delay, exponentially increasing the time in between two
retry attempts. As a result, we will end up with four failed requests in the Network Dev Tools
(one original, and three retried requests).
18 Handling Errors and Side Effects in RxJS
If we experience several cascading errors in the system, we will open the circuit. In this OPEN state,
we will immediately reject any new request to the server for a certain period. After that period,
we enter the HALF_OPEN state, where we allow several test requests to check whether the system
has recovered in the meantime. If the system has recovered and the request is successful, we close
the circuit. Otherwise, we open it again for a certain period.
First, let’s have methods to represent these states and transitions to these states:
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
private openCircuit() {
if (this.state === 'OPEN') return;
this.state = 'OPEN';
timer(15000)
Chapter 1 19
private halfOpenCircuit() {
this.state = 'HALF_OPEN';
}
private closeCircuit() {
this.state = 'CLOSED';
}
Now we are ready to react to these states in case of network failures. In our recipe.service.ts,
we will start off by checking whether we should throw an error if we are in the OPEN state:
sendRequestIfCircuitNotOpen() {
if (this.state === 'OPEN') {
console.error('Circuit is open, aborting request');
return throwError(() =>
new Error('Circuit is open'));
}
return this.httpClient.get<Recipe[]>('/api/recipes');
}
getRecipesWithCircuitBreakerStrategy$():
Observable<Recipe[]> {
return defer(() =>
this.sendRequestIfCircuitNotOpen());
}
1. First, with the help of the defer function, we defer the execution of the request until we
figure out the current state of circuit.
2. We check whether the state of circuit is OPEN:
• If it is, we immediately throw an error and set up a timer for 15 seconds to enter
the HALF_OPEN state, so we can try again after that period
• If not, we will allow at least one request
20 Handling Errors and Side Effects in RxJS
return timer(2000);
},
}),
);
}
As part of the retry mechanism, here, we can notice that based on a circuit state, we control the
number of retries to be one or three. Also, if the current state was HALF_OPEN and we failed again,
or initially when we exceed three retries, we enter the OPEN state and throw an error. We enter
the retry block every two seconds to check whether there were any changes until the system
eventually recovers or we give up and show the error notification.
Chapter 1 21
Once we enter HALF_OPEN state, we can play around in our browser by blocking network request
and verify circuit behavior in action.
At the end, if the response was successful, we would simply close the circuit by calling the
closeCircuit method and show the recipes list.
There’s more…
• Exponential back off pattern: The benefit of this approach is that we reduce the load to
the server during error cases and give the whole system time to recover in between. It also
improves the success rate by allowing transient errors to resolve quickly.
• Circuit breaker pattern: The benefit of this approach might be improving system
responsiveness. Since we know that there are services unavailable, we will fail fast and
let users know about it. Also, it can help us with self-healing and monitoring of the system
resiliency. The logic when we enter OPEN and HALF_OPEN states might defer from use case
to use case or based on personal preference. We might keep track of the number of success
and error requests, and based on a success rate, let the circuit go into an OPEN state or not.
• Fallback strategy: One more thing to keep in mind here is the fallback strategy, or what
should happen when requests are rejected. Should we return cached data, default values,
or something else? It’s totally up to you to decide what fits your needs best.
22 Handling Errors and Side Effects in RxJS
See also
• catchError operator: https://rxjs.dev/api/operators/catchError
• retry operator: https://rxjs.dev/api/operators/retry
• timer function: https://rxjs.dev/api/index/function/timer
• defer function: https://rxjs.dev/api/index/function/defer
• How to implement retry logic like a Pro article: https://dev.to/officialanurag/
javascript-secrets-how-to-implement-retry-logic-like-a-pro-g57
• How to implement an exponential backoff retry strategy in Javascript article: https://
advancedweb.hu/how-to-implement-an-exponential-backoff-retry-strategy-in-
javascript/
How to do it…
In this recipe, we will implement a simple network logger to observe the ongoing network traffic
in the browser console. For that purpose, we will leverage Angular interceptors. Also, after each
error, we will send the error information to our custom analytics endpoint, to increase our system
Observability and Error Tracking.
console.log(
`Request took %c${elapsed} ms`, 'color: #ffc26e');
console.log('%cResponse:', 'color: #d30b8e',
event);
}
}
return next(req).pipe(
tap(() => console.log(
'-----------------'
`\nRequest for ${req.urlWithParams} started...`
)
),
tap((event) => logSuccessfulResponse(event)),
finalize(() => console.log('-----------------'))
);
};
return next(req).pipe(
tap(() =>
console.log(
' ----------------- ',
`\nRequest for ${req.urlWithParams} started...`
)
),
tap((event) => logSuccessfulResponse(event)),
catchError((error) => logFailedResponse(error)),
finalize(() => console.log(' ----------------- '))
);
Also, we might notice that whenever an error happens, we are sending that error to errorSubject.
Now, whenever a new error has been emitted to the Subject, we may notify our analytics service
and have our own custom error reporting mechanism within our system:
errorSubject
.pipe(
concatMap((error: HttpErrorResponse) =>
httpClient.post('/api/analytics', error)
)
).subscribe();
Now, when we open our browser console, we can observe network traffic and improve the
debugging experience. Also, note how in the case of an error, we would get one Error Response
from an endpoint, and one Successful Response from our analytics endpoint.
Chapter 1 25
How to do it…
In this recipe, we are going to explore a reactive approach to the standard HTTP polling, as well
as long polling by building a small cooking recipe app. First, we will load a list of recipes from the
mocked BE (using MSW) and show them in the list. Then, we will implement a polling mechanism
to check continuously whether there are new recipe data to refresh the list with new data.
If we open our console in the browser, we can observe network request being sent every five
seconds:
Compared to the standard polling example, note here that we added a timer at the end of our
request:
switchMap(() => this.httpClient.get<T>(url).pipe(
timeout(interval),
)),
This means that if we establish connection with the server, and the response doesn’t arrive before
that interval, the request will time out, and then we will go into the retry mechanism.
If we open our browser console, we may observe this behavior in action. If the response hasn’t
arrived within a defined interval (in our case, five seconds), we will retry the same request three
times, until we eventually throw an error.
Chapter 1 29
In the mocks/handlers.ts file, there is a delay function from MSW that delays the
HTTP response for a certain amount of time, so we can play around with this delay.
If we set the delay to be under five seconds, in our example, we will see the list of
recipes being refreshed every five seconds. If we set the delay to over five seconds, we
can simulate slow network responses from BE services and see how our app would
behave in the case of the error retry mechanism.
30 Handling Errors and Side Effects in RxJS
See also
• interval function: https://rxjs.dev/api/index/function/interval
• timeout operator: https://rxjs.dev/api/operators/timeout
• retry operator: https://rxjs.dev/api/operators/retry
• delay operator: https://rxjs.dev/api/operators/delay
RxJS provides us with a webSocket factory function, a wrapper around the W3C-compatible
WebSocket object provided by the browser.
How to do it…
In this recipe, we are going to explore a reactive approach for handling real-time updates over
WebSocket by building a small cooking recipe app. We will load a list of recipes from the mocked BE
(using MSW) over a WebSocket connection and show them in the list. Also, we will automatically
update the list over WebSocket if there is a new data entry. Additionally, we will implement a
heartbeat mechanism, which is essential when we lose connection with WebSocket.
In our recipes.service.ts, we can see the implementation behind the connect method:
import {
webSocket, WebSocketSubject } from 'rxjs/webSocket';
After successfully connecting to the socket and gaining the ability to parse the messages that
come over the TCP connection, we can subscribe to the specific channel of topic, which in
WebSocket terms is called multiplexing. This is often done for efficiency, to reduce the overhead
32 Handling Errors and Side Effects in RxJS
of establishing and maintaining multiple connections, or when constraints limit the number of
possible connections. In our case, we are interested in the list of recipes, so we will subscribe to
that channel.
The way we can communicate over a WebSocket connection between client and a server is by
implementing the sendMessage method:
sendMessage(message: Message) {
this.socket$.next(message);
}
Now, back in our component, we can send a message with a "recipe" type, which will fetch us
the list of recipes:
this.recipesService.sendMessage({ type: 'recipes' });
this.recipesSubscription = this.recipesService.recipes$.subscribe(
(message: Message) => this.recipes = message.payload);
In our browser, when we open the console, we may observe the incoming messages from the
Websocket connection:
In our recipe.service.ts, after a retry fails, we can send a heartbeat to our server:
this.socket$
.pipe(
retry({
count: 1,
delay: (error, retryCount) => {
console.log(`Attempt ${retryCount}:
Error occurred during websocket
connection, retrying...`
);
return of(error).pipe(delay(1000));
},
}),
catchError((err) => {
console.error('Error occurred during websocket
connection:', err);
this.sendHeartbeat();
return of(null);
})
).subscribe();
this._snackBar.open(
'Lost connection to the WebSocket',
'Close');
this.close();
return EMPTY;
// Return null to stop retry attempts after
//closing
})
)
),
).subscribe();
}
Now we can see that every five seconds, we will send a heartbeat message to the server to check
whether we were able to establish the connection again. In case of no luck, we throw an error
eventually and show an error notification that states that we have lost connection. The logic
around the time interval and retry strategies can be all up to the specific use case or personal
preference. Additionally, the same resiliency strategies for building robust web apps in RxJS
from the Handling network errors recipe in this chapter can be applied to the WebSockets as well.
See also
• WebSocket: https://rxjs.dev/api/webSocket/webSocket
• MDN WebSocket API: https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
https://packt.link/RxJSCookbook
2
Building User Interfaces
with RxJS
One of the areas where RxJS excels is handling user interactions and orchestrating events in
the browser. In this chapter, we’re going to explore how to build awesome and interactive UI
components that handle any interaction or side effect seamlessly.
Technical requirements
To complete this chapter, you’ll need the following:
• Angular v19+
• Angular Material
36 Building User Interfaces with RxJS
• RxJS v7
• Node.js v22+
• npm v11+ or pnpm v10+
The code for the recipes in this chapter can be found in this book’s GitHub repository: https://
github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/
Chapter02.
How to do it…
To create a phone unlock component, we’ll create UI controls representing number pads and
identify key events to react to user actions. Once the user lifts their finger off the screen, we’ll
compare the result with the correct pattern to unlock our phone.
With a little bit of CSS magic, we can see the component in the UI:
From here, we can react to these events and figure out the coordinates of a touch event, check if
it’s intersecting with the number pad area, and highlight it in the UI:
const swipe$ = touchStart$.pipe(
switchMap(() =>
touchMove$.pipe(
takeUntil(touchEnd$),
map((touchMove) => ({
x: touchMove.touches[0].clientX,
y: touchMove.touches[0].clientY,
}))
)
),
);
Now, when we subscribe to those swipe coordinates, we can perform the required actions in
sequence, such as selecting the number pad and creating a dot trail:
swipe$.pipe(
tap((dot) => this.selectNumber(dot)),
mergeMap((dot) => this.createTrailDot(dot)),
).subscribe();
By adding a selected class to each intersecting element, we can visually represent the selected
number pads:
return of('').pipe(
delay(1000),
bufferCount(100, 50),
finalize(() => dot.remove())
);
}
Chapter 2 41
Now, in our browser’s Dev Tools, we can inspect the creation of the trail by looking at the DOM:
Pattern matching
Since this is pattern matching and not exact password matching, the phone can be
unlocked by inputting those buttons in any order, so long as those numbers in the
pattern are included.
See also
• The fromEvent function: https://rxjs.dev/api/index/function/fromEvent
• The switchMap operator: https://rxjs.dev/api/operators/switchMap
• The takeUntil operator: https://rxjs.dev/api/operators/takeUntil
42 Building User Interfaces with RxJS
How to do it…
In this recipe, we’ll simulate upload progress to the backend API by implementing a progress
indicator that produces a random progress percentage until we get a response. If we still haven’t
received a response after we get to the very end of the progress bar, we’ll set its progress to 95%
and wait for the request to be completed.
With the help of the scan operator, we can decide whether we should produce the next increment
of a progress percentage or whether we shouldn’t go over 95%.
this.httpClient.post<Recipe>(
'/api/recipes',
recipe
).pipe(
map(() => 100),
catchError(() => of(-1)),
finalize(() => this.unsubscribe$.next())
)
)
}
Once we call the postRecipe service method inside a component, we can track the request progress:
See also
• The interval function: https://rxjs.dev/api/index/function/interval
• The takeUntil operator: https://rxjs.dev/api/operators/takeUntil
• The scan operator: https://rxjs.dev/api/operators/scan
44 Building User Interfaces with RxJS
How to do it…
In this recipe, we’ll learn how to handle the Progressive Image pattern with ease with the help
of RxJS magic.
Now, we can merge these load events and stream them into the Progressive Image load:
loadingProgress$ = new BehaviorSubject<number>(0);
We’ll use startWith on the placeholder image and show it immediately in the UI while continuously
tracking the progress of the original image load. Once we get 100%, we’ll replace the placeholder
image source with the original image.
Common gotcha
In this recipe, for simplicity, we’ve chosen to artificially increase the download
progress of an image. The obvious drawback is that we don’t get the actual progress
of the image download. There’s a way to achieve this effect: by converting the request
of an image’s responseType into a blob. More details can be found here: https://
stackoverflow.com/questions/14218607/javascript-loading-progress-
of-an-image.
See also
• The Ultimate LQIP Technique, by Harry Roberts: https://csswizardry.com/2023/09/
the-ultimate-lqip-lcp-technique/
• The HTML load event: https://developer.mozilla.org/en-US/docs/Web/API/Window/
load_event
• The takeWhile operator: https://rxjs.dev/api/operators/takeWhile
• The startWith operator: https://rxjs.dev/api/operators/startWith
How to do it…
In this recipe, we’ll have a simple tab group of two tabs. Only when a tab is selected will we lazy-
load the component representing the contents of that tab. Each tab is represented in the URL, so
whenever we change tabs, we’re navigating to a separate page.
<mat-tab [label]="tab.label"></mat-tab>
</ng-container>
</mat-tab-group>
Now, inside tabs.component.ts, we need to define the activeTab and loading states, as well as
the content of a tab stream that we can subscribe to:
activeTab$ = new BehaviorSubject<TabConfig | null>(null);
activeTabContent$!: Observable<
typeof TabContentComponent |
typeof TabContent2Component |
null
>;
loadingTab$ = new BehaviorSubject<boolean>(false);
Now, we can hook into Angular Router events, filter events when navigation ends, and, based
on an active URL, mark the corresponding tab as active:
this.router.events.pipe(
filter((event) => event instanceof NavigationEnd),
takeUntil(this.destroy$)
).subscribe({
next: () => {
const activeTab = this.tabs.find(
(tab) => tab.route === this.router.url.slice(1)
);
this.activeTab$.next(activeTab || null);
},
});
return content$.pipe(delay(1000));
}
this.activeTabContent$ = this.activeTab$.pipe(
tap(() => this.loadingTab$.next(true)),
switchMap((tab) =>
this.loadTabContent(tab!).pipe(
startWith(null),
catchError((error) => {
this.errors$.next(error);
return of(null);
}),
finalize(() => this.loadingTab$.next(false))
)
),
shareReplay({ bufferSize: 1, refCount: true })
);
Inside the loadTabContent method, we’ll create an Observable out of the Angular component
that’s matched based on the current route. Once we’ve done this, we’re ready to stream into the tab
content whenever the active tab changes. We can do this by starting the loading state, switching
to the stream that’s loading content, and resetting the loading state once the content has arrived.
Now, all we need to do is represent the content in the UI. Back in our tabs.component.html file,
we can simply add the following code:
@if (loadingTab$ | async) {
<p>Loading...</p>
}
<ng-container
*ngComponentOutlet="activeTabContent$ | async"
></ng-container>
Now, by going to our browser, we’ll see that the content of a tab will only be loaded when we
click on that specific tab:
50 Building User Interfaces with RxJS
See also
• The of function: https://rxjs.dev/api/index/function/of
• The startWith operator: https://rxjs.dev/api/operators/startWith
• Angular’s Router’ NavigationEnd event: https://angular.dev/api/router/
NavigationEnd
• The Angular Material tab component: https://material.angular.io/components/tabs/
overview
Getting ready
In this recipe, to provide support for tracking image upload progress, we need to run a small Node.
js server application located in the server folder. We can run this server application by using the
following command:
node index.js
After that, we’re ready to go to the client folder and dive into the reactive drag-and-drop
component.
How to do it…
In this recipe, we’ll define a drag-and-drop area for .png images. Then, we’ll add support for
multiple uploads to be made at the same time, show the upload progress of each image, and display
error messages if the format of the image isn’t correct. We’ll also implement a retry mechanism
in case a file upload fails over the network.
After getting the dropzoneElement reference with @ViewChild(), we can start reacting to the
drag-and-drop events in the dropzone area:
@ViewChild('dropzoneElement') dropzoneElement!: ElementRef;
ngAfterViewInit(): void {
const dropzone = this.dropzoneElement.nativeElement;
const dragenter$ = fromEvent<DragEvent>(
dropzone,
'dragenter'
);
const dragover$ = fromEvent<DragEvent>(
dropzone,
'dragover'
).pipe(
tap((event: DragEvent) => {
event.preventDefault();
event.dataTransfer!.dropEffect = 'copy';
(event.target as Element).classList.add('dragover');
})
);
const dragleave$ = fromEvent<DragEvent>(
dropzone,
'dragleave'
).pipe(
tap((event: DragEvent) => {
(event.target as Element).classList.remove('dragover');
})
);
const drop$ = fromEvent<DragEvent>(
dropzone,
'drop'
).pipe(
tap((event: DragEvent) => {
(event.target as Element).classList.remove('dragover');
Chapter 2 53
})
);
While creating these events, we can track when the file(s) have entered the dropzone and when
they’re leaving. Based on this, we can style the component by adding the corresponding classes.
We’ve also defined all droppable even so that we know when to stop reacting to the stream of
new images that’s being dragged over.
return this.fileUploadService.validateFiles$(
files$);
}),
...the rest of the stream
Back in our FileUploadService service, we have a validation method that checks whether we’ve
uploaded a .png image:
validateFiles$(files: Observable<File>): Observable<{
valid: boolean,
file: FileWithProgress
}> {
54 Building User Interfaces with RxJS
return files.pipe(
map((file File) => {
const newFile: FileWithProgress = new File(
[file],
file.name,
{ type: file.type }
);
if (file.type === 'image/png') {
newFile.progress = 0;
} else {
newFile.error = 'Invalid file type';
}
return newFile;
}),
map((file: FileWithProgress) => {
return of({
valid: !file.error,
file
});
}),
mergeAll()
);
}
Here, we check the file type. If it’s expected, we set the progress to 0 and start the upload. Otherwise,
we set the error message for that specific file upload.
),
repeat()
)
handleFileValidation$(file: FileWithProgress):
Observable<FileWithProgress | never> {
if (!file.valid) {
this._snackBar.open(
`Invalid file ${file.name} upload.`,
'Close',
{ duration: 4000 }
);
return EMPTY;
}
return this.fileUploadService
.uploadFileWithProgress$(file);
}
If the file is invalid, we’ll immediately return that file and show the error in the UI:
If it’s a valid file upload, then we initiate an upload request to our API. In Angular, if we want to
track the actual progress of a request, there are a few things we must do:
After applying all these steps, our uploadFiles$ method should look like this:
uploadFile$(file: File): Observable<number> {
const formData = new FormData();
formData.append('upload', file);
const req = new HttpRequest(
'POST', '/api/recipes/upload', formData, {
reportProgress: true,
responseType: 'blob'
}
);
return this.httpClient.request(req).pipe(
map((event: HttpEvent<Blob>) =>
this.getFileUploadProgress(event)),
filter(progress => progress < 100),
);
}
Now, when we send this request, we’ll get a series of HTTP events that we can react to. If we check
the getFileUploadProgress method, we’ll see this in action:
getFileUploadProgress(event: HttpEvent<Blob>): number {
const { type } = event;
if (type === HttpEventType.Sent) {
return 0;
}
if (type === HttpEventType.UploadProgress) {
const percentDone = Math.round(
100 * event.loaded / event.total!);
return percentDone;
}
if (type === HttpEventType.Response) {
Chapter 2 57
return 100;
}
return 0;
}
With this approach, we know the exact progress of the file upload due to the UploadProgress event.
Finally, we can call the uploadFileWithProgress$ method from our service and return each file
with progress information attached to each corresponding file:
uploadFileWithProgress$(file: FileWithProgress):
Observable<FileWithProgress> {
return this.uploadFile$(file).pipe(
map((progress: number) =>
this.createFileWithProgress(file, progress)),
endWith(this.createFileWithProgress(file, 100))
);
}
After emitting a progress value, we’ll return the file with information attached about its progress
so that we can display it in the UI.
{}
);
}
}
});
Once we open our browser and drag multiple valid .png images, we can handle those uploads
concurrently and observe their progress:
this.createFileWithProgress(file, progress)),
endWith(this.createFileWithProgress(file, 100)),
catchError(() => {
const newFile: FileWithProgress =
this.createFileWithProgress(
file,
-1,
'Upload failed'
);
return of(newFile);
})
);
}
If we open our browser, if an upload error has occurred, we may notice the retry button in the UI.
If the network recovers, we can trigger another upload request for the failed uploads:
See also
• The HTML input file: https://developer.mozilla.org/en-US/docs/Web/HTML/Element/
input/file
• The interval function: https://rxjs.dev/api/index/function/interval
• The repeat operator: https://rxjs.dev/api/operators/repeat
• The scan operator: https://rxjs.dev/api/operators/scan
• The finalize operator: https://rxjs.dev/api/operators/finalize
• The merge operator: https://rxjs.dev/api/operators/merge
• The mergeAll operator: https://rxjs.dev/api/operators/mergeAll
• The endWith operator: https://rxjs.dev/api/operators/endWith
How to do it…
The essential thing to understand in this recipe is the native HTMLAudioElement and, based on
that, which events are the most important to react to.
Concerning that audio HTML element, in the component audio-player.component.ts file, we’ll
define all the key events for that element:
@ViewChild('audio') audioElement!:
ElementRef<HTMLAudioElement>;
ngAfterViewInit(): void {
const audio = this.audioElement.nativeElement;
const duration$ = fromEvent(audio,
'loadedmetadata').pipe(map(() => (
{ duration: audio.duration }))
);
const playPauseClick$ = fromEvent(audio, 'play').pipe(
map(() => ({ isPlaying: true }))
);
const pauseClick$ = fromEvent(audio, 'pause').pipe(
map(() => ({ isPlaying: false }))
);
const volumeChange$ = fromEvent(audio,
'volumechange').pipe(
map(() => ({ volume: audio.volume })),
);
const time$ = fromEvent(audio, 'timeupdate').pipe(
map(() => ({ time: audio.currentTime }))
);
const error$ = fromEvent(audio, 'error');
}
62 Building User Interfaces with RxJS
Using the audio element, we can react to play, pause, volumechange, and timeupdate events, as
well as metadata that holds information about the duration value of a song. Also, in case network
interruptions occur when we fetch the audio file or corrupted audio files, we can subscribe to the
error event from the audio element.
Now, we can combine all those events and hold the state of a song in a centralized place:
merge(
duration$,
playPauseClick$,
pauseClick$,
volumeChange$
).subscribe((state) =>
this.audioService.updateState(state));
Now, we can subscribe to all state changes in the component and have reactive audio player
controls over user actions.
Chapter 2 63
Now, in the audio-player.component.html file, we can present either a play or pause icon based
on the following condition:
<button mat-fab class="play-pause-btn" (click)="playPause()">
@if (isPlaying) {
<mat-icon>pause</mat-icon>
} @else {
<mat-icon>play_arrow</mat-icon>
}
</button>
Now, we can emit the same event by changing the volume of the audio player by invoking the
changeVolume() method:
This will automatically update the volume state reactively on the audio player element.
this.updateState({
isPlaying: false,
currentTrackIndex: prevIndex
});
}
nextSong(): void {
let nextIndex =
this.audioState$.value.currentTrackIndex + 1;
const tracks = this.audioState$.value.tracks;
if (nextIndex >= tracks.length) {
nextIndex = 0; // Loop back to the beginning
}
this.updateState({
isPlaying: false,
currentTrackIndex: nextIndex
});
}
Also, when we come to the end of the list, we’ll loop to the beginning of the playlist.
Inside the audio-player.component.ts component, we can subscribe to this state change and
change the song using the audio element:
this.audioService.audioState$.subscribe(({
currentTrackIndex,
tracks
}) => {
if (
tracks[currentTrackIndex].title !==
this.currentTrack.title
) {
this.audioElement.nativeElement.src =
tracks[currentTrackIndex].song;
this.currentTrack = tracks[currentTrackIndex];
}
});
66 Building User Interfaces with RxJS
This means that we have all the information we need about the current song, which means we
can display that data in our audio-player.component.html template.
In the UI, we can combine this current time information with the previous song metadata, show
it in a slider, and watch the song progress:
<p>{{ currentTime | time }}</p>
<audio #audio></audio>
<mat-slider [max]="duration" class="song">
<input matSliderThumb
[value]="currentTime"
(dragEnd)="skip($event)"
>
</mat-slider>
<p>{{ duration | time }}</p>
Chapter 2 67
Finally, if we open our browser, we can inspect all these features and play our favorite jam:
See also
• The BehaviorSubject class: https://rxjs.dev/api/index/class/BehaviorSubject
• The fromEvent function: https://rxjs.dev/api/index/function/fromEvent
• The map operator: https://rxjs.dev/api/operators/map
• The HTML audio tag: https://developer.mozilla.org/en-US/docs/Web/HTML/Element/
audio
68 Building User Interfaces with RxJS
How to do it…
In this recipe, we’ll have an array of notifications to represent incoming notifications based on a
user action, store them by ID, and remove them after a certain period. We’ll also provide support
to manually remove notifications from a stack.
addNotification(notification: Notification) {
this.addNotification$.next(notification);
}
removeNotification(id: string) {
this.removeNotification$.next(id);
}
So, whenever there’s an ongoing request for posting new data, we’ll combine these two actions
with the latest state of the notification stack with the help of the withLatestFrom operator and
update its state:
get notifications(): Observable<Notification[]> {
return merge(
this.addNotification$,
this.removeNotification$
Chapter 2 69
).pipe(
withLatestFrom(this.notifications$),
map(([changedNotification, notifications]) => {
if (changedNotification instanceof Object) {
this.notifications$.next([
...notifications,
changedNotification
]);
} else {
this.notifications$.next(notifications.filter
(notification =>
notification.id !== changedNotification)
);
}
return this.notifications$.value;
})
)
}
Based on the latest emitted value’s type, we can decide whether a new notification needs to be
added or filtered from the stack.
In RecipeService, we must implement the service method for sending the request to the BE API.
If we get a successful response, we’ll perform a side effect to add a notification to the stack. If we
get an error, we’ll display a notification that’s of the error type:
getRecipes(): void {
this.httpClient.get<Recipe[]>('/api/recipes').pipe(
tap(() => {
this.notificationService.addNotification({
id: crypto.randomUUID(),
message: 'Recipe added successfully.',
type: 'success'
});
}),
catchError((error) => {
this.notificationService.addNotification({
id: crypto.randomUUID(),
message: 'Recipe could not be added.',
type: 'error'
});
return throwError(() =>
new Error('Recipe could not be added.'));
}),
).subscribe();
}
</div>
</div>
Now, when we open our browser, we’ll see incoming notifications stacked on each other:
See also
• The BehaviorSubject class: https://rxjs.dev/api/index/class/BehaviorSubject
• The Subject class: https://rxjs.dev/api/index/class/Subject
• The withLatesFrom operator: https://rxjs.dev/api/operators/withLatestFrom
• Web API Crypto’s randomUUID method: https://developer.mozilla.org/en-US/docs/
• Web/API/Crypto/randomUUID
How to do it…
In this recipe, we’re going to build a timeline component that shows the list of your favorite latest
cooking recipes. Since there are a lot of delicious recipes out there, this would be a huge list to
fetch initially. To increase the performance of the application and to improve the general UX, we
can implement an infinite scroll list so that once the user scrolls to the end of a list of 5 initial
recipes, we can get a set of 5 new recipes. After some time, we can send a new request to check
whether there are new recipes and refresh our timeline of recipe news.
As you can imagine, with the scroll event emissions, there’s the potential for performance
bottlenecks. We can limit the number of scroll events that are processed by the stream using
the auditTime operator. This is especially useful since we want to ensure that we are always
processing the latest scroll event, and auditTime will always emit the most recent value within
the specified time frame. Also, with observeOn(animationFrameScheduler), we can schedule
tasks to be executed just before the browser’s next repaint. This can be beneficial for animations
or any updates that cause a repaint as it can help to prevent jank and make the application feel
smoother.
You might be wondering why we used auditTime in our scroll stream and not
throttleTime. The key difference between these two operators is that auditTime
emits the last value in a time window, whereas throttleTime emits the first value
in a time window. Common use cases for throttleTime might include rate-limiting
API calls, handling button clicks to prevent accidental double clicks, and controlling
the frequency of animations.
Once we know we’re getting near the end of a list, we can trigger a loading state and the next
request with a new set of data.
Step 2 – Controlling the next page and loading the state of the
list
At the top of our RecipesList component, we’ll define the necessary states to control the whole
flow and know when we require the next page, when to show the loader, and when we’ve reached
the end of the list:
private page = 0;
public loading$ = new BehaviorSubject<boolean>(false);
public noMoreData$ = new Subject<void>();
private destroy$ = new Subject<void>();
74 Building User Interfaces with RxJS
Now, we can continue our isNearBottom$ stream, react to the next page, and specify when to
show the loader:
isNearBottom$.pipe(
filter((isNearBottom) =>
isNearBottom && !this.loading$.value),
tap(() => this.loading$.next(true)),
switchMap(() =>
this.recipesService.getRecipes(++this.page)
.pipe(
tap((recipes) => {
if (recipes.length === 0)
this.noMoreData$.next();
}),
finalize(() => this.loading$.next(false))
)
),
takeUntil(merge(this.destroy$, this.noMoreData$))
)
.subscribe((recipes) => (
this.recipes = [...this.recipes, ...recipes])
);
}
1. First, we check whether we’re near the bottom of the page or whether there’s already an
ongoing request.
2. We start a new request by showing a loading spinner.
3. We send a new request with the next page as a parameter.
4. When we get a successful response, we can check whether there’s no more data or we can
continue scrolling down the timeline.
Chapter 2 75
Once we receive several new recipes, we can subscribe to that information inside
NewRecipesComponent and display it in the UI:
Now, once we click the 2 new recipes button, we can scroll to the top of the timeline and get
the newest data.
See also
• The fromEvent function: https://rxjs.dev/api/index/function/fromEvent
• The auditTime operator: https://rxjs.dev/api/operators/auditTime
• The animationFrameScheduler operator: https://rxjs.dev/api/index/const/
animationFrameScheduler
• The observeOn operator: https://rxjs.dev/api/operators/observeOn
• The distinctUntilChanged operator: https://rxjs.dev/api/operators/
distinctUntilChanged
• The switchMap operator: https://rxjs.dev/api/operators/switchMap
• The takeUntil operator: https://rxjs.dev/api/operators/takeUntil
3
Understanding Reactive
Animation Systems with RxJS
Animations in web apps are much more than just decorative elements. When used thoughtfully,
they can significantly enhance the user experience (UX) and create a wow effect for the end users.
When it comes to animations, RxJS can really become handy when creating complex animations
because of its superpower to orchestrate multiple events.
In this chapter, we are about to explore some of the coolest ways to add visual delight for our
users with RxJS animations by implementing the following recipes:
Technical requirements
To follow along in this chapter, you’ll need the following:
• Angular v19+
• RxJS v7
• Node.js v22+
• npm v11+ or pnpm v10+
The code for the recipes in this chapter is placed in the GitHub repository here: https://github.
com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/Chapter03
78 Understanding Reactive Animation Systems with RxJS
How to do it…
In this recipe, we’re going to implement a reactive bouncing ball animation, ensuring that it runs
60 frames per second. We are striving to be as realistic as possible with the physics of a bouncing
ball, applying gravity, falling velocity, and energy loss on each bounce.
Now we can pipe into the animationFrame$ stream and update the ball position on every new
animation frame:
const initialHeight = 0;
ngAfterViewInit() {
const initialHeight = 0;
this.ballLoop$ = animationFrame$.pipe(
scan(({ y, dy}, velocity) => {
dy += gravity;
y += dy;
return { y, dy };
}, { y: initialHeight, dy: 0 }),
tap(({ y }) => {
ball.style.top = `${y}px`;
}),
Chapter 3 79
);
this.ballLoop$.subscribe();
}
At this point, we will see an animation where we update the ball’s y position on each frame, where
dy is the direction of movement of the ball.
Anybody who paid attention to physics class in high school will know about gravity laws. They
essentially say that if we have gravity acceleration of 9.81m/s^2, it means that each second the ball
is falling, it is going to get additional velocity of 9.81m/s. In our recipe, if we map one animation
frame as a second in real life, it means that we can achieve a similar effect if we just accumulate
gravity value to dy. By saying dy += gravity, we can simulate how the ball would gain velocity
over time.
ngAfterViewInit() {
const ball = this.ballRef.nativeElement;
const container = document.documentElement;
let energyLoss = 0.8;
this.ballLoop$ = animationFrame$.pipe(
scan(({ y, dy}, velocity) => {
dy += gravity;
y += dy;
// Bounce off the ground
if (y + ball.offsetHeight >
container.clientHeight) {
y = container.clientHeight –
ball.offsetHeight;
dy = -dy * energyLoss;
// Reverse direction and reduce energy
this.bounceCount++;
}
return { y, dy };
80 Understanding Reactive Animation Systems with RxJS
Here, we can see that we have updated the logic so that we know when the ball is hitting the
ground (has reached the end of a viewport), bouncing off it by reverting the direction and factoring
in the energy loss on each bounce.
ngAfterViewInit() {
this.ballLoop$ = interval(
0,
animationFrameScheduler
).pipe(
// the rest of the stream
takeWhile(
({ y, dy }) =>
y < this.container.clientHeight -
ball.offsetHeight - 10 ||
Math.abs(dy) > 5
),
finalize(() => {
this.message.set(`Bouncing stopped after ${
this.bounceCount} bounces`);
}),
repeat({ delay: () => this.bounceRepeat$ })
);
}
Chapter 3 81
startBouncing() {
this.bounceRepeat$.next();
}
Here, we are simply saying that the stream should stop whenever we are 10 px away from the
ground and the current movement of the ball is not larger than 5 px, up or down.
Also, whenever we click on the START BOUNCING button, we will trigger the animation again
by leveraging the RxJS repeat operator. One more great thing about the repeat operator is that
it won’t trigger new animations if we repeatedly click on a button, since it will only re-subscribe
to the stream once the initial stream is completed.
See also
• RxJS animationFrameScheduler: https://rxjs.dev/api/index/const/animation
FrameScheduler
• RxJS repeat operator: https://rxjs.dev/api/operators/repeat
How to do it…
In this recipe, we are going to re-create the default version of the particles.js animation, only
this time with the power and elegance of RxJS.
To increase the resolution of canvas elements, the dimensions of the canvas are higher than
the dimensions of the screen. Increasing the width and height of a canvas element increases its
resolution because it increases the number of pixels available for rendering. This results in more
detailed, higher-quality graphics, but it also requires more resources to process and render the
additional pixels.
Now we can call the generateParticles() method and draw particles on the canvas:
ngOnInit() {
this.ctx = this.canvas.nativeElement.getContext('2d')!;
const initialParticles: Particle[] = Array.from(
{ length: 123 },
this.generateParticle,
this
);
const animationFrame$ = animationFrames();
this.particles$ = animationFrame$.pipe(
scan((particles: Particle[], event: IAnimationFrame) =>
{
return particles.map(particle => {
let newX = particle.x + particle.vx;
let newY = particle.y + particle.vy;
84 Understanding Reactive Animation Systems with RxJS
return {
...particle,
x: newX,
y: newY,
vx: particle.vx,
vy: particle.vy
};
});
}, initialParticles),
tap(particles => this.drawParticles(particles))
);
Here, you may notice that we are using the RxJS animationFrames function to ensure smooth
animation and render particles within the browser rendering cycle. Whenever we enter a
new rendering cycle, we can update the position of each particle on the canvas, and call the
drawParticles() method:
drawParticles(particles: Particle[]) {
this.ctx.clearRect(
0,
0,
this.canvas.nativeElement.width,
this.canvas.nativeElement.height
);
particles.forEach(particle =>
this.drawParticle(particle));
}
drawParticle(particle: Particle) {
this.ctx.beginPath();
this.ctx.arc(
particle.x,
particle.y,
particle.radius,
0,
Math.PI * 2
);
Chapter 3 85
this.ctx.fillStyle = particle.color;
this.ctx.fill();
}
return particle;
}
Once we have implemented the checkWallCollision() method, we can detect wall collision for
each particle in the stream:
this.particles$ = animationFrame$.pipe(
scan((particles: Particle[],
event: IAnimationFrame) => {
return particles.map(particle => {
let newX = particle.x + particle.vx;
let newY = particle.y + particle.vy;
return {
...particle,
x: newX,
y: newY,
vx: particle.vx,
vy: particle.vy
};
});
}, initialParticles),
map(particles => particles.map(particle =>
this.detectWallCollision(
particle,
particle.x + particle.vx,
particle.y + particle.vy))
),
);
Chapter 3 87
drawConnections(particles: Particle[]) {
for (let i = 0; i < particles.length; i++) {
for (let j = i + 1; j < particles.length; j++) {
const particle1 = particles[i];
const particle2 = particles[j];
const distance = Math.sqrt(
(particle1.x - particle2.x) ** 2 +
(particle1.y - particle2.y) ** 2
);
if (distance <= 250) {
const opacity = 1 - distance / 250;
this.drawLine(particle1, particle2, opacity);
}
}
}
}
Who said we won’t need the Pythagorean theorem ever in our lives? In the drawConnections()
method, we are calculating the distance between two particles based on their coordinates. If
they are in a radius of 250 px, we will call the drawLine() method, which will draw a connection
between particles on the canvas.
88 Understanding Reactive Animation Systems with RxJS
};
}),
takeUntil(
merge(
fromEvent(this.canvas.nativeElement,
'mouseout'),
fromEvent(this.canvas.nativeElement,
'mouseleave')
)
)
);
Since the dimensions of the canvas are scaled compared to screen dimensions, we need to
transform mouse coordinates proportionally. After doing that, we can include the mouseMove$
stream in the main particles$ stream:
this.particles$ = merge(mouseMove$, animationFrame$).pipe(scan(
(particles: Particle[],
event: { x: number, y: number } | IAnimationFrame
) => {
if ('x' in event && 'y' in event) {
this.mouseX = event.x;
this.mouseY = event.y;
return particles;
}
return particles.map(particle => {
const mouseMoveCoordinates =
this.handleMouseInteraction(particle);
let newX = mouseMoveCoordinates?.newX ||
particle.x + particle.vx;
let newY = mouseMoveCoordinates?.newY ||
particle.y + particle.vy;
return {
particle,
x: newX,
y: newY,
vx: particle.vx,
vy: particle.vy
90 Understanding Reactive Animation Systems with RxJS
};
});
}, initialParticles));
Finally, we can call the handleMouseInteraction() method and move the particles away from
the hover radius:
handleMouseInteraction(
particle: Particle
): { newX: number; newY: number } | undefined {
// Mouse hover radius avoidance
const distanceToMouse = Math.sqrt(
(this.mouseX - particle.x) ** 2 +
(this.mouseY - particle.y) ** 2
);
return {
newX: particle.x,
newY: particle.y,
};
}
Chapter 3 91
Another day in life not using trigonometry’s sin, co.... oh wait. Math professors were right all
along! Here is how we know where to move particles if the distance between the particle and the
mouse hover radius is less than 200 px:
• Math.atan2 calculates the angle (in radians) between the particle and the mouse. This
angle is measured counterclockwise from the positive x axis to the line connecting the
particle and the mouse.
• Math.cos(angle) and Math.sin(angle) give the x and y components of the unit vector
pointing from the mouse to the particle. This vector is normalized, meaning it has a length
of 1.
• The dot product of the particle’s velocity vector and the normalized direction vector is
calculated. This measures how much of the particle’s velocity is in the direction of the
mouse.
• The new position of the particle is calculated by moving it 200 units away from the mouse
in the direction opposite to the angle, plus the adjusted velocity.
The trigonometry behind the particle movement involves calculating the angle and direction
from the particle to the mouse, determining the influence of the mouse on the particle’s velocity,
and then updating the particle’s position to move it away from the mouse. By doing so, we are
able to avoid the mouse cursor and adjust particle velocities based on their relative positions.
See also
• Particles.js: https://vincentgarreau.com/particles.js/
How to do it…
In this recipe, we are going to be creative and craft a recipe for an upload button where we are
gradually going to increase the progress and control animation states in between until the upload
is fully over.
Here, we could get creative in how we generate the upload progress or retrieve the actual progress
from a server. The current approach is just a simulation, and the focus of this recipe is on animating
the upload progress.
Whenever we click a button, we are going to switch to the animationFrames() stream, which gives
us a timeframe to update and render performant animations. We are going to take the animation
frames stream as long as we are not in the DONE state. After that, we are going to combine that
stream with the latest stream from uploadProgress$ and increase the progress background
width of the button.
Finally, when we open our browser, we can see this animation in action:
See also
• RxJS animationFrame: https://rxjs.dev/api/index/function/animationFrames
4
Testing RxJS Applications
Code testing is one of the best and most vital practices in modern software development.
Establishing code testing as a practice ensures the reliability and correctness of our code, improves
code quality, makes our code more resilient to errors, and acts as a safety net when we want to
introduce a change in our code. We can use unit testing to verify that all components are working
as expected in isolation. Then, we can write integration tests to verify that components in our
system work together in certain scenarios or to check if they are well integrated with external
systems, such as backend APIs. Tests can also serve us as documentation of the expected behavior
of the code. All these benefits apply to testing RxJS code as well, especially for complex streams.
By testing Observables, subscriptions, and operators, we can verify that data flows as expected
and asynchronous operations are handled correctly.
Testing can be challenging, especially with RxJS, because of its asynchronous nature, where
operations execute over time and not in a predictable, linear fashion. That’s why we have a lot of
cool testing tools and libraries to make our lives easier when dealing with async code and event
coordination. Key elements in RxJS unit testing include the following:
Technical requirements
To follow along in this chapter, you’ll need the following:
• Angular v19+
• Karma
• Jasmine
• Jest
• RxJS v7
• MSW
• Node.js v22+
• npm v11++ or pnpm v10+
The code for recipes in this chapter is placed in the GitHub repository here: https://github.com/
PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/Chapter04
Getting ready
To demonstrate the power of RxJS marbles, we are going to write a marble unit test for the
Streamlining real-time updates with RxJS-powered notifications recipe from Chapter 2 (https://
github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/
Chapter02/rxjs-notification).
To understand the marble diagram syntax and help us follow along with this recipe, here is a
quick cheat sheet:
How to do it…
In this recipe, we will test the stream of notifications that are incoming asynchronously over time
and stacking in the UI. To accomplish this, we will present incoming notifications with marble
diagrams and then assert the given sequence of events to make sure we get timely state transitions,
which will be represented in the UI.
beforeEach(() => {
TestBed.configureTestingModule({
providers: [NotificationService]
});
98 Testing RxJS Applications
// if (autodismiss) {
// timer(timeout).subscribe(() =>
this.removeNotification(notification.id));
// }
}
Here, we will demonstrate how notifications are added to the stack over time and how we can
represent that timeline with RxJS marble diagrams:
it('should add notifications', fakeAsync(() => {
testScheduler.run(({ cold, expectObservable,
expectSubscriptions }) => {
const notification1:
Notification = { id: '1',
message: 'Recipe added successfully.',
type: 'success' };
const notification2:
Notification = { id: '2',
message: 'Recipe added successfully.',
type: 'success' };
const add1$ = cold('a', { a: notification1 });
Chapter 4 99
1. First, we create a cold Observable for the first notification at the first frame of time. The
reason for that is that compared to hot Observables, cold Observables offer us better
control and predictability for testing asynchronous behavior since they start emitting
values only when we subscribe to the stream.
2. For the second notification, nothing will happen at the first two frames, then we will wait
at the third frame and finally emit a value.
Imagine this as a timeline of events that we can visually represent in our code, where each '-'
instance in the timeline represents 1 ms. We emit the first notification immediately. Meanwhile,
the second notification waits for two frames and then emits a value. When we look at a timeline
of events, we can clearly see that the order was as follows:
1. First frame – The first notification emits a value; the second notification waits. At this
point, we are expecting to have one notification in the stack.
2. Second frame – Nothing happens, we still have one notification in the stack.
3. Third frame – The second notification emits a value. Once we emit the second notification,
we expect to have two notifications in the stack.
100 Testing RxJS Applications
service.addNotification(notification));
service.notifications.subscribe();
testScheduler.schedule(() => {
expectObservable(service.notifications$).toBe(
expected$,
expectedValues
);
});
});
}));
1. We create a cold Observable for the first notification, which we emit right away.
2. After 1500 ms, we emit the second notification, which won’t be auto dismissible. That
means that the notification will stay in the stack until we remove it manually.
3. After 4000 ms, we emit the third notification.
Now, the fun part starts. At first, by looking at the expected timeline, some values might seem
strange, but there is a good reason behind these values. Every emission of a value takes 1 ms of
virtual time. That means that if we want to create a 1500 ms gap between two emitted values,
we can describe this timeline as with marble diagrams: 'a 1499ms b'. Now, if we look closely at
the expected timeline, 'a 1499ms b 2499ms c 999ms d 3999ms e', things start to make sense:
1. First, we emit a value for the first notification, which will be dismissed after 5000 ms. At
that point, we have the first notification in the stack.
2. We wait 1499 ms, then we emit the second notification that takes 1 ms, which won’t be
auto-dismissed from the stack. At that point, we have the first two notifications in the stack.
3. The third notification should be emitted 4000 ms after the first notification. So far, 1500
ms have passed in virtual time. That means we must wait 2499 ms to emit a value for the
third notification. At that point, we have all three notifications in the stack.
4. So far, 4000 ms have passed in the timeline after we emitted the first value. That means
we wait for 999 ms until the first notification is auto-dismissed from the stack. Once that
happens, we have the second and third notifications left in the stack.
102 Testing RxJS Applications
5. At this point in virtual time, since we emitted the third notification, 1000 ms have passed.
We wait for 3999 ms more and remove the third notification from the stack. Finally, we
are left with only the second notification in the stack.
Wasn’t that hard as it seemed at first, right? These marble diagrams are fun! By following this
pattern, with marble diagrams, we can describe any asynchronous stream of events, which can
help us be more descriptive about our test.
See also
• Marble syntax: https://rxjs.dev/guide/testing/marble-testing#marble-syntax
• RxJS marble testing blog: https://betterprogramming.pub/rxjs-testing-write-unit-
tests-for-observables-603af959e251
• Getting Started with Marble Tests course: https://rxjs-course.dev/course/testing/
getting-started-with-marble-tests
• Ben Lesh’s article Hot vs Cold Observables: https://benlesh.medium.com/hot-vs-cold-
observables-f8094ed53339
• Testing asynchronous RxJs operators article: https://medium.com/angular-in-depth/
testing-asynchronous-rxjs-operators-5495784f249e
• How to test Observables article: https://medium.com/angular-in-depth/how-to-test-
observables-a00038c7faad
• Marble testing helpers for RxJS and Jasmine: https://www.npmjs.com/package/jasmine-
marbles
Getting ready
In this recipe, we’re going to test network request services that we implemented in Chapter 1
(https://github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/
main/Chapter01/network-requests).
Chapter 4 103
How to do it…
In order to mock HTTP calls to external services, we will take the help of the @angular/common/
http/testing module and learn how to test requests being sent in sequence as well as in parallel.
afterEach(() => {
httpMock.verify();
});
Now, when we run our tests, instead of hitting the real backend, a request will be sent to our test
mock backend. We can also notice that we are injecting HttpTestingController, which helps
us to interact with the mocked backend, send requests to it, and assert responses that come back
from the mocked backend.
After each test execution, we call the httpMock.verify() function to confirm that all mocked HTTP
requests within the test have been properly handled and asserted, thus preventing accidental
omissions and maintaining test integrity.
104 Testing RxJS Applications
The response from an HTTP call such as this would look something like this:
const mockResponse = [
{
"id": 1,
"name": "Spaghetti Aglio e Olio",
"description": "Simple yet flavorful pasta with garlic,
olive oil, and chili flakes",
"ingredients": ["spaghetti", "garlic", "olive oil",
"chili flakes", "parmesan cheese",
"parsley"]
},
{
"id": 2,
"name": "Chicken Tikka Masala",
"description": "Creamy, spiced Indian curry with tender
chicken pieces.",
"ingredients": ["chicken breasts", "yogurt",
"garam masala", "turmeric", "cumin",
"tomatoes", "onion", "ginger",
"garlic", "heavy cream"]
},
];
Now, we can assert this response by calling the getRecipes$ method within the test.
In our integration test, we will define a test-case scenario for fetching a list of recipes:
it('should fetch a list of recipes', fakeAsync(async () => {
const recipes$ = service.getRecipes$();
const recipes = firstValueFrom(recipes$);
Chapter 4 105
expect(await recipes).toEqual(mockResponse);
}));
This means that in our test, we must assert both ongoing requests and responses since we are
combining them as the result of the whole stream.
expect((await recipeDetails).recipe)
.toEqual(dummyRecipe);
expect((awaitrecipeDetails).details)
.toEqual(dummyDetails);
});
Here, we can notice that we are asserting ongoing requests as well as the responses from both.
The getRecipes$ service will return the same response as we had in step 3. Once we get a list of
recipes, we will send multiple requests in parallel for each recipe’s image:
const dummyImages = [
{
"id": 1,
"url": "/assets/images/spaghetti.jpg"
},
{
"id": 2,
"url": "/assets/images/chicken_tikka_masala.jpg"
},
];
108 Testing RxJS Applications
Now, when we test this HTTP call, we should be able to assert the image responses for each recipe:
it('should fetch recipes with images in parallel', async () => {
const recipeImages$ = service
.getRecipesWithImageInParallel$();
const recipeImages = firstValueFrom(recipeImages$);
const req = httpMock.expectOne('/api/recipes');
expect(req.request.method).toBe('GET');
req.flush(dummyRecipes);
dummyRecipes.forEach((recipe, index) => {
const imgReq = httpMock.expectOne(
`/api/recipes/images?id=${recipe.id}');
expect(imgReq.request.method).toBe('GET');
imgReq.flush(dummyImages[index]);
});
expect(await recipeImages).toEqual(dummyImages);
});
Note that in this test, we are using forEach to iterate through each recipe that we’re about to
send a request to and extract images for each individual recipe.
The way we can test this error scenario is by flushing the error to httpMock and asserting the error
instance and message in the end:
it('should handle error when fetching recipes', async () => {
const recipes$ = service.getRecipes$();
const recipes = firstValueFrom(recipes$);
const req = httpMock.expectOne('/api/recipes');
expect(req.request.method).toBe('GET');
req.flush('Failed!', {
status: 500,
statusText: 'Error fetching recipes'
});
const recipesResponse = await recipes;
expect(recipesResponse).toBeInstanceOf(Error);
expect(recipesResponse.message).toEqual(
'Error fetching recipes');
});
See also
• Angular docs guide on testing: https://angular.dev/guide/http/testing#expecting-
and-answering-requests
• Angular docs guide on testing services: https://angular.dev/guide/testing/servic
es#httpclienttestingmodule
In this recipe, we’re going to explore how we can simplify this process even further with the
Mock Service Worker (MSW) library. In general, MSW is like a proxy between the browser and
external services, so it is ideal to intercept all ongoing requests and return what we desire instead
of pinging the real backend service. This is especially useful for writing integration tests.
Getting ready
In this recipe, we’re going to test network request services that we implemented in Chapter 1
(https://github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/
main/Chapter01/network-requests).
110 Testing RxJS Applications
How to do it…
Since MSW works only with Jest, we first need to set up the test configuration in the Angular
project to use Jest, and not default testing frameworks Jasmine and Karma. The way we can do it
is by using the jest-preset-angular library. We’ll go through the same test-case scenarios we
looked at in the preceding recipe, but this time, we will see the power of MSW and how we can
simplify the testing process of RxJS side effects such as network requests and errors.
Jest setup
To keep the focus of the recipe on writing tests, we will skip the Jest setup part. But if you
need more info on how to do it, you can check this fantastic blog from Tim Deschryver on
how to integrate Jest into an Angular application and library: https://timdeschryver.
dev/blog/integrate-jest-into-an-angular-application-and-library#.
service = TestBed.inject(RecipesService);
});
Now, the difference from the preceding recipe is that we omitted the need to put
provideHttpClientTesting into the providers array as well as injecting HttpTestingController.
Also, note that before running all tests, we need to call the listen() method on a mocked server
provided by MSW. This will start MSW and intercept all API calls to the real backend service.
This means that in our test, we must assert both ongoing requests and responses since we are
combining them as the result of the whole stream:
test('should fetch recipe details', (done) => {
const dummyRecipe = {
id: 1,
name: 'Spaghetti Aglio e Olio',
description: 'Simple yet flavorful pasta with
garlic, olive oil, and chili flakes',
ingredients: ['spaghetti', 'garlic']
};
const dummyDetails = {
id: 1,
prepTime: 7200000,
cuisine:: 'Italian',
diet: 'Vegetarian',
url: '/assets/images/spaghetti.jpg',
nutrition: {
calories: 450,
fat: 15,
carbs: 70,
protein: 10
}
};
1. We call the getRecipesDetails$ method to trigger two requests in sequence, one after
the other.
2. The requests are automatically intercepted by MSW, and a mocked response is returned.
3. With the firstValueFrom operator, we extract the response value.
4. We do Jest assertion to check if both responses are the ones we expect to have.
Comparing this approach to that in the preceding recipe, note how simplified our integration test
looks, thanks to the power of MSW.
114 Testing RxJS Applications
Here is where it gets a little bit tricky with MSW. There is no way to have both success and error
cases within the same MSW handler, so if we want to simulate an error within the test, we must
override the original handler with the error one to return a mocked error response.
After that, the process is standard to the previous approach: we call the service method, extract
the response with the firstValueFrom operator, and assert the error message.
See also
• jest-preset-angular library: https://www.npmjs.com/package/jest-preset-angular
• MSW docs: https://mswjs.io/
Getting ready
In Chapter 6, we will use NgRx as state management in Angular. Here, we will use the same recipe
and write unit tests for the provided functionality (https://github.com/PacktPublishing/
RxJS-Cookbook-for-Reactive-Programming/tree/main/Chapter06/ngrx-state-management).
Chapter 4 115
How to do it…
In this recipe, we will go over the cooking recipe app example and write unit and integration tests
for different parts of NgRx, such as the following:
• Store
• Selectors
• Actions
• Effects
beforeEach(() => {
TestBed.configureTestingModule({
providers: [provideMockStore()]
});
store = TestBed.inject(MockStore);
});
});
An alternative way to do the same thing is to provide a selector state through provideMockStore
as follows:
provideMockStore({
selectors: [
{
selector: selectRecipesState,
value: [
{
"id": 1,
"name": "Spaghetti Aglio e Olio",
"description": "Simple yet flavorful pasta with
garlic, olive oil, and chili
flakes",
"ingredients": ["spaghetti", "garlic", "olive
oil", "chili flakes",
"parmesan cheese", "parsley"],
"image": "/assets/images/spaghetti.jpg"
Chapter 4 117
},
]
},
]
}),
After that, we can subscribe to the store selector within the test. In that case, our test would look
like this:
it('should select the recipes state', (done) => {
store.select(selectRecipesState).subscribe(
(mockBooks) => {
expect(mockBooks).toEqual([
{
id: 'mockedId',
volumeInfo: {
title: 'Mocked Title',
authors: ['Mocked Author'],
},
},
]);
done();
});
});
beforeEach(() => {
TestBed.configureTestingModule({
imports: [],
providers: [
RecipesService,
118 Testing RxJS Applications
RecipesEffects,
provideHttpClient(),
provideHttpClientTesting(),
provideMockActions(() => actions$)
]
});
effects = TestBed.inject(RecipesEffects);
recipesService = TestBed.inject(RecipesService);
});
When writing tests, it is always good practice to check the reverse error case, where we cause
the test to fail on purpose, just to be sure if the test is working properly:
spyOn(recipesService, 'getRecipes$').and.returnValue(of([
...recipes,
{
"id": 6,
"name": "Spaghetti Carbonara",
"description": "Rich and creamy pasta with
pancetta, eggs, and cheese.",
"ingredients": ["spaghetti", "pancetta", "eggs",
"parmesan cheese", "black pepper"],
"image": "/assets/images/spaghetti_carbonara"
}
]));
So, if we add, let’s say, one more recipe as a return value of a service, our test should fail with the
following error:
Now, we can safely say that our test is working as expected and not producing false positive test cases.
spyOn(recipesService, 'getRecipes$').and.returnValue(
throwError(() => 'error')
);
actions$ = of(loadRecipesAction());
const effectResult =
await firstValueFrom(effects.loadRecipes$);
expect(effectResult).toEqual(loadRecipesActionError({
error: error.message
}));
});
Testing practices
As per the NgRx documentation, there are multiple strategies to write unit and
integration tests, such as marble diagrams, TestScheduler, and ReplaySubject.
The strategy that has been chosen in this recipe is Observables, but we can
accommodate our tests to any strategy of preference.
See also
• NgRx testing strategies: https://ngrx.io/guide/effects/testing#testing-practices
• NgRx effect testing: https://ngrx.io/guide/effects/testing#examples
• NgRx testing selectors: https://ngrx.io/guide/store/testing#testing-selectors
https://packt.link/RxJSCookbook
5
Performance Optimizations
with RxJS
One of the key aspects of having amazing user experiences across the web is web performance.
Performance optimization in RxJS involves carefully managing data flow and strategically
using operators to streamline asynchronous operations within your applications. Key techniques
include filtering unnecessary emissions, asynchronous handling, efficient data combination,
and preventing memory leaks. By employing these strategies, you can minimize redundant
calculations, reduce rendering overhead, and create a more responsive and smooth user experience.
Technical requirements
To complete this chapter, you’ll need the following:
• Angular v19+
• RxJS v7
• Node.js v22+
• npm v11+ or pnpm v10+
122 Performance Optimizations with RxJS
The code for the recipes in this chapter can be found in this book’s GitHub repository: https://
github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/
Chapter05.
How to do it…
In this recipe, we’re going to address and prevent most of the potential performance bottlenecks
in one simple RxJS stream, such as search input that sends HTTP requests based on a search query.
We’ll also create a simple custom RxJS operator to benchmark the speed of stream execution.
Then, in the app.component.ts file, we can use the fromEvent operator to create a stream of events:
@ViewChild('input') input!: ElementRef;
ngAfterViewInit(): void {
const input$ = fromEvent<KeyboardEvent>(
this.input.nativeElement, 'input' );
}
Chapter 5 123
• Prevents memory leaks: It prevents potential memory leaks that could occur if multiple
inner Observables remain subscribed and active concurrently, consuming resources even
when they’re no longer needed.
• Prevents resource accumulation: Without switchMap, you might end up with multiple
pending asynchronous operations accumulating if the source Observable emits values
quickly. The switchMap Observable ensures only the latest operation is active, preventing
resource wastage.
• Simplifies nested subscriptions: In complex data flows, you might need to subscribe to
multiple Observables in a nested fashion. The switchMap Observable flattens this nesting,
making the code more readable and maintainable.
• Avoids subscription management overhead: Manual subscription management in nested
scenarios can be error-prone and lead to memory leaks if it’s not handled meticulously.
The switchMap Observable automates this process, reducing the risk of leaks.
Understanding the difference between hot and cold Observables is crucial for man-
aging resources and data flows effectively, as well as avoiding unexpected behaviors
in your RxJS applications. We can think of cold Observables as a factory that creates
a product line each time someone wants a product. Cold Observables are more re-
source-efficient in simple scenarios, since they produce data only when needed and
execute once per subscription. We refer to cold Observables as unicast as each sub-
scriber receives an independent sequence of data. We can think of hot Observables as
a radio station that’s always broadcasting, whether someone is listening or not. Hot
Observables are more performant on a larger scale and when implementing caching
mechanisms. That’s why we refer to hot Observables as multicast – the stream will
be executed once, and all subscribers will get the same value.
1. We keep bufferSize set to 1 so that we only replay the latest cached value.
2. We pass refCount to keep track of the number of active subscribers. When the subscriber
count drops to 0, the underlying Observable is automatically unsubscribed, and its
resources are cleaned up.
126 Performance Optimizations with RxJS
3. Finally, we unsubscribe from the stream with takeUntil once the component has been
destroyed.
Another commonly used technique is to use the takeUntil operator, which helps us manage the
life cycle of Observable subscriptions gracefully. While it’s often used for component destruction,
it can have broader applications for controlling the flow of data based on external events or
conditions.
One of the most common misconceptions about the takeUntil operator is its place-
ment within the stream. Order matters! The position of takeUntil determines which
operators are affected by the notifier and in what order. The most common and
straightforward placement is at the end of the operator chain. This allows the en-
tire stream of operators to process values until the notifier Observable emits. If we
place takeUntil in the middle of a stream, we can selectively control which parts
of the stream are affected by the notifier. It can also be placed within a higher-order
Observable such as switchMap, mergeMap, or concatMap to control the lifespan of
inner Observables. Being able to place the takeUntil operator flexibly gives us
fine-grained control over our Observable streams and helps manage their life cycle
effectively.
Chapter 5 127
return subscription;
});
}
Once we place the startMeasurePerformance operator at the beginning of the stream and mark
the start of Performance Monitor, we can place measurePerformance at the end of the stream:
export function measurePerformance<T>() {
return (source$: Observable<T>) => new Observable<T>(
observer => {
const subscription = source$.subscribe({
next(value) {
observer.next(value);
performance.mark('end');
performance.measure(
'Measure between start and end',
'start',
'end'
);
128 Performance Optimizations with RxJS
return subscription;
});
}
See also
• Dmytro Mezhenskyi’s video about the hidden pitfalls of shareReplay: https://www.
youtube.com/watch?v=mVKAzhlqTx8&t
• Joshua Morony’s fantastic video about improved performance with the share and
shareReplay operators: https://www.youtube.com/watch?v=H542ZSyubrE
• Dominic Elm’s advanced caching mechanism blog: https://blog.thoughtram.io/
angular/2018/03/05/advanced-caching-with-rxjs.html
• The shareReplay operator: https://rxjs.dev/api/index/function/shareReplay
• The share operator: https://rxjs.dev/api/index/function/share
• Mastering RxJS Memory Leaks: The Leak Detective Handbook: https://hackernoon.com/
mastering-rxjs-memory-leaks-the-leak-detective-handbook
• Async Pipe in Angular: https://medium.com/@softwaretechsolution/async-pipe-in-
angular-bf0c691faaf2
Chapter 5 129
How to do it…
In this recipe, we’ll build a small custom performance analytics observer to track the Core Web
Vitals metrics and improve the performance of our web application, something we’ll do once
we’ve identified performance bottlenecks. Once we’ve gathered the necessary metric information,
we can send that data to our API analytics service.
);
return this.performanceObserver$;
}
Now, we can merge all these streams, gather performance metrics for our custom analytics, and
gain important insights about our web application’s page load times, cumulative layout shifts,
user timing interactions, and more:
loading = true;
ngOnInit(): void {
setTimeout(() => {
// Simulate Cummulative Layout Shift
this.loading = false;
}, 2000);
merge(
this.firstInputPaint$,
this.largestContentfulPaint$,
this.firstContentfulPaint$,
this.cumulativeLayoutShift$
).subscribe((entry) => console.log(entry));
}
See also
• The PerformanceObserver Web API: https://developer.mozilla.org/en-US/docs/
Web/API/PerformanceObserver
• Web Vitals articles: https://web.dev/articles/vitals
• RxJS’s fromEventPattern operator: https://rxjs.dev/api/index/function/
fromEventPattern
How to do it…
In this recipe, we’ll simulate computationally intensive operations by creating a web worker and
running one million iterations of some simple transformations.
In our example, the web worker will run one million iterations. To simulate some form of data
processing, we’ll do the following:
After that, we’re ready to create a stream of message events and react to them. But if we open Dev
Tools and start using the application, after some time, the application will get slower and slower,
even though we’re processing a long task in the background.
This means that we can take message subscriptions and apply additional performance
optimizations.
• We get the whole MessageEvent interface as a result of the stream, but we’re only interested
in data.
• Null values are included in the result.
• There are duplicated successive values.
• The number of events is causing the console to break due to memory overload.
We can address and fix these issues by applying certain RxJS operators:
const message$ = fromEvent<MessageEvent>(
worker,
'message'
).pipe(
filter(( { data } ) => data !== null),
map(({ data }) => data),
distinctUntilChanged(),
bufferCount(1000),
throttleTime(10),
share()
);
Now, our message stream is much more efficient and performant in delivering worker messages.
See also
• Angular docs guide about Web Workers: https://angular.dev/ecosystem/web-
workers#adding-a-web-worker
• MDN Web Workers docs: https://developer.mozilla.org/en-US/docs/Web/API/Web_
Workers_API/Using_web_workers
• RxJS’s bufferCount operator: https://rxjs.dev/api/operators/bufferCount
• RxJS’s throttleTime operator: https://rxjs.dev/api/operators/throttleTime
• RxJS’s share operator: https://rxjs.dev/api/operators/share
6
Building Reactive State
Management Systems with RxJS
State management is hard, and for good reason. If we imagine the perfect state management library,
the main features of that library would be scalability, predictability, performance, maintainability,
developer experience, and flexibility. RxJS can help us with all of that. It can make our state more
scalable by efficiently managing the complexity of data flows and user interactions throughout
our app. We can model state changes as streams of events in a structured way, so it is easier to
reason about state complexity. Also, RxJS enables us to express state changes declaratively and
react to state changes in a natural way, where changes are propagated automatically throughout
the system as they occur, which can lead to more efficient state updates, performance-wise. All
of this makes state changes more predictable and testable, as well as easier to debug.
In this chapter, we’re going to cover some of the most advanced and sophisticated ways of
managing RxJS state:
Technical requirements
To follow along with this chapter, you’ll need the following:
• Angular v19+
• RxJS v7
• Node.js v22+
• npm v11+ or pnpm v10+
The code for the recipes in this chapter can be found in the GitHub repository here: https://
github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/
Chapter06
• Global state access: Effortless access to the state from any component
• Predictable state updates: Without unwanted side effects
• Scalability: Easily grow your app to an infinite number of new modules/features, managing
data flow complexity with ease
• Reactivity: Carry out an automatic component state refresh whenever there is a state
update
• Granular control: The ability to create slices of smaller, more manageable states, each
responsible for a specific domain
• Performance: Targeted re-renders for efficient state updates, providing the best possible
user experience
• Handling errors and side effects: Gracefully handling network requests and creating
resilient error handling
• Developer experience: Clear and concise usage, easy mental model when thinking about
state changes, and integrated debugging tools
We know that there are a lot of libraries that solve these problems, but to truly appreciate a
library, let’s try to create our very own minimum Redux-like state management from scratch and
understand the underlying complexity behind a system like this.
Chapter 6 137
How to do it…
In this example, we will build a small Cooking Recipes app, where we will load a list of recipes
from the mocked data (using MockServiceWorker) and show them in the list. Also, we will have
the ability to know which recipe is selected at the moment, and we will represent that in a separate
state. For this recipe, we are not using any library for state management, but rather building our
own custom one.
orders: [],
}
};
private state$ = new BehaviorSubject<AppState>(
this.initialState
);
}
constructor() {
this.actions$.pipe(
withLatestFrom(this.state$),
map(([action, state]) => rootReducer(
state, action))
).subscribe((state: AppState) => {
this.state$.next(state);
});
}
After dispatching an action, we will notify all the subscribers and start reacting to this new
action, based on its type. In the following code, we combine the dispatched action with the
withLatestFrom state that we have already defined in Step 1 and apply a pure reducer function
to make that state transition in an immutable way.
Chapter 6 139
Let’s say that in our recipes.actions.ts, we have action creators like so:
export interface Action<T = any> {
type: string;
payload?: T;
error?: Error
}
We will dispatch loadRecipesAction initially when the component mounts. As a result, a root
reducer will react to a corresponding action and apply state transition changes. We define our
on function as follows:
return state;
};
}
{},
structuredClone(state),
{ error: payload, loading: false }
})),
)
We have created a reducer function by going through all possible actions, filtering out the action
that actually happened, and applying a reducer function on the action that was dispatched.
For now, there is a way for us to dispatch a synchronous action and transition state, but what
about handling a network request that loadRecipesAction should trigger?
Now we are ready to handle side effect in RecipeService, which is responsible for communication
with the backend:
export class RecipesService {
loadRecipes$ = this.recipeStore.createEffect(() => {
return this.recipeStore.actions$
.pipe(
ofType(LOAD_RECIPES),
exhaustMap(() => this.getRecipes().pipe(
map(res => loadRecipesActionSuccess(res)),
catchError((error: Error) =>
// gracefully error exit and continue stream
of(loadRecipesActionError(
142 Building Reactive State Management Systems with RxJS
error.message ?? error
))
)
))
);
});
getRecipes(): Observable<Recipe[]> {
return this.http.get<Recipe[]>('/api/recipes');
}
loadRecipes(): void {
this.recipeStore.dispatch(loadRecipesAction());
}
}
In the preceding code, we have a small utility function, ofType, similar to NgRx, which will filter
incoming actions, and handle a side effect for a specific action:
export function ofType(type: string) {
return (source: Observable<Action>) => source.pipe(
filter(action => action.type === type)
);
}
What might be of interest here is the usage of the RxJS exhaustMap operator, which can help us
with the following:
When handling concurrent requests in our effect, we can achieve almost the same
result with the switchMap operator. The only difference is that if the same request is
triggered before an ongoing request has finished, it will cancel the previous one and
start the new request. On the other hand, exhaustMap will wait for the first request
to resolve, and it will ignore new incoming requests. The approach we choose here
might vary between different product use cases or personal preference.
Great, now our reducer will pick up the LOAD_RECIPES_SUCCESS action, extract the response
data payload, and update the state. In case of an error, the LOAD_RECIPES_ERROR action will be
dispatched, and the state will be updated with the error message.
What might be interesting here is the usage of the shareReplay operator, which is especially
useful in scenarios where we want to multicast the same cached value to multiple subscribers.
What this means in our case is that whenever we call the same selector in any different part of the
application, those late-state subscribers would catch up on the previously emitted values. This
way, we optimize app performance by returning the latest cached value immediately, without
the need to calculate this slice of the same state again.
144 Building Reactive State Management Systems with RxJS
Under the hood, this operator leverages ReplaySubject, and in that way man-
ages to multicast emitted values from the source Observable to the multiple sub-
scribers. By default, shareReplay will cache the number of passed buffer values
forever. Even when the number of subscribers drops to 0, it will not unsubscribe
from ReplaySubject. If we are not careful, this might cause memory leaks in our
application, but we might want to do this in cases when we want to avoid expensive
computations for Observables that are frequently used, such as state selectors. If
we want to have programmatic control over this behavior, shareReplay provides
a configuration object that can be passed as an argument. Also, if would like un-
subscriptions to be automatically handled by RxJS, we might consider using the
share operator.
constructor(
private recipeStore: RecipesStoreService,
private recipesService: RecipesService
) { }
ngOnInit() {
this.recipesService.loadRecipes();
this.recipeStore.selectState$((
state: Partial<AppState>
) => state.recipesState?.recipes).subscribe(
(recipes: Partial<AppState>) => {
this.recipes = recipes as Recipe[];
}
);
}
}
Chapter 6 145
And voilà!
Now, if we open our app in a browser, we can see the list of cooking recipes that represent the
client state.
return newState;
};
}
To apply a meta-reducer to each reducer, we will wrap it around rootReducer, so it becomes the
following:
export const rootReducer = logMetaReducer(combineReducers({
recipesState: recipesReducer,
ordersState: recipeOrderReducer
}), this);
In our console, we can see our meta-reducer in action and observe all dispatched actions and
state transitions!
See also
• MSW: https://mswjs.io/
• The shareReplay operator: https://rxjs.dev/api/index/function/shareReplay
• The share operator: https://rxjs.dev/api/index/function/share
• The exhaustMap operator: https://rxjs.dev/api/operators/exhaustMap
• The switchMap operator: https://rxjs.dev/api/operators/switchMap
• The structuredClone global browser function: https://developer.mozilla.org/en-
US/docs/Web/API/structuredClone
148 Building Reactive State Management Systems with RxJS
Now that we have a high overview of NgRx, let’s dive into a practical example of how we can deal
with state management challenges.
Important note
At the time of writing this recipe, the latest Angular and NgRx versions were v18. In
the examples, we are using the standalone approach to writing components and
providing services with appConfig. If you are using the older module approach, the
code may vary a bit, but the main approach and concepts relating to state manage-
ment should stay the same.
How to do it…
In this example, we will build a small Cooking Recipes app, where we will load a list of recipes
from the mocked BE (using MockServiceWorker) and show them in the list. Also, we will have
the ability to know which recipe is selected at the moment. The recipe is pretty much the same
as the previous one, but this time, we are leveraging NgRx as a solution for state management, so
we can compare the reduced code complexity and edge cases we have to pay attention to, once
we have a full production-ready state management solution. Also, we will explore some of the
additional features that NgRx offers us, to make our code even more scalable, robust, and up to
the latest trends. In the end, we will connect our store to Angular Router, so we are aware of all
navigations and router states.
AppState is an interface where we will define the structure of our global state:
In our recipes.reducers.ts, we can now define our feature reducer, which will control how the
state would transition depending on dispatched actions:
import { map, catchError, of, exhaustMap } from "rxjs";
import { Actions, createEffect, ofType } from "@ngrx/effects";
);
In the preceding code, we defined the initial app state, created recipeReducer, which will handle
state transitions, and in the end, connected our reducer to the feature state. Also, we can notice
that if we dispatch loadRecipesActions, we will change the loading state to show the loader,
and once the request has completed, we will set loading back to false to hide the spinner and
set the received data as the response. A list of actions that we can dispatch is declared in recipes.
actions.ts, and it looks like this:
Now we are ready to dispatch actions from our components. In RecipeListComponent, after
injecting the store from @ngrx/store components, we can just call the action that will start
fetching recipes:
this.store.dispatch(loadRecipesAction());
this.actions$.pipe(
ofType(loadRecipesAction),
exhaustMap(() => this.recipesService.getRecipes$().pipe(
map(recipes => loadRecipeActionSuccess({
recipes
})),
catchError((error: Error) =>
of(loadRecipeActionError({
error: error.message
}))
)
))
)
);
}
In this effect, we may observe that when an action of a certain type happens, we can trigger a
corresponding side effect. NgRx has a built-in action filter with the ofType operator that matches
actions being triggered and reacting upon them. So, when loadRecipesAction happens, we call
our BE service and react to success and error cases. Like in the preceding recipe, note the use of
the RxJS exhaustMap operator.
• Readability and maintainability: We can extract logic for extracting the slices of state
we need outside of a component
• Memoization: All selectors are cached by default, so we can have performance improvements,
especially with complex state structures
• Composability: Selectors can be combined to have even more granular control
Chapter 6 153
• Testability: Since selectors are pure functions and have no side effects, they can easily
be tested
• Type safety: This helps us with intelligent code completion and early error detection,
improving the overall developer experience
At this point, we have already managed to reproduce the same behavior from our previous recipe.
Talking about reduced complexity, right?
Now, let’s explore additional features of NgRx, and additionally improve our Cooking Recipes app.
} as RootStoreConfig<AppState, Action>),
provideRouterStore()
]
}
A nice addition to this is the router selectors, which we can use right out of the box:
export const {
selectCurrentRoute, // select the current route
selectFragment, // select the current route fragment
selectQueryParams, // select the current route query params
selectQueryParam, // factory function to select a query param
selectRouteParams, // select the current route params
selectRouteParam, // factory function to select a route param
selectRouteData, // select the current route data
selectRouteDataParam, // factory function to select a route data param
selectUrl, // select the current url
selectTitle, // select the title if available
} = getRouterSelectors();
Also, if we open NgRx Dev Tools, we can see router events being dispatched while we are navigating
around the app.
Now, in our recipes.meta-reducer.ts, we can implement our custom debug function that will
act as middleware between an action and a reducer.
Note that we are running this middleware only in dev mode, not in production:
import { isDevMode } from '@angular/core';
import { ActionReducer, MetaReducer } from '@ngrx/store';
After applying metaReducer in the app config, we may observe in the browser console all actions
being dispatched with the payload and current state.
Chapter 6 157
See also
• NgRx docs: https://ngrx.io/docs
• Redux DevTools: https://chromewebstore.google.com/detail/redux-devtools/lmh
kpmbekcpmknklioeibfkpmmfibljd?pli=1
158 Building Reactive State Management Systems with RxJS
One thing that we may notice from the first usage when working with TanStack Query is that it
only supports promises and not Observables. But asynchronous programming and handling side
effects are where RxJS can really excel. So, let’s build our very own custom, minimalistic version
of TanStack Query, but this time with the help of RxJS for managing async data flow.
How to do it…
In this example, we will build a small Cooking Recipes app in Angular, where we will load a list
of recipes from the mocked BE (using MSW) and show them in the list. The way we are going
to manage the state is by replicating a small set of features of TanStack Query, and we will call
it RxJS-Query.
• Declarative queries
• Automatic caching by query keys
• Smart background re-fetches – on window focus, network regain, or query key change
• The Stale-while-revalidate caching strategy
• Composable query keys
• Request cancelation
• Deduplication of request
• Retry mechanism on request failures
Chapter 6 159
• Garbage collection
• Small dev tools component to visualize cache store
constructor(
private queryClient: QueryClientService,
private recipesService: RecipesService
) {}
ngOnInit(): void {
this.queryClient.injectQuery(
['recipes'],
() => this.recipesService.getRecipes$(),
{ staleTime: 1000 * 5 }
);
}
}
): Observable<QueryState<T>> {
const compositeKey = JSON.stringify(key);
let cachedValue = this.cache.get(compositeKey);
const { staleTime = 0, retryNo = 3, gcTime = 30000 } = options;
let { state$ } = cachedValue || {};
if (!state$) {
// start a new state stream
}
return state$.asObservable();
}
public injectQuery<T>(
queryKey: string[],
queryFn: () => Observable<T>,
queryOptions: QueryOptions,
): Observable<QueryState<T>> {
return this.query<T>(queryKey, queryFn, queryOptions);
}
1. First, we will create a Map cache, which will store our queries and Observable streams by
an array of query keys (or one big composable key). Each time we call a query function,
first we will check whether there is a cache value already there, and if it is, then we will
simply return an Observable value from the cache.
2. Now, let’s check what happens if there is no previous cached value, and we need to trigger
a new request (the code inside the !state$ if block):
state$ = new BehaviorSubject<QueryState<T>>({
isLoading: true,
isFetching: true,
});
this.cache.set(
compositeKey,
{ state$, lastFetched: 0 }
Chapter 6 161
);
return state$.pipe(
filter((state: QueryState<T>) => state.isFetching),
switchMap((val) =>
merge(
of(val),
queryFn().pipe(
retry(retryNo),
map((data) => {
this.cache.set(compositeKey, {
state$,
lastFetched: Date.now() + staleTime,
});
return {
data,
isFetching: false,
isLoading: false
};
}),
)
),
),
shareReplay(1),
);
1. First, we set a new state with initial loading values. If you were wondering why we
need both the isLoading and isFetching states, it’s because we need isLoading
to represent when the data is loading for the first time, and we need to display a
spinner in the UI. We use isFetching when we do have data, but we are doing
smart background re-fetches to revalidate stale data.
2. Then, we set the cache immediately, and not when the request has finished. By
doing so, we deduplicate multiple of the same requests (if multiple components
are calling the same query). Also, the switchMap operator will cancel the previous
request if there is a new inbound request.
162 Building Reactive State Management Systems with RxJS
3. Then, we merge the loading state with queryFn, in order not to lose the previous
loading state to show in the UI and to run in parallel queryFn, which sends the
request. After the request has been completed, we will extract the response data
payload, update the cache with values, and set staleTime (how long we consider
data to be fresh from that point on).
4. In the end, we cache all previous values with the shareReplay operator.
The stale-while-revalidate response directive indicates that the cache could reuse
a stale response while it revalidates it to a cache.
– MDN docs
This means that whenever we request data by a query key, first we would check whether that
data is fresh, and if it is, we would return data immediately. But if it is stale, we would still display
the data, and do a background update to fetch fresh data, without disrupting the current user
flow. Once the background revalidation completes and fresh data is available, TanStack Query
seamlessly updates the UI to reflect the latest information. Key benefits of this approach are the
following:
• Improved perceived performance: Users experience faster initial load times because they
receive cached data immediately
• Reduced Server load: Fewer unnecessary requests are made to the server since cached
data is utilized
• Always up-to-date data: The background revalidation ensures that the displayed data is
eventually updated without requiring user interaction
In our example, we are going to achieve the same effect by defining staleTime in a number of
milliseconds as QueryOption:
type QueryOptions = {
refetchOnWindowFocus?: boolean;
refetchOnReconnect?: boolean;
Chapter 6 163
staleTime?: number;
retryNo?: number;
gcTime?: number;
// ... other options
};
Now, our condition for returning the state value from the Map cache might be the following:
if (!state$ || Date.now() > cachedValue.lastFetched)
• Window focus
• Network regain after lost connection
• Component mount
• Change of a query key
If there is a query key change, then we will simply treat that as a new entry and store it under
the new key in our Map cache. Component mount is also straightforward; once the component
is mounted, we will call the query function. But the missing parts are the first two triggers. Let’s
react to those events in an RxJS way:
private queryTriggers(
key: string[],
{ refetchOnWindowFocus = true, refetchOnReconnect = true }:
QueryOptions,
): void {
const focus$ = refetchOnWindowFocus
? fromEvent(window, 'focus')
: from([]);
const networkReconnect$ = refetchOnReconnect
? fromEvent(window, 'online')
: from([]);
merge(focus$, networkReconnect$).subscribe(
() => this.refetch(key));
}
164 Building Reactive State Management Systems with RxJS
We are using the RxJS fromEvent operator to create a stream of events; specifically, whenever we
focus inside of our app or regain network, a new event will be emitted, and we will be notified
about that. Once that happens, we call our RxJS-Query to do a background re-fetch and get us
fresh data. Also, we could opt out of that default behavior by passing the refetchOnWindowFocus
and refetchOnReconnect parameters:
public refetch(key: string[]): void {
const compositeKey = JSON.stringify(key);
const cachedValue = this.cache.get(compositeKey);
let { state$, lastFetched } = cachedValue;
Since our state is BehaviourSubject, we can trigger a new HTTP request by calling the next
method and setting a new fetching state to true.
if (state$) {
this.cache.delete(compositeKey);
}
}
The way we can call this function after that gcTime period is by leveraging the power of RxJS
schedulers.
Chapter 6 165
What is a scheduler?
An RxJS scheduler can help us with controlling the timing and execution of asynchro-
nous operations within streams. They provide fine-grained control over concurrency
and resource management, by letting us define an execution context in which an
Observable can notify Observers.
In our example, at the end of our state stream, we can use the finalize operator and call
asyncScheduler (time-based scheduler):
finalize(() =>
asyncScheduler.schedule(
() => this.removeQuery(key),
gcTime
),
)
Now that we have covered the main features of our RxJS Query, take a glance at the full example
in its full power here: https://github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-
Programming/tree/main/Chapter06/rxjs-query.
After this solid foundation, we could easily replicate the rest of TanStack Query’s functionality and
scale our RxJS Query, such as with mutations, optimistic updates, parallel requests, dependent
requests, pagination support, or offline support.
See also
• TanStack’s documentation: https://tanstack.com/query/latest/docs/framework/
angular/overview
• merge: https://rxjs.dev/api/index/function/merge
• The catchError operator: https://rxjs.dev/api/operators/catchError
• The shareReplay operator: https://rxjs.dev/api/index/function/shareReplay
• HTTP Cache-Control Header: https://developer.mozilla.org/en-US/docs/Web/HTTP/
Headers/Cache-Control
• The fromEvent operator: https://rxjs.dev/api/index/function/fromEvent
• RxJS schedulers: https://rxjs.dev/guide/scheduler
7
Building Progressive
Web Apps with RxJS
Progressive Web Apps (PWAs) are web applications that use modern web capabilities to deliver
a native app experience to users. They combine the best of web and mobile apps, providing a
seamless, reliable, and engaging user experience across different devices. What makes PWAs
special are features such as push notifications, offline access, background data sync, native app
experiences, and so on.
In this chapter, we will implement some of the core PWA features by covering the following recipes:
Technical requirements
To follow along in this chapter, you’ll need the following:
• Angular v19+
• RxJS v7
• Node.js v22+ and npm v11+
• RxDB
168 Building Progressive Web Apps with RxJS
• NestJS v11+
• Dexie.js
The code for the recipes in this chapter is placed in the GitHub repository here: https://github.
com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/Chapter07.
To convert an Angular application to a PWA, we can run one simple command that will add all
the necessary configurations and a basic service worker to our project:
ng add @angular/pwa
Since service workers are one of the requirements for an app to be a PWA and enabled only in
production mode, we must build our app and serve it locally to observe each recipe in action. In
each recipe, we will use the simple http-server library to host the client app locally and observe
the offline mode in the browser by running the following command in the terminal:
npm run build && http-server ${build-location} -c-1 –o
How to do it…
In order to deliver timely food order push notifications to our clients, we will combine RxJS power
both in the Angular client app and in the NestJS backend app. In this recipe, we need a backend
app as well because we need to deliver push notifications in a secure, authenticated way. The
common practice in the industry to achieve a web server identifying itself to the push service is
by leveraging Voluntary Application Server Identification (VAPID) keys.
After that, we can pick the results from the console and use the keys on the backend side (or load
them from environment variables or a secure secret manager):
const vapidKeys = {
publicKey: '//your public key',
privateKey: '//your private key',
};
const options = {
vapidDetails: {
subject: 'mailto:example_email@example.com',
publicKey: vapidKeys.publicKey,
privateKey: vapidKeys.privateKey,
},
};
interface PublicKeyResponse {
publicKey: string;
}
subscribeToNotifications() {
this.http.get<PublicKeyResponse>(
'http://localhost:3000/api/publicKey').pipe(
switchMap((res: PublicKeyResponse) => {
return this.swPush.requestSubscription({
serverPublicKey: res.publicKey
});
}),
catchError(err => {
console.error(
'Could not subscribe to notifications', err);
return EMPTY;
})
)
.subscribe();
}
}
After calling requestSubscription(), we will notice in our browser that we are asking the user
to allow push notifications:
In this object, we will define what information each notification will have, and what user actions
will offer. We can notice that when a food order is being accepted, we will have a notification title,
body, and an icon. When the courier is on the way, we will have an action button, which will lead
to the PWA geolocation page to see the current location of our food delivery.
In our backend endpoint that should deliver the notifications, we can define the RxJS stream that
is calling the sendNotifications() method from the webpush package:
@Post('/api/subscriptions')
async addSubscription(@Body()sub:NotificationSubscription){
return of(orderNotification[OrderStatus.ACCEPTED])
.pipe(switchMap((notification) =>
webpush.sendNotification(sub, JSON.stringify(
notification), options)),
delayWhen(() => this.
foodOrderService.processOrder()),
switchMap(() => webpush.sendNotification(sub,
JSON.stringify(orderNotification[
OrderStatus.COURIER_ON_THE_WAY]),
options)),
delay(4000),
switchMap(() => webpush.sendNotification(sub,
JSON.stringify(orderNotification[
OrderStatus.DELIVERED]), options)),
);
}
See also
• Angular University’s complete guide on push notifications: https://blog.angular-
university.io/angular-push-notifications/
• Angular documentation on push notifications: https://angular.dev/ecosystem/
service-workers/push-notifications
• The SwPush class documentation: https://angular.dev/api/service-worker/SwPush
• Firebase cloud messaging: https://firebase.google.com/docs/cloud-messaging
How to do it…
In this recipe, we will simulate the PWA’s background sync of data by leveraging Angular’s
interceptors and Dexie.js, a small wrapper around the browser’s IndexedDB database.
The way we can intercept each recipe request is by using Angular’s interceptors. In interceptors/
background-sync.interceptor.ts, we have generated a starting point for our background sync:
}
return next(req);
};
constructor() {
super('RecipesDB');
this.version(1).stores({
recipes: '++id'
});
this.recipes = this.table('recipes');
}
}
return EMPTY;
}),
concatMap(async ({ id, ...data }: RecipeResponse) => {console.log(id);
const existingRecipe = await db.recipes.get(id);
const isRecipeStale = existingRecipe &&
existingRecipe.lastUpdated < data.timestamp;
if (!existingRecipe) {
console.log(`Adding recipe with id ${
id} in IndexedDB.`);
return db.recipes.add({ id, data,
lastUpdated: data.timestamp });
}
if (isRecipeStale) {
console.log(`Updating recipe with id ${
id} in IndexedDB.`);
Chapter 7 177
}
console.log(`Recipe with id ${
id} already exists in IndexedDB.`);
return EMPTY;
}),
catchError((error) => {
console.error('Error while syncing data:', error);
return EMPTY;
})
);
When we get a backend response, we will check whether the specific food recipe already exists in
IndexedDB. If not, we can add that recipe to the database. However, if it does exist, then we are
going to check whether the recipe data is stale so that we know whether we need to update that
recipe. Finally, if the recipe exists in the database and the data is fresh, we will gracefully exit and
continue the stream since no changes need to be made in the database.
When we open our browser and inspect DevTools, we can check out the results:
Now that we have data stored in IndexedDB, we can create reactive queries on our IndexedDB
database. In app.component.rs, we can create a reactive query and display recipes data in the UI:
import { liveQuery } from 'dexie';
recipes$ = liveQuery(() => db.recipes.toArray());
Finally, when we run the application build and open our browser in offline mode, we will still
see the recipes data retrieved from IndexedDB.
Since it’s built on top of RxJS, RxDB is reactive by default. We can write reactive database queries
that will update automatically whenever the underlying data changes. RxDB provides Observables
that act as change streams, allowing us to listen to any data modifications such as document
insertions, updates, and deletes.
By combining RxJS with its change streams, RxDB enables you to build applications that react to
data changes in real time, keeping the UI always in sync. Besides offline capabilities, this makes
RxDB a perfect fit to build a PWA.
Getting ready
Since RxDB provides a wide range of possibilities, its documentation is a great learning resource
for exploring all features. You can check it out here if your needs go beyond building a PWA:
https://rxdb.info/quickstart.html.
How to do it…
In this recipe, we will keep the scope to how to set up RxDB and leverage RxJS as much as we can.
We will build a small offline-first cooking app that keeps data in sync.
Chapter 7 179
• Schema metadata provides information about the schema, including its title, description,
primary key, type, and version
• The properties definition specifies the structure and types of the properties in the recipe
object
• The required property lists the properties that are mandatory for a valid recipe object
await recipesDatabase.addCollections({
recipes: {
schema: recipeSchema
}
});
return db;
}
First, we call the createRxDatabase method where we pass the database name and storage. We
use the Dexie.js plugin, which is a wrapper around the browser’s IndexedDB database.
After the database creation, we add a recipes collection with data schema. Now, we are ready
to store recipe data in IndexedDB.
Chapter 7 181
After inserting data into the RxDB collection, we can subscribe to ChangeEvent and track all
changes in our client database:
db.$.subscribe((changeEvent:
ChangeStreamEvent<RxDocument) => {
console.dir(changeEvent)
});
Also, in rxdb.service.ts, we can create an RxJS stream whenever we want to add a new recipe
and call the RxDB insert() method:
export class RxDBService {
db$: Observable<RxDatabase> = from(this.initDatabase())
.pipe(
catchError((error) => {
console.error('Error initializing database:',
error);
return EMPTY;
}), retry(3),
shareReplay({ bufferSize: 1, refCount: true })
);
addRecipe$ = new Subject<Recipe>();
constructor() {
this.addRecipe$
.pipe(
182 Building Progressive Web Apps with RxJS
withLatestFrom(this.db$),
switchMap(([recipe, db]) =>
db.collections['recipes'].insert(recipe)),
catchError((error) => {
console.error('Error inserting recipe:', error);
return EMPTY;
})
)
.subscribe();
}
addRecipe(data: Recipe): void {
this.addRecipe$.next(data);
}
}
Since the recipe title is the database’s primary key, we will get an error if we add the recipe with
the same title.
After that, back in rxdb.service.ts, we can call the find() method on a collection and pass a
variety of selectors with Query Builder to search for the desired recipe by title:
recipes$ = new BehaviorSubject<Recipe[]>([]);
findRecipe$ = new Subject<string>();
constructor() {
this.findRecipe$
.pipe(
withLatestFrom(this.db$),
switchMap(
Chapter 7 183
Here, we can notice that we have searched the recipes collection to find a recipe whose title is an
exact match with the desired recipe. After that, with $, we can observe query results and update
the UI accordingly.
There’s more...
Here are a few points to note:
• Database migrations: As your application grows and your data schema evolves, RxDB
provides tools to manage these changes gracefully. You can define migration strategies
that automatically update your existing data to match the new schema, preventing data
loss and ensuring a smooth transition for your users. This also extends to the underlying
storage engine. If you need to switch from one storage type to another, RxDB allows you
to migrate all your data without any hiccups.
• Real-time replication: RxDB allows your application to seamlessly synchronize data with
various backends such as GraphQL, CouchDB, and even other RxDB instances, making
real-time collaboration a breeze. It achieves this through a simple, yet powerful replication
protocol that ensures data consistency across all connected clients. Even better, RxDB
excels in offline scenarios. Your app can continue to modify data while offline, and RxDB
will automatically synchronize those changes with the server once a connection is re-
established.
• Conflict resolution: RxDB acts as a safety net for your data when multiple users or offline
devices make changes to the same information. This ensures that your data stays consistent
even with many simultaneous edits, preventing data loss and confusion.
184 Building Progressive Web Apps with RxJS
See also
• RxDB as a database for PWAs: https://rxdb.info/articles/progressive-web-app-
database.html
• RxDB as offline first: https://rxdb.info/offline-first.html
• RxQuery: https://rxdb.info/rx-query.html
• RxDB conflict resolution: https://rxdb.info/transactions-conflicts-revisions.
html#conflicts
8
Building Offline-First
Applications with RxJS
We live in an increasingly connected world, and because of this, it’s easy to overlook the
significance of offline functionality. However, the reality is that internet connectivity isn’t always
guaranteed. Whether it’s due to limited network coverage, or simply being in a remote location,
there are countless scenarios where users might find themselves without an internet connection.
This is where offline-first web apps come into play.
Offline-first web apps are designed to function seamlessly, even when users are offline. They
prioritize providing core functionality and data access without relying on a constant internet
connection. This approach has numerous benefits:
• Enhanced user experience: By ensuring that users can interact with an app regardless of
network availability, offline-first apps deliver a smoother and more consistent experience.
This eliminates the frustration of waiting for content to load or being unable to perform
basic tasks when offline.
• Increased accessibility: Offline capabilities make web apps more inclusive, especially in
areas with unreliable or limited internet access. This expands the potential user base and
ensures that everyone can benefit from the app’s features.
• Improved performance: Offline-first apps often load faster and respond more quickly,
as they leverage cached data and resources. This leads to a more efficient and enjoyable
user experience.
186 Building Offline-First Applications with RxJS
• Reduced data costs: For users with metered or expensive data plans, offline functionality
can help minimize data usage. This is particularly valuable in regions where data costs
are high.
• Data synchronization: Offline-first apps typically implement mechanisms to synchronize
data with servers when connectivity is restored. This ensures that user data remains up
to date and consistent across devices.
In order to deal with the complexity of designing offline web apps, we can use different offline
strategies, carefully tailored for specific use cases:
Blog posts with occasional edits: Display the cached post while fetching
Stale-While- potential updates
Revalidate Dashboards with live data: Show cached metrics initially, then refresh
with live server data
Content with moderate update frequency: Articles that might get minor
edits or updates
Websites with mixed content: Static elements (header, footer) alongside
dynamic content (articles, comments).
Cache-Network- Applications with real-time features: Display cached data instantly while
Race fetching live updates
Situations prioritizing speed above all: When the fastest possible response
is crucial, regardless of initial content freshness
Interactive forms and input fields: Provide immediate feedback by
updating the UI before server confirmation (e.g., adding a comment,
Optimistic Update sending a message)
Simple actions with high user expectations for speed: Liking a post, adding
an item to a cart, favoring an item
Technical requirements
To complete this chapter, you’ll need:
• Angular v19+
• Angular Material
• RxJS v7
• Node.js v22+
• npm v11+ or pnpm v10+
The code for recipes in this chapter is placed in the GitHub repository at https://github.com/
PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/Chapter08.
To make Angular applications work offline, we can run one simple command, which will add all
the necessary configurations and a basic service worker to our project:
ng add @angular/pwa
Since service workers are enabled only in production mode, to observe each recipe in action, we
must build our app and serve it locally. In each recipe, we will use the simple http-server library
to host the client app locally and observe offline mode in the browser by running the following
command in the terminal:
npm run build && http-server ${build-location} -c-1 –o
What is in common for all the recipes in this chapter is that whenever we have a step called Going
offline, it is a necessary step to do in order to check offline behavior.
Also, what is in common for all strategies in this chapter (except the optimistic update pattern)
is that we will need to generate an Angular interceptor. All of our logic to simulate offline mode
will be in a file offline.interceptor.ts.
In Angular, we can simply use the following command to generate our request interceptor:
ng generate interceptor interceptors/offline
How to do it…
We are going to leverage Angular’s interceptors and simulate the same behavior as if we were
implementing the Cache-First offline strategy inside a service worker.
return continueRequest$;
}),
map((response: unknown) => {
return new HttpResponse({
status: 200,
body: response
})
})
);
}
190 Building Offline-First Applications with RxJS
return EMPTY;
}),
// exponential backoff if request fails (check Chapter01)
);
1. First, we create an Observable stream out of values returned from cache storage with
the my-app-cache key. Then, we combine that stream with the intercepted request
continueRequest$.
2. Then, we check whether we are online.
3. If that is the case, once we get the response from the network, we will open our cache
storage and put the latest fresh values there by the key to the corresponding request URL.
Now, if we open a browser and Dev Tools, we can start playing around:
On the Network tab, we can observe that there is an ongoing network request when we are online,
which is expected. What we also might notice is that on the Application tab, the cache storage
is visually represented. We can also see our cache entry, my-app-cache:
192 Building Offline-First Applications with RxJS
Finally, if we go into offline mode, we will see that there is no ongoing or failing network request,
but the data is still there, and the whole app works as when it was back online.
How to do it…
We are going to leverage Angular’s interceptors and simulate the same behavior as if we were
implementing a network-first offline strategy inside a service worker.
return continueRequest$.pipe(
withLatestFrom(openCache$),
map(([response, cache]) => {
if (response instanceof HttpResponse) {
cache.put(
req.url,
new Response(JSON.stringify(response.body)));
}
return response;
}),
catchError(() => cacheFallback(req, openCache$))
);
1. First, we create an Observable stream out of the values returned from the cache storage
with the my-app-cache key. Then, we combine that stream with the intercepted request,
continueRequest$.
194 Building Offline-First Applications with RxJS
2. We send the request, and once we get the response from the network, we will open our
cache storage and put the latest fresh values there by the key to the corresponding request
URL.
3. When we go offline, the request will fail, and then we will fall back to the cache.
return EMPTY;
}),
map((response: unknown) => {
return new HttpResponse({
status: 200,
body: response
})
})
);
}
Chapter 8 195
When we experience the offline mode and network request fails, we do the following:
Now, if we open a browser and Dev Tools, we can start playing around:
When we are online, we can see that the network request is being sent. The network response will
refresh our cache data so that we can have the latest possible values once we go offline.
Once we go offline, we can see that the network request fails, but since we have a cache fallback,
everything works as expected, and we show the latest data from the cache.
An obvious drawback of this approach might be use cases where the network is slow. We would
wait for the slow network response, although we might have data from the cache ready to be
shown in the UI. The next two strategies aim to resolve that challenge.
How to do it…
We are going to leverage Angular’s interceptors and simulate the same behavior as if we were
implementing the Stale-While-Revalidate offline strategy inside a service worker. The strategy
here is to create two streams that will emit values in order, first for the cache data, and the second
for the network response data.
return EMPTY;
}),
map((response: unknown) => new HttpResponse({
body: response
})),
);
1. First, we open the cache, my-app-cache, and create an Observable stream from it.
2. Then, we check whether there is a match in the cache with the current request URL.
3. If there is cache data, we return it as a response.
4. If there is no cache data, we gracefully complete the stream without the result.
198 Building Offline-First Applications with RxJS
return response;
}),
// exponential backoff if request fails (check Chapter01)
);
Now, we can use the RxJS concat operator to control the sequence of events for the Stale-While-
Revalidate offline strategy:
return concat(
dataFromCache$,
continueRequest$
);
This means that if we experience offline conditions, first we show the cache data to the user, and
after we do that, we send a request in the background to check whether we are back online and
refresh the data.
If we open a browser and Dev Tools, we can see this strategy in action:
On the Network tab, we can see that there is an ongoing network request when we are online,
which is expected. This is when we prepare for offline conditions and fill in our cache with the
latest data:
Once we go into the offline mode, we can observe the Stale-While-Revalidate strategy in action.
We show cache data immediately, but we are checking in the background if we are back online.
Once we are back online, we are going to see fresh data from the server.
How to do it…
We are going to leverage Angular’s interceptors and simulate the same behavior as if we were
implementing Racing Cache and Network offline strategy inside of a service worker.
return NEVER;
}),
map((response: unknown) => {
return new HttpResponse({
status: 200,
body: response
})
})
);
Chapter 8 201
1. First, we open the cache, my-app-cache, and create an Observable stream from it.
2. Then, we check whether there is a match in the cache with the current request URL.
3. If there is a match, we return cached data as a response.
4. If there is cache data, we return it as a response.
5. If there is no cache data, we simply continue the stream without emitting any value
(without completion of the stream). We can achieve this effect by using NEVER operator
that keeps the observable open indefinitely without emitting values. This is intended to
prevent premature stream completion when no cache is available.
continueRequestWithCacheSave$ = next(req).pipe(
withLatestFrom(openCache$),
map(([response, cache]) => {
if (response instanceof HttpResponse) {
cache.put(
req.url,
new Response(JSON.stringify(response.body)));
}
return response;
}),
catchError(() => dataFromCache$),
);
1. We create an Observable stream of our interceptor’s next function and combine it with
a promise that we get as a result of opening cache storage.
2. We send the request, and once we get the response from the network, we open our Cache
storage and put the latest fresh values there by the key to the corresponding request URL.
3. If the request fails when we go offline, we will refer to the cached value.
202 Building Offline-First Applications with RxJS
We would end up with a response from whoever wins the race. Since we have a two-second delay
inside of our MSW network handler, it is obvious that the cache will win in every case except
initially, when there is no cache data. We can test this and play around with it by putting a delay
of more than two seconds at the beginning of dataFromCache$ stream.
Initially, we can see that network request wins, since we had nothing in the cache storage. But if
we refresh the page and look at Dev Tools, we can see that reading from the cache is faster than
a network request, so this time, the cache wins:
Figure 8.9: Racing the cache and the network in offline mode
Finally, if we go into offline mode, we can see the Racing Cache and Network strategy in action.
First, we will try to send the request, see that the request is failing, and then go to the cache.
See also
• Http-server library: https://github.com/http-party/http-server#readme
• Big thank you to Jake Archibald for the inspiration for this chapter with his blog Offline
Cookbook: https://jakearchibald.com/2014/offline-cookbook/
• MDN Cache API docs: https://developer.mozilla.org/en-US/docs/Web/API/Cache
• Angular Service Worker guide: https://angular.dev/ecosystem/service-workers/
getting-started
204 Building Offline-First Applications with RxJS
How to do it…
In this recipe, we are going to implement a custom operator called optimisticUpdate, which
will handle a POST request in a way that we immediately show the result to the user, then in the
background continue with the network request. If we are offline and the request fails, we will
do a retry after a certain period of time. If the request fails the second time, then we will provide
a rollback option, which in most cases would be that we remove the item from the list that we
added optimistically.
})
)
);
};
}
1. First, as operator arguments, we provide a recipe object that we are about to send to the
backend, and a rollback function (what should we do if the request fails).
2. Then, with the concat operator, we combine two streams that should execute in order,
one stream being the optimistic original value of the original object value we are about
to post, the other stream being the actual request to the backend.
3. We immediately return the original value of the original object value as the first emitted
value and show that recipe in the UI optimistically.
4. After that, we start dealing with the ongoing request. In case of request failure, we define
the retry mechanism by desire: in this case, we will have one retry within three seconds.
5. If we are offline, and the request fails, then we catch that error and provide a rollback
option.
postRecipe(): void {
this.http.post<Recipe>('/api/recipes', this.recipe).pipe(
optimisticUpdate(
this.recipe,
(originalItem: Recipe, error: Error) => {
// Rollback UI changes here
this.recipes$.next(error);
}
),
filter((recipe) => !!recipe)
).subscribe(
(recipe: Recipe) => this.recipes$.next(recipe)
);
}
206 Building Offline-First Applications with RxJS
Here, we can see that in case of a postRecipe method call, we immediately put the new potential
recipe value inside of a Subject. If there’s an error, we have a rollback option to show the error
message, instead of a recipe.
If we open a browser and Dev Tools, we can start playing around with this strategy. We immediately
see the new recipe in the list, although we are offline:
After three seconds and a retry, the request still fails, so we remove the recipe item from the list:
https://packt.link/RxJSCookbook
9
Going Real-Time with RxJS
We live in a world of technology, where we want to always feel connected, and where information
changes quickly. That is why it is an important requirement for modern web apps to provide us
with dynamic and engaging experiences, without the need for manual browser refresh. Instant
live sports event updates, smooth chat experiences, online multiplayer games, and tools where
we can collaborate easily: these are all examples of real-time web apps.
One of the main technologies we can use to achieve real-time magic is WebSocket. WebSocket is
a powerful technology that enables real-time, two-way communication between a web browser
and a server. This persistent connection allows for instant data transfer, making it ideal for appli-
cations that require live updates and interactions. In this chapter, we will see in action WebSocket’s
efficiency, low latency, and full-duplex communication capabilities, which make it a powerful tool.
But what about developer experience when building these kinds of user experiences? That is where
RxJS fits in, to orchestrate real-time data effectively, in an elegant, asynchronous, and reactive way.
Technical requirements
To follow along with this chapter, you’ll need:
• RxJS v7
• Angular v19+
• NestJS v11+
• Node.js v22+
• npm v11+ or pnpm v10+
The code for the recipes in this chapter can be found in the GitHub repository here: https://
github.com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/
Chapter09.
How to do it…
In this recipe, we will visualize food recipe orders over a period of a year, by subscribing to the
WebSocket connection, and observe live changes as data arrives.
constructor() {}
connect() {
if (!this.socket$ || this.socket$.closed) {
Chapter 9 211
this.socket$ = webSocket<Message>({
url: environment.wsEndpoint,
deserializer: (e) => JSON.parse(e.data) as Message,
});
this.orders$ = this.socket$.multiplex(
() => ({ subscribe: ‘orders’ }),
() => ({ unsubscribe: ‘orders’ }),
(message) => message.type === ‘orders’
);
}
}
}
type TRetryOptions = {
count: number;
delayTime: number;
}
type TMessage = {
type: string;
payload?: unknown;
}
retry({
count,
delay: (err, retryAttempt) => {
console.error(‘Socket connection failed:’, err);
return timer(retryAttempt * delayTime);
},
}),
catchError((err: Error) => {
console.error(‘Socket connection failed:’, err);
return of({ type: ‘error’, payload: err } as TMessage);
})
);
}
Now we can pipe into the orders$ WebSocket stream and apply the retryConnection operator:
this.orders$ = this.socket$.multiplex(
() => ({ subscribe: ‘orders’ }), // Subscription message
() => ({ unsubscribe: ‘orders’ }), // Unsubscription message
(message) => message.type === ‘orders’ // Filter function
).pipe(
retryConnection<Message>({
count: 5,
delayTime: 1000,
}),
);
ngOnInit() {
this.recipesService.connect();
Chapter 9 213
ngAfterViewInit(): void {
this.recipesService.orders$.subscribe((message: Message) => {
this.orders = [...this.orders, ...message.payload];
this.chart.updateSeries([{
name: ‘Orders’,
data: this.orders,
}]);
});
}
ngOnDestroy() {
this.recipesService.close();
}
Finally, if we open our browser, we can observe all data as it arrives over a socket.
214 Going Real-Time with RxJS
See also…
• The ApexCharts library: https://apexcharts.com/
• The WebSocket function: https://rxjs.dev/api/webSocket/webSocket
Chapter 9 215
How to do it…
In this recipe, we’re going to build a minimalistic clone of the Instagram chat app. We will be able
to communicate with our friends using this chat, check if our friends are typing, and check friends’
online status. We are going to use Angular and NestJS to handle all WebSocket events, since both
frameworks have first-class support with RxJS. We will use NestJS to handle server-side handling
of WebSocket communication, and, of course, Angular for the client side.
Now, we are ready to create our WebSocket gateway. When we go to chat.gateway.ts, we can
see how to handle new, incoming connections:
import {
ConnectedSocket,
OnGatewayConnection,
OnGatewayDisconnect,
WebSocketGateway,
WebSocketServer
} from ‘@nestjs/websockets’;
import * as WebSocket from ‘ws’;
@WebSocketGateway(8080)
export class ChatGateway implements OnGatewayConnection,
OnGatewayDisconnect {
@WebSocketServer()
server: WebSocket.Server;
constructor(
private chatConnectionService: ChatConnectionService,
private chatService: ChatService,
) {}
We are using the @WebSocketGateway decorator to say that class ChatGateway will handle WebSock-
et connections on port 8080. The ChatGateway class also implements the OnGatewayConnection
interface, which provides a lifecycle hook that allows us to execute code whenever a new client
connects to our WebSocket server. With the @ConnectedSocket decorator, we get access to the
client socket.
@Injectable()
export class ChatConnectionService implements OnModuleInit {
private clients$ = new BehaviorSubject<WebSocket[]>([]);
private clientOneId = ‘b9ec382c-a624-40ba-9865-a81be0d390a8’;
private clientTwoId = ‘e1426280-0169-4647-b7d1-5e061a23a0d8’;
if (clients.length >= 2) {
// only 2 people in chat
client.close();
return;
}
In the clients$ BehaviorSubject, we keep track of all client connections to WebSocket. Whenever
there is a new connection, we will check if we already have two chat participants. Otherwise, we
will assign a new random ID to each new client, since the ws module doesn’t have a native way
to track client IDs.
218 Going Real-Time with RxJS
In this way, we can broadcast a new event connect to all clients, with messages containing in-
formation about the connection.
@Injectable({
providedIn: ‘root’
})
export class ChatService {
private socket$: WebSocketSubject<Message>;
public clientId$!: Observable<Message>;
constructor() {
this.connect();
}
connect() {
if (!this.socket$ || this.socket$.closed) {
this.socket$ = webSocket<IWsMessage<IMessage>>({
url: ‘ws://localhost:8080’,
deserializer: (e) => JSON.parse(e.data) as IWsMessage<IMessage>,
});
this.clientConnection$ = this.socket$.multiplex(
() => ({ subscribe: ‘connect’ }),
() => ({ unsubscribe: ‘connect’ }),
(message) => message.event === ‘connect’
);
}
Initially, we will establish a WebSocket connection and subscribe to the connect topic. After a
successful connection, here we would get all relevant events regarding ongoing client connections.
clientId
}});
}
By doing so, we are sending a message event to our WebSocket for processing. Back in our back-
end chat.gateway.ts, we can subscribe to the message event by defining a new event handler:
import {
MessageBody,
SubscribeMessage,
} from ‘@nestjs/websockets’;
this.chatService.sendTopicMessage(topic, {
id: crypto.randomUUID(),
message,
clientId,
timestamp: new Date(),
});
}
}
With the @MessageBody decorator, we extract the data we have sent over the socket from the
client, in our case, the chat topic key.
Whenever there is a new message, we want to send that message to the chat topic. It is time to
create a reactive stream of incoming messages assigned to that topic in chat.service.ts:
private topics: {
[topicKey: string]: ReplaySubject<Message | { typing: string } | any>;
} = {
Chapter 9 221
onModuleInit() {
const chatTopic$ = this.topics[‘chat’].pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
messages$.subscribe(
(response: { event: string; data: Message[] }) => {
this.chatConnectionService.broadcastMessage(response);
},
);
}
The perfect way to have a chat history is by leveraging RxJS’s ReplaySubject, since every new
connection can see all previous events. Also, we can leverage the shareReplay operator to multi-
cast chat history to all connected clients. Now we can broadcast incoming messages to all clients
and show them to all chat participants.
Back in our frontend app, in chat.service.ts, we can multiplex RxJS WebSocket to subscribe
to our chat topic:
public chat$!: Observable<Message>;
this.chat$ = this.socket$.multiplex(
() => ({ subscribe: ‘chat’ }),
() => ({ unsubscribe: ‘chat’ }),
222 Going Real-Time with RxJS
ngOnInit(): void {
this.chatService
.getChatSocket$()
.subscribe(({ data }: WsMessage) => {
this.messages = data;
);
}
}
We can then open two client browsers next to each other and start chatting:
this.chatService.sendTopicMessage(topic, {
clientId,
isTyping,
});
}
Since we are sending new data to the same topic we have set up in step 3, our chat.service.ts
lifecycle hook finally looks like this:
onModuleInit() {
const chatTopic$ = this.topics[‘chat’].pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
merge(messages$, typing$).subscribe(
(response: { event: string; data: Message[] }) => {
this.latestMessages$.next(response.data);
this.chatConnectionService.broadcastMessage(response);
},
);
}
What is different from what we had in step 3 is that we are merging the typing$ stream with chat
messages. Now we have information when the other client has started typing, and we can have
that information show on the frontend:
isTyping = false;
ngOnInit(): void {
this.chatService.getChatSocket$().pipe(
filter(({ data }: IWsMessage<IChatEvent | IMessage[]>) => (
data as IChatEvent).clientId !== this.clientId),
).subscribe(({ data }) => {
if (‘isTyping’ in data) {
this.isTyping = data.isTyping;
return;
}
}
Chapter 9 225
Now, when we open two browsers side by side and start typing in one application, the other client
will see that the first client has started typing.
To achieve this effect, we can leverage the OnGatewayDisconnect lifecycle hook from NestJS:
handleDisconnect(@ConnectedSocket() client: WebSocket): void {
this.chatConnectionService.handleDisconnect(client);
}
Whenever a client disconnects from a socket, we can call the handleDisconnect() method from
chat-connection.service.ts:
Here, we simply filter out the disconnected client and notify the clients$ ReplaySubject about
this change. Now, we can extend the message that we are broadcasting with the isOnline property:
client.send(
JSON.stringify({
event: ‘connect’,
data: {
clientId: client.id,
otherClientId:
client.id === this.clientOneId
? this.clientTwoId
: this.clientOneId,
isOnline: clients.length === 2,
},
}),
);
We can easily test this behavior by closing the window of one of the clients.
In this step, we are going to introduce an error-handling mechanism to queue messages during
connection outages and attempt to resend them once the connection is re-established. We will
use an RxJS ReplaySubject to store messages and replay historical messages once we establish
a connection again.
Back in our chat.service.ts, we are going to extend the sendChatMessage() method to handle
the case when the socket is closed:
private socketOfflineMessages$ = new
ReplaySubject<IWsMessage<IMessage>>(100);
return;
}
Now that we have messages stored in socketOfflineMessages$, we can observe when we estab-
lish a WebSocket connection again in the connect() method:
this.socket$ = webSocket<IWsMessage<IMessage>>({
url: ‘ws://localhost:8080’,
deserializer: (e) => JSON.parse(e.data) as IWsMessage<IMessage>,
openObserver: {
next: () => {
this.socketOfflineMessages$.subscribe(
(message: IWsMessage<IMessage>) => {this.socket$.next(message);
});
228 Going Real-Time with RxJS
},
},
});
We can notice that the RxJS webSocket function accepts openObserver as part of a configuration
object, which we can leverage to resend all messages while the server is down.
For more information about WebSocket reconnection strategies, take a look at the Implementing
real- time data visualization charts recipe.
return merge(
fromEvent(mediaRecorder, ‘dataavailable’),
fromEvent(mediaRecorder, ‘stop’),
);
})
);
Here, we can observe that we are using the MediaDevices Web API by requesting permission to
use the microphone on our device and creating the Observable stream micRecording$. Once we
allow microphone permission in our browser, we can create a new MediaRecorder, start recording,
and stop the recording after 5 seconds.
Chapter 9 229
Now once we have MediaRecorder available, we can create event streams from the dataavailable
and stop events:
return audioChunkEvent$.pipe(
map((audioEvent: BlobEvent | Event) => {
if (‘data’ in audioEvent) {
audioChunks.push(audioEvent.data);
return EMPTY;
}
return EMPTY;
})
).subscribe();
If the dataavailable event happens, we will update new audio chunks. If the recording stops, we
will gather the reordered audio by creating a new file blob out of the recorded content, encoding
the result into Base64 format as a data URL, and returning the loadend stream event down the
Observable pipeline.
Now, at the end of the stream, we have the result available as a data URL that we can send as a
voice message through our chat topic to the WebSocket gateway:
sendVoiceMessageToServer(progressEvent: Event, clientId: string) {
if (!progressEvent) return;
const reader = progressEvent.target as FileReader;
const base64AudioMessage = reader.result as string;
230 Going Real-Time with RxJS
Our backend will store the voice message in the same ReplaySubject as the rest of the messages,
which means that we would get new WebSocket messages the same way as before, with just a
little modification in the chat messages subscription:
this.chatService.getChatSocket$().pipe(
filter(({ data }: IWsMessage<IChatEvent | IMessage[]>) =>
(data as IChatEvent).clientId !== this.clientId),
).subscribe(({ data }) => {
if (‘isTyping’ in data) {
this.isTyping = data.isTyping;
return;
}
return chatMessage;
});
});
Here, we will check whether the message starts with data:audio, which indicates that we are
dealing with a data URL or voice message. Now, we can differentiate between a regular message
and voice message, and in the case of a voice message, pass the source to the HTML audio element.
Chapter 9 231
Finally, when we open our browser, we can record a voice message to our friend:
See also…
• NestJS gateways: https://docs.nestjs.com/websockets/gateways
• RxJS webSocket function: https://rxjs.dev/api/webSocket/webSocket
• MDN MediaRecorder Web API: https://developer.mozilla.org/en-US/docs/Web/API/
MediaRecorder
• MDN data URLs: https://developer.mozilla.org/en-US/docs/Web/URI/Schemes/data
• MDN HTML audio element: https://developer.mozilla.org/en-US/docs/Web/HTML/
Element/audio
How to do it…
To build this online game, we are going to use Angular and NestJS to handle all WebSocket events,
since both frameworks have first-class support for RxJS. We will use NestJS to handle the server
side of handling a player’s moves, and Angular as a client app for showing the 3x3 grid for the
game, handling user interaction, and sending WebSocket events to our API.
We will also need to follow the instructions for creating a NestJS WebSocket gateway and con-
necting to WebSocket from the client app described in steps 1 and 2 of the Crafting a modern chat
application recipe.
In the case of spectators, we will just notify them that the game has started. Otherwise, we will
assign X to the first player and O to the second one:
onModuleInit(): void {
this.clients$
.pipe(
tap((clients) => {
clients.forEach((client) => {
client.send(
JSON.stringify({
event: ‘join’,
data: {
player: client.player,
board: this.board,
nextPlayer: this.currentPlayer,
},
}),
);
});
}),
)
.subscribe();
}
234 Going Real-Time with RxJS
Finally, it’s game time! If we put two browser windows next to each other, we can see that both
players have joined the match.
We can notice that we are preventing the same player from playing twice in a row. If it’s the correct
player’s turn to play, we will send the index of the clicked field on the board game.
In our game.gateway.ts, we will create an event handler for the move event:
@SubscribeMessage(‘move’)
handleMove(@MessageBody() data: number): void {
Chapter 9 235
this.moves$.next(data);
}
Whenever we have a new move from a player, our system will react by updating the board state
and switching the player’s turn:
const playerMoves$ = this.moves$.pipe(
withLatestFrom(this.clients$),
filter(([move]) => this.board[move] === null),
map(([move, clients]) => {
const nextPlayer = this.currentPlayer === ‘X’ ? ‘O’ : ‘X’;
this.board[move] = this.currentPlayer;
clients.forEach(client => {
client.send(JSON.stringify({ event: ‘boardUpdate’,
data: { move, currentPlayer: this.currentPlayer, nextPlayer } }));
});
this.currentPlayer = nextPlayer;
return clients;
}),
shareReplay({ bufferSize: 1, refCount: true }),
).subscribe();
By sending the boardUpdate event, our frontend app can accept that event and react to the latest
player move. We would send the information about the move field, which player played that
move, and the next player’s turn. Back in our client app game.service.ts:
public boardUpdate$!: Observable<WsMessage>;
this.boardUpdate$ = this.socket$.multiplex(
() => ({ subscribe: ‘boardUpdate’ }),
() => ({ unsubscribe: ‘boardUpdate’ }),
(message) => message.event === ‘boardUpdate’
);
getBoardUpdate$() {
return this.boardUpdate$;
}
236 Going Real-Time with RxJS
Now, we can subscribe to the latest board state in game-board.component.ts and update the UI:
currentPlayerTurn = ‘X’;
board = Array(9).fill(null);
ngOnInit() {
this.wsService.getBoardUpdate$().subscribe(({ data }: WsMessage) => {
const { move, currentPlayer, nextPlayer } = data;
this.board[move] = currentPlayer;
this.currentPlayerTurn = nextPlayer;
});
When we open two separate browser tabs and start playing moves, we can see real-time updates
in the UI:
clients.forEach((client) => {
client.send(JSON.stringify({ event: ‘winner’, data: winner }));
});
this.resetGame();
}
}
return this.board[a];
}
}
}
The way we are doing this winner check is by having all winning scenarios upfront and simply
seeing whether the current state of the board matches one of the winning cases. Also, we are
completing player the moves$ subject to prevent memory leaks after ending the game.
In case of a draw, we can send all clients the draw event and show the restart game button. Back
in our game.service.ts, we can simply check for a draw:
checkDraw(board: Array<string>): boolean {
return board.every(cell => cell !== null) && !this.checkWinner(board);
}
Chapter 9 239
Once we open the UI, we can see the restart button, and we can start the game over:
See also…
• NestJS gateways: https://docs.nestjs.com/websockets/gateways
• RxJS webSocket function: https://rxjs.dev/api/webSocket/webSocket
10
Building Reactive NestJS
Microservices with RxJS
With NestJS, we can build scalable, responsive, message-driven, and resilient server-side
applications efficiently. Since NestJS has first-class support for RxJS, this can take our reactive
game to a whole new level. RxJS can help us enhance responsiveness by handling large volumes of
concurrent requests and gracefully handling failures and recovery mechanisms. It can also help us
with handling asynchronous operations, which can be especially useful in an event-driven system.
Technical requirements
To follow along with this chapter, you’ll need:
• RxJS v7
• NestJS v11+
• Kafka v2.13-3.8.0
• KafkaJS v2.2.4
• grpc-js v1.12.2
• Node.js v22+
• npm v11+ or pnpm v10+
242 Building Reactive NestJS Microservices with RxJS
The code for recipes in this chapter can be found in the GitHub repository here: https://github.
com/PacktPublishing/RxJS-Cookbook-for-Reactive-Programming/tree/main/Chapter10.
Getting ready
Check out Chapter 1 for more HTTP communication strategies in RxJS.
How to do it...
In this recipe, we are going to have two NestJS microservices, namely recipes-api and orders-
api, communicating with each other synchronously, and will delve deep into resiliency patterns.
We are going to explore different techniques for how we can create REST APIs that are robust
and reliable.
In step 2, we described how to handle an individual service method when handling HTTP
communication. But what if we want to have the same resiliency strategy across the whole
microservice?
244 Building Reactive NestJS Microservices with RxJS
return request$;
}
return queueRequest$;
}
}
We can notice that if we have available resources, we will process the request immediately.
Otherwise, if there are more ongoing requests to the microservice than allowed concurrency, we
put those requests into a queue, and schedule processing of those requests when the service has
more resources available:
const queueRequest$: Observable<T> = new Observable((observer) => {
this.requestQueue.push(() => {
return next.handle().pipe(
Chapter 10 245
catchError((err) => {
observer.error(err);
return of(err);
}),
map((result) => {
observer.next(result);
return result;
}),
);
});
return queueRequest$;
Finally, when we check the queue, we can put the first request that went into the queue and
complete the request task:
private processQueue() {
if (
this.requestQueue.length > 0 &&
this.activeRequests < this.MAX_CONCURRENT_REQUESTS
) {
const task = this.requestQueue.shift()!;
this.activeRequests++;
task().subscribe({
complete: () => {
this.activeRequests--;
this.processQueue();
},
});
}
}
246 Building Reactive NestJS Microservices with RxJS
Another approach is to leverage the power of the RxJS mergeMap operator, which has built-in
concurrency support. In that case, we could simply use this pattern, and our interceptor would
look like this:
import { Observable, Subject, mergeMap } from 'rxjs';
@Injectable()
export class BulkheadInterceptor implements NestInterceptor {
private readonly MAX_CONCURRENT_REQUESTS = 3;
private requestQueue = new Subject<Observable<any>>();
constructor() {
this.requestQueue
.pipe(mergeMap((task) => task, this.MAX_CONCURRENT_REQUESTS))
.subscribe();
}
return this.requestQueue.asObservable();
}
}
See also...
• Resiliency patterns on the NestJS blog: https://medium.com/@ali-chishti/
resiliencypatterns-nestjs-b39351f8dea8
• Resiliency best practices: https://dev.to/lucasnscr/resilience-and-best-patterns-
4mo
Kafka’s real-time data processing makes it perfect for many use cases, like:
• User activity tracking – companies like Uber, Netflix, and Spotify use Kafka to monitor
user interactions, clicks, and page views.
• Event bus for microservices – Kafka can act as a message queue and help multiple
microservices communicate and stay synchronized.
• Observability – Kafka can be easily integrated with observability tools to track the health
and performance of the system, as well as process and analyze error data. Cloudflare uses
Kafka for analytics purposes.
• Real-time apps – Kafka can process payments, stock trades, geolocation, or any other
data in real time. It can also easily be integrated with data analytic tools like Apache Spark.
• IoT sensor data – Kafka can be used to collect and analyze data from sensors in devices
like smart home appliances.
Getting ready
To follow along with this recipe efficiently, we will cover some basic terms and definitions when
it comes to Kafka.
• Topic – immutable log of events kept in order. A topic stores information by key, so we
can access values by the key. Also, the timestamp and metadata are included.
• Producer – sources of data that are publishing/writing records of data to Kafka topics.
• Consumer – subscribes to the topic and processes data. Consumers can group and
aggregate data.
• Broker – a Kafka server that hosts Kafka topics and handles data storage and message
delivery.
• Cluster – a group of brokers working together to provide fault tolerance and scalability.
• Partition – topics are separated into divided partitions, ordered sequences of records that
can be distributed across different brokers. This enables load balancing and fault tolerance.
• ZooKeeper – a centralized service that manages the coordination and state of the Kafka
cluster. It helps with things like electing leaders for partitions and managing broker
registrations.
Also, in Kafka’s official documentation, there is a Quickstart on how to download and start a Kafka
server locally https://kafka.apache.org/quickstart.
How to do it...
In this recipe, we will create a reactive Kafka producer and consumer. We will leverage the power
of RxJS to process the stream of data, and to craft a resiliency mechanism that will be useful in
a distributed system.
constructor() {
this.kafka = new Kafka({
brokers: ['localhost:9092'],
retry: {
retries: 0
}
});
this.producer = this.kafka.producer({
allowAutoTopicCreation: true
});
}
In the broker array, we list all Kafka brokers running in the background.
You may notice that we have disabled Kafka’s built-in retry mechanism. You might
want to do this when there is a need for a custom backoff retry pattern. The reason
we are doing it here is for learning purposes when implementing resilient Kafka
connections with RxJS. More info about KafkaJS’s default retry mechanism can be
found in the documentation: https://kafka.js.org/docs/retry-detailed.
Also, we got access to the Kafka producer. Now, if we call the connect() method on a producer,
we will be connected to the Kafka broker, and we can produce the first messages.
Chapter 10 249
});
},
});
this.consumers.push(consumer);
}
async onApplicationShutdown() {
250 Building Reactive NestJS Microservices with RxJS
Now, whenever we call a consumer method from different services, we are going to take several
crucial steps:
Now that we have done the very basic setup of the Kafka message broker, we are ready to add a
little bit of reactivity with the magic of RxJS!
The request body payload that we are sending as each message would look something like this:
{
"message": "{ \"id\": 3, \"name\": \"Pasta alla Gricia\", \"description\":
\"A Roman pasta dish featuring guanciale (cured pork jowl), pecorino
romano cheese, and black pepper.\", \"ingredients\": [\"pasta (rigatoni or
bucatini)\", \"guanciale\", \"pecorino romano cheese\", \"black pepper\"]
}"
}
Chapter 10 251
Now, when we have all incoming messages stored in an RxJS subject, we can send new messages
to the Kafka broker, in a reactive way. In the service’s message-broker.ts init lifecycle hook, we
can subscribe to the new messages:
this.kafkaMessage$.pipe(
concatMap((kafkaMessage) => from(this.producer.send(kafkaMessage))),
).subscribe();
Here, we are creating an Observable stream from the return value of the producer’s send method.
By utilizing the concatMap operator, we will not only send all incoming messages to the Kafka
broker but also keep the order of messages.
This is a basic example of sending new messages to the Kafka broker, but with the producer
connected. What about making a reactive producer that is resilient to connection errors?
async onModuleInit() {
setTimeout(() => {
console.log('Disconnecting producer...');
this.producer.disconnect();
}, 40000);
setTimeout(() => {
console.log('Connecting producer...');
this.producer.connect();
}, 50000);
this.handleBrokerConnection();
}
In a scenario like this, it is crucial that the producer stays resilient and keeps collecting incoming
messages. Once it is connected back to the broker, the producer should send all messages and
continue accepting the new ones.
252 Building Reactive NestJS Microservices with RxJS
After we are successfully connected to the producer, we can listen to the producer’s CONNECT and
DISCONNECT events. When the connection status changes, we will keep track of the active state
of a producer in a separate subject:
this.producer.on(this.producer.events.CONNECT, () => {
this.producerActiveState$.next(true);
});
Chapter 10 253
Also, whenever there is a producer disconnection, we will switch to the producerConnect$ stream,
as part of our reconnection strategy.
producerDisconnect$.pipe(
switchMap(() => producerConnect$)
).subscribe(() => this.producerActiveState$.next(false));
Now the RxJS magic starts! Based on these states, we can decide what we want to do with the
incoming messages:
const acceptIncomingMessages$ = this.kafkaMessage$.pipe(windowToggle(
producerActive$, () => producerInactive$));
const bufferIncomingMessages$ = this.kafkaMessage$.pipe(bufferToggle(
producerInactive$, () => producerActive$));
With the help of the windowToggle operator, we get fine-grained control over toggling between
when we want to collect new stream events and when to stop. If the producer is disconnected,
we will stop the regular emissions of the message. However, at that point, we want to collect all
incoming messages in a buffer, in order not to lose them once the producer is reconnected. For
those purposes, we can leverage the bufferToggle operator, which will keep track of all messages
while we are in the producerInactive$ state. After that, we can merge those two streams and
send messages to the broker in both cases:
merge(
acceptIncomingMessages$,
bufferIncomingMessages$
254 Building Reactive NestJS Microservices with RxJS
).pipe(
mergeAll(),
concatMap((kafkaMessage) => from(this.producer.send(kafkaMessage))),
catchError(() => of('Error sending messages to Kafka!')),
).subscribe();
The lossy backpressure strategy has a simpler form where the consumer starts dropping messages
when overwhelmed. This strategy might be useful in situations where receiving the latest data is
more important than processing every single message, e.g., real-time stock prices.
In our use case, we have decided that we will go with lossless backpressure, since we don’t want
to lose any messages:
merge(
acceptIncomingMessages$,
bufferIncomingMessages$
).pipe(
mergeAll(),
bufferTime(2000),
filter(messages => messages.length > 0),
concatMap((kafkaMessage) => from(this.producer.sendBatch(
{ topicMessages: kafkaMessage }
))),
catchError(() => of('Error sending messages to Kafka!')),
).subscribe();
Chapter 10 255
1. We are using the bufferTime operator for lossless backpressure, and every two seconds,
we are collecting all messages into a buffer.
2. We are filtering time frames without any messages, meaning empty buffers.
3. Instead of producing one message, we are going to send a batch of messages with the
method sendBatch().
Now, when we send a few HTTP requests, we can observe in the console that messages are
successfully being sent to the Kafka broker and the consumer is being notified.
But as we can see now, we get messages asynchronously under the same topic. Wouldn’t it be nice if
could collect all of them under the same topic, and have an array of messages that we can process?
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const kafkaMessage = {
256 Building Reactive NestJS Microservices with RxJS
topic,
compression: CompressionTypes.GZIP,
offset: message.offset,
messages: [{ value: message.value.toString() }]
};
this.messages$.next(kafkaMessage);
},
});
Whenever there is a new message, our messages$ subject will be notified about it. We can consume
those messages reactively by doing the following:
consumeMessages(): void {
this.messages$.asObservable().pipe(
groupBy(person => person.topic, {
connector: () => new ReplaySubject(100) }),
concatMap(group$ => group$.pipe(
scan((acc, cur) => ({
topic: group$.key,
messages: acc.messages ? cur.messages.concat(acc.messages)
:
cur.messages,
}), {} as KafkaConsumedMessage))
),
catchError((error) => {
console.log('Error consuming messages from Kafka!');
return EMPTY;
}),
tap(console.log),
).subscribe();
Here, we can see that with the help of the groupBy operator, we can collect all messages under the
same topic. After that, we use concatMap to have messages in order, while also transforming the
inner stream to put all the messages into the same array with the scan operator. An alternative
approach might be to use the mergeScan operator alongside the map operator.
Chapter 10 257
When we send new POST requests to our API, we will see that messages are efficiently grouped
into the array under the same topic and ready for processing.
We might also notice the order of the messages, where the most recent ones are at the beginning
of the array.
}), {} as KafkaConsumedMessage)
)
),
catchError((error) => {
console.log('Error consuming messages from Kafka!');
return EMPTY;
}),
tap(console.log),
).subscribe();
In a real-life scenario, if the processing of a message fails, we want to keep track of all failed
processing events, send those events to the monitoring service, and schedule them to reinitiate
re-processing or additional checks.
For those purposes, we will implement the dead-letter queue pattern. This is a mechanism for
handling messages that failed to be processed by the consumer and sent to a separate topic. We
will extend the previous implementation of the consumer’s run() method from step 7:
private readonly dlq$ = new ReplaySubject<KafkaMessage>();
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const kafkaMessage = {
topic,
compression: CompressionTypes.GZIP,
offset: message.offset,
messages: [{ value: message.value.toString() }]
};
let parsedMessage = null;
try {
Chapter 10 259
parsedMessage = this.deserializeMessage(kafkaMessage);
} catch (error) {
this.dlq$.next({
topic: 'dlq',
compression: CompressionTypes.GZIP,
messages: [{ value: message.value.toString() }],
error
});
return;
}
this.messages$.next(parsedMessage);
},
});
What the deserializeMessage() method will do is basically try to call JSON.parse() for every
message. If deserialization fails, we will send that message to the dlq topic, with information
about the message and the error that happened.
Whenever we have a new message sent to the dlq subject, we can react to that event by scheduling
a new value emission:
async onModuleInit() {
this.dlq$.pipe(
delay(5000),
// send DLQ error to monitoring service every night at 2am
subscribeOn(asyncScheduler),
materialize(),
tap(console.log)
).subscribe();
}
260 Building Reactive NestJS Microservices with RxJS
Finally, when we try to send a message that contains the error for processing, we would see the
following log in the console:
See also
• Apache Kafka in 6 minutes: https://www.youtube.com/watch?v=Ch5VhJzaoaI
• What is Kafka? by IBM: https://www.youtube.com/watch?v=aj9CDZm0Glc
• Kafka in 100 Seconds by Fireship: https://www.youtube.com/watch?v=uvb00oaa3k8
• System Design: Apache Kafka In 3 Minutes by ByteByteGo: https://www.youtube.com/
watch?v=HZklgPkboro
• Kafka Deep Dive w/ a Ex-Meta Staff Engineer: https://www.youtube.com/watch?v=DU8o-
OTeoCc
gRPC uses Protocol Buffers as a language-agnostic mechanism for serializing structured data,
offering efficient and compact data encoding. Compared to JSON, as a standard way of exchanging
data on the web, Protocol Buffers are much faster and easier to decode and store on a server. Also,
gRPC can help us when building type-safe APIs that scale, since Protocol Buffers enforce type-
checking, which leads to reduced errors and improved code quality.
Use cases where we can leverage gRPC protocol are communication between microservices, real-
time applications like chat apps and video streaming, IoT applications, etc.
Getting ready
If you haven’t used gRPC before, their official documentation might be a great starting point:
https://grpc.io/docs/what-is-grpc/introduction/. Also, NestJS provides documentation
on how to set up a gRPC microservice: https://docs.nestjs.com/microservices/grpc.
How to do it...
Ever wondered how food delivery apps like Wolt or Uber Eats give us real-time information
about the location of a courier who is delivering our order? Well, now it’s time to find out how
that works under the hood. In this recipe, we are going to build a gRPC microservice in a NestJS
server app that accepts the food order and, with a little bit of RxJS magic, streams to the client
the results of different states of food preparation.
service FoodOrderService {
rpc CreateOrder (stream OrderRequest)
returns (stream OrderResponse);
}
message OrderRequest {
string item = 1;
int32 quantity = 2;
262 Building Reactive NestJS Microservices with RxJS
message OrderResponse {
string id = 1;
string item = 2;
int32 quantity = 3;
string status = 4;
}
package: 'order',
protoPath: join(__dirname, './proto/order.proto'),
url: 'localhost:5000',
onLoadPackageDefinition: (pkg, server) => {
new ReflectionService(pkg).addToServer(server);
},
},
});
await app.listen();
}
bootstrap();
Here, we define all the necessary options for effective gRPC communication, like transport, package
name, path to .proto file, and server URL. Also, what you may notice is that we set up a gRPC
server reflection. This allows clients to discover the available services and their methods on the
server without needing the Protocol Buffers file. The benefit of this approach is easier developer
experience; it is also easier to test and debug gRPC services.
@GrpcStreamMethod('FoodOrderService', 'CreateOrder')
createOrder(
stream: Observable<OrderRequest>
): Observable<OrderResponse> {
return stream.pipe(mergeMap((data: OrderRequest) =>
this.orderService.createOrder(data), 3));
}
}
264 Building Reactive NestJS Microservices with RxJS
1. With the @GrpcStreamMethod decorator, we can match the service and RPC method from
our .proto file.
2. The controller accepts a new OrderRequest as an Observable stream, which means that
we can pipe into that stream and leverage RxJS.
3. We can also return OrderResponse as an Observable stream.
4. With the power of the RxJS mergeMap operator, we can handle the concurrency of incoming
requests. This means that no more than three orders can be processed at the same time,
and other requests will be queued and processed when we have three or fewer ongoing
requests.
return currentOrder$.asObservable();
}
Once we accept a new order, we immediately change its status to PENDING and notify the client
that the order is being processed. Over time, we will switch this status over the gRPC stream to
other statuses, like ACCEPTED, PREPARING, COURIER_ON_THE_WAY, or CANCELLED.
Now, once we have gRPC streaming in place, we can use the power of RxJS to simulate reactive
stream changes over time, update the order status, and notify the client about the changes:
const id = crypto.randomUUID();
const newOrder: OrderResponse = { orderRequest, id,
status: OrderStatus.PENDING };
const orderStatus$ = new BehaviorSubject<OrderResponse>(newOrder);
Chapter 10 265
of(newOrder)
.pipe(
delay(1000),
map((order: OrderResponse) => {
order.status = OrderStatus.ACCEPTED;
orderStatus$.next(order);
return order;
}),
delay(1000),
map((order: OrderResponse) => {
order.status = OrderStatus.PREPARING;
orderStatus$.next(order);
return order;
}),
delay(10000),
map((order: OrderResponse) => {
order.status = OrderStatus.DELIVERED;
orderStatus$.next(order);
return order;
}),
tap(() => orderStatus$.complete()),
takeUntil(this.stop$),
)
.subscribe();
return orderStatus$.asObservable();
}
}
266 Building Reactive NestJS Microservices with RxJS
Here, we are simulating the food preparation process by simply delaying the notifications about
the order status changes. At the end of a stream, we will complete the orderStatus$ stream,
because we want to notify the controller that the request has completed, and we can move on to
the next concurrent requests.
return interval(2000).pipe(
map(i => {
const orderWithLocation = {
order,
location: { lat: 40.7128 + i * 0.1, lng: -74.0060 + i * 0.1
}
};
this.orderStatus$.next(orderWithLocation);
return orderWithLocation;
}),
startWith({ ...order, location: { lat: 40.7128, lng: -74.0060 } }),
takeUntil(merge(timer(10001), this.stop$),
);
}),
We can see that every two seconds, we will update the location of the courier and notify the gRPC
client about the change. Also, since we have an inner Observable that we have created with the
interval operator, we need to be careful about memory leaks. In cases where we need fine-grained
control over when an Observable stream should end, we can use operators like takeUntil, which
will automatically unsubscribe any observers on a given signal (in our case, emission from the
timer operator after 10,001 milliseconds, or when we stop the service).
Chapter 10 267
Finally, when we test our gRPC controller using an API testing tool like Postman, we can see how
we are streaming data efficiently and giving clients timely notifications about changes within
the system.
After sending the request message, we can track our food orders in real time and know exactly
when to expect our delicious meals.
See also...
• Getting started with gRPC from Google: https://www.youtube.com/watch?v=cSGBbwvW1y4
• What is RPC? gRPC Introduction. from ByteByteGo: https://www.youtube.com/
watch?v=gnchfOojMk4
• How LinkedIn Improved Latency by 60%: https://www.linkedin.com/pulse/how-
linkedin-improved-latency-60-deep-dive-power-google-syed-vln8c/
• Why top companies are switching to gRPC: https://www.youtube.com/shorts/
t0ONFCY6NWI
• Prime Reacts on gRPC: https://www.youtube.com/watch?v=9IxE2UQqJCw
• NestJS documentation on gRPC streaming: https://docs.nestjs.com/microservices/
grpc#grpc-streaming
• Guide on how to use Postman to test gRPC endpoints: https://learning.postman.com/
docs/sending-requests/grpc/grpc-request-interface/
• gRPC web: https://github.com/grpc/grpc-web
https://packt.link/RxJSCookbook
Index
A C
Angular Cache-First strategy 189
NgRx for state management, using 148, 149 used, for implementing seamless RxJS
used, for building TanStack Query 158 offline-first apps 188
Angular Material 47 circuit breaker pattern 18, 21
Angular Observable cluster 247
used, for mocking HTTP dependencies 102 code testing 95
Angular Router 149 cold Observables
Angular Router events 48 versus hot Observables 125
animationFrame complex state management
reference link 94 testing, with NgRx 114
ApexCharts library concatMap 126
reference link 214 consumer 247
API calls cooking recipe app
errors, testing 114 building 2
mocking, with MSW 109 one search input, handling 3, 4
multiple requests, testing in parallel 113 Cooking Recipes app
multiple requests, testing in actions, dispatching 138, 139
sequence 111, 113 building 137
simple HTTP request, testing 111 composable reducers, creating 145, 146
TestBed, setting up 110, 111 meta-reducers, creating 146, 147
auditTime reducer function, applying for state
versus throttleTime 73 transitions 140, 141
side effects, handling 141, 142
B state, defining 137
state, slicing 143, 145
broker 247
custom client-side state management
bulkhead resiliency strategy
building 136, 137
implementing 243, 244
270 Index
W Z
W3C-compatible 30 ZooKeeper 247
WebSocket 30
reference link 34
WebSocket connections
connect method 30, 31
handling 30
WebSocket function
reference link 215
packtpub.com
Subscribe to our online digital library for full access to over 7,000 books and videos, as well as
industry leading tools to help you plan your personal development and advance your career. For
more information, please visit our website.
Why subscribe?
• Spend less time learning and more time coding with practical eBooks and Videos from
over 4,000 industry professionals
• Improve your learning with Skill Plans built especially for you
• Get a free eBook or video every month
• Fully searchable for easy access to vital information
• Copy and paste, print, and bookmark content
At www.packt.com, you can also read a collection of free technical articles, sign up for a range of
free newsletters, and receive exclusive discounts and offers on Packt books and eBooks.
Other Books You May Enjoy
Other Books
You May Enjoy
If you enjoyed this book, you may be interested in these other books by Packt:
Lamis Chebbi
ISBN: 978-1-83508-770-1
• Get to grips with RxJS core concepts such as Observables, subjects, and operators
• Use the marble diagram in reactive patterns
• Delve into stream manipulation, including transforming and combining them
• Understand memory leak problems using RxJS and best practices to avoid them
• Build reactive patterns using Angular Signals and RxJS
• Explore different testing strategies for RxJS apps
• Discover multicasting in RxJS and how it can resolve complex problems
• Build a complete Angular app reactively using the latest features of RxJS and Angular
Other Books You May Enjoy 279
Effective Angular
Roberto Heckers
ISBN: 978-1-80512-553-2
Your review is important to us and the tech community and will help us make sure we’re deliv-
ering excellent quality content.
Other Books You May Enjoy 281
Do you like to read on the go but are unable to carry your print books everywhere?
Is your eBook purchase not compatible with the device of your choice?
Don’t worry, now with every Packt book you get a DRM-free PDF version of that book at no cost.
Read anywhere, any place, on any device. Search, copy, and paste code from your favorite technical
books directly into your application.
The perks don’t stop there, you can get exclusive access to discounts, newsletters, and great free
content in your inbox daily.
https://packt.link/free-ebook/9781788624053