Posts in this series:

Sample code from this series

So far in this series, we've looked at the ins and outs of moving beyond distributed transactions using persisted messages as a means of coordination between different documents (or resources). One common question in the example I give is "how do I actually make sure either both operations happen or neither?" To answer this question, we need to recognize that this "all-or-nothing" approach is a kind of transaction. But we've already said we're trying to avoid distributed transactions!

We won't be building a new kind of distributed transaction, but instead one that lives longer than any one single request, or a long-lived transaction. To implement a long-lived transaction, we need to look at the Saga pattern, first described in the original Sagas paper (Molina, Salem). The most common example of a saga I've seen described is booking a vacation. When you book a vacation, you need to:

  • Book a flight
  • Book a hotel
  • Reserve a car

You can't do all three at once - that's like getting a conference call together with all companies and getting consensus altogether. Not going to happen! Instead, we build this overall business transaction as a series of requests and compensating actions in case something goes wrong:

  • Cancel flight
  • Cancel hotel
  • Cancel car reservation

Our saga operations can be linear (controller pattern) or parallel (observer pattern) or in microservices terms, orchestration/choreography.

In order to satisfy our saga constraints, our requests must:

  • Be idempotent
  • Can abort

And the compensating requests must:

  • Be idempotent
  • Cannot abort
  • Be commutative

In our example, we have a model that basically assumes success. We:

  • Approve an order
  • Deduct stock

Let's modify this a bit to create an order fulfillment saga. For this saga, we can fulfill an order if and only if:

  • Our order is approved
  • We have enough stock

If our order is rejected, we need to release the stock. If we don't have enough stock, we need to un-approve (reject) our order. And keeping with our example, we need something to coordinate this activity - our saga. But rather than just call it some generic "saga", let's give it a meaningful name - OrderFulfillmentSaga:

This saga will coordinate the activities of the order request and stock. And because we need this saga to have the same communication properties of the other two documents, we can simply model this saga as just another document with our inbox/outbox!

The overall flow will be:

  • Once an order is created, kick off a new order fulfillment saga
  • This saga will coordinate actions between the stock and order request
  • If the order is rejected, the saga needs to initiate a stock return
  • If there is not enough stock, the saga needs to cancel the order

Let's start with kicking off the saga!

Kicking things off

When should we kick the saga off? It's tempting to do this in the initial request that creates a new order request, but remember - we can't put more than one document in a transaction unless we're very sure fulfillment and order requests will live close together in our Cosmos DB instance. That means we need to use our document messages to communicate with a saga - even if it doesn't exist!

We don't want to fulfill an order request twice, and for our simple scenario let's just assume an order request can't be "retried". Originally, our OrderRequest created an ItemPurchased document message for each item - we'll remove that in favor of a single OrderCreated document message:

public class OrderCreated : IDocumentMessage  
{
    public Guid Id { get; set; }
    public Guid OrderId { get; set; }

    public List<LineItem> LineItems { get; set; }

    public class LineItem
    {
        public int ProductId { get; set; }
        public int Quantity { get; set; }
    }
}

