Building Messaging Endpoints in Azure: A Generic Host

Posts in this series:

In the last post, we looked at the many ways we can consume messages in Azure as a message endpoint. Unfortunately, there's not an easy answer. We don't have a simple, PaaS solution to a background service. We basically have some very bare serverless options, some shimmed options, and IaaS options through containers.

Regardless of how we might want to deploy a message endpoint, we first need to build out something to host our generic endpoint. For our systems, we nearly always reach for NServiceBus to construct our message endpoint, since it provides nearly all of the enterprise integration patterns out-of-the-box, things we would have to write ourselves.

With the release of ASP.NET Core 2.1, we have a new option for building a host, using one provided by .NET Core, the .NET Generic Host.

.NET Generic Host

In ASP.NET Core projects, you have a file Program.cs that you probably don't pay much attention to. Inside of this file you'll see the Web Host, with something that looks like:

public class Program
{
    public static void Main(string[] args)
    {
        CreateWebHostBuilder(args).Build().Run();
    }

    public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseStartup<Startup>();
}

Inside of that CreateDefaultBuilder method is quite a bit of sensible defaults for web applications. Configuration sources, logging, hosting, there's a ton going on. But for generic hosting, we don't need any of that. We may not even have any HTTP traffic we're handling.

We could just write a blank console application, but doing so means we lose all of the other nice things provided by the host - a unified logging, configuration, and DI model. Ideally we can still use all of those services.Xyz methods we've used in our ASP.NET Core applications.

The simplest version of our generic host would be:

public static async Task Main(string[] args)
{
    var host = new HostBuilder()
        .Build();

    await host.RunAsync();
}

A production application will have a bit more going on of course - logging, configuration, instrumentation, and services configuration. Our host needs to run something, and for that, we'll use background tasks with hosted services:

public class MyHostedService : IHostedService {
    public Task StartAsync(CancellationToken cancellationToken) {
       // start work
    }

    public Task StopAsync(CancellationToken cancellationToken) {
       // stop work
    }
}

Then we add our hosted service as part of the host:

var host = new HostBuilder()
        .ConfigureServices(s => {
             s.AddHostedService<MyHostedService>();
        });
        .Build();

await host.RunAsync();

Integrating NServiceBus

Now that we have a way to build a generic host, and background service, we now need to build out the NServiceBus host as part of it. NServiceBus is host-agnostic, so we can host in any environment that can run .NET/.NET Core. That means we have a little bit of work to get NServiceBus integrated into the generic host. Additionally, we also want to share the same DI container between NServiceBus and the generic host.

First, we'll be using a few packages to help support our host:

The first is all the generic NServiceBus functionality, the second bridges the MS DI container with NServiceBus, and the final one is our transport package (NServiceBus supports numerous transports, Azure Service Bus just being one of them).

In our code, we eventually want to get to something like:

services.AddNServiceBus("endpoint_name");

And everything work from there. To accomplish both, the NServiceBus folks have a sample app that shows a working implementation of exactly this, so we'll adapt this sample for our needs:

namespace AddNServiceBus
{
    using System;
    using System.Threading;
    using Microsoft.Extensions.DependencyInjection;
    using NServiceBus;

    public static class AddNServiceBusServiceCollectionExtensions
    {
        public static IServiceCollection AddNServiceBus(this IServiceCollection services, string endpointName, Action<EndpointConfiguration> configuration)
        {
            var endpointConfiguration = new EndpointConfiguration(endpointName);
            configuration(endpointConfiguration);
            return services.AddNServiceBus(endpointConfiguration);
        }

