Skip to content

Middleware, Pipes, and Filters

MassTransit is built using a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters, each of which is a key atom and is described below.

Middleware components are configured using extension methods on any pipe configurator IPipeConfigurator<T>, and the extension methods all begin with Use to separate them from other methods.

To understand how middleware filters are built, an understanding of filters and pipes is needed.

A filter is a middleware filter that performs a specific function and should adhere to the single responsibility principal – do one thing, one thing only (and hopefully do it well). By sticking to this approach, developers are able to opt in to each behavior without including unnecessary or unwanted functionality.

There are many filters included with MassTransit. In fact, all message flows in MassTransit are composed of pipes and filters.

Developers can create their own filters. To create a filter, create a class that implements IFilter<T>.

public interface IFilter<T>
where T : class, PipeContext
{
void Probe(ProbeContext context);
Task Send(T context, IPipe<T> next);
}

The Probe method is used to interrogate the filter about its behavior. This should describe the filter in a way that a developer would understand its role when looking at a network graph. For example, a transaction filter may add the following to the context.

public void Probe(ProbeContext context)
{
context.CreateFilterScope("transaction");
}

The Send method is used to send contexts through the pipe to each filter. Context is the actual context, and next is used to pass the context to the next filter in the pipe. Send returns a Task, and should always follow the .NET guidelines for asynchronous methods.

When a middleware filter is added to or inserted into the message pipeline, it has absolute and complete control of the execution context and flow of the message. Pipelines execute asynchronously and expect that asynchronous operations will be performed.

public class ExceptionLoggerFilter<T> :
IFilter<T>
where T : class, PipeContext
{
long _exceptionCount;
long _successCount;
long _attemptCount;
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("exceptionLogger");
scope.Add("attempted", _attemptCount);
scope.Add("succeeded", _successCount);
scope.Add("faulted", _exceptionCount);
}
/// <summary>
/// Send is called for each context that is sent through the pipeline
/// </summary>
/// <param name="context">The context sent through the pipeline</param>
/// <param name="next">The next filter in the pipe, must be called or the pipe ends here</param>
public async Task Send(T context, IPipe<T> next)
{
try
{
Interlocked.Increment(ref _attemptCount);
// here the next filter in the pipe is called
await next.Send(context);
Interlocked.Increment(ref _successCount);
}
catch (Exception ex)
{
Interlocked.Increment(ref _exceptionCount);
await Console.Out.WriteLineAsync($"An exception occurred: {ex.Message}");
// propagate the exception up the call stack
throw;
}
}
}

Middleware filters are configured using extension methods to make them easy to discover. It is highly recommended that the extension method name begins with Use to separate it from other methods and to follow the established naming convention set by MassTransit.

For the example middleware filter above, the configuration method might resemble the code below.

x.UsingInMemory((context,cfg) =>
{
cfg.UseExceptionLogger(); // <-- our new method
cfg.ConfigureEndpoints(context);
});

The extension method creates the pipe specification for the middleware filter, which can be added to a compatible pipe. There are several different pipe configurators available, depending on the type of pipe being configured.

PipeContext TypePipeConfigurator Type
ReceiveContextUsed by the receive pipeline (prior to message deserialization)
ConsumeContextUsed by the consume pipeline
SendContextUsed by the send pipeline
PublishContextUsed by the publish pipeline
public static class ExampleMiddlewareConfiguratorExtensions
{
public static void UseExceptionLogger<T>(this IPipeConfigurator<T> configurator)
where T : class, PipeContext
{
configurator.AddPipeSpecification(new ExceptionLoggerSpecification<T>());
}
}

The pipe specification is a class that adds the filter to the pipeline. Additional logic can be included, such as configuring optional settings, etc. using a closure syntax similar to the other configuration classes in MassTransit.

public class ExceptionLoggerSpecification<T> :
IPipeSpecification<T>
where T : class, PipeContext
{
public IEnumerable<ValidationResult> Validate()
{
// no validation required, but you could validate here
return Enumerable.Empty<ValidationResult>();
}
public void Apply(IPipeBuilder<T> builder)
{
builder.AddFilter(new ExceptionLoggerFilter<T>());
}
}

Once the filter and its configuration classes are created, they can be used in a compatible pipe configuration method.

In many cases, the message type is used by a filter. To create an instance of a generic filter that includes the message type, use the configuration observer. The filter can then include the message type as a generic type parameter.

public class MessageFilter<T> :
IFilter<ConsumeContext<T>>
where T : class
{
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("messageFilter");
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
// do something
await next.Send(context);
}
}