We could just have the OrderId and have the receiver then load up the OrderRequest, but for simplicity sake (and assuming you can't change the order after created), we'll treat this information as immutable and keep it in the message. Now when we create an OrderRequest, we'll also send this message:

public class OrderRequest : DocumentBase  
{
    public OrderRequest(ShoppingCart cart)
    {
        Id = Guid.NewGuid();
        Customer = new Customer
        {
            FirstName = "Jane",
            MiddleName = "Mary",
            LastName = "Doe"
        };
        Items = cart.Items.Select(li => new LineItem
        {
            ProductId = li.Key,
            Quantity = li.Value.Quantity,
            ListPrice = li.Value.ListPrice,
            ProductName = li.Value.ProductName
        }).ToList();
        Status = Status.New;

        Send(new OrderCreated
        {
            Id = Guid.NewGuid(),
            OrderId = Id,
            LineItems = Items
                .Select(item => new OrderCreated.LineItem
                {
                    ProductId = item.ProductId,
                    Quantity = item.Quantity
                })
                .ToList()
        });
    }

It's not much different than our original order creation - we're just now including the document message to initiate the order fulfillment saga.

Our handler for this document message needs to find the right OrderFulfillment saga document and let the saga handle the message:

public class OrderCreatedHandler : IDocumentMessageHandler<OrderCreated>  
{
    private readonly IDocumentDBRepository<OrderFulfillment> _repository;

    public OrderCreatedHandler(IDocumentDBRepository<OrderFulfillment> repository)
        => _repository = repository;

    public async Task Handle(OrderCreated message)
    {
        var orderFulfillment = (await _repository
                .GetItemsAsync(s => s.OrderId == message.OrderId))
            .FirstOrDefault();

        if (orderFulfillment == null)
        {
            orderFulfillment = new OrderFulfillment
            {
                Id = Guid.NewGuid(),
                OrderId = message.OrderId
            };

            await _repository.CreateItemAsync(orderFulfillment);
        }

        orderFulfillment.Handle(message);

        await _repository.UpdateItemAsync(orderFulfillment);
    }
}

Not shown here - but we do need to make sure we only have a single fulfillment saga per order, so we can configure inside Cosmos DB OrderId as a unique index.

The orderFulfillment.Handle method needs to start, and request stock:

public void Handle(OrderCreated message)  
{
    Process(message, m =>
    {
        if (IsCancelled)
            return;

        LineItems = m.LineItems
            .Select(li => new LineItem
            {
                ProductId = li.ProductId,
                AmountRequested = li.Quantity
            })
            .ToList();

        foreach (var lineItem in LineItems)
        {
            Send(new StockRequest
            {
                Id = Guid.NewGuid(),
                ProductId = lineItem.ProductId,
                AmountRequested = lineItem.AmountRequested,
                OrderFulfillmentId = Id
            });
        }
    });
}

In my example, I've made the OrderFulfillment saga coordinate with Stock with our StockRequest. This is instead of Stock listening for OrderCreated itself. My general thought here is that fulfillment manages the requests/returns for stock, and any business logic around that.

I also have a little check at the beginning - if an order is cancelled, we don't want to send out stock requests. This is the piece that's enforcing commutative requests - we might receive an order rejected notice before receiving the order created notice! When it comes to messaging, I always assume messages are received out of order, which means our business logic needs to be able to handle these situations.

Handling stock requests

Our original Stock implementation was quite naive, but this time we want to more intelligently handle orders. In our stock handler, we'll still have a document per product, but now it can make a decision based on the quantity available:

public void Handle(StockRequest message)  
{
    Process(message, e =>
    {
        if (QuantityAvailable >= message.AmountRequested)
        {
            QuantityAvailable -= e.AmountRequested;
            Send(new StockRequestConfirmed
            {
                Id = Guid.NewGuid(),
                OrderFulfillmentId = e.OrderFulfillmentId,
                ProductId = ProductId
            });
        }
        else
        {
            Send(new StockRequestDenied
            {
                Id = Guid.NewGuid(),
                OrderFulfillmentId = e.OrderFulfillmentId,
                ProductId = ProductId
            });
        }
    });
}

Because we're using document messages with our inbox de-duping messages, we don't need to worry about processing the stock request twice. Our simple logic just checks the stock, and if it's successful we can deduct the stock and return a StockRequestConfirmed message. If not, we can return a StockRequestDenied message.

A successful order fulfillment

Our original logic said that "an order can be fulfilled if the order is approved and we have enough stock". Approving an order is a human decision, so we have a basic form for doing so:

@if (Model.Order.Status == Status.New)
{
    <form asp-controller="Order" asp-action="Reject" asp-route-id="@Model.Order.Id" method="post">
        <input type="submit" value="Reject"/>
    </form>
    <form asp-controller="Order" asp-action="Approve" asp-route-id="@Model.Order.Id" method="post">
        <input type="submit" value="Approve"/>
    </form>
}

And when the order is approved, we just delegate to MediatR to handle this request:

public class ApproveOrder  
{
    public class Request : IRequest
    {
        public Guid Id { get; set; }
    }

    public class Handler : IRequestHandler<Request>
    {
        private readonly IDocumentDBRepository<OrderRequest> _orderRepository;

        public Handler(IDocumentDBRepository<OrderRequest> orderRepository)
        {
            _orderRepository = orderRepository;
        }

        public async Task<Unit> Handle(Request request, CancellationToken cancellationToken)
        {
            var orderRequest = await _orderRepository.GetItemAsync(request.Id);

            orderRequest.Approve();

            await _orderRepository.UpdateItemAsync(orderRequest);

            return Unit.Value;
        }
    }
}

Which then delegates to our document to approve the order request:

public void Approve()  
{
    if (Status == Status.Approved)
        return;

    if (Status == Status.Rejected)
        throw new InvalidOperationException("Cannot approve a rejected order.");

    Status = Status.Approved;
    Send(new OrderApproved
    {
        Id = Guid.NewGuid(),
        OrderId = Id
    });
}

We only want to send out the OrderApproved message once, so just some basic status checking handles that.

On the order fulfillment side:

public void Handle(OrderApproved message)  
{
    Process(message, m =>
    {
        OrderApproved = true;

        if (IsCancelled)
        {
            ProcessCancellation();
        }
        else
        {
            CheckForSuccess();
        }
    });
}

Each time we receive some external notification, we need to process the success/failure path, which I'll come back to in a bit. Our handler for StockRequestConfirmed will be similar, except we're tracking stock on a line item by line item basis:

public void Handle(StockRequestConfirmed message)  
{
    Process(message, m =>
    {
        var lineItem = LineItems.Single(li => li.ProductId == m.ProductId);
        lineItem.StockConfirmed = true;

        if (IsCancelled)
        {
            ReturnStock(lineItem);
        }
        else
        {
            CheckForSuccess();
        }
    });
}

The CheckForSuccess method will look to see if all the order fulfillment requirements are met:

private void CheckForSuccess()  
{
    if (IsCancelled)
        return;

    if (LineItems.All(li => li.StockConfirmed) && OrderApproved)
    {
        Send(new OrderFulfillmentSuccessful
        {
            Id = Guid.NewGuid(),
            OrderId = OrderId
        });
    }
}

Only if all of our stock has been confirmed and our order has been approved will we send a message back to the Order document to then finally complete the order:

public void Handle(OrderFulfillmentSuccessful message)  
{
    Process(message, m =>
    {
        if (Status == Status.Rejected || Status == Status.Cancelled)
            return;

        Status = Status.Completed;
    });
}

The overall message flow looks something like this:

For each step along the way, we've got idempotency handled for us by the inbox/outbox structures. However, we still need to handle out-of-order messages, which is why you'll see success/fail checks every time we receive a notification.

Now that we've got the success path taken care of, let's look at the failure paths.

Cancelling the order fulfillment

The first way our order fulfillment can be cancelled is if an order is rejected. From the web app, our Order document handles a rejection:

public void Reject()  
{
    if (Status == Status.Rejected)
        return;

    if (Status == Status.Approved)
        throw new InvalidOperationException("Cannot reject an approved order.");

    if (Status == Status.Approved)
        throw new InvalidOperationException("Cannot reject a completed order.");

    Status = Status.Rejected;
    Send(new OrderRejected
    {
        Id = Guid.NewGuid(),
        OrderId = Id
    });
}

Our order sends an OrderRejected document message that our order fulfillment document receives:

public void Handle(OrderRejected message)  
{
    Process(message, m =>
    {
        OrderRejected = true;

        Cancel();
    });
}

The Cancel method marks the order fulfillment as cancelled and then processes the cancellation:

private void Cancel()  
{
    IsCancelled = true;

    ProcessCancellation();
}

Similarly, a notification of StockRequestDenied will cancel the order fulfillment:

public void Handle(StockRequestDenied message)  
{
    Process(message, m =>
    {
        Cancel();
    });
}

In order to process our order fulfillment cancellation, we need to do a couple of things. First, we need to notify our Order document that it needs to be cancelled. And for any Stock items that were fulfilled, we need to return that stock:

private void ProcessCancellation()  
{
    if (!CancelOrderRequested && !OrderRejected)
    {
        CancelOrderRequested = true;
        Send(new CancelOrderRequest
        {
            Id = Guid.NewGuid(),
            OrderId = OrderId
        });
    }

    foreach (var lineItem in LineItems.Where(li => li.StockConfirmed))
    {
        ReturnStock(lineItem);
    }
}

Each step along the way, we keep track of what messages we've sent out so that we don't send notifications twice. To return stock:

private void ReturnStock(LineItem lineItem)  
{
    if (lineItem.StockReturnRequested)
        return;

    lineItem.StockReturnRequested = true;
    Send(new StockReturnRequested
    {
        Id = Guid.NewGuid(),
        ProductId = lineItem.ProductId,
        AmountToReturn = lineItem.AmountRequested
    });
}

If stock item has already had a return requested, we just skip it. Finally, the order can receive the cancel order request:

public void Handle(CancelOrderRequest message)  
{
    Process(message, m =>
    {
        if (Status == Status.Rejected)
            return;

        Status = Status.Cancelled;
    });
}

With our failure flow in place, the message flows looks something like:

Our order fulfillment saga can now handle the complex process of managing stock and order approvals, keeping track of each step along the way and dealing with success/failure when it receives the notifications. It handles idempotency, retries, and commutative/out-of-order messages.

In the next post, we'll look at how we can implement the inbox/outbox pattern for other resources, allowing us to bridge to other kinds of databases where a distributed transaction is just plain impossible.