Life Beyond Distributed Transactions: An Apostate's Implementation - Dispatching Example

Posts in this series:

Sample code from this series

In the last post, we looked at refactoring our documents to use messaging to communicate changes. We're still missing something, however - the dispatcher:

Our dispatcher is the main component that facilitates document communication. For a given document, it needs to:

  • Read messages out of a document's outbox
  • Find the the document message handler for each, and invoke
  • Manage failures for a document message handler

We'll tackle that last piece in a future post. There's one piece we do need to think about first - where does the dispatcher get its list of documents to dispatch messages to?

Before we get to the dispatcher, we need to solve for this problem - knowing which documents need dispatching!

Introducing a unit of work

For a given request, we'll load up a document and affect some change with it. We already have a pinch point in which our documents are loaded - the repository. If we want to dispatch document messages in the same request, we'll need to keep track of our documents that we've loaded in a request. For this, we can use a Unit of Work.

Any ORM that you use will implement this pattern - for Entity Framework, for example, the DbContext is your Unit of Work. For Cosmos DB's SDK, there really isn't a concept of these ORM patterns. We have to introduce them ourselves.

Our unit of work will keep track of documents for a given session/request, letting us interact with the loaded documents during the dispatch phase of a request. Our Unit of Work will also serve as an identity map - the thing that makes sure that when we load a document in a request, it's only loaded once. Here's our basic IUnitOfWork interface:

public interface IUnitOfWork
{
    T Find<T>(Guid id) where T : DocumentBase;
    void Register(DocumentBase document);
    void Register(IEnumerable<DocumentBase> documents);
    Task Complete();
}

The implementation contains the "identity map" as a simple HashSet

public class UnitOfWork : IUnitOfWork
{
    private readonly ISet<DocumentBase> _identityMap 
        = new HashSet<DocumentBase>(DocumentBaseEqualityComparer.Instance);

Then we can register an instance with our UnitOfWork:

public void Register(DocumentBase document)
{
    _identityMap.Add(document);
}

public void Register(IEnumerable<DocumentBase> documents)
{
    foreach (var document in documents)
    {
        Register(document);
    }
}

Finding an existing DocumentBase just searches our identity map:

public T Find<T>(Guid id) 
    where T : DocumentBase
    => _identityMap.OfType<T>().FirstOrDefault(ab => ab.Id == id);

We'll come back to the Complete method, because this will be the part where we dispatch. We still need the part where we register our documents in the unit of work, and this will be in our repository implementation:

public async Task<T> GetItemAsync(Guid id)
{
    try
    {
        var root = _unitOfWork.Find<T>(id);

        if (root != null)
            return root;

        Document document = await _client.ReadDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id.ToString()));
        var item = (T)(dynamic)document;

        _unitOfWork.Register(item);

        return item;
    }
    catch (DocumentClientException e)
    {
        if (e.StatusCode == System.Net.HttpStatusCode.NotFound)
        {
            return null;
        }

        throw;
    }
}

We'll repeat this for any method in our repository that loads a document, registering and looking up in our unit of work.

With a means to track our documents, let's see how we'll dispatch.

Dispatching document messages

Our dispatcher's fairly straightforward - the only wrinkle is we'll need to surface any potential exception out. Instead of just crashing in case something goes awry, we'll want to just surface the exception and let the caller decide how to handle failures:

public interface IDocumentMessageDispatcher
{
    Task<Exception> Dispatch(DocumentBase document);
}

If I'm dispatching a document message to three handlers, I don't want one handler prevent dispatching to others.

We have another challenge - our interface is not generic for dispatching, but the handlers and repositories are! We'll have to do some generics tricks to unwrap our base type to the correct generic types. The basic flow will be:

  • For each document message:
  • Find document message handlers
  • Call the handler
  • Remove the document message from the outbox
  • Save the document

Here's our basic implementation:

public async Task<Exception> Dispatch(DocumentBase document)
{
    var repository = GetRepository(document.GetType());
    foreach (var documentMessage in document.Outbox.ToArray())
    {
        try
        {
            var handler = GetHandler(documentMessage);

            await handler.Handle(documentMessage, _serviceFactory);

            document.ProcessDocumentMessage(documentMessage);

            await repository.Update(document);
        }
        catch (Exception ex)
        {
            return ex;
        }
    }
    return null;
}

We first build a repository based on the document type. Next, we loop through each document message in the outbox. For each document message, we'll find the handler(s) and call them. Once those succeed, we'll process our document message (removing it from the outbox) and update our document. We want to update for each document message in the outbox - if there's 3 document messages in the outbox, we save 3 times to make sure once message completes we don't have to go back to it if something goes wrong.

The GetHandler method is a bit wonky, because we're bridging generics. Basically, we create a non-generic version of the document message handlers:

private abstract class DomainEventDispatcherHandler
{
    public abstract Task Handle(
        IDocumentMessage documentMessage, 
        ServiceFactory factory);
}

