Building End-to-End Diagnostics: Activity and Span Correlation

Posts in this series:

Source Code

In the last post, we looked at hooking up our diagnostics events (and Activities) to OpenTelemetry, where our main task was creating the appropriate span attributes based on the OpenTelemetry Span conventions. When everything runs, we get lovely correlated events based on our W3C tracecontext headers flowing through, as our Activity maintains a link to its parent, regardless of where that parent activity came from.

The Activity to Span connection works great, as long as we can maintain a link between the OTel "Span" and the Activity Start/Stop events themselves, and we're effectively able to start and stop the right activity when something interesting happens in our observed code.

Sometimes, it's not so easy to correlate the "start" and "stop" events for a given observed event. Consider our NServiceBus consumer activity logic:

public override async Task Invoke(
    IIncomingPhysicalMessageContext context,
    Func<Task> next)
{
    var activity = StartActivity(context);

    try
    {
        await next().ConfigureAwait(false);
    }
    finally
    {
        StopActivity(activity, context);
    }
}

For the Start and Stop activities, we have the reference to the activity we started in that activity variable. We can do this because the infrastructure hook for consuming a message wraps the entire operation, and we're passed a continuation (that next parameter). But what happens if the infrastructure hooks we have for "start" and "stop" are physically separate? How do we know which Activity to stop once the observed event completes?

I ran into this exact situation with the MongoDB .NET Driver, which does have a mechanism to raise internal diagnostic events, but your handling of these events is physically separate in your code.

Correlating Events

The main interface to listen to internal diagnostic events in the MongoDB .NET Driver is the IEventSubscriber interface:

public interface IEventSubscriber
{
    bool TryGetEventHandler<TEvent>(out Action<TEvent> handler);
}

We can register subscribers, but we only get this one method that gets called based on different TEvent types. That means that we don't get a single method for opening a connection or issuing a command. Those are separate calls with separate callbacks.

First, let's get some boilerplate to handle these events. There's a helper class, ReflectionEventSubscriber, that lets us define methods with a signature that accepts the event objects and the subscriber uses reflection to find these events. The events we're looking to handle are CommandStartedEvent, CommandSucceededEvent, and CommandFailedEvent. Our event subscriber implementation can wrap this reflection-based event subscriber:

public class DiagnosticsActivityEventSubscriber : IEventSubscriber
{
    private readonly ReflectionEventSubscriber _subscriber;

    public DiagnosticsActivityEventSubscriber() =>
        _subscriber = new ReflectionEventSubscriber(this, 
            bindingFlags: BindingFlags.Instance | BindingFlags.NonPublic);

    public bool TryGetEventHandler<TEvent>(out Action<TEvent> handler) 
        => _subscriber.TryGetEventHandler(out handler);

    private void Handle(CommandStartedEvent @event)
    {
        // Start activity
    }

    private void Handle(CommandSucceededEvent @event)
    {
        // Stop activity
    }

    private void Handle(CommandFailedEvent @event)
    {
        // Stop activity
    }
}

As we can see above, the methods to handle the start/stop activities are physically separate, but we need to share our started Activity between these! We can store the started activity in a field, but when the stop events come in, we need to make sure we stop the right activity.

To find the "right" activity, we need some way to correlate the information in the CommandStartedEvent object and the CommandSucceededEvent/ CommandFailedEvent objects. Luckily, the Mongo drivers for other languages are quite similar to .NET, and some of those other drivers have OpenTelemetry implementations. From that code, we can see that these events have a RequestId that uniquely identifies this request against the MongoDB server. This could work!

Implementing our Event Subscriber

First, we need the plumbing of our diagnostic source in our DiagnosticsActivityEventSubscriber class to raise the diagnostic events:

public class DiagnosticsActivityEventSubscriber : IEventSubscriber
{
    public const string ActivityName = "MongoDB.Driver.Core.Events.Command";

