Create a middleware filter
MassTransit is built around a lightweight, composable middleware architecture based on pipes and filters. This model allows behaviors such as retries, redelivery, logging, serialization, and container scope management to be layered together in a predictable and extensible way.
For more details, refer to the middleware section of the documentation.
Create a basic filter
Section titled “Create a basic filter”When a middleware filter is added to or inserted into the message pipeline, it has absolute and complete control of the execution context and flow of the message. Pipelines execute asynchronously and expect that asynchronous operations will be performed.
public class ExceptionLoggerFilter<T> : IFilter<T> where T : class, PipeContext{ long _exceptionCount; long _successCount; long _attemptCount;
public void Probe(ProbeContext context) { var scope = context.CreateFilterScope("exceptionLogger"); scope.Add("attempted", _attemptCount); scope.Add("succeeded", _successCount); scope.Add("faulted", _exceptionCount); }
/// <summary> /// Send is called for each context that is sent through the pipeline /// </summary> /// <param name="context">The context sent through the pipeline</param> /// <param name="next">The next filter in the pipe, must be called or the pipe ends here</param> public async Task Send(T context, IPipe<T> next) { try { Interlocked.Increment(ref _attemptCount);
// here the next filter in the pipe is called await next.Send(context);
Interlocked.Increment(ref _successCount); } catch (Exception ex) { Interlocked.Increment(ref _exceptionCount);
await Console.Out.WriteLineAsync($"An exception occurred: {ex.Message}");
// propagate the exception up the call stack throw; } }}Create an extension method
Section titled “Create an extension method”Middleware filters are configured using extension methods to make them easy to discover. It is highly recommended that the extension method name begins with
Use to separate it from other methods and to follow the established naming convention set by MassTransit.
For the example middleware filter above, the configuration method might resemble the code below.
x.UsingInMemory((context,cfg) =>{ cfg.UseExceptionLogger(); // <-- our new method
cfg.ConfigureEndpoints(context);});The extension method creates the pipe specification for the middleware filter, which can be added to a compatible pipe. There are several different pipe configurators available, depending on the type of pipe being configured.
| PipeContext Type | PipeConfigurator Type |
|---|---|
| ReceiveContext | Used by the receive pipeline (prior to message deserialization) |
| ConsumeContext | Used by the consume pipeline |
| SendContext | Used by the send pipeline |
| PublishContext | Used by the publish pipeline |
public static class ExampleMiddlewareConfiguratorExtensions{ public static void UseExceptionLogger<T>(this IPipeConfigurator<T> configurator) where T : class, PipeContext { configurator.AddPipeSpecification(new ExceptionLoggerSpecification<T>()); }}Create a pipe specification
Section titled “Create a pipe specification”The pipe specification is a class that adds the filter to the pipeline. Additional logic can be included, such as configuring optional settings, etc. using a closure syntax similar to the other configuration classes in MassTransit.
public class ExceptionLoggerSpecification<T> : IPipeSpecification<T> where T : class, PipeContext{ public IEnumerable<ValidationResult> Validate() { // no validation required, but you could validate here return []; }
public void Apply(IPipeBuilder<T> builder) { builder.AddFilter(new ExceptionLoggerFilter<T>()); }}Once the filter and its configuration classes are created, they can be used in a compatible pipe configuration method.
Create a message type filter
Section titled “Create a message type filter”In many cases, the message type is used by a filter. To create an instance of a generic filter that includes the message type, use the configuration observer. The filter can then include the message type as a generic type parameter.
public class MessageFilter<T> : IFilter<ConsumeContext<T>> where T : class{ public void Probe(ProbeContext context) { var scope = context.CreateFilterScope("messageFilter"); }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { // do something
await next.Send(context); }}The extension method for the above is shown below. Instead of adding a specification, the extension method creates an observer that adds the pipe specification when any message type is configured.
public static class MessageFilterConfigurationExtensions{ public static void UseMessageFilter(this IConsumePipeConfigurator configurator) { if (configurator == null) throw new ArgumentNullException(nameof(configurator));
var observer = new MessageFilterConfigurationObserver(configurator); }}Create a configuration observer
Section titled “Create a configuration observer”The observer then adds the pipe specification when a message type is configured.
public class MessageFilterConfigurationObserver : ConfigurationObserver, IMessageConfigurationObserver{ public MessageFilterConfigurationObserver(IConsumePipeConfigurator receiveEndpointConfigurator) : base(receiveEndpointConfigurator) { Connect(this); }
public void MessageConfigured<TMessage>(IConsumePipeConfigurator configurator) where TMessage : class { var specification = new MessageFilterPipeSpecification<TMessage>();
configurator.AddPipeSpecification(specification); }}Create a generic specification
Section titled “Create a generic specification”Then, in the specification, the appropriate filter can be created and added to the pipeline with the message type as a generic type parameter.
public class MessageFilterPipeSpecification<T> : IPipeSpecification<ConsumeContext<T>> where T : class{ public void Apply(IPipeBuilder<ConsumeContext<T>> builder) { var filter = new MessageFilter<T>();
builder.AddFilter(filter); }
public IEnumerable<ValidationResult> Validate() { yield break; }}Create a scoped filter
Section titled “Create a scoped filter”Many of the built-in filters are created and added to the pipeline during configuration. This approach is typically sufficient; however, there are scenarios where the filter needs access to other components at runtime.
Using a scoped filter allows a new filter instance to be resolved from the container for each message. If a current scope is not available, a new scope will be created using the root container.
Supported scoped types
Section titled “Supported scoped types”Scoped filters can be either an open generic class implementing one of the supported filter contexts or a concrete class implementing a filter context for one more valid message type(s).
For example, a scoped open generic consume filter would be defined as shown below.
public class MyConsumeFilter<TMessage> : IFilter<ConsumeContext<TMessage>>A concrete consume filter can also be defined.
public class MyMessageConsumeFilter : IFilter<ConsumeContext<MyMessage>>Supported pipe contexts
Section titled “Supported pipe contexts”Scope filters are added using one of the following methods, which are specific to the filter context type.
| Type | Usage |
|---|---|
ConsumeContext<T> | UseConsumeFilter(typeof(MyConsumeFilter<>), context) |
SendContext<T> | UseSendFilter(typeof(MyConsumeFilter<>), context) |
PublishContext<T> | UsePublishFilter(typeof(MyConsumeFilter<>), context) |
ExecuteContext<TArguments> | UseExecuteActivityFilter(typeof(MyConsumeFilter<>), context) |
CompensateContext<TLog> | UseCompensateActivityFilter(typeof(MyConsumeFilter<>), context) |
More information could be found inside the Middleware section.
UseConsumeFilter
Section titled “UseConsumeFilter”To create a ConsumeContext<T> filter and add it to the receive endpoint:
public class MyConsumeFilter<T> : IFilter<ConsumeContext<T>> where T : class{ public MyConsumeFilter(IMyDependency dependency) { }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { await next.Send(context); }
public void Probe(ProbeContext context) { }}
public class Startup{ public void ConfigureServices(IServiceCollection services) { services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x => { x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.ReceiveEndpoint("input-queue", e => { e.UseConsumeFilter(typeof(MyConsumeFilter<>), context);
e.ConfigureConsumer<MyConsumer>(); }); }); }); }}To configure a scoped filter for a specific message type (or types) and configure it on all receive endpoints:
public class MyMessageConsumeFilter : IFilter<ConsumeContext<MessageA>>, IFilter<ConsumeContext<MessageB>> where T : class{ public MyConsumeFilter(IMyDependency dependency) { }
public async Task Send(ConsumeContext<MessageA> context, IPipe<ConsumeContext<MessageA>> next) { await next.Send(context); }
public async Task Send(ConsumeContext<MessageB> context, IPipe<ConsumeContext<MessageB>> next) { await next.Send(context); }
public void Probe(ProbeContext context) { }}
public class Startup{ public void ConfigureServices(IServiceCollection services) { services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x => { x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.UseConsumeFilter<MyMessageConsumerFilter>(context);
cfg.ConfigureEndpoints(context); }); }); }}To use an open generic filter but only configure the filter for specific message types:
public class MyCommandFilter<T> : IFilter<ConsumeContext<T>> where T : class, ICommand{ public MyCommandFilter(IMyDependency dependency) { }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { await next.Send(context); }
public void Probe(ProbeContext context) { }}
public class Startup{ public void ConfigureServices(IServiceCollection services) { services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x => { x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) => { // Specify a conditional expression to only // add the filter for certain message types cfg.UseConsumeFilter(typeof(MyCommandFilter<>), context, x => x.Include(type => type.HasInterface<ICommand>()));
cfg.ConfigureEndpoints(context); }); }); }}UseSendFilter
Section titled “UseSendFilter”To create a SendContext<T> filter and add it to the send pipeline:
public class MySendFilter<T> : IFilter<SendContext<T>> where T : class{ public MySendFilter(IMyDependency dependency) { }
public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next) { await next.Send(context); }
public void Probe(ProbeContext context) { }}
public class Startup{ public void ConfigureServices(IServiceCollection services) { services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { cfg.UseSendFilter(typeof(MySendFilter<>), context); }); }); }}UsePublishFilter
Section titled “UsePublishFilter”public class MyPublishFilter<T> : IFilter<PublishContext<T>> where T : class{ public MyPublishFilter(IMyDependency dependency) { }
public async Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next) { await next.Send(context); }
public void Probe(ProbeContext context) { }}
public class Startup{ public void ConfigureServices(IServiceCollection services) { services.AddScoped<IMyDependency, MyDependency>();
services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { cfg.UsePublishFilter(typeof(MyPublishFilter<>), context); }); }); }}UseExecuteActivityFilter
Section titled “UseExecuteActivityFilter”Same as above, but for routing slip activities.
UseCompensateActivityFilter
Section titled “UseCompensateActivityFilter”Same as above, but for routing slip activities.
Composing consume and publish/send filters
Section titled “Composing consume and publish/send filters”A common use case with scoped filters is transferring data between the consumer. This data may be extracted from headers, or could include context or authorization information that needs to be passed from a consumed message context to sent or published messages. In these situations, there may be some special requirements to ensure everything works as expected.
The following example has both consume and send filters, and utilize a shared dependency to communicate data to outbound messages.
public class MyConsumeFilter<T> : IFilter<ConsumeContext<T>> where T : class{ public MyConsumeFilter(MyDependency dependency) { }
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next) { }
public void Probe(ProbeContext context) { }}
public class MySendFilter<T> : IFilter<SendContext<T>> where T : class{ public MySendFilter(MyDependency dependency) { }
public async Task Send(SendContext<T> context, IPipe<SendContext<T>> next) { }
public void Probe(ProbeContext context) { }}
public class MyDependency{ public string SomeValue { get; set; }}
public class Startup{ public void ConfigureServices(IServiceCollection services) { services.AddScoped<MyDependency>();
services.AddMassTransit(x => { x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.UseSendFilter(typeof(MySendFilter<>), context);
cfg.ReceiveEndpoint("input-queue", e => { e.UseConsumeFilter(typeof(MyConsumeFilter<>), context); e.ConfigureConsumer<MyConsumer>(context); }); }); }); }}Using the InMemoryOutbox with scoped filters
Section titled “Using the InMemoryOutbox with scoped filters”When using the InMemoryOutbox with scoped publish or send filters, UseMessageScope must be configured before the
InMemoryOutbox. If UseMessageRetry is used, it must come before UseMessageScope.
Because the InMemoryOutbox delays publishing and sending messages until after the consumer or saga completes, the
created container scope will have been disposed. The UseMessageScope filter creates the scope before the
InMemoryOutbox, which is then used by the consumer or saga and any scoped filters (consume, publish, or send).
The updated receive endpoint configuration using the InMemoryOutbox is shown below.
cfg.ReceiveEndpoint("input-queue", e =>{ e.UseMessageRetry(r => r.Intervals(100, 500, 1000, 2000)); e.UseMessageScope(context); e.UseInMemoryOutbox(context);
e.UseConsumeFilter(typeof(MyConsumeFilter<>), context); e.ConfigureConsumer<MyConsumer>(context);});