Life Beyond Distributed Transactions: An Apostate's Implementation - Document Example
Posts in this series:
- A Primer
- Document Coordination
- Document Example
- Dispatching Example
- Failures and Retries
- Failure Recovery
- Sagas
- Relational Resources
- Conclusion
In the last post, I walked through the "happy path" scenario of coordinated communication/activities between multiple resources that otherwise can't participate in a transaction. In this post, I'll walk through a code example of building out document coordination in Azure Cosmos DB. My starting point is this set of code for approving an invoice and updating stock:
[HttpPost]
public async Task<IActionResult> Approve(Guid id)
{
var orderRequest = await _orderRepository.GetItemAsync(id);
orderRequest.Approve();
await _orderRepository.UpdateItemAsync(orderRequest);
foreach (var lineItem in orderRequest.Items)
{
var stock = (await _stockRepository
.GetItemsAsync(s => s.ProductId == lineItem.ProductId))
.FirstOrDefault();
stock.QuantityAvailable -= lineItem.Quantity;
await _stockRepository.UpdateItemAsync(stock);
}
return RedirectToPage("/Orders/Show", new { id });
}
The repositories in my example are straight from the example code when you download a sample application in the Azure Portal, and just wrap the underlying DocumentClient.
Modeling our document
First, we need to baseline our document messages. These objects can be POCOs, but we still need some base information. Since we want to enforce idempotent actions, we need to be able to distinguish between different messages. The easiest way to do so is with a unique identifier per message:
public interface IDocumentMessage
{
Guid Id { get; }
}
Since our documents need to store and process messages in an inbox/outbox, we need to build out our base Document class to include these items. We can also build a completely separate object for our inbox/outbox, but for simplicity sake, we'll just use a base class:
public abstract class DocumentBase
{
[JsonProperty(PropertyName = "id")]
public Guid Id { get; set; }
private HashSet<IDocumentMessage> _outbox
= new HashSet<IDocumentMessage>(DocumentMessageEqualityComparer.Instance);
private HashSet<IDocumentMessage> _inbox
= new HashSet<IDocumentMessage>(DocumentMessageEqualityComparer.Instance);
public IEnumerable<IDocumentMessage> Outbox
{
get => _outbox;
protected set => _outbox = value == null
? new HashSet<IDocumentMessage>(DocumentMessageEqualityComparer.Instance)
: new HashSet<IDocumentMessage>(value, DocumentMessageEqualityComparer.Instance);
}
public IEnumerable<IDocumentMessage> Inbox
{
get => _inbox;
protected set => _inbox = value == null
? new HashSet<IDocumentMessage>(DocumentMessageEqualityComparer.Instance)
: new HashSet<IDocumentMessage>(value, DocumentMessageEqualityComparer.Instance);
}
Each of our mailboxes are a HashSet, to ensure we enforce uniqueness of document messages inside our document. We wrap our mailboxes in a couple of convenience properties for storage purposes (since our documents are serialized using JSON.NET, we have to model appropriately for its serialization needs).
We're using a custom equality comparer for document messages based on that interface and ID we added earlier:
public class DocumentMessageEqualityComparer
: IEqualityComparer<IDocumentMessage>
{
public static readonly DocumentMessageEqualityComparer Instance
= new DocumentMessageEqualityComparer();
public bool Equals(IDocumentMessage x, IDocumentMessage y)
{
return x.Id == y.Id;
}
public int GetHashCode(IDocumentMessage obj)
{
return obj.Id.GetHashCode();
}
}
With this, we can make sure that our document messages only exist in our inbox/outboxes once (assuming we can pick unique GUIDs).
Next, we need to be able to send a message in our DocumentBase
class:
protected void Send(IDocumentMessage documentMessage)
{
if (_outbox == null)
_outbox = new HashSet<IDocumentMessage>(DocumentMessageEqualityComparer.Instance);
_outbox.Add(documentMessage);
}
We have to check that the outbox exists and create it if it's not (due to serialization, it might not exist), then simply add the document message to the outbox.
To process a document message, we need to make sure this action is idempotent. To check for idempotency, we'll examine our Inbox
before executing the action. We can wrap this all up in a single method that our derived documents will use:
protected void Process<TDocumentMessage>(
TDocumentMessage documentMessage,
Action<TDocumentMessage> action)
where TDocumentMessage : IDocumentMessage
{
if (_inbox == null)
_inbox = new HashSet<IDocumentMessage>(DocumentMessageEqualityComparer.Instance);
if (_inbox.Contains(documentMessage))
return;
action(documentMessage);
_inbox.Add(documentMessage);
}
Our derived documents will need to call this method to process their messages with the idempotency check. Once a message is processed successfully, we'll add it to the inbox. And since our transaction boundary is the document, if something fails, the action never happened and the message never gets stored to the inbox. Only by keeping our inbox, outbox, and business data inside a transaction boundary can we guarantee all either succeeds or fails.
Refactoring our action
Now that we have our basic mechanism of storing and processing messages, we can refactor our original action. It was split basically into two actions - one of approving the invoice, and another of updating stock.
We need to "send" a message from our Order to Stock. But what should that message look like? A few options:
- Command, "update stock"
- Event, "order approved"
- Event, "item purchased"
If I go with a command, I'm coupling the primary action with the intended side effect. But what if this side effect needs to change? Be removed? I don't want burden the main Order logic with that.
What about the first event, "order approved"? I could go with this - but looking at the work done and communication, Stock doesn't care that an order was approved, it only really cares if an item was purchased. Approvals are really the internal business rules of an order, but the ultimate side effect is that items finally become "purchased" at this point in time. So if I used "order approved", I'd be coupling Stock to the internal business rules of Order. Even though it's an event, "order approved" concerns internal business processes that other documents/services shouldn't care about.
Finally, we have "item purchased". This most closely matches what Stock cares about, and removes any kind of process coupling between these two aggregates. If I went with the macro event, "order approved", I'd still have to translate that to what it means for Stock.
With this in mind, I'll create a document message representing this event:
public class ItemPurchased : IDocumentMessage
{
public int ProductId { get; set; }
public int Quantity { get; set; }
public Guid Id { get; set; }
}
I know how much of which product was purchased, and that's enough for Stock to deal with the consequences of that event.
My Order
class then models its Approve
method to include sending these new messages:
public void Approve()
{
Status = Status.Approved;
foreach (var lineItem in Items)
{
Send(new ItemPurchased
{
ProductId = lineItem.ProductId,
Quantity = lineItem.Quantity,
Id = Guid.NewGuid()
});
}
}
I don't have an idempotency check here (if the order is already approved, do nothing), but you get the idea.
On the Stock side, I need to add a method to process the ItemPurchased
message:
public void Handle(ItemPurchased message)
{
Process(message, e =>
{
QuantityAvailable -= e.Quantity;
});
}
Finally, we need some way of linking our ItemPurchased
message with the Stock
, and that's the intent of an IDocumentMessageHandler
:
public interface IDocumentMessageHandler<in T>
where T : IDocumentMessage
{
Task Handle(T message);
}
The part of our action that loaded up each Stock
is the code we'll put into our handler:
public class UpdateStockFromItemPurchasedHandler
: IDocumentMessageHandler<ItemPurchased>
{
private readonly IDocumentDBRepository<Stock> _repository;
public UpdateStockFromItemPurchasedHandler(
IDocumentDBRepository<Stock> repository)
=> _repository = repository;
public async Task Handle(ItemPurchased message)
{
var stock = (await _repository
.GetItemsAsync(s => s.ProductId == message.ProductId))
.Single();
stock.Handle(message);
await _repository.UpdateItemAsync(stock);
}
}
Not that exciting, as our document will handle the real business logic of handling the request. This class just connects the dots between an IDocumentMessageHandler
and some DocumentBase
instance.
With these basic building blocks, we'll modify our action to only update the Order
instance:
[HttpPost]
public async Task<IActionResult> Approve(Guid id)
{
var orderRequest = await _orderRepository.GetItemAsync(id);
orderRequest.Approve();
await _orderRepository.UpdateItemAsync(orderRequest);
return RedirectToPage("/Orders/Show", new { id });
}
Now when we approve our order, we only create messages in the outbox, which get persisted along with the order. If I look at the saved order in Cosmos DB, I can verify the items are persisted:
{
"Items": [
{
"Quantity": 1,
"ListPrice": 3399.99,
"ProductId": 771,
"ProductName": "Mountain-100 Silver, 38",
"Subtotal": 3399.99
}
],
"Status": 2,
"Total": 3399.99,
"Customer": {
"FirstName": "Jane",
"LastName": "Doe",
"MiddleName": "Mary"
},
"id": "8bf4bda2-3796-431e-9936-8511243352d2",
"Outbox": {
"$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
"$values": [
{
"$type": "AdventureWorksCosmos.UI.Models.Orders.ItemPurchased, AdventureWorksCosmos.UI",
"ProductId": 771,
"Quantity": 1,
"Id": "987ce801-e7cf-4abf-aba7-83d7eed00610"
}
]
},
"Inbox": {
"$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
"$values": []
},
"_rid": "lJFnANVMlwADAAAAAAAAAA==",
"_self": "dbs/lJFnAA==/colls/lJFnANVMlwA=/docs/lJFnANVMlwADAAAAAAAAAA==/",
"_etag": "\"02002652-0000-0000-0000-5b48f2140000\"",
"_attachments": "attachments/",
"_ts": 1531507220
}
In order to get that polymorphic behavior for my IDocumentMessage
collections, I needed to configure the JSON serializer settings in my repository:
_client = new DocumentClient(new Uri(Endpoint), Key,
new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Auto
});
With these pieces in place, I've removed the process coupling between updating an order's status and updating stock items using document messaging. Of course, we don't actually have anything dispatching our messages. We'll cover the infrastructure for dispatching messages in the next post.