Building End-to-End Diagnostics: ActivitySource and OpenTelemetry 1.0

Posts in this series:

It's a few months, and quite a lot has changed in the tracing landscape in .NET. As OpenTelemetry marches towards 1.0, the .NET team had to make a decision. Given there is a lot of code out there using the existing tracing/telemetry APIs in both the .NET codebase and various SDKs, how should .NET 5 best support the OpenTelemetry specification for:

  • Logging
  • Tracing
  • Metrics

The tracing specification is solid at this point (but not yet "1.0"), and the tracing specification is not just the W3C standards on the wire, but the span APIs themselves.

Since the Activity API already exists, and it's already being used, it was far easier to support a "Span" concept as part of an Activity and close the gaps in functionality.

The second major addition in the .NET 5 release was the addition of the ActivitySource/ActivityListener APIs, which make it quite a bit simpler to raise and listen to events for Activity start/stop.

The overall plan was to convert from DiagnosticListener -based activity interaction to the new ActivitySource API. But before we do that, let's look at the current state of our packages.

Current Packages

Before I made any changes, you would see the code that started the activity be fairly light, something like:

_diagnosticListener.OnActivityImport(activity, context);

if (_diagnosticListener.IsEnabled(StartActivityName, context))
{
    _diagnosticListener.StartActivity(activity, context);
}
else
{
    activity.Start();
}

The code starting the Activity would need to do all the work to see if the diagnostic listener was enabled, and if it were, then use the listener to start the activity. You could then pass in a context object that could be...anything. The code then receiving the listener event would then do whatever it wanted, and in the case of OpenTelemetry, it would enrich the event with additional tags, create a Span, then record it.

That left my two packages as:

  • Xyz.Extensions.Diagnostics - raise a diagnostic listener event
  • Xyz.Extensions.OpenTelemetry - listen to the diagnostic listener event, add tags, and enlist with OpenTelemetry infrastructure

In the "new" world, we really want to treat our generated Activity as our first-class "Span", and not something that some other package needs to muck around with. This means a substantial change for our packages:

  • Xyz.Extensions.Diagnostics - create ActivitySource, add tags, start/stop event through ActivitySource
  • Xyz.Extensions.OpenTelemetry - ??? maybe nothing?

I'll come back to the second package, but the big change is moving all the tag creation code from the OpenTelemetry package over to the Diagnostics one.

This is a good thing, because now we don't have to rely on some external package to add interesting telemetry information to our Activity, we've already added it!

Converting to ActivitySource

Instead of a DiagnosticListener, we'll want to use an ActivitySource, and for this part, I'll follow the guidance on this API from OpenTelemetry. We create an ActivitySource that we'll share for all activities in our application:

internal static class NServiceBusActivitySource
{
    private static readonly AssemblyName AssemblyName 
        = typeof(NServiceBusActivitySource).Assembly.GetName();
    internal static readonly ActivitySource ActivitySource 
        = new (AssemblyName.Name, AssemblyName.Version.ToString());
}

Per conventions, the activity source name should be the name of the assembly creating the activities. That makes it much easier to "discover" activities, you don't have to expose a constant or search through source code to discern the name.

In our main methods for dealing with the Activity, we can take advantage of Activity implementing IDisposable to automatically stop the activity:

public override async Task Invoke(
    IIncomingPhysicalMessageContext context,
    Func<Task> next)
{
    using (StartActivity(context))
    {
        await next().ConfigureAwait(false);

        if (_diagnosticListener.IsEnabled(EventName))
        {
            _diagnosticListener.Write(EventName, context);
        }
    }
}

I've kept a DiagnosticListener here for backward compatibility purposes. The StartActivity method takes a lot of the header business we did earlier, with one slight variation - using the renamed W3C  Baggage spec:

if (context.MessageHeaders.TryGetValue(Headers.BaggageHeaderName, out var baggageValue)
   || context.MessageHeaders.TryGetValue(Headers.CorrelationContextHeaderName, out baggageValue))
{
    var baggage = baggageValue.Split(',');
    if (baggage.Length > 0)
    {
        foreach (var item in baggage)
        {
            if (NameValueHeaderValue.TryParse(item, out var baggageItem))
            {
                baggageItems.Add(new KeyValuePair<string, string?>(baggageItem.Name, HttpUtility.UrlDecode(baggageItem.Value)));
            }
        }
    }
}

Finally, starting our activity is a little different. We use the ActivitySource to start, passing along the parentId if we found it in the incoming headers:

var activity = parentId == null
    ? NServiceBusActivitySource.ActivitySource.StartActivity(
        ActivityNames.IncomingPhysicalMessage, 
        ActivityKind.Consumer)
    : NServiceBusActivitySource.ActivitySource.StartActivity(
        ActivityNames.IncomingPhysicalMessage, 
        ActivityKind.Consumer, 
        parentId);