    private static readonly DiagnosticSource _diagnosticListener
        = new DiagnosticListener(ActivityName);

Next, we somewhere to place our started Activity objects, correlated by the RequestId property. The most straightforward solution would be a ConcurrentDictionary for this value:

private readonly ConcurrentDictionary<int, Activity> _activityMap
    = new ConcurrentDictionary<int, Activity>();

Now, when we start our activity, we follow the normal steps we did in our NServiceBus example, with the added step of adding the started activity to our dictionary:

private void Handle(CommandStartedEvent @event)
{
    var activity = new Activity(ActivityName);

    if (_diagnosticListener.IsEnabled(CommandStarted.EventName, @event))
    {
        _diagnosticListener.StartActivity(activity, new CommandStarted(@event));
    }
    else
    {
        activity.Start();
    }

    _activityMap.TryAdd(@event.RequestId, activity);
}

With the event started and stored locally, we can stop our activity based on pulling the activity back out of the local dictionary:

private void Handle(CommandSucceededEvent @event)
{
    if (_activityMap.TryRemove(@event.RequestId, out var activity))
    {
        _diagnosticListener.StopActivity(activity, new CommandSucceeded(@event));
    }
}

private void Handle(CommandFailedEvent @event)
{
    if (_activityMap.TryRemove(@event.RequestId, out var activity))
    {
        _diagnosticListener.StopActivity(activity, new CommandFailed(@event));
    }
}

Initially, I tried to use Activity.Current to pull out the activity, but that won't always be correct if the async context hasn't flowed through to my event handler (it did not). Instead, with this correlated dictionary, I can ensure I stop the correct activities and my eventual OTel spans will correlate as well.

OpenTelemetry Integration

On the OTel side, we have a similar issue where the "Current Span" assumes a common execution context. But since context isn't shared, we have to implement our ListenerHandler in a similar fashion, but this time, correlate a TelemetrySpan:

internal class CommandListener : ListenerHandler
{
    public CommandListener(string sourceName, Tracer tracer) 
        : base(sourceName, tracer)
    {
    }

    private readonly ConcurrentDictionary<int, TelemetrySpan> _spanMap
        = new ConcurrentDictionary<int, TelemetrySpan>();

Now when we receive the OnStartActivity call, we have to perform a similar operation to store our TelemetrySpan based on that OperationId:

public override void OnStartActivity(Activity activity, object payload)
{
    if (!(payload is CommandStarted message))
    {
        AdapterEventSource.Log.NullPayload("CommandListener.OnStartActivity");
        return;
    }

    Tracer.StartActiveSpanFromActivity($"mongodb.{message.Event.CommandName}",
        activity, 
        SpanKind.Client, 
        out var span);

    SetSpanAttributes(span, message);

    _spanMap.TryAdd(message.Event.RequestId, span);
}

From here, stopping the activity will mean, instead of us accessing the CurrentSpan, pulling it out of our dictionary:

public override void OnStopActivity(Activity activity, object payload)
{
    if (!(payload is CommandSucceeded message))
    {
        AdapterEventSource.Log.NullPayload("CommandListener.OnStopActivity");
        return;
    }

    if (_spanMap.TryRemove(message.Event.RequestId, out var span))
    {
        span.End();
    }
}

public override void OnException(Activity activity, object payload)
{
    if (!(payload is CommandFailed message))
    {
        AdapterEventSource.Log.NullPayload("CommandListener.OnExceptionActivity");
        return;
    }

    if (_spanMap.TryRemove(message.Event.RequestId, out var span))
    {
        span.Status = Status.Unknown.WithDescription(message.Event.Failure.Message);
        SetSpanAttributes(span, message);
        span.End();
    }
}

Although it took a bit more work to store the Activity and TelemetrySpan locally, doing so ensured that we correlated the correct instances for eventual publishing to our tracing adapters. If we only went with the Current properties, we'd be stopping the wrong activities and spans, resulting in a very wonky looking request graph.

The last piece is registering my subscriber with MongoDB, but it's quite specific to that driver so you can check out the source code to see how it's all registered and hooked up.

In the next (and last) post, I'll walk through my sample application and see how these adapters will be hooked up and used to visualize complicated traces in Zipkin, Jaeger, and Application Insights.