The extension method for the above is shown below. Instead of adding a specification, the extension method creates an observer that adds the pipe specification when any message type is configured.

public static class MessageFilterConfigurationExtensions
{
public static void UseMessageFilter(this IConsumePipeConfigurator configurator)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var observer = new MessageFilterConfigurationObserver(configurator);
}
}

The observer then adds the pipe specification when a message type is configured.

public class MessageFilterConfigurationObserver :
ConfigurationObserver,
IMessageConfigurationObserver
{
public MessageFilterConfigurationObserver(IConsumePipeConfigurator receiveEndpointConfigurator)
: base(receiveEndpointConfigurator)
{
Connect(this);
}
public void MessageConfigured<TMessage>(IConsumePipeConfigurator configurator)
where TMessage : class
{
var specification = new MessageFilterPipeSpecification<TMessage>();
configurator.AddPipeSpecification(specification);
}
}

Then, in the specification, the appropriate filter can be created and added to the pipeline with the message type as a generic type parameter.

public class MessageFilterPipeSpecification<T> :
IPipeSpecification<ConsumeContext<T>>
where T : class
{
public void Apply(IPipeBuilder<ConsumeContext<T>> builder)
{
var filter = new MessageFilter<T>();
builder.AddFilter(filter);
}
public IEnumerable<ValidationResult> Validate()
{
yield break;
}
}

Filters are combined in sequence to form a pipe. A pipe configurator, along with a pipe builder, is used to configure and build a pipe.

public interface CustomContext :
PipeContext
{
string SomeThing { get; }
}
IPipe<CustomContext> pipe = Pipe.New<CustomContext>(x =>
{
x.UseFilter(new CustomFilter(...));
})

The IPipe interface is similar to IFilter, but a pipe hides the next parameter as it is part of the pipe’s structure. It is the pipe’s responsibility to pass the appropriate next parameter to the individual filters in the pipe.

public interface IPipe<T>
where T : class, PipeContext
{
Task Send(T context);
}

Send can be called, passing a context instance as shown.

public class BaseCustomContext :
BasePipeContext,
CustomContext
{
public string SomeThing { get; set; }
}
await pipe.Send(new BaseCustomContext { SomeThing = "Hello" });

The context type has a PipeContext constraint, which is another core atom in GreenPipes. A pipe context can include payloads, which are kept in a last-in, first-out (LIFO) collection. Payloads are identified by type, and can be retrieved, added, and updated using the PipeContext methods:

public interface PipeContext
{
/// <summary>
/// Used to cancel the execution of the context
/// </summary>
CancellationToken CancellationToken { get; }
/// <summary>
/// Checks if a payload is present in the context
/// </summary>
bool HasPayloadType(Type payloadType);
/// <summary>
/// Retrieves a payload from the pipe context
/// </summary>
/// <typeparam name="T">The payload type</typeparam>
/// <param name="payload">The payload</param>
/// <returns></returns>
bool TryGetPayload<T>(out T payload)
where T : class;
/// <summary>
/// Returns an existing payload or creates the payload using the factory method provided
/// </summary>
/// <typeparam name="T">The payload type</typeparam>
/// <param name="payloadFactory">The payload factory is the payload is not present</param>
/// <returns>The payload</returns>
T GetOrAddPayload<T>(PayloadFactory<T> payloadFactory)
where T : class;
/// <summary>
/// Either adds a new payload, or updates an existing payload
/// </summary>
/// <param name="addFactory">The payload factory called if the payload is not present</param>
/// <param name="updateFactory">The payload factory called if the payload already exists</param>
/// <typeparam name="T">The payload type</typeparam>
/// <returns></returns>
T AddOrUpdatePayload<T>(PayloadFactory<T> addFactory, UpdatePayloadFactory<T> updateFactory)
where T : class;

The payload methods are also used to check if a pipe context is another type of context. For example, to see if the SendContext is a RabbitMqSendContext, the TryGetPayload method should be used instead of trying to pattern match or cast the context parameter.

public async Task Send(SendContext context, IPipe<SendContext> next)
{
if(context.TryGetPayload<RabbitMqSendContext>(out var rabbitMqSendContext))
rabbitMqSendContext.Priority = 3;
return next.Send(context);
}

User-defined payloads are easily added so that the following filters can use them. The example below adds a payload to the pipe context.

public class SomePayload
{
public int Value { get; set; }
}
public async Task Send(SendContext context, IPipe<SendContext> next)
{
var payload = context.GetOrAddPayload(() => new SomePayload{Value = 27});
return next.Send(context);
}