if (activity == null)
{
    return activity;
}

Now instead of checking the diagnosticListener to see if any one is listening, StartActivity returns null when there are no listeners. We can simply return.

If there is someone listening, we can enrich our Activity with data:

activity.TraceStateString = traceStateString;

_activityEnricher.Enrich(activity, context);

foreach (var baggageItem in baggageItems)
{
    activity.AddBaggage(baggageItem.Key, baggageItem.Value);
}

return activity;

I've encapsulated all of the context reading of headers and setting of tags in a separate ActivityEnricher object:

public void Enrich(Activity activity, IIncomingPhysicalMessageContext context)
{
    var destinationName = _settings.LogicalAddress();
    const string operationName = "process";
    activity.DisplayName = $"{destinationName} {operationName}";

    activity.AddTag("messaging.message_id", context.Message.MessageId);
    activity.AddTag("messaging.operation", operationName);
    activity.AddTag("messaging.destination", destinationName);
    activity.AddTag("messaging.message_payload_size_bytes", context.Message.Body.Length.ToString());

I've included everything I could match on the OpenTelemetry semantic conventions.

There was one extra piece, however, which was configuring for "extra" information that I didn't want to turn on by default.

Configuring Span Attributes

In my original OpenTelemetry integration, I included a way to record the message contents as part of your trace. However, now that the Activity needs to include all the relevant information, I needed to have a way to configure the tags when it's actually started and stopped.

To do so, I integrated with the configuration of the extension point of this feature in NServiceBus. I first created a class to hold all of the configuration settings (just one for now):

public class InstrumentationOptions
{
    public bool CaptureMessageBody { get; set; }
}

Next, I had my Activity enrichment class take the settings as a constructor argument:

internal class SettingsActivityEnricher : IActivityEnricher
{
    private readonly ReadOnlySettings _settings;
    private readonly InstrumentationOptions _options;

    public SettingsActivityEnricher(ReadOnlySettings settings)
    {
        _settings = settings;
        _options = settings.Get<InstrumentationOptions>();
    }

Now when I enrich the Activity, I'll also look at these settings:

if (activity.IsAllDataRequested 
    && _options.CaptureMessageBody)
{
    activity.AddTag("messaging.message_payload", Encoding.UTF8.GetString(context.Message.Body));
}

Then when I register the NServiceBus behavior, I'll look for these settings:

public DiagnosticsFeature()
{
    Defaults(settings => settings.SetDefault<InstrumentationOptions>(new InstrumentationOptions
    {
        CaptureMessageBody = false
    }));
    EnableByDefault();
}

protected override void Setup(FeatureConfigurationContext context)
{
    var activityEnricher = new SettingsActivityEnricher(context.Settings);

    context.Pipeline.Register(new IncomingPhysicalMessageDiagnostics(activityEnricher), "Parses incoming W3C trace information from incoming messages.");
    context.Pipeline.Register(new OutgoingPhysicalMessageDiagnostics(activityEnricher), "Appends W3C trace information to outgoing messages.");

All this code is very specific to NServiceBus, so for any custom diagnostics, they'll need to plug in to whatever extensibility model that exists for the middleware they're building. For MongoDB for example, I had a direct constructor:

var clientSettings = MongoClientSettings.FromUrl(mongoUrl);
var options = new InstrumentationOptions { CaptureCommandText = true };
clientSettings.ClusterConfigurator = cb => cb.Subscribe(new DiagnosticsActivityEventSubscriber(options));
var mongoClient = new MongoClient(clientSettings);

With all this in place, now I just needed to integrate with OpenTelemetry (though this is really optional).

OpenTelemetry Integration

Now that OpenTelemetry and System.Diagnostics.DiagnosticSource are more aligned, registering and listening to activities in OpenTelemetry is as simple as registering a source:

services.AddOpenTelemetryTracing(builder =>  builder
    .AddAspNetCoreInstrumentation()
    .AddSqlClientInstrumentation(opt => opt.SetTextCommandContent = true)
    .AddSource("NServiceBus.Extensions.Diagnostics")
    .AddZipkinExporter(o =>
    {
        o.Endpoint = new Uri("http://localhost:9411/api/v2/spans");
        o.ServiceName = Program.EndpointName;
    })

Or, to make things a little easier, I created a tiny little package, the old NServiceBus.Extensions.Diagnostics.OpenTelemetry one that wraps that one line:

public static class TracerProviderBuilderExtensions
{
    public static TracerProviderBuilder AddNServiceBusInstrumentation(this TracerProviderBuilder builder)
        => builder.AddSource("NServiceBus.Extensions.Diagnostics");
}

It's not required, but at this point, this is all we need to bridge ActivitySource-enabled telemetry with OpenTelemetry.

So that's all for now - OpenTelemetry marches to 1.0, and once it does, I'll update my end-to-end example (it's on the RC now).