Then create a generic version that inherits from this:

private class DomainEventDispatcherHandler<T> : DomainEventDispatcherHandler
    where T : IDocumentMessage
{
    public override Task Handle(IDocumentMessage documentMessage, ServiceFactory factory)
    {
        return HandleCore((T)documentMessage, factory);
    }

    private static async Task HandleCore(T domainEvent, ServiceFactory factory)
    {
        var handlers = factory.GetInstances<IDocumentMessageHandler<T>>();
        foreach (var handler in handlers)
        {
            await handler.Handle(domainEvent);
        }
    }
}

I've used this pattern countless times, basically to satisfy the compiler. I've tried dynamic too but it introduces other problems. Then to call this, our GetHandler instantiates the generic version, but returns the non-generic base class:

private static DomainEventDispatcherHandler GetHandler(
    IDocumentMessage documentMessage)
{
    var genericDispatcherType = typeof(DomainEventDispatcherHandler<>)
        .MakeGenericType(documentMessage.GetType());

    return (DomainEventDispatcherHandler)
        Activator.CreateInstance(genericDispatcherType);
}

With this, I can have non-generic code still call into generics. I'll do something similar with the repository:

private abstract class DocumentDbRepo
{
    public abstract Task<DocumentBase> FindById(Guid id);
    public abstract Task Update(DocumentBase document);
}

With these bridges in place, my dispatcher can interact with the concrete generic repositories and handlers. The final piece is the document cleaning up its outbox:

public void ProcessDocumentMessage(
    IDocumentMessage documentMessage)
{
    _outbox?.Remove(documentMessage);
}

With our dispatcher done, and our unit of work in place, we can now focus on the piece that will invoke our unit of work.

Building a MediatR behavior

We want our unit of work to complete with each request once everything is "done". For ASP.NET Core applications, this might mean some kind of filter. For us, I want the dispatching to work really with any context, so one possibility is to use a MediatR behavior to wrap our MediatR handler. A filter would work too of course, but we'd need to mimic our filters in tests if we want everything to still get dispatched appropriately.

The behavior is pretty straightforward:

public class UnitOfWorkBehavior<TRequest, TResponse> 
    : IPipelineBehavior<TRequest, TResponse>
{
    private readonly IUnitOfWork _unitOfWork;

    public UnitOfWorkBehavior(IUnitOfWork unitOfWork)
    {
        _unitOfWork = unitOfWork;
    }

    public async Task<TResponse> Handle(
        TRequest request, 
        CancellationToken token, 
        RequestHandlerDelegate<TResponse> next)
    {
        var response = await next();

        await _unitOfWork.Complete();

        return response;
    }
}

We do the main work, then once that's finished, complete our unit of work.

That's all of our infrastructure pieces, and the last part is registering these components with the DI container at startup:

services.AddMediatR(typeof(Startup));

services.AddScoped(typeof(IDocumentDBRepository<>), typeof(DocumentDBRepository<>));
services.AddScoped<IUnitOfWork, UnitOfWork>();
services.AddScoped<IDocumentMessageDispatcher, DocumentMessageDispatcher>();
services.AddScoped(typeof(IPipelineBehavior<,>), typeof(UnitOfWorkBehavior<,>));
services.Scan(c =>
{
    c.FromAssembliesOf(typeof(Startup))
        .AddClasses(t => t.AssignableTo(typeof(IDocumentMessageHandler<>)))
        .AsImplementedInterfaces()
        .WithTransientLifetime();
});

We add our MediatR handlers using the MediatR.Extensions.Microsoft.DependencyInjection package, our generic repository, unit of work, dispatcher, and unit of work behavior. Finally, we add all of the IDocumentMessageHandler implementations using Scrutor, making our lives much easier to add all the handlers in one go.

With all this in place, we can run and verify that our handlers fire and we can see the message in the inbox of the Stock item:

{
    "QuantityAvailable": 99,
    "ProductId": 771,
    "id": "cfbb6333-ed9f-49e7-8640-bb920d5c9106",
    "Outbox": {
        "$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
        "$values": []
    },
    "Inbox": {
        "$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
        "$values": [
            {
                "$type": "AdventureWorksCosmos.UI.Models.Orders.ItemPurchased, AdventureWorksCosmos.UI",
                "ProductId": 771,
                "Quantity": 1,
                "Id": "2ab2108c-9698-49e8-93de-a3ced453836a"
            }
        ]
    },
    "_rid": "WQk4AKSQMwACAAAAAAAAAA==",
    "_self": "dbs/WQk4AA==/colls/WQk4AKSQMwA=/docs/WQk4AKSQMwACAAAAAAAAAA==/",
    "_etag": "\"060077c2-0000-0000-0000-5b71d8a10000\"",
    "_attachments": "attachments/",
    "_ts": 1534187681
}

We now have effective document messaging between our documents!

Well, almost.

In the next post, we'll walk through what to do when things go wrong: failures and retries.