Azure Cosmos DB - Change Feed Support

Download as pdf or txt
Download as pdf or txt
You are on page 1of 8

Azure Cosmos DB - Change Feed Support

Azure Cosmos DB is a fast and flexible globally replicated database service that is used for
storing high-volume transactional and operational data with predictable single-digit
millisecond latency for reads and writes. This makes it well-suited for IoT, gaming, retail, and
operational logging applications.
A common design pattern in these applications is to track changes made to Azure Cosmos
DB data, and update materialized views, perform real-time analytics, archive data to cold
storage, and trigger notifications on certain events based on these changes.
Azure Cosmos Db has an impressive feature called "Change Feed" support which enables
you to build efficient and scalable solutions for each of these patterns, (inserts and updates)
and provides a unified API to access those captured change events.

With change feed support, Azure Cosmos DB provides a sorted list of documents within an
Azure Cosmos DB collection in the order in which they were modified. This feed can be used
to listen for modifications to data within the collection and perform actions such as:
1. Trigger a call to an API when a document is inserted or modified
2. Perform real-time (stream) processing on updates
3. Synchronize data with a cache, search engine, or data warehouse

Change feed: Event Sourcing with Cosmos DB


Storing your data is just the beginning of the adventure. With change feed support, you can
integrate with many different services depending on what you need to do once changes
appear.
Example #1: You are building an online shopping website and need to trigger an email
notification once a customer completes a purchase. Whether you prefer to use Azure
Functions, Azure Notification Hub, Azure App Services, or your custom-built micro services,
change feed allows seamless integration by surfacing changes in the order that they occur.
Example #2: You are storing data from an autonomous vehicle and need to detect
abnormalities in incoming sensor data. As new entries are stored in Cosmos DB, these
changes that appear on the change feed can be directly processed by Azure HDInsight,
Apache Spark, or Apache Storm. With change feed support, you can apply intelligent
processing in real-time while data is stored into Cosmos DB.

Example #3: Due to architecture changes, you need to change the partition key for your
Cosmos DB collection. Change feed allows you to move your data to a new collection while
processing incoming changes. The result is zero down time while you move data from
anywhere to Cosmos DB.

The change feed in Azure Cosmos DB enables you to build efficient and scalable solutions
for each of these patterns, as shown in the following image:
From an architecture point of view, the change feed feature can be used as an event
sourcing mechanism. Applications can subscribe to the change event feed, by default
Cosmos Db is enabled with the change feed, there are 3 different ways to subscribe to the
change feed.
1. Azure Functions – Serverless Approach
2. Using Change Feed Processor SDK
3. Using Cosmos SQL SDK

1. Using Azure Functions


Setting up the change feed using Azure Functions is straight forward, this is a trigger-based
mechanism. We can configure an Azure Function using the portal by navigating to the
Cosmos Db collection and click ‘Add Azure Function’ in the blade. This will create an Azure
Function with the minimum required template to subscribe to the change feed.
Sample Code:
public static class CosmosDBTriggerFunction {

[FunctionName("CosmosDBTriggerFunction")]

public static void Run([CosmosDBTrigger(

databaseName: "CosmosDBChangeFeed",

collectionName: "IoT",

ConnectionStringSetting = "DBConnection",

LeaseCollectionName = "leases")]IReadOnlyList<Document> input, ILogger log)

if (input != null && input.Count > 0) {

foreach (var document in input) {

log.LogInformation("Documents modified " + input.Count);

log.LogInformation("First document Id " + input[0].Id);

}
Local.settings.json
{ "IsEncrypted": false,

"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"DBConnection": "AccountEndpoint=https://oralcare-
fulfillment.documents.azure.com:443/;AccountKey=65OmgLmM7lS8GFTx3G1vLHibr34Ng1XVVi9c2z
QSGMkgtuKjq2Bf0NVrkPUzAtr6dKJohacw6eCJIYaWfZcRrw==;"
}}

The above Function gets triggered when a change occurs in the collection (insertion of a
new document or an update in the existing document). One change event trigger may
contain more than one changed documents, IReadOnlyList parameter receives the list of
changed documents and implements some business logic in a loop.
In order to get the feed from the last changed checkpoint, the serverless function need to
persist the checkpoint information. So, when we create the Azure Function, in order to
capture the change, it will create a Cosmos Db document collection to store the checkpoint
information. This collection is known as lease collection. The lease collection stores the
continuation information per partition and helps to coordinate multiple subscribers per
collection.
In practical implementations, we would not worry much about the lease collection structure
as this is used by the Azure Function to coordinate the work and subscribe to the right
change feed and right checkpoint. Serverless implementation abstracts lots of details and
this is the recommended option as per the documentation from Microsoft.

2. Using Change Feed Processor SDK


The Azure Cosmos DB change feed processor library helps you distribute event processing
across multiple consumers. This library simplifies reading changes across partitions and
multiple threads working in parallel.
The main benefit of change feed processor library is that you don’t have to manage each
partition and continuation token and you don’t have to poll each container manually.
The change feed processor library simplifies reading changes across partitions and multiple
threads working in parallel. It automatically manages reading changes across partitions
using a lease mechanism.
Change Processor Library helps handles lots of complexity in handling the coordination of
subscribers. This library can be used in advance subscribe scenarios as developers do not
need to manage partition and continuation token logic.
Implementing the change feed processor library

There are four main components of implementing the change feed processor library:
1. The monitored container: The monitored container has the data from which the
change feed is generated. Any inserts and changes to the monitored container are
reflected in the change feed of the container.
2. The lease container: The lease container coordinates processing the change feed
across multiple workers. A separate container is used to store the leases with one
lease per partition. It is advantageous to store this lease container on a different
account with the write region closer to where the change feed processor is running.
3. The processor host: Each host determines how many partitions to process based on
how many other instances of hosts have active leases.
4. The consumers: Consumers, or workers, are threads that perform the change feed
processing initiated by each host. Each processor host can have multiple consumers.
Each consumer reads the change feed from the partition it is assigned to and notifies
its host of changes and expired leases.
Install-Package Microsoft.Azure.DocumentDB.ChangeFeedProcessor
Code Snippets : Initializing Change Feed Processer Builder
static async Task RunProcessingAsync() {

DocumentCollectionInfo feedCollectionInfo = new DocumentCollectionInfo() {

DatabaseName = Constants.CosmosDb_DatabaseName,

CollectionName = Constants.CosmosDb_CollectionName,

Uri = new Uri(Constants.CosmosDb_Uri),

MasterKey = Constants.CosmosDb_Key

};

DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo() {

DatabaseName = Constants.CosmosDb_DatabaseName,

CollectionName = "leases",

Uri = new Uri(Constants.CosmosDb_Uri),

MasterKey = Constants.CosmosDb_Key

};
var builder = new ChangeFeedProcessorBuilder();

var processor = await builder

.WithHostName("ProductChangeObserverHost")

.WithFeedCollection(feedCollectionInfo)

.WithLeaseCollection(leaseCollectionInfo)

.WithObserver<ProductChangeObserver>()

.BuildAsync();

await processor.StartAsync();

Console.WriteLine("Change Feed Processor started. Press <Enter> key to stop...");

Console.ReadLine();

await processor.StopAsync();

In the above code, the monitored collection and the lease collection are given, and the
change feed processor builder is built with the minimum required details. As a minimum
requirement you should pass the IChangeFeedObserverFactory to the builder. The change
feed processor library can manage rest of the things like how to share leases of different
partitions between different subscribers and etc.

Code Snippet for Change Feed Processor Observer


public Task OpenAsync(IChangeFeedObserverContext context) {

CreateCollectionIfNotExistsAsync(

Constants.CosmosDb_Uri, Constants.CosmosDb_Key,

Constants.CosmosDb_DatabaseName, Constants.CosmosDb_CollectionNameLog,

Constants.CosmosDb_Throughput).Wait();

// Setting policies for write location for cosmos db

ConnectionPolicy connectionPolicy = new ConnectionPolicy();

connectionPolicy.UserAgentSuffix = " samples-net/3";

connectionPolicy.ConnectionMode = ConnectionMode.Direct;

connectionPolicy.ConnectionProtocol = Protocol.Tcp;

connectionPolicy.PreferredLocations.Add(LocationNames.SouthIndia);
// Setting up the database container for feeding changed events

Initialize(Constants.CosmosDb_DatabaseName, Constants.CosmosDb_CollectionNameLog,
Constants.CosmosDb_Uri, Constants.CosmosDb_Key, connectionPolicy);

return Task.CompletedTask;

public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document>


docs, CancellationToken cancellationToken)

{ Console.ForegroundColor = ConsoleColor.Green;

Console.WriteLine("Change feed: PartitionId {0} total {1} doc(s)",


context.PartitionKeyRangeId, Interlocked.Add(ref totalDocs, docs.Count));

foreach (Document doc in docs) {

//Fetching all the columns for changed event data

JObject data = JObject.Parse(JsonConvert.SerializeObject(doc));

Event e = new Event() {

PID = doc.Id,

Name = (string)data["Name"]

};

InsertData(e);

} return Task.CompletedTask;

Summary
Cosmos Db change feed is a powerful feature to subscribe to the changes. There are three
different ways to do this as mentioned above.
The below table summarizes the options and features.

You might also like