First steps of the Change Feed pull model in Azure Cosmos DB
Tatsuro Shibamura
Posted on February 5, 2021
What is your favorite feature in Azure Cosmos DB? I like Change Feed the most. The Change Feed is a very flexible feature, and we sometimes use it as an ultra-high performance guaranteed-order queue.
You can find various use cases in the official documentation. Please read it at least once.
The beauty of Change Feed is that it can read data in the order in which it is updated, as long as the container exists, and since the API is a pull model, multiple readers can read the Change Feed at the same time.
In this article, I will talk about a new way to read Change Feed.
Push model (change feed processor)
The most commonly used method of reading the Change Feed is the Push model.
In Change Feed Processor and Azure Functions, methods and functions are executed at the timing when the Change Feed is read, so it becomes a Push model.
The Push model is a great and easy way to do this, but there are use cases where you want to control how much you read from the Change Feed.
In my example, I needed to control the amount and frequency of data read from the Change Feed and written to the Data Lake Storage Gen 2.
Pull model (iterator style)
For use cases that are not covered by the push model, Cosmos DB SDK v3 for NET newly implements a pull model for reading Change Feed.
The pull model is a very thin API compared to the Change Feed Processor, but allows developers to have more control and flexibility.
Perhaps it will be easier to understand if you look at the code using the pull model.
class Program
{
static async Task Main(string[] args)
{
var connectionString = "AccountEndpoint=https://***.documents.azure.com:443/;AccountKey=***;";
var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions
{
SerializerOptions = new CosmosSerializationOptions
{
PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
}
});
var container = cosmosClient.GetContainer("HackAzure", "TodoItem");
// Read the Change Feed from the current time
var iterator = container.GetChangeFeedIterator<TodoItem>(ChangeFeedStartFrom.Now());
// HasMoreResults will always be true
while (iterator.HasMoreResults)
{
try
{
// Read data from Change Feed
var items = await iterator.ReadNextAsync();
foreach (var todoItem in items)
{
Console.WriteLine($"{DateTime.Now}: {todoItem.Id},{todoItem.Title},{todoItem.Body}");
}
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
{
// If Change Feed is empty, an exception will be thrown with StatusCode set to NotModified
await Task.Delay(5000);
}
}
}
}
The basic usage is to execute the GetChangeFeedIterator<T>
method and loop through the returned FeedIterator<T>
.
The actual reading of the Change Feed is done with ReadNextAsync
. You can control the speed of reading the Change Feed by how often you run this method.
I'm actually checking the behavior of the Change Feed. By default, the pull model reads the Change Feed for the entire container.
Change Feed start position
In the sample code, we have set the Change Feed to read from the data updated after the current time. There are several ways to set the start of Change Feed, and you can use them by calling the methods of the ChangeFeedStartFrom
class.
The following four methods are provided.
Beginning
ContinuationToken
Now
Time
Except for ContinuationToken
, the rest of the settings is not much different from using the push model with Change Feed Processor or Azure Functions. The ContinuationToken
will be explained later.
Before that, let me explain a little about the overloaded methods.
All methods except ContinuationToken
have overloads that accept FeedRange
. The FeedRange
represents a physical partition of Cosmos DB. In other words, it is also the unit of parallel processing.
If you know that Leases
in the Push model are created for each physical partition, it is easy to understand that they are the same.
Since the pull model is a thin wrapper, parallel processing of Change Feed is a bit complicated, but instead we got the ability to read only the data of a specific partition key in Change Feed.
// Read only the data whose partition key is "shibayan"
var startFrom = ChangeFeedStartFrom.Now(FeedRange.FromPartitionKey(new PartitionKey("shibayan")));
var iterator = container.GetChangeFeedIterator<TodoItem>(startFrom);
By using the FeedRange.FromPartitionKey
method, you can load data using Change Feed for a specific partition key only. This was not possible with the Push model.
Handle continuation token
Finally, I will explain the continuation token. The pull model does not provide the feature to manage the read position using Lease DB like Push model. You need to create the feature corresponding to Lease DB by yourself.
class Program
{
static async Task Main(string[] args)
{
var connectionString = "AccountEndpoint=https://***.documents.azure.com:443/;AccountKey=***;";
var cosmosClient = new CosmosClient(connectionString, new CosmosClientOptions
{
SerializerOptions = new CosmosSerializationOptions
{
PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase
}
});
var container = cosmosClient.GetContainer("HackAzure", "TodoItem");
// Read the previous continuation token
var continuationToken = LoadContinuationToken();
// If there is no continuation token, start from the current time
var startFrom = continuationToken != null ? ChangeFeedStartFrom.ContinuationToken(continuationToken) : ChangeFeedStartFrom.Now();
var iterator = container.GetChangeFeedIterator<TodoItem>(startFrom);
while (iterator.HasMoreResults)
{
try
{
var items = await iterator.ReadNextAsync();
// Always update the continuation token after reading the Change Feed
continuationToken = items.ContinuationToken;
foreach (var todoItem in items)
{
Console.WriteLine($"{DateTime.Now}: {todoItem.Id},{todoItem.User.Id}");
}
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified)
{
// Update only if there is no continuation token
continuationToken ??= ex.Headers.ContinuationToken;
break;
}
}
// Save the last obtained continuation token
SaveContinuationToken(continuationToken);
}
private static string LoadContinuationToken()
{
// TODO: Need to implement your own.
}
private static void SaveContinuationToken(string continuationToken)
{
// TODO: Need to implement your own.
}
}
Loading and saving continuation tokens needs to be implemented according to the use case. I solved this by creating a Lease DB similar to the Push model.
Except in cases such as one-time data migration, it is necessary to store a continuous token.
Enjoy Cosmos DB Change Feed!
Posted on February 5, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.