Building End-to-End Diagnostics: Activity and Span Correlation
Posts in this series:
- An Intro
- Trace Context
- Diagnostic Events
- OpenTelemetry Integration
- Activity and Span Correlation
- Visualization with Exporters
- User-Defined Context with Correlation Context
- ActivitySource and OpenTelemetry 1.0
In the last post, we looked at hooking up our diagnostics events (and Activities) to OpenTelemetry, where our main task was creating the appropriate span attributes based on the OpenTelemetry Span conventions. When everything runs, we get lovely correlated events based on our W3C tracecontext headers flowing through, as our Activity maintains a link to its parent, regardless of where that parent activity came from.
The Activity to Span connection works great, as long as we can maintain a link between the OTel "Span" and the Activity Start/Stop events themselves, and we're effectively able to start and stop the right activity when something interesting happens in our observed code.
Sometimes, it's not so easy to correlate the "start" and "stop" events for a given observed event. Consider our NServiceBus consumer activity logic:
public override async Task Invoke(
IIncomingPhysicalMessageContext context,
Func<Task> next)
{
var activity = StartActivity(context);
try
{
await next().ConfigureAwait(false);
}
finally
{
StopActivity(activity, context);
}
}
For the Start and Stop activities, we have the reference to the activity we started in that activity
variable. We can do this because the infrastructure hook for consuming a message wraps the entire operation, and we're passed a continuation (that next
parameter). But what happens if the infrastructure hooks we have for "start" and "stop" are physically separate? How do we know which Activity
to stop once the observed event completes?
I ran into this exact situation with the MongoDB .NET Driver, which does have a mechanism to raise internal diagnostic events, but your handling of these events is physically separate in your code.
Correlating Events
The main interface to listen to internal diagnostic events in the MongoDB .NET Driver is the IEventSubscriber
interface:
public interface IEventSubscriber
{
bool TryGetEventHandler<TEvent>(out Action<TEvent> handler);
}
We can register subscribers, but we only get this one method that gets called based on different TEvent
types. That means that we don't get a single method for opening a connection or issuing a command. Those are separate calls with separate callbacks.
First, let's get some boilerplate to handle these events. There's a helper class, ReflectionEventSubscriber
, that lets us define methods with a signature that accepts the event objects and the subscriber uses reflection to find these events. The events we're looking to handle are CommandStartedEvent
, CommandSucceededEvent
, and CommandFailedEvent
. Our event subscriber implementation can wrap this reflection-based event subscriber:
public class DiagnosticsActivityEventSubscriber : IEventSubscriber
{
private readonly ReflectionEventSubscriber _subscriber;
public DiagnosticsActivityEventSubscriber() =>
_subscriber = new ReflectionEventSubscriber(this,
bindingFlags: BindingFlags.Instance | BindingFlags.NonPublic);
public bool TryGetEventHandler<TEvent>(out Action<TEvent> handler)
=> _subscriber.TryGetEventHandler(out handler);
private void Handle(CommandStartedEvent @event)
{
// Start activity
}
private void Handle(CommandSucceededEvent @event)
{
// Stop activity
}
private void Handle(CommandFailedEvent @event)
{
// Stop activity
}
}
As we can see above, the methods to handle the start/stop activities are physically separate, but we need to share our started Activity
between these! We can store the started activity in a field, but when the stop events come in, we need to make sure we stop the right activity.
To find the "right" activity, we need some way to correlate the information in the CommandStartedEvent
object and the CommandSucceededEvent
/ CommandFailedEvent
objects. Luckily, the Mongo drivers for other languages are quite similar to .NET, and some of those other drivers have OpenTelemetry implementations. From that code, we can see that these events have a RequestId
that uniquely identifies this request against the MongoDB server. This could work!
Implementing our Event Subscriber
First, we need the plumbing of our diagnostic source in our DiagnosticsActivityEventSubscriber
class to raise the diagnostic events:
public class DiagnosticsActivityEventSubscriber : IEventSubscriber
{
public const string ActivityName = "MongoDB.Driver.Core.Events.Command";
private static readonly DiagnosticSource _diagnosticListener
= new DiagnosticListener(ActivityName);
Next, we somewhere to place our started Activity
objects, correlated by the RequestId
property. The most straightforward solution would be a ConcurrentDictionary
for this value:
private readonly ConcurrentDictionary<int, Activity> _activityMap
= new ConcurrentDictionary<int, Activity>();
Now, when we start our activity, we follow the normal steps we did in our NServiceBus example, with the added step of adding the started activity to our dictionary:
private void Handle(CommandStartedEvent @event)
{
var activity = new Activity(ActivityName);
if (_diagnosticListener.IsEnabled(CommandStarted.EventName, @event))
{
_diagnosticListener.StartActivity(activity, new CommandStarted(@event));
}
else
{
activity.Start();
}
_activityMap.TryAdd(@event.RequestId, activity);
}
With the event started and stored locally, we can stop our activity based on pulling the activity back out of the local dictionary:
private void Handle(CommandSucceededEvent @event)
{
if (_activityMap.TryRemove(@event.RequestId, out var activity))
{
_diagnosticListener.StopActivity(activity, new CommandSucceeded(@event));
}
}
private void Handle(CommandFailedEvent @event)
{
if (_activityMap.TryRemove(@event.RequestId, out var activity))
{
_diagnosticListener.StopActivity(activity, new CommandFailed(@event));
}
}
Initially, I tried to use Activity.Current
to pull out the activity, but that won't always be correct if the async context hasn't flowed through to my event handler (it did not). Instead, with this correlated dictionary, I can ensure I stop the correct activities and my eventual OTel spans will correlate as well.
OpenTelemetry Integration
On the OTel side, we have a similar issue where the "Current Span" assumes a common execution context. But since context isn't shared, we have to implement our ListenerHandler
in a similar fashion, but this time, correlate a TelemetrySpan
:
internal class CommandListener : ListenerHandler
{
public CommandListener(string sourceName, Tracer tracer)
: base(sourceName, tracer)
{
}
private readonly ConcurrentDictionary<int, TelemetrySpan> _spanMap
= new ConcurrentDictionary<int, TelemetrySpan>();
Now when we receive the OnStartActivity
call, we have to perform a similar operation to store our TelemetrySpan
based on that OperationId
:
public override void OnStartActivity(Activity activity, object payload)
{
if (!(payload is CommandStarted message))
{
AdapterEventSource.Log.NullPayload("CommandListener.OnStartActivity");
return;
}
Tracer.StartActiveSpanFromActivity($"mongodb.{message.Event.CommandName}",
activity,
SpanKind.Client,
out var span);
SetSpanAttributes(span, message);
_spanMap.TryAdd(message.Event.RequestId, span);
}
From here, stopping the activity will mean, instead of us accessing the CurrentSpan
, pulling it out of our dictionary:
public override void OnStopActivity(Activity activity, object payload)
{
if (!(payload is CommandSucceeded message))
{
AdapterEventSource.Log.NullPayload("CommandListener.OnStopActivity");
return;
}
if (_spanMap.TryRemove(message.Event.RequestId, out var span))
{
span.End();
}
}
public override void OnException(Activity activity, object payload)
{
if (!(payload is CommandFailed message))
{
AdapterEventSource.Log.NullPayload("CommandListener.OnExceptionActivity");
return;
}
if (_spanMap.TryRemove(message.Event.RequestId, out var span))
{
span.Status = Status.Unknown.WithDescription(message.Event.Failure.Message);
SetSpanAttributes(span, message);
span.End();
}
}
Although it took a bit more work to store the Activity
and TelemetrySpan
locally, doing so ensured that we correlated the correct instances for eventual publishing to our tracing adapters. If we only went with the Current
properties, we'd be stopping the wrong activities and spans, resulting in a very wonky looking request graph.
The last piece is registering my subscriber with MongoDB, but it's quite specific to that driver so you can check out the source code to see how it's all registered and hooked up.
In the next (and last) post, I'll walk through my sample application and see how these adapters will be hooked up and used to visualize complicated traces in Zipkin, Jaeger, and Application Insights.