Skip to content

Middleware, Pipes, and Filters

MassTransit is built around a lightweight, composable middleware architecture based on pipes and filters. This model allows behaviors such as retries, redelivery, logging, serialization, and container scope management to be layered together in a predictable and extensible way.

MassTransit’s middleware architecture is based on three core concepts:

  • Pipe – the execution path
  • Filter – a unit of behavior in the path
  • PipeContext – the state flowing through the pipe

A pipe is the execution path that a message (or operation) travels through. It is composed of a sequence of filters, each responsible for a specific concern.

When a message is consumed, it is not delivered directly to your consumer. Instead, it flows through a series of filters, including:

  • Deserialization
  • Retry (and redelivery)
  • Outbox
  • Consumer invocation

The pipe context is sent to each filter, and that filter then passes control to the next filter. This structure makes the system highly modular and allows new behavior to be inserted without changing existing components.

A filter is a unit of middleware that operates on a specific type of context. Filters may do any of the following:

  • Inspecting the pipe context
  • Performing some business logic
  • Modifying and/or intercepting the pipe context
  • Passing control to the next filter
  • Handling/observing exceptions

A filter should always call the next filter in the pipe. Except when the filter is intentionally preventing the pipe from continuing.

A simple filter is created by a class that implements IFilter<T>:

public class TimingFilter :
IFilter<ConsumeContext>
{
public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
{
var start = DateTime.UtcNow;
await next.Send(context);
var elapsed = DateTime.UtcNow - start;
Console.WriteLine($"Message processed in {elapsed}");
}
public void Probe(ProbeContext context)
{
context.CreateFilterScope("timing");
}
}

Key points:

  • Filters are async
  • Filters must call next.Send(context) to continue the pipeline
  • Probe is used for introspection

Filters implement the IFilter<T> interface, where T is the specific PipeContext type they operate on.

public interface IFilter<T>
where T : class, PipeContext
{
void Probe(ProbeContext context);
Task Send(T context, IPipe<T> next);
}

Filters can wrap behavior around the rest of the pipeline, making them ideal for:

  • Retries
  • Timeouts
  • Logging
  • Metrics
  • Security checks
  • Transaction boundaries

At the core of the pipe architecture is the PipeContext. The PipeContext is the shared state container that flows through the pipeline. It includes:

  • Cancellation token
  • Payload container

The main context types used by a bus include:

Context TypePurpose
ReceiveContextRaw transport message and metadata
ConsumeContextMessage being consumed by a consumer
SendContextMessage being sent to an endpoint
PublishContextMessage being published
ExecuteContextRouting slip execution
CompensateContextRouting slip compensation

Each stage of the messaging pipeline uses a more specific context type. For example, the ExecuteContext includes both a ConsumeContext and a ReceiveContext.

The BasePipeContext abstract class provides the basic functionality for all pipe contexts.

public interface CustomContext :
PipeContext
{
string Content { get; }
}
public class CustomPipeContext :
BasePipeContext,
CustomContext
{
public string Content { get; set; }
}
await pipe.Send(new CustomPipeContext { Content = "Hello" });

In addition to the CancellationToken, the PipeContext interface provides a number of methods for accessing and manipulating payloads.

public interface PipeContext
{
CancellationToken CancellationToken { get; }
bool HasPayloadType(Type payloadType);
bool TryGetPayload<T>(out T payload)
where T : class;
T GetOrAddPayload<T>(PayloadFactory<T> payloadFactory)
where T : class;
T AddOrUpdatePayload<T>(PayloadFactory<T> addFactory, UpdatePayloadFactory<T> updateFactory)
where T : class;
}

One of the most powerful features of PipeContext is its payload system. Payloads are stored in a type-based collection, with last-in, first-out (LIFO) semantics.

A payload is an object stored in a context that can be:

  • Added by one filter
  • Used by another filter later in the pipeline
  • Updated by a third filter
  • Inspected by the original filter on return

Payloads are used internally for:

  • Retry state
  • Transaction context
  • Outbox coordination
  • Transport connections
  • Diagnostic activities

This allows filters to track state and cooperate without tightly coupling to each other.

For example:

  • A transport filter adds a connection payload
  • A retry filter adds retry metadata (RetryContext)
  • An outbox filter adds transactional state (OutboxContext)

Payloads are identified by type, and can be retrieved, added, and updated using the PipeContext methods.

if (context.TryGetPayload<MyPayload>(out var payload))
{
// use payload
}

Or you can add a payload if it doesn’t exist:

context.GetOrAddPayload(() => new MyPayload());

Or you can add or update an existing payload:

context.AddOrUpdatePayload(() => new MyPayload(),
payload => payload.SomeValue = newValue);

This pattern allows features to be added to the pipeline without requiring changes to the core context types.

The payload methods can also be used to check if a pipe context is another type of context. For example, to see if the SendContext is a RabbitMqSendContext, the TryGetPayload method should be used instead of trying to pattern match or cast the context parameter.

public async Task Send(SendContext context, IPipe<SendContext> next)
{
if(context.TryGetPayload<RabbitMqSendContext>(out var rabbitMqSendContext))
rabbitMqSendContext.Priority = 3;
return next.Send(context);
}

User-defined payloads are easily added so that the following filters can use them. The example below adds a payload to the pipe context.

public class SomePayload
{
public int Value { get; set; }
}
public async Task Send(SendContext context, IPipe<SendContext> next)
{
var payload = context.GetOrAddPayload(() => new SomePayload{Value = 27});
return next.Send(context);
}

MassTransit uses pipes throughout the system. Some examples:

Handles messages received from the transport.

  • Deserializing message
  • Moving skipped messages to the dead letter queue
  • Moving faulted messages to the error queue

Handles message consumption.

  • Faulted message redelivery
  • Faulted message retry
  • In-memory or transactional outbox
  • Consumer execution

Handles sending messages:

  • Message serialization
  • Adding message headers

Handles publishing events:

  • Message serialization
  • Adding message headers

Developers can add custom filters to the pipeline to:

  • Implement cross-cutting concerns
  • Auditing, logging, and tracing
  • Message validation or enrichment
  • Control over execution flow, before and after consumers
  • Authorization or security checks
  • Multi-tenant routing

Filters can be added at different levels:

  • Bus-level
  • Endpoint-level
  • Consumer-level
  • Saga-level

This allows precise control over where behavior is applied.

Scoped filters extend this model by integrating with dependency injection, allowing middleware to safely use scoped services and per-message state.

To create middleware filters, refer to the guide.