Building End-to-End Diagnostics and Tracing: Trace Context

Posts in this series:

Source Code

In the last post, I walked through the overall problem we run into with diagnosing issues in distributed systems - mainly that it can be difficult to determine causality because we don't have that "stack trace" with a single in-process application.

To create a sort of "trace" in a distributed system, we need some way to build breadcrumbs into our communications. When one system communicates with another, and that system calls another, we need some way to link those requests together:

Distributed systems communicating

In a downstream system that experiences a failure, how do we link that failure to the original request, and all between? This is where distributed tracing comes in.

Product companies and OSS filled the void, but there became a problem - each product, OSS or not, had its own way of providing additional context to each request to be able to link them together. The solution to causality is rather simple - we just need some context of the parent system/process that initiated the current request. That context is as simple as providing some unique identifier for the current request to all subsequent requests.

Very recently (February 2020), a new W3C standard exited "draft" status entered the "recommendation", Trace Context.  This standard describes mainly:

  • What is the ID of the current request?
  • What is the ID of the parent request?

It also allows requests to include some state information, but most important are those identifications. With an ID and parent ID, we can now create a directed acyclic graph, very similar to what we would see in a Git commit history.

Trace Context in .NET Core

In order to flow tracing identifiers through a request pipeline, regardless of the technology of the "in" and "out" request, we need some means of capturing the incoming tracing identifiers (on headers), storing them, and flowing them to outgoing headers. The basic pieces for this flow are:

  1. Incoming requests pull trace identifiers and store in an Activity
  2. Activity.Current includes any additional information for the current activity
  3. Outgoing requests read information from Activity.Current and place on outgoing trace headers.

One of the big pushes for observability in .NET Core 3.0 was to enable this W3C standard. Although it's not turned on by default for backwards compatibility reasons, if you turn it on:

public class Program
{
    public static void Main(string[] args)
    {
        Activity.DefaultIdFormat = ActivityIdFormat.W3C;

        CreateHostBuilder(args).Build().Run();
    }

That will use the W3C standards for identifiers, but we still need to consume and propagate these trace identifiers. Luckily, we can see how ASP.NET Core and HttpClient did this:

There's a lot more going on in that code that we'll get to soon, but first things first, we need middleware for NServiceBus to:

  • Start an Activity for incoming requests and set its parent ID from the traceparent header
  • Set the traceparent header for outgoing requests

Luckily for us, NServiceBus has a robust middleware API that makes it easy for us to add these pieces, Behaviors.

Incoming Requests to Activity

The first step in the process for diagnostics is to start an Activity at the beginning of processing a message and stop it at the end. We need to go one step further to add the appropriate parent ID. We can do this with a behavior defined for incoming messages:

public class ConsumerDiagnostics 
    : Behavior<IIncomingPhysicalMessageContext>
{
    public override async Task Invoke(
        IIncomingPhysicalMessageContext context
        Func<Task> next)
    {
        var activity = StartActivity(context);

        try
        {
            await next().ConfigureAwait(false);
        }
        finally
        {
            StopActivity(activity, context);
        }
    }

Behaviors in NServiceBus are similar to ASP.NET Core middleware. You get two parameters, the first being the context of the operation performed, and the second delegate to perform the next action in the chain.

The StartActivity method needs to do two things - start an Activity, and set pull the traceparent header off the incoming message:

private static Activity StartActivity(IIncomingPhysicalMessageContext context)
{
    var activity = new Activity(Constants.ConsumerActivityName);

    if (!context.MessageHeaders.TryGetValue(
        Constants.TraceParentHeaderName, 
        out var requestId))
    {
        context.MessageHeaders.TryGetValue(
            Constants.RequestIdHeaderName, 
            out requestId);
    }

    if (!string.IsNullOrEmpty(requestId))
    {
        // This is the magic 
        activity.SetParentId(requestId);
        
        if (context.MessageHeaders.TryGetValue(
            Constants.TraceStateHeaderName, 
            out var traceState))
        {
            activity.TraceStateString = traceState;
        }
    }

    // The current activity gets an ID with the W3C format
    activity.Start();

    return activity;
}

We first create an activity with a good name, in my case I chose NServiceBus.Diagnostics.Receive. There's not a ton of recommendations about naming activities, but it should be something meaningful to the overall operation that's being performed. Activity names are hierarchical for future purposes, so we want to adhere to some sort of namespacing. The ASP.NET Core name is Microsoft.AspNetCore.Hosting.HttpRequestIn and HttpClient is System.Net.Http.HttpRequestOut.

After creating the Activity, I try to pull the traceparent header value out. I'm also trying to be a good citizen and pull the older, previous request-id header value out. Once i have this, I can set the ParentId on the Activity. Finally, if it exists, I'll pull the tracestate value and stuff it into the Activity. There's some more things in store for distributed tracing related to additional correlation context items, but for now, I'll leave that alone.

Finally, I start the activity, and Activity.Current represents this new activity. Stopping the activity is straightforward - the only thing I really need to care about is setting an end time of the Activity:

private static void StopActivity(Activity activity,
    IIncomingPhysicalMessageContext context)
{
    if (activity.Duration == TimeSpan.Zero)
    {
        activity.SetEndTime(DateTime.UtcNow);
    }

    activity.Stop();
}

Setting an appropriate end time for the activity will mean more later on when we start raising diagnostic events, but we want to make sure the duration of the event is just around calling the next item in the pipeline.

That's incoming requests, what about outgoing ones?

Propagating trace context in outgoing messages

Just like we have incoming behaviors for messages, NServiceBus has outgoing behaviors as well. We just need to reverse the flow from above - set the traceparent header on outgoing messages from the current Actvity:

public class ProducerDiagnostics : Behavior<IOutgoingPhysicalMessageContext>
{
    public override async Task Invoke(
        IOutgoingPhysicalMessageContext context, 
        Func<Task> next)
    {
        var activity = StartActivity(context);

        InjectHeaders(activity, context);

        try
        {
            await next().ConfigureAwait(false);
        }
        finally
        {
            StopActivity(activity, context);
        }
    }

Starting the activity is much simpler now:

private static Activity StartActivity(IOutgoingPhysicalMessageContext context)
{
    var activity = new Activity(Constants.ProducerActivityName);

    activity.Start();

    return activity;
}

But wait, we're not setting the parent ID! For outgoing messages, we don't need to. If there's a current started activity, our activity will automatically have its ParentId set to Activity.Current.Id, so we're good to go without managing all that ourselves.

Next, we need to inject the headers of the current activity into the outgoing request:

private static void InjectHeaders(
    Activity activity,
    IOutgoingPhysicalMessageContext context)
{
    if (activity.IdFormat == ActivityIdFormat.W3C)
    {
        if (!context.Headers.ContainsKey(Constants.TraceParentHeaderName))
        {
            context.Headers[Constants.TraceParentHeaderName] = activity.Id;
            if (activity.TraceStateString != null)
            {
                context.Headers[Constants.TraceStateHeaderName] =
                    activity.TraceStateString;
            }
        }
    }
    else
    {
        if (!context.Headers.ContainsKey(Constants.RequestIdHeaderName))
        {
            context.Headers[Constants.RequestIdHeaderName] = 
                activity.Id;
        }
    }
}

The new request's parent ID will be this activity's ID, and that new parent ID will be consumed by downstream systems as well.

The magic here is Activity.Current, an async local static property that means that anything sharing the same async context will get the same Activity.Current value.

Stopping the activity looks exactly the same as the incoming requests:

private static void StopActivity(
    Activity activity, 
    IOutgoingPhysicalMessageContext context)
{
    if (activity.Duration == TimeSpan.Zero)
    {
        activity.SetEndTime(DateTime.UtcNow);
    }

    activity.Stop();
}

To enable these behaviors, you can use NServiceBus Features to add these behaviors to the processing pipeline automatically:

public class DiagnosticsFeature : Feature
{
    public DiagnosticsFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        context.Pipeline.Register(new ConsumerDiagnostics(), 
            "Parses incoming W3C trace information from incoming messages.");
        context.Pipeline.Register(new ProducerDiagnostics(), 
            "Appends W3C trace information to outgoing messages.");
    }
}

I enable this feature by default, with the future idea that anyone that references this package/assembly will get this behavior opted in. With all of this in place, how does this look in practice?

A Dummy Distributed System

I wanted to simulate all these different kinds of flows, which use a variety of hosts and communication:

  • Incoming HTTP to Outgoing NServiceBus
  • Incoming NServiceBus to Outgoing HTTP
  • Incoming NServiceBus to Outgoing NServiceBus

Incoming HTTP will be a regular ASP.NET Core application and host, and the incoming NServiceBus will be a worker service. I wanted to capture all manners of communication with these two applications:

Dummy distributed system communicating via HTTP and AMQP

My Web Server is a web application with this diagnostics code added, plus using the NServiceBus extension to .NET Core generic hosting. I created a simple API controller that uses the NServiceBus IMessageSession injected to send an AMQP message via RabbitMQ:

[HttpGet]
public async Task<ActionResult> Get(string message)
{
    var command = new SaySomething
    {
        Message = message
    };

    _logger.LogInformation("Sending message {message}", command.Message);

    await _messageSession.Send(command);

    return Accepted();
}

The handler of this message on the worker service makes the HTTP call and a Reply:

public class SaySomethingHandler : IHandleMessages<SaySomething>
{
    private readonly ILogger<SaySomethingHandler> _logger;

    private static readonly HttpClient _httpClient = new HttpClient
    {
        BaseAddress = new Uri("https://localhost:5001")
    };


    public SaySomethingHandler(ILogger<SaySomethingHandler> logger) 
        => _logger = logger;

    public async Task Handle(SaySomething message, IMessageHandlerContext context)
    {
        var content = await _httpClient.GetStringAsync("/weatherforecast/today");

        dynamic json = Deserialize<ExpandoObject>(content);

        var temp = (int)json.temperatureF.GetInt32();

        _logger.LogInformation("Saying {message} and the weather today is {weather}F", message.Message, temp);

        await context.Reply(new SaySomethingResponse
        {
            Message = $"Back at ya {message.Message}"
        });
    }
}

The API endpoint is rather dumb, it's the weather dummy data that I stuck in a database:

[HttpGet("today")]
public async Task<WeatherForecast> GetToday()
{
    var forecastCount = await _dbContext.Forecasts.CountAsync();

    var rng = new Random();

    return await _dbContext.Forecasts.Skip(rng.Next(forecastCount)).FirstAsync();
}

And the Reply handler doesn't do anything fun, but it stops the distributed flow:

public class SaySomethingResponseHandler 
    : IHandleMessages<SaySomethingResponse>
{
    private readonly ILogger<SaySomethingResponseHandler> _logger;

    public SaySomethingResponseHandler(ILogger<SaySomethingResponseHandler> logger)
        => _logger = logger;

    public Task Handle(SaySomethingResponse message, IMessageHandlerContext context)
    {
        _logger.LogInformation("Received {message}", message.Message);

        return Task.CompletedTask;
    }
}

With all the pieces in place, let's trace the flow from the initial request all the way through each receiver and out again.

Tracing the flow

It all starts with initiating the request with the Swagger UI:

Initial request in Swagger UI

Logging the Activity.Current.Id and Activity.Current.ParentId:

Initial request activity ID and parent ID

We see that the current activity ID has a value, but the parent ID does not. This makes sense - the Swagger UI doesn't track activities and does not pass a traceparent header along.

With the message sent, let's look at the message in RabbitMQ to see if it has a traceparent value that matches the above:

RabbitMQ message with matching traceparent header value

It does! Let's now run the whole system end-to-end and watch the activity IDs in our logs:

Aggregate logs for both web and worker service with activity IDs

We can see that all of our activity IDs share the same trace-id fragment, while the parent-id values differ (technical detail, but these link to a span in tracing terms).

With our tracing identifiers correctly propagating, we've laid the groundwork to start putting humpty dumpty together again. In the next post, we'll look at how we can raise diagnostic events so that something can see these traces outside of directly instrumenting our traffic.