Middleware, Pipes, and Filters
MassTransit is built using a network of pipes and filters to dispatch messages. A pipe is composed of a series of filters, each of which is a key atom and is described below.
Middleware components are configured using extension methods on any pipe configurator IPipeConfigurator<T>, and the extension methods all begin with Use to
separate them from other methods.
To understand how middleware filters are built, an understanding of filters and pipes is needed.
Filters
Section titled “Filters”A filter is a middleware filter that performs a specific function and should adhere to the single responsibility principal – do one thing, one thing only (and hopefully do it well). By sticking to this approach, developers are able to opt in to each behavior without including unnecessary or unwanted functionality.
There are many filters included with MassTransit. In fact, all message flows in MassTransit are composed of pipes and filters.
Developers can create their own filters. To create a filter, create a class that implements IFilter<T>.
public interface IFilter<T> where T : class, PipeContext{ void Probe(ProbeContext context);
Task Send(T context, IPipe<T> next);}The Probe method is used to interrogate the filter about its behavior. This should describe the filter in a way that a developer would understand its role when looking at a network graph. For example, a transaction filter may add the following to the context.
public void Probe(ProbeContext context){ context.CreateFilterScope("transaction");}The Send method is used to send contexts through the pipe to each filter. Context is the actual context, and next is used to pass the context to the next filter in the pipe. Send returns a Task, and should always follow the .NET guidelines for asynchronous methods.
Create a middleware filter
Section titled “Create a middleware filter”When a middleware filter is added to or inserted into the message pipeline, it has absolute and complete control of the execution context and flow of the message. Pipelines execute asynchronously and expect that asynchronous operations will be performed.
public class ExceptionLoggerFilter<T> : IFilter<T> where T : class, PipeContext{ long _exceptionCount; long _successCount; long _attemptCount;
public void Probe(ProbeContext context) { var scope = context.CreateFilterScope("exceptionLogger"); scope.Add("attempted", _attemptCount); scope.Add("succeeded", _successCount); scope.Add("faulted", _exceptionCount); }
/// <summary> /// Send is called for each context that is sent through the pipeline /// </summary> /// <param name="context">The context sent through the pipeline</param> /// <param name="next">The next filter in the pipe, must be called or the pipe ends here</param> public async Task Send(T context, IPipe<T> next) { try { Interlocked.Increment(ref _attemptCount);
// here the next filter in the pipe is called await next.Send(context);
Interlocked.Increment(ref _successCount); } catch (Exception ex) { Interlocked.Increment(ref _exceptionCount);
await Console.Out.WriteLineAsync($"An exception occurred: {ex.Message}");
// propagate the exception up the call stack throw; } }}Create the configuration method
Section titled “Create the configuration method”Middleware filters are configured using extension methods to make them easy to discover. It is highly recommended that the extension method name begins with
Use to separate it from other methods and to follow the established naming convention set by MassTransit.
For the example middleware filter above, the configuration method might resemble the code below.
x.UsingInMemory((context,cfg) =>{ cfg.UseExceptionLogger(); // <-- our new method
cfg.ConfigureEndpoints(context);});The extension method creates the pipe specification for the middleware filter, which can be added to a compatible pipe. There are several different pipe configurators available, depending on the type of pipe being configured.
| PipeContext Type | PipeConfigurator Type |
|---|---|
| ReceiveContext | Used by the receive pipeline (prior to message deserialization) |
| ConsumeContext | Used by the consume pipeline |
| SendContext | Used by the send pipeline |
| PublishContext | Used by the publish pipeline |
public static class ExampleMiddlewareConfiguratorExtensions{ public static void UseExceptionLogger<T>(this IPipeConfigurator<T> configurator) where T : class, PipeContext { configurator.AddPipeSpecification(new ExceptionLoggerSpecification<T>()); }}Create the pipe specification
Section titled “Create the pipe specification”The pipe specification is a class that adds the filter to the pipeline. Additional logic can be included, such as configuring optional settings, etc. using a closure syntax similar to the other configuration classes in MassTransit.
public class ExceptionLoggerSpecification<T> : IPipeSpecification<T> where T : class, PipeContext{ public IEnumerable<ValidationResult> Validate() { // no validation required, but you could validate here return Enumerable.Empty<ValidationResult>(); }
public void Apply(IPipeBuilder<T> builder) { builder.AddFilter(new ExceptionLoggerFilter<T>()); }}Once the filter and its configuration classes are created, they can be used in a compatible pipe configuration method.
Create a message type filter
Section titled “Create a message type filter”In many cases, the message type is used by a filter. To create an instance of a generic filter that includes the message type, use the configuration observer. The filter can then include the message type as a generic type parameter.
public class MessageFilter<T> : IFilter<ConsumeContext<T>> where T : class{ public void Probe(ProbeContext context) { var scope = context.CreateFilterScope("messageFilter"); }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { // do something
await next.Send(context); }}The extension method for the above is shown below. Instead of adding a specification, the extension method creates an observer that adds the pipe specification when any message type is configured.
public static class MessageFilterConfigurationExtensions{ public static void UseMessageFilter(this IConsumePipeConfigurator configurator) { if (configurator == null) throw new ArgumentNullException(nameof(configurator));
var observer = new MessageFilterConfigurationObserver(configurator); }}Create the message type observer
Section titled “Create the message type observer”The observer then adds the pipe specification when a message type is configured.
public class MessageFilterConfigurationObserver : ConfigurationObserver, IMessageConfigurationObserver{ public MessageFilterConfigurationObserver(IConsumePipeConfigurator receiveEndpointConfigurator) : base(receiveEndpointConfigurator) { Connect(this); }
public void MessageConfigured<TMessage>(IConsumePipeConfigurator configurator) where TMessage : class { var specification = new MessageFilterPipeSpecification<TMessage>();
configurator.AddPipeSpecification(specification); }}Create the generic specification
Section titled “Create the generic specification”Then, in the specification, the appropriate filter can be created and added to the pipeline with the message type as a generic type parameter.
public class MessageFilterPipeSpecification<T> : IPipeSpecification<ConsumeContext<T>> where T : class{ public void Apply(IPipeBuilder<ConsumeContext<T>> builder) { var filter = new MessageFilter<T>();
builder.AddFilter(filter); }
public IEnumerable<ValidationResult> Validate() { yield break; }}Filters are combined in sequence to form a pipe. A pipe configurator, along with a pipe builder, is used to configure and build a pipe.
public interface CustomContext : PipeContext{ string SomeThing { get; }}
IPipe<CustomContext> pipe = Pipe.New<CustomContext>(x =>{ x.UseFilter(new CustomFilter(...));})The IPipe interface is similar to IFilter, but a pipe hides the next parameter as it is part of the pipe’s structure. It is the pipe’s responsibility to
pass the appropriate next parameter to the individual filters in the pipe.
public interface IPipe<T> where T : class, PipeContext{ Task Send(T context);}Send can be called, passing a context instance as shown.
public class BaseCustomContext : BasePipeContext, CustomContext{ public string SomeThing { get; set; }}
await pipe.Send(new BaseCustomContext { SomeThing = "Hello" });PipeContext
Section titled “PipeContext”The context type has a PipeContext constraint, which is another core atom in GreenPipes. A pipe context can include payloads, which are kept in a
last-in, first-out (LIFO) collection. Payloads are identified by type, and can be retrieved, added, and updated using the PipeContext methods:
public interface PipeContext{ /// <summary> /// Used to cancel the execution of the context /// </summary> CancellationToken CancellationToken { get; }
/// <summary> /// Checks if a payload is present in the context /// </summary> bool HasPayloadType(Type payloadType);
/// <summary> /// Retrieves a payload from the pipe context /// </summary> /// <typeparam name="T">The payload type</typeparam> /// <param name="payload">The payload</param> /// <returns></returns> bool TryGetPayload<T>(out T payload) where T : class;
/// <summary> /// Returns an existing payload or creates the payload using the factory method provided /// </summary> /// <typeparam name="T">The payload type</typeparam> /// <param name="payloadFactory">The payload factory is the payload is not present</param> /// <returns>The payload</returns> T GetOrAddPayload<T>(PayloadFactory<T> payloadFactory) where T : class;
/// <summary> /// Either adds a new payload, or updates an existing payload /// </summary> /// <param name="addFactory">The payload factory called if the payload is not present</param> /// <param name="updateFactory">The payload factory called if the payload already exists</param> /// <typeparam name="T">The payload type</typeparam> /// <returns></returns> T AddOrUpdatePayload<T>(PayloadFactory<T> addFactory, UpdatePayloadFactory<T> updateFactory) where T : class;The payload methods are also used to check if a pipe context is another type of context. For example, to see if the SendContext is a RabbitMqSendContext,
the TryGetPayload method should be used instead of trying to pattern match or cast the context parameter.
public async Task Send(SendContext context, IPipe<SendContext> next){ if(context.TryGetPayload<RabbitMqSendContext>(out var rabbitMqSendContext)) rabbitMqSendContext.Priority = 3;
return next.Send(context);}User-defined payloads are easily added so that the following filters can use them. The example below adds a payload to the pipe context.
public class SomePayload{ public int Value { get; set; }}
public async Task Send(SendContext context, IPipe<SendContext> next){ var payload = context.GetOrAddPayload(() => new SomePayload{Value = 27});
return next.Send(context);}