        static IServiceCollection AddNServiceBus(this IServiceCollection services, EndpointConfiguration configuration)
        {
            services.AddSingleton(configuration);
            services.AddSingleton(new SessionAndConfigurationHolder(configuration));
            services.AddHostedService<NServiceBusService>();
            services.AddSingleton(provider =>
            {
                var management = provider.GetService<SessionAndConfigurationHolder>();
                if (management.MessageSession != null)
                {
                    return management.MessageSession;
                }

                var timeout = TimeSpan.FromSeconds(30);
                // SpinWait is here to accomodate for WebHost vs GenericHost difference
                // Closure here should be fine under the assumption we always fast track above once initialized
                if (!SpinWait.SpinUntil(() => management.MessageSession != null || management.StartupException != null,
                    timeout))
                {
                    throw new TimeoutException($"Unable to resolve the message session within '{timeout.ToString()}'. If you are trying to resolve the session within hosted services it is encouraged to use `Lazy<IMessageSession>` instead of `IMessageSession` directly");
                }

                management.StartupException?.Throw();

                return management.MessageSession;
            });
            services.AddSingleton(provider => new Lazy<IMessageSession>(provider.GetService<IMessageSession>));
            return services;
        }
    }
}

It's quite a bit of code, but one of the problems right now is that the NServiceBus message session needs to be registered with the container - but startup doesn't really allow us to separate this out. The best we can do is provide a Lazy<T>, so that the message session is created the first time it's asked for.

This will be improved in the future - NServiceBus folks are pursuing separating the starting of the NServiceBus host from registration, but for now, we have to do some tricks to "wait" until the host is ready for sending/receiving before returning it. If we don't need to send messages outside of our hosted service, we can skip the whole IMessageSession registration business. I don't like to get surprised, though, so this above registration can be used in any host, web or otherwise.

Next we have our NServiceBusService, the IHostedService implementation:

class NServiceBusService : IHostedService
{
	public NServiceBusService(SessionAndConfigurationHolder holder)
	{
		this.holder = holder;
	}

	public async Task StartAsync(CancellationToken cancellationToken)
	{
		try
		{
			endpoint = await Endpoint.Start(holder.EndpointConfiguration)
				.ConfigureAwait(false);
		}
		catch (Exception e)
		{
			holder.StartupException = ExceptionDispatchInfo.Capture(e);
			return;
		}

		holder.MessageSession = endpoint;
	}

	public async Task StopAsync(CancellationToken cancellationToken)
	{
		try
		{
			if (endpoint != null)
			{
				await endpoint.Stop()
					.ConfigureAwait(false);
			}
		}
		finally
		{
			holder.MessageSession = null;
			holder.StartupException = null;
		}
	}

	readonly SessionAndConfigurationHolder holder;
	IEndpointInstance endpoint;
}

This isn't doing a whole lot, but it's using the SessionAndConfigurationHolder as a holder for the started endpoint from StartAsync. It's effectively property injection, but it's necessary because we can't register the message session until it's started, but we've past the point of container configuration when the NServiceBus host starts. A little ugly, but typically, we put this kind of code in a common project/package that all of our endpoints share.

With this in place, we can configure our endpoint to start.

Configuring NServiceBus with the generic host

Now with our configuration extension set up, we can call them as part of starting our host, configuring the endpoint as needed:

public class Program
{
    public static Task Main(string[] args)
    {
        return CreateHostBuilder(args).Build().RunAsync();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
		    .ConfigureAppConfiguration(cfg => {
			    // Use user secrets, Azure App Configuration etc
			})
            .ConfigureServices((hostContext, services) =>
            {
                services.AddNServiceBus("MyMessagingEndpoint", endpointConfig => {
				    // Bridge MS DI
				    UpdateableServiceProvider container;
					
					endpointConfig.UseContainer<ServicesBuilder>(customizations => 
					{
					    customizations.ExistingServices(services);
						customizations.ServiceProviderFactory(svcs => {
						    container = new UpdateableServiceProvider(svcs);
							return container;
						});
					});
					
					var transport = endpointConfig.UseTransport<AzureServiceBusTransport>();
					var connectionString = hostContext.Configuration.GetConnectionString("AzureServiceBus");
					transport.ConnectionString(connectionString);
					
					// Other endpoint configuration
                });
            });
}

We bridge MS DI and NServiceBus per the package instructions, and finally configure our Azure Service Bus transport. Because we're using a unified configuration model, we can grab our connection string from the configuration model. Our configuration model can be pulled from any of the configuration provider sources, like appsettings.json, environment variables, user secrets, Azure App Configuration - wherever we like.

With our generic host ready to go, we can now determine how best to deploy our host to the variety of sources in Azure. Next up, let's look at hosting our endpoint in the IaaS model - Azure Containers Instances.