Document-Level Optimistic Concurrency in MongoDB

I've had a number of projects now that have used MongoDB, and each time, I've needed to dig deep into the transaction support. But in addition to transaction support, I needed to understand the concurrency and locking models of Mongo. Unlike many other NoSQL databases, Mongo has locks at the global, database, or collection level, but not at the document level (or row-level, like SQL).

If two processes read a document, then update it, as long as those updates don't collide they'll both succeed, and we'll potentially lose data.

Mongo has had a lot of work done to strengthen its distributed story (see the Jepsen analysis), it has no built-in support for optimistic concurrency control at a document level. With SQL Server, you can use Snapshot Isolation to guarantee no other process has modified data since you read. With Cosmos DB, an etag value is used to check the version of the document being written vs. what exists.

Luckily, it's fairly straightforward to implement document-level optimistic concurrency control. But first, let's prove that without OCC, we can have bad writes.

Without OCC

To start, I'm going to create a very simple document, just an identifier and a counter:

public class Counter
{
    public Guid Id { get; set; }
    public int Value  { get; set; }
}

I'm just going to spin off some tasks to increment the value of the counter, keeping track of the number of writes and final value:

var document = await collection.AsQueryable().Where(doc => doc.Id == id).SingleAsync();

Console.WriteLine($"Before  : {document.Value}");

var tasks = Enumerable.Range(0, 100).Select(async i =>
{
    var loaded = await collection.AsQueryable().Where(doc => doc.Id == id).SingleAsync();

    loaded.Value++;

    long result;
    do
    {
        result = (await collection.ReplaceOneAsync(c => c.Id == id, loaded,
            new UpdateOptions {IsUpsert = false})).ModifiedCount;
    } while (result != 1);

    return result;
}).ToList();

var total = await Task.WhenAll(tasks);

document = await collection.AsQueryable().Where(doc => doc.Id == id).SingleAsync();

Console.WriteLine($"After   : {document.Value}");
Console.WriteLine($"Modified: {total.Sum(r => r)}");

Each pass, I load up the document and increment the value by 1. However, I can have multiple tasks executing at once, so two tasks might read the same value, but only increment by 1. To force a dual write, I continue to update until the collection lock is released. In a real-world scenario, there would be delays between reads/writes that would introduce this issue.

When I run this, I should see an initial value of 0, a final value of 100, and a modified count of 100. But I don't, because some value overwrote each other:

Before  : 1
After   : 92
Modified: 100

I modified 100 times, but the counter only made it up to 92! Let's add some optimistic concurrency to improve things.

With OCC

Some implementations of OCC use a timestamp, but that often isn't precise enough, so instead I'm using a monotonic counter as my version. It starts at zero and goes up from there:

public class Counter
{
    public Guid Id { get; set; }
    public int Version { get; set; }
    public int Value  { get; set; }
}

Version design is a bit more complex, I'm just keeping things simple but we can get as complicated as we want. Now when I update, I'm going to make sure that I both increment my counter and version, and when I send the update, I'll include an additional clause against the originally read version:

var tasks = Enumerable.Range(0, 100).Select(async i =>
{
    var loaded = await collection.AsQueryable().Where(doc => doc.Id == id).SingleAsync();
    var version = loaded.Version;

    loaded.Value++;
    loaded.Version++;

    var result = await collection.ReplaceOneAsync(
        c => c.Id == id && c.Version == version, 
        loaded, new UpdateOptions { IsUpsert = false });

    return result;
}).ToList();

I removed the "retry" to make sure that I don't get any overwrites, and with this in place, the final values line up:

Before  : 0
After   : 92
Modified: 92

However, if I really wanted to make sure that I actually get all of those updates in, I'd need to retry the entire operation:

var tasks = Enumerable.Range(0, 100).Select(async i =>
{
    ReplaceOneResult result;

    do
    {
        var loaded = await collection.AsQueryable()
                           .Where(doc => doc.Id == id)
                           .SingleAsync();
        var version = loaded.Version;

        loaded.Value++;
        loaded.Version++;

        result = await collection.ReplaceOneAsync(
            c => c.Id == id && c.Version == version, loaded,
            new UpdateOptions {IsUpsert = false});
    } while (result.ModifiedCount != 1);

    return result;
}).ToList();

With a simple retry in place, I make sure I reload the document in question, get a refreshed version, and now all my numbers add up:

Before  : 0
After   : 100
Modified: 100

Exactly what I was looking for!

A general solution

This works just fine if I "remember" to include that Where clause correctly, but there's a better way if we want a general solution. For that, I'd do pretty much what I would have in the Life Beyond Distributed Transactions series - introduce a Repository, Unit of Work, and Identity Map. These can all start very simple, but I can encapsulate all of the version checking/managing in these objects instead of forcing all users to remember to include that Where clause.

If there's even a remote possibility that you'll see concurrent updates, you'll likely go down the path of optimistic concurrency. Luckily, a basic solution isn't that much code.