End-to-End Integration Testing with NServiceBus: How It Works

In my last post, I walked through setting up end-to-end integration testing with NServiceBus, and how we can use it to black box test message endpoints similar to how the ASP.NET Core integration testing works. In this post, I want to walk through how it all works underneath the covers.

The major challenge in testing an async messaging flow is observability - how do we know when the work is started or completed? Our tests are written in an imperative, top-down, "Arrange-Act-Assert" style but the system under test (SUT) is intentionally asynchronous.

In my end-to-end diagnostics series, I walked through adding observability to our message endpoints, and it turns out, we can leverage this same mechanism to observe our SUT in our integration tests.

What is done?

In order to know what we need to test, we need to understand what it means for the "Act" step in a test to complete successfully. In a normal synchronous interaction, the "Act" is guaranteed to complete when the method returns - whether this call is "awaited" or not, it's still blocking until the response is returned.

With async messaging, our calling code only blocks while until the message is "ack"d:

var firstMessage = new FirstMessage {Message = "Hello World"};

var session = _factory.Services.GetService<IMessageSession>();

await _session.SendLocal(firstMessage);

// OK now what?

We could do something silly like an await Task.Delay(something??) but that's not terribly efficient. Those task delays easily add up, and we could have finished far earlier but just not known it.

In an end-to-end message endpoint scenario, there are some top-level indications my work is complete:

  • When a specific message is processed
  • When a specific message is sent

Depending on my scenario, when one of these conditions is met, it's now safe to make my assertions of the messages or the handler's expected side effects.

When we have an external source of signals we're waiting to receive, we don't want to wait forever. We'll still want a timeout - a maximum amount of time we wait for those observable conditions to be met, otherwise we'll just timeout.

For those that have used various BDD or QA testing tools, this is pretty common for UI tests. We wait for the browser to make its calls and update things, and wait for some expected UI state to move on, but not wait forever. It's not much different, except instead of waiting for some DOM element to show up, we're waiting for messages.

Observing Messages

To be notified for messages sent or processed, we can first look at our diagnostic events we created in the end-to-end tracing posts. Our "Act" code needs to look for specific messages sent/received. Unfortunately, the same diagnostic event we used for telemetry is a bit lower level than we want - for example, the messages are transport-oriented, with byte[] messages bodies and the like.

Luckily, NServiceBus has an additional pipeline hook for logical messages sent/received, and this context object has more relevant information we'd want in our tests. We can then expose more diagnostic events at the logical message level, instead of physical. First, a behavior for processing messages:

public class IncomingLogicalMessageDiagnostics : Behavior<IIncomingLogicalMessageContext>
{
    private readonly DiagnosticListener _diagnosticListener;
    private const string EventName = ActivityNames.IncomingLogicalMessage + ".Processed";

    public IncomingLogicalMessageDiagnostics(DiagnosticListener diagnosticListener)
        => _diagnosticListener = diagnosticListener;

    public IncomingLogicalMessageDiagnostics()
        : this(new DiagnosticListener(ActivityNames.IncomingLogicalMessage))
    {
    }

    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        await next().ConfigureAwait(false);

        if (_diagnosticListener.IsEnabled(EventName))
        {
            _diagnosticListener.Write(EventName, context);
        }
    }
}

In this example, I'm not using Activity information, I'm simply raising a single diagnostic event since the physical message behavior already does all of the telemetry work. We await the next step in the pipeline, and raise the event. This ensures our diagnostic event only raises when the message is successfully handled.

The parameter we include in the diagnostic event, the IIncomingLogicalMessageContext, includes the deserialized message and its type information. That way if we want to observe a specific message type with specific data, we can.

Next, we create a similar pipeline behavior for outgoing logical messages:

public class OutgoingLogicalMessageDiagnostics : Behavior<IOutgoingLogicalMessageContext>
{
    private readonly DiagnosticListener _diagnosticListener;
    private const string EventName = ActivityNames.OutgoingLogicalMessage + ".Sent";

    public OutgoingLogicalMessageDiagnostics(DiagnosticListener diagnosticListener)
        => _diagnosticListener = diagnosticListener;

    public OutgoingLogicalMessageDiagnostics()
        : this(new DiagnosticListener(ActivityNames.OutgoingLogicalMessage)) { }

    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
    {
        await next().ConfigureAwait(false);

        if (_diagnosticListener.IsEnabled(EventName))
        {
            _diagnosticListener.Write(EventName, context);
        }
    }
}

Almost exactly the same! With our diagnostic events defined and set up in behaviors (this is in the NServiceBus.Extensions.Diagnostics package), we can listen in on these events in our integration test.

Observing Diagnostics Events in a Test

I boiled the "wait for an event" method to a single method in a fixture:

private static async Task<ObservedMessageContexts> ExecuteAndWait<TMessageContext>(
    Func<Task> testAction,
    Func<TMessageContext, bool> predicate,
    TimeSpan? timeout = null)
    where TMessageContext : IPipelineContext
{

This method takes a generic parameter - an IPipelineContext, which is either the incoming or outgoing logical message contexts. I need the test action to perform as a Func<Task> as I will need to call this after setting up the event observation. Finally, I need to know when to "stop" observing, with a predicate around the pipeline context information and a timeout.

The return value is a collection of all the observed message contexts - so the "Assert" step can look at everything sent/received.

First, I want to default a timeout and initialize some collectors:

timeout = Debugger.IsAttached
    ? (TimeSpan?)null
    : timeout ?? TimeSpan.FromSeconds(10);

var incomingMessageContexts = new List<IIncomingLogicalMessageContext>();
var outgoingMessageContexts = new List<IOutgoingLogicalMessageContext>();
var obs = Observable.Empty<object>();

The Activity API is a two-level set of observables. The first is when the initial diagnostic listener starts up, and the second is when diagnostic events are published. It's a little confusing to be sure, but not terrible. We want to first subscribe to all listeners when the two listener names we care about are published:

using var allListenerSubscription = DiagnosticListener.AllListeners
    .Subscribe(listener =>
    {
        switch (listener.Name)
        {
            case ActivityNames.IncomingLogicalMessage:
                // Subscribe to the incoming listener

                break;
            case ActivityNames.OutgoingLogicalMessage:
                // Subscribe to the outgoing listener

                break;
        }
    };

When we run our application, our Subscribe method will be called for all diagnostic listeners starting up - the ASP.NET Core ones, HttpClient ones, SqlClient ones etc. but we only care about the NServiceBus logical message listeners.

Once we have this set up, we need to do 2 things inside those each switch block:

  • Capture the observable so that we can await it based on our predicate later
  • Capture all contexts in those lists

For each, we can rely on Reactive Extensions via the System.Reactive.Core library. Here's the incoming logical message block:

var incomingObs = listener
    .Select(e => e.Value)
    .Cast<IIncomingLogicalMessageContext>();

incomingObs.Subscribe(incomingMessageContexts.Add);

if (typeof(TMessageContext) ==  typeof(IIncomingLogicalMessageContext))
{
    obs = obs.Merge(incomingObs);
}

break;

The listener passed in is an IObservable<KeyValuePair<string, object>> where the object is that message context object. The first thing we can do is use LINQ to treat this observable as IObservable<*message context type>. Next, we add a single subscription action to simply capture the IIncomingLogicalMessageContext values on the list we created earlier.

Next, because I can test multiple hosts and endpoints, we Merge the different listener observables into our single obs variable we declared earlier, as our diagnostic listener object is created per endpoint.

The outgoing message switch block looks almost identical:

var outgoingObs = listener
    .Select(e => e.Value)
    .Cast<IOutgoingLogicalMessageContext>();

outgoingObs.Subscribe(outgoingMessageContexts.Add);

if (typeof(TMessageContext) == typeof(IOutgoingLogicalMessageContext))
{
    obs = obs.Merge(outgoingObs);
}

break;

The idea is we're only listening to incoming or outgoing messages, so I use that generic parameter to understand which one we care about.

Now that we've got our captured and merged observable, we can construct or final observable that includes the predicate and timeout, if exists (it won't if we're debugging):

var finalObs = obs.Cast<TMessageContext>().TakeUntil(predicate);
if (timeout != null)
{
    finalObs = finalObs.Timeout(timeout.Value);
}

With our observable in place, we can execute our test action and await for our observable to complete (either by satisfying our predicate or timing out):

await testAction();

// Force the observable to complete
await finalObs;

return new ObservedMessageContexts(
    incomingMessageContexts, 
    outgoingMessageContexts);

Once the final observable sequence completes, we can return back to our test the list of observed incoming/outgoing message contexts we capture earlier.

With this in place, our tests have a simple filter they can pass in based on their logical message type, with all the sent/received messages to assert against:

[Fact]
public async Task Should_execute_orchestration_saga()
{
    var client = _fixture.WebAppHost.CreateClient();

    var message = Guid.NewGuid().ToString();

    var response =
        await ExecuteAndWaitForHandled<SaySomethingResponse>(() =>
            client.GetAsync($"saysomething?message={message}"), 
            TimeSpan.FromSeconds(30));

    var saySomethingResponses = response.ReceivedMessages.OfType<SaySomethingResponse>().ToList();
    saySomethingResponses.Count.ShouldBe(1);
    saySomethingResponses[0].Message.ShouldContain(message);
}

It took a little bit to get the observables to work correctly, but once in, we can easily add end-to-end integration testing for complex messaging scenarios. Observability has a long tail of payoffs, as I found with integration testing.