Building End-to-End Diagnostics: OpenTelemetry Integration
Posts in this series:
- An Intro
- Trace Context
- Diagnostic Events
- OpenTelemetry Integration
- Activity and Span Correlation
- Visualization with Exporters
- User-Defined Context with Correlation Context
- ActivitySource and OpenTelemetry 1.0
In the last post, we looked at providing diagnostic event hooks into our code at specific points so that "something" could listen in. For our purposes, we want to listen in to surface telemetry data, but the uses are far wider for diagnostic events, such as logging, testing, and metrics. We want to focus on telemetry through the OpenTelemetry project.
So why OpenTelemetry? Just another standard we need to follow? As someone that uses NServiceBus on a wide variety of clients, I'm exposed to a number of different observability tools, to name a few:
- Zipkin
- Jaeger
- Dynatrace
- Application Insights
If you're a library author, like NServiceBus, and you want to enable observability through these tools, you'd have to create and maintain packages for each one of those products. Instead, if you could target some common API, like you can today for:
- Logging
- Dependency Injection
- Configuration
Then you don't have to have some extensive matrix of support. You target one common API, and the implementation providers can plug in to that common model. OpenTelemetry is just this - providing a standardized API for tracing primitives (and eventually, much more). The .NET SDK for OpenTelemetry (in alpha at the moment, in beta for most other runtimes/platforms) provides that SDK, as well as convenient bridges for the diagnostics event API
Plugging into OpenTelemetry
The diagnostic events by themselves provide quite a bit of value, but we need to listen to them in order to do something. That's where OpenTelemetry comes in - the "something" listening to diagnostic events. Since OpenTelemetry .NET SDK is currently in alpha, I'm going to reference the OpenTelemetry MyGet repository and reference the OpenTelemetry alpha package:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="0.2.0-alpha.231" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\NServiceBus.Diagnostics\NServiceBus.Diagnostics.csproj" />
</ItemGroup>
</Project>
This library I created serves as the listener to diagnostic events, as well as provide an extension to OpenTelemetry to register those listeners. But first, what do we need to create? With the current OpenTelemetry SDK, we need 3 things:
- A
ListenerHandler
that receivesActivity
-based diagnostic events - An
Adapter
that subscribes aListenerHandler
to diagnostic events - Extension method to
TracerBuilder
that registers ourAdapter
with OpenTelemetry
ListenerHandlers
For NServiceBus, we have two kinds of activities - sending and processing messages. That means we'll have two telemetry events, two spans, and two listeners. The OpenTelemetry SDK includes a helper for dealing with the Activity/OpenTelemetry bridge, and that's a base ListenerHandler
class. Note: this is all subject to change as it's all alpha, but the primitives defined in the OpenTelemetry Specification are at least in beta.
That base ListenerHandler
class has methods to override - when an Activity starts, stops, raises an exception event, or any other custom event. For us, we only have start/stop events, so we can create a class that overrides those two methods:
internal class SendMessageListener : ListenerHandler
{
public SendMessageListener(string sourceName, Tracer tracer) : base(sourceName, tracer)
{
}
public override void OnStartActivity(Activity activity, object payload)
{
ProcessEvent(activity, payload as BeforeSendMessage);
}
public override void OnStopActivity(Activity activity, object payload)
{
ProcessEvent(activity, payload as AfterSendMessage);
}
The payload
is the argument fed into StartActivity
in our diagnostic listener, allowing our OpenTelemetry listener to have an actual class to work with. The processing of those events needs to create OpenTelemetry spans from the activities, and add attributes to the span.
Our ProcessEvent
method is a bit long, so let's break it up into parts:
private void ProcessEvent(Activity activity, BeforeSendMessage payload)
{
if (payload == null)
{
AdapterEventSource.Log.NullPayload("SendMessageListener.OnStartActivity");
return;
}
var span = StartSpanFromActivity(activity, payload);
if (span.IsRecording)
{
SetSpanAttributes(activity, payload, span);
}
}
If the payload wasn't recognized as what we expect, then we simply return. Next, we start an OpenTelemetry span from the activity. Finally, if we detect that the span is recording, then we apply the span attributes.
The StartSpanFromActivity
is:
private TelemetrySpan StartSpanFromActivity(Activity activity, BeforeSendMessage payload)
{
payload.Context.Headers.TryGetValue(Headers.MessageIntent, out var intent);
var routes = payload.Context.RoutingStrategies
.Select(r => r.Apply(payload.Context.Headers))
.Select(t =>
{
switch (t)
{
case UnicastAddressTag u:
return u.Destination;
case MulticastAddressTag m:
return m.MessageType.Name;
default:
return null;
}
})
.ToList();
var operationName = $"{intent ?? activity.OperationName} {string.Join(", ", routes)}";
Tracer.StartActiveSpanFromActivity(operationName, activity, SpanKind.Producer, out var span);
return span;
}
I wanted to have a more meaningful name here, so I had to do a bit of munging of the incoming data to detect what "kind" of message we're sending here from the intent of the message (Send/Publish/Reply etc.). I also wanted to record where this operation was being sent to, so I also record the routes. The reason this is a little wonky is that NServiceBus is an abstraction over messaging, so I don't have access for example to the inner workings of RabbitMQ or Azure Service Bus or MSMQ.
The final result however is a Span
with a kind of SpanKind.Producer
started from the Activity
we pass in. The basic guidance of setting the span and its attributes comes from the OpenTelemetry span conventions around messaging.
With the span started, we now want to apply the span attributes. Following the span guidance, we set several well-known attributes:
private static void SetSpanAttributes(Activity activity, BeforeSendMessage payload, TelemetrySpan span)
{
span.SetAttribute("messaging.message_id", payload.Context.MessageId);
span.SetAttribute("messaging.message_payload_size_bytes", payload.Context.Body.Length);
span.ApplyContext(payload.Context.Builder.Build<ReadOnlySettings>(), payload.Context.Headers);
foreach (var tag in activity.Tags)
{
span.SetAttribute($"messaging.nservicebus.{tag.Key.ToLowerInvariant()}", tag.Value);
}
}
As well as fill in the span with any tags passed in through the Activity. Those extra tags translated into span attributes will allow the diagnostics hooks to add any extra details they want that can finally show up in telemetry events. There's a lot more attributes being set in the ApplyContext
method, but it's very specific to NServiceBus internals.
Finally, to end the span, the ProcessEvent
method takes in the AfterSendMessage
payload but doesn't really do anything with it - the current Span
already tracks the given Activity
so we don't have anything additional to add in its data:
private void ProcessEvent(Activity activity, AfterSendMessage payload)
{
Tracer.CurrentSpan.End();
}
The corresponding ProcessMessageListener
is quite similar, and you can find that code over on GitHub. With our listeners in place, we now need the hooks into OpenTelemetry to register our listeners with its configuration.
Registering Our Listeners
The last two steps in OpenTelemetry integration are to provide the bridge from "what diagnostic events should I be listening to" to the ListenerHandler
classes we created above. Listening to diagnostic events isn't the most straightforward thing in the world - it uses a global registry and a series of nested observables to subscribe to (more on that in a future post), and you need to be mindful to clean up your subscriptions to avoid memory leaks.
Luckily, OpenTelemetry .NET SDK has this nailed down for us, we just need to create a disposable class that uses a DiagnosticSourceSubscriber
to listen to specific named diagnostic events. Here's the one for the "Send" messages:
public class NServiceBusSendAdapter : IDisposable
{
private readonly DiagnosticSourceSubscriber _diagnosticSourceSubscriber;
public NServiceBusSendAdapter(Tracer tracer)
{
_diagnosticSourceSubscriber = new DiagnosticSourceSubscriber(
new SendMessageListener("NServiceBus.Diagnostics.Send", tracer),
null);
_diagnosticSourceSubscriber.Subscribe();
}
public void Dispose()
=> _diagnosticSourceSubscriber?.Dispose();
}
We create an instance of the DiagnosticSourceSubscriber
, which is the main helper for bridging diagnostic events and activities, and pass in an instance of our SendMessageListener
class we saw earlier. The name of the source, NServiceBus.Diagnostics.Send
, is the root name of our diagnostic event.
When the diagnostic events get raised, they are then suffixed with "Start" and "Stop" in their name, and the base listener class uses that convention to call the OnStart
and OnStop
methods. If you compare this code with say, the Azure SDK code for listening to diagnostic events, ours is MUCH easier to make sense of.
Finally, we need to create an extension to OpenTelemetry configuration for our adapters to register them with OpenTelemetry:
public static class TraceBuilderExtensions
{
public static TracerBuilder AddNServiceBusAdapter(this TracerBuilder builder)
=> builder
.AddAdapter(t => new NServiceBusReceiveAdapter(t))
.AddAdapter(t => new NServiceBusSendAdapter(t));
}
With that in place, we can now register our adapters in our application startup. But before we get to that, what happens when our underlying infrastructure does not play nicely with this model? In the next post, I'll walk through adding telemetry to the Mongo .NET driver, where async context is not preserved and we have to do some more work to correlate diagnostic events.