Building End-to-End Diagnostics: OpenTelemetry Integration

Posts in this series:

Source Code

In the last post, we looked at providing diagnostic event hooks into our code at specific points so that "something" could listen in. For our purposes, we want to listen in to surface telemetry data, but the uses are far wider for diagnostic events, such as logging, testing, and metrics. We want to focus on telemetry through the OpenTelemetry project.

So why OpenTelemetry? Just another standard we need to follow? As someone that uses NServiceBus on a wide variety of clients, I'm exposed to a number of different observability tools, to name a few:

  • Zipkin
  • Jaeger
  • Dynatrace
  • Application Insights

If you're a library author, like NServiceBus, and you want to enable observability through these tools, you'd have to create and maintain packages for each one of those products. Instead, if you could target some common API, like you can today for:

  • Logging
  • Dependency Injection
  • Configuration

Then you don't have to have some extensive matrix of support. You target one common API, and the implementation providers can plug in to that common model. OpenTelemetry is just this - providing a standardized API for tracing primitives (and eventually, much more). The .NET SDK for OpenTelemetry (in alpha at the moment, in beta for most other runtimes/platforms) provides that SDK, as well as convenient bridges for the diagnostics event API

Plugging into OpenTelemetry

The diagnostic events by themselves provide quite a bit of value, but we need to listen to them in order to do something. That's where OpenTelemetry comes in - the "something" listening to diagnostic events. Since OpenTelemetry .NET SDK is currently in alpha, I'm going to reference the OpenTelemetry MyGet repository and reference the OpenTelemetry alpha package:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <TargetFramework>netstandard2.0</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="OpenTelemetry" Version="0.2.0-alpha.231" />
  </ItemGroup>

  <ItemGroup>
    <ProjectReference Include="..\NServiceBus.Diagnostics\NServiceBus.Diagnostics.csproj" />
  </ItemGroup>

</Project>

This library I created serves as the listener to diagnostic events, as well as provide an extension to OpenTelemetry to register those listeners. But first, what do we need to create? With the current OpenTelemetry SDK, we need 3 things:

  • A ListenerHandler that receives Activity-based diagnostic events
  • An Adapter that subscribes a ListenerHandler to diagnostic events
  • Extension method to TracerBuilder that registers our Adapter with OpenTelemetry

ListenerHandlers

For NServiceBus, we have two kinds of activities - sending and processing messages. That means we'll have two telemetry events, two spans, and two listeners. The OpenTelemetry SDK includes a helper for dealing with the Activity/OpenTelemetry bridge, and that's a base ListenerHandler class. Note: this is all subject to change as it's all alpha, but the primitives defined in the OpenTelemetry Specification are at least in beta.

That base ListenerHandler class has methods to override - when an Activity starts, stops, raises an exception event, or any other custom event. For us, we only have start/stop events, so we can create a class that overrides those two methods:

internal class SendMessageListener : ListenerHandler
{
    public SendMessageListener(string sourceName, Tracer tracer) : base(sourceName, tracer)
    {
    }

    public override void OnStartActivity(Activity activity, object payload)
    {
        ProcessEvent(activity, payload as BeforeSendMessage);
    }

    public override void OnStopActivity(Activity activity, object payload)
    {
        ProcessEvent(activity, payload as AfterSendMessage);
    }

The payload is the argument fed into StartActivity in our diagnostic listener, allowing our OpenTelemetry listener to have an actual class to work with. The processing of those events needs to create OpenTelemetry spans from the activities, and add attributes to the span.

Our ProcessEvent method is a bit long, so let's break it up into parts:

private void ProcessEvent(Activity activity, BeforeSendMessage payload)
{
    if (payload == null)
    {
        AdapterEventSource.Log.NullPayload("SendMessageListener.OnStartActivity");
        return;
    }

    var span = StartSpanFromActivity(activity, payload);

    if (span.IsRecording)
    {
        SetSpanAttributes(activity, payload, span);
    }
}

If the payload wasn't recognized as what we expect, then we simply return. Next, we start an OpenTelemetry span from the activity. Finally, if we detect that the span is recording, then we apply the span attributes.

The StartSpanFromActivity is:

private TelemetrySpan StartSpanFromActivity(Activity activity, BeforeSendMessage payload)
{
    payload.Context.Headers.TryGetValue(Headers.MessageIntent, out var intent);

    var routes = payload.Context.RoutingStrategies
        .Select(r => r.Apply(payload.Context.Headers))
        .Select(t =>
        {
            switch (t)
            {
                case UnicastAddressTag u:
                    return u.Destination;
                case MulticastAddressTag m:
                    return m.MessageType.Name;
                default:
                    return null;
            }
        })
        .ToList();

    var operationName = $"{intent ?? activity.OperationName} {string.Join(", ", routes)}";

    Tracer.StartActiveSpanFromActivity(operationName, activity, SpanKind.Producer, out var span);
    return span;
}

I wanted to have a more meaningful name here, so I had to do a bit of munging of the incoming data to detect what "kind" of message we're sending here from the intent of the message (Send/Publish/Reply etc.). I also wanted to record where this operation was being sent to, so I also record the routes. The reason this is a little wonky is that NServiceBus is an abstraction over messaging, so I don't have access for example to the inner workings of RabbitMQ or Azure Service Bus or MSMQ.

The final result however is a Span with a kind of SpanKind.Producer started from the Activity we pass in. The basic guidance of setting the span and its attributes comes from the OpenTelemetry span conventions around messaging.

With the span started, we now want to apply the span attributes. Following the span guidance, we set several well-known attributes:

private static void SetSpanAttributes(Activity activity, BeforeSendMessage payload, TelemetrySpan span)
{
    span.SetAttribute("messaging.message_id", payload.Context.MessageId);
    span.SetAttribute("messaging.message_payload_size_bytes", payload.Context.Body.Length);

    span.ApplyContext(payload.Context.Builder.Build<ReadOnlySettings>(), payload.Context.Headers);

    foreach (var tag in activity.Tags)
    {
        span.SetAttribute($"messaging.nservicebus.{tag.Key.ToLowerInvariant()}", tag.Value);
    }
}

As well as fill in the span with any tags passed in through the Activity. Those extra tags translated into span attributes will allow the diagnostics hooks to add any extra details they want that can finally show up in telemetry events. There's a lot more attributes being set in the ApplyContext method, but it's very specific to NServiceBus internals.

Finally, to end the span, the ProcessEvent method takes in the AfterSendMessage payload but doesn't really do anything with it - the current Span already tracks the given Activity so we don't have anything additional to add in its data:

private void ProcessEvent(Activity activity, AfterSendMessage payload)
{
    Tracer.CurrentSpan.End();
}

The corresponding ProcessMessageListener is quite similar, and you can find that code over on GitHub. With our listeners in place, we now need the hooks into OpenTelemetry to register our listeners with its configuration.

Registering Our Listeners

The last two steps in OpenTelemetry integration are to provide the bridge from "what diagnostic events should I be listening to" to the ListenerHandler classes we created above. Listening to diagnostic events isn't the most straightforward thing in the world - it uses a global registry and a series of nested observables to subscribe to (more on that in a future post), and you need to be mindful to clean up your subscriptions to avoid memory leaks.

Luckily, OpenTelemetry .NET SDK has this nailed down for us, we just need to create a disposable class that uses a DiagnosticSourceSubscriber to listen to specific named diagnostic events. Here's the one for the "Send" messages:

public class NServiceBusSendAdapter : IDisposable
{
    private readonly DiagnosticSourceSubscriber _diagnosticSourceSubscriber;

    public NServiceBusSendAdapter(Tracer tracer)
    {
        _diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(
            new SendMessageListener("NServiceBus.Diagnostics.Send", tracer),
            null);
        _diagnosticSourceSubscriber.Subscribe();
    }

    public void Dispose() 
        => _diagnosticSourceSubscriber?.Dispose();
}

We create an instance of the DiagnosticSourceSubscriber, which is the main helper for bridging diagnostic events and activities, and pass in an instance of our SendMessageListener class we saw earlier. The name of the source, NServiceBus.Diagnostics.Send, is the root name of our diagnostic event.

When the diagnostic events get raised, they are then suffixed with "Start" and "Stop" in their name, and the base listener class uses that convention to call the OnStart and OnStop methods. If you compare this code with say, the Azure SDK code for listening to diagnostic events, ours is MUCH easier to make sense of.

Finally, we need to create an extension to OpenTelemetry configuration for our adapters to register them with OpenTelemetry:

public static class TraceBuilderExtensions
{
    public static TracerBuilder AddNServiceBusAdapter(this TracerBuilder builder)
        => builder
            .AddAdapter(t => new NServiceBusReceiveAdapter(t))
            .AddAdapter(t => new NServiceBusSendAdapter(t));
}

With that in place, we can now register our adapters in our application startup. But before we get to that, what happens when our underlying infrastructure does not play nicely with this model? In the next post, I'll walk through adding telemetry to the Mongo .NET driver, where async context is not preserved and we have to do some more work to correlate diagnostic events.