Sibelius Seraphini
Posted on July 30, 2024
Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog
.
Change streams enable you to update other data sources like a search index in elastic search, or publish events to subscribers.
A naive implementation would listen to some collections, but won't handle the case your service that is listening crashes or has some downtime when doing a rollout deployment.
This article provides a more robust solution that will avoid making you lose any change stream event, even when your service crashes or get some downtime.
Robust Change Streams using a Resume Token
const stream = model.watch([], {
fullDocument: 'updateLookup',
});
stream.on('change', (data) => {})
A stream will emit a ChangeStreamData for each data change in your collection.
For each data change it will also provide a resumeToken
, that will let you resume the processing from this point in time.
A resumeToken
is like a cursor in cursor-based pagination.
The implementation to use a resume token is easy as that:
const changeStreamListen = async <T extends any>(
model: Model<T>,
fn: (data: ChangeStreamData<T>) => void,
) => {
const name = model.collection.name;
const key = `resumetoken:${name}`;
const resumeToken = await RedisCache.get(key);
const getResumeOptions = () => {
if (resumeToken) {
return {
resumeAfter: resumeToken,
};
}
return {};
};
const stream = model.watch([], {
fullDocument: 'updateLookup',
...getResumeOptions(),
});
stream.on('change', changeStreamMiddleware(name, fn)).on('error', (err) => {
// eslint-disable-next-line
console.log('change error:', err);
Sentry.setExtra('error', err);
Sentry.captureException(err);
});
};
This implementation saves the resume token for a given collection in Redis, and tries to resume from it if available.
In Summary
Change Streams can simplify a lot your software architecture.
Providing a robust implementation of it is important to make sure you won't lose any data change.
References
Woovi
Woovi is a Startup that enables shoppers to pay as they like. Woovi provides instant payment solutions for merchants to accept orders to make this possible.
If you want to work with us, we are hiring!
Posted on July 30, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.