Azure Cosmos DB - Change Feed Support
Azure Cosmos DB - Change Feed Support
Azure Cosmos DB - Change Feed Support
Azure Cosmos DB is a fast and flexible globally replicated database service that is used for
storing high-volume transactional and operational data with predictable single-digit
millisecond latency for reads and writes. This makes it well-suited for IoT, gaming, retail, and
operational logging applications.
A common design pattern in these applications is to track changes made to Azure Cosmos
DB data, and update materialized views, perform real-time analytics, archive data to cold
storage, and trigger notifications on certain events based on these changes.
Azure Cosmos Db has an impressive feature called "Change Feed" support which enables
you to build efficient and scalable solutions for each of these patterns, (inserts and updates)
and provides a unified API to access those captured change events.
With change feed support, Azure Cosmos DB provides a sorted list of documents within an
Azure Cosmos DB collection in the order in which they were modified. This feed can be used
to listen for modifications to data within the collection and perform actions such as:
1. Trigger a call to an API when a document is inserted or modified
2. Perform real-time (stream) processing on updates
3. Synchronize data with a cache, search engine, or data warehouse
Example #3: Due to architecture changes, you need to change the partition key for your
Cosmos DB collection. Change feed allows you to move your data to a new collection while
processing incoming changes. The result is zero down time while you move data from
anywhere to Cosmos DB.
The change feed in Azure Cosmos DB enables you to build efficient and scalable solutions
for each of these patterns, as shown in the following image:
From an architecture point of view, the change feed feature can be used as an event
sourcing mechanism. Applications can subscribe to the change event feed, by default
Cosmos Db is enabled with the change feed, there are 3 different ways to subscribe to the
change feed.
1. Azure Functions – Serverless Approach
2. Using Change Feed Processor SDK
3. Using Cosmos SQL SDK
[FunctionName("CosmosDBTriggerFunction")]
databaseName: "CosmosDBChangeFeed",
collectionName: "IoT",
ConnectionStringSetting = "DBConnection",
}
Local.settings.json
{ "IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"DBConnection": "AccountEndpoint=https://oralcare-
fulfillment.documents.azure.com:443/;AccountKey=65OmgLmM7lS8GFTx3G1vLHibr34Ng1XVVi9c2z
QSGMkgtuKjq2Bf0NVrkPUzAtr6dKJohacw6eCJIYaWfZcRrw==;"
}}
The above Function gets triggered when a change occurs in the collection (insertion of a
new document or an update in the existing document). One change event trigger may
contain more than one changed documents, IReadOnlyList parameter receives the list of
changed documents and implements some business logic in a loop.
In order to get the feed from the last changed checkpoint, the serverless function need to
persist the checkpoint information. So, when we create the Azure Function, in order to
capture the change, it will create a Cosmos Db document collection to store the checkpoint
information. This collection is known as lease collection. The lease collection stores the
continuation information per partition and helps to coordinate multiple subscribers per
collection.
In practical implementations, we would not worry much about the lease collection structure
as this is used by the Azure Function to coordinate the work and subscribe to the right
change feed and right checkpoint. Serverless implementation abstracts lots of details and
this is the recommended option as per the documentation from Microsoft.
There are four main components of implementing the change feed processor library:
1. The monitored container: The monitored container has the data from which the
change feed is generated. Any inserts and changes to the monitored container are
reflected in the change feed of the container.
2. The lease container: The lease container coordinates processing the change feed
across multiple workers. A separate container is used to store the leases with one
lease per partition. It is advantageous to store this lease container on a different
account with the write region closer to where the change feed processor is running.
3. The processor host: Each host determines how many partitions to process based on
how many other instances of hosts have active leases.
4. The consumers: Consumers, or workers, are threads that perform the change feed
processing initiated by each host. Each processor host can have multiple consumers.
Each consumer reads the change feed from the partition it is assigned to and notifies
its host of changes and expired leases.
Install-Package Microsoft.Azure.DocumentDB.ChangeFeedProcessor
Code Snippets : Initializing Change Feed Processer Builder
static async Task RunProcessingAsync() {
DatabaseName = Constants.CosmosDb_DatabaseName,
CollectionName = Constants.CosmosDb_CollectionName,
MasterKey = Constants.CosmosDb_Key
};
DatabaseName = Constants.CosmosDb_DatabaseName,
CollectionName = "leases",
MasterKey = Constants.CosmosDb_Key
};
var builder = new ChangeFeedProcessorBuilder();
.WithHostName("ProductChangeObserverHost")
.WithFeedCollection(feedCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ProductChangeObserver>()
.BuildAsync();
await processor.StartAsync();
Console.ReadLine();
await processor.StopAsync();
In the above code, the monitored collection and the lease collection are given, and the
change feed processor builder is built with the minimum required details. As a minimum
requirement you should pass the IChangeFeedObserverFactory to the builder. The change
feed processor library can manage rest of the things like how to share leases of different
partitions between different subscribers and etc.
CreateCollectionIfNotExistsAsync(
Constants.CosmosDb_Uri, Constants.CosmosDb_Key,
Constants.CosmosDb_DatabaseName, Constants.CosmosDb_CollectionNameLog,
Constants.CosmosDb_Throughput).Wait();
connectionPolicy.ConnectionMode = ConnectionMode.Direct;
connectionPolicy.ConnectionProtocol = Protocol.Tcp;
connectionPolicy.PreferredLocations.Add(LocationNames.SouthIndia);
// Setting up the database container for feeding changed events
Initialize(Constants.CosmosDb_DatabaseName, Constants.CosmosDb_CollectionNameLog,
Constants.CosmosDb_Uri, Constants.CosmosDb_Key, connectionPolicy);
return Task.CompletedTask;
{ Console.ForegroundColor = ConsoleColor.Green;
PID = doc.Id,
Name = (string)data["Name"]
};
InsertData(e);
} return Task.CompletedTask;
Summary
Cosmos Db change feed is a powerful feature to subscribe to the changes. There are three
different ways to do this as mentioned above.
The below table summarizes the options and features.