Middleware, Pipes, and Filters
MassTransit is built around a lightweight, composable middleware architecture based on pipes and filters. This model allows behaviors such as retries, redelivery, logging, serialization, and container scope management to be layered together in a predictable and extensible way.
MassTransit’s middleware architecture is based on three core concepts:
- Pipe – the execution path
- Filter – a unit of behavior in the path
- PipeContext – the state flowing through the pipe
What is a pipe?
Section titled “What is a pipe?”A pipe is the execution path that a message (or operation) travels through. It is composed of a sequence of filters, each responsible for a specific concern.
When a message is consumed, it is not delivered directly to your consumer. Instead, it flows through a series of filters, including:
- Deserialization
- Retry (and redelivery)
- Outbox
- Consumer invocation
The pipe context is sent to each filter, and that filter then passes control to the next filter. This structure makes the system highly modular and allows new behavior to be inserted without changing existing components.
What is a filter?
Section titled “What is a filter?”A filter is a unit of middleware that operates on a specific type of context. Filters may do any of the following:
- Inspecting the pipe context
- Performing some business logic
- Modifying and/or intercepting the pipe context
- Passing control to the next filter
- Handling/observing exceptions
A filter should always call the next filter in the pipe. Except when the filter is intentionally preventing the pipe from continuing.
A simple filter is created by a class that implements IFilter<T>:
public class TimingFilter : IFilter<ConsumeContext>{ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next) { var start = DateTime.UtcNow;
await next.Send(context);
var elapsed = DateTime.UtcNow - start; Console.WriteLine($"Message processed in {elapsed}"); }
public void Probe(ProbeContext context) { context.CreateFilterScope("timing"); }}Key points:
- Filters are async
- Filters must call next.Send(context) to continue the pipeline
- Probe is used for introspection
Filters implement the IFilter<T> interface, where T is the specific PipeContext type they operate on.
public interface IFilter<T> where T : class, PipeContext{ void Probe(ProbeContext context);
Task Send(T context, IPipe<T> next);}Filters can wrap behavior around the rest of the pipeline, making them ideal for:
- Retries
- Timeouts
- Logging
- Metrics
- Security checks
- Transaction boundaries
What is PipeContext?
Section titled “What is PipeContext?”At the core of the pipe architecture is the PipeContext. The PipeContext is the shared state container that flows
through the pipeline. It includes:
- Cancellation token
- Payload container
The main context types used by a bus include:
| Context Type | Purpose |
|---|---|
ReceiveContext | Raw transport message and metadata |
ConsumeContext | Message being consumed by a consumer |
SendContext | Message being sent to an endpoint |
PublishContext | Message being published |
ExecuteContext | Routing slip execution |
CompensateContext | Routing slip compensation |
Each stage of the messaging pipeline uses a more specific context type. For example, the ExecuteContext includes
both a ConsumeContext and a ReceiveContext.
The BasePipeContext abstract class provides the basic functionality for all pipe contexts.
public interface CustomContext : PipeContext{ string Content { get; }}
public class CustomPipeContext : BasePipeContext, CustomContext{ public string Content { get; set; }}
await pipe.Send(new CustomPipeContext { Content = "Hello" });In addition to the CancellationToken, the PipeContext interface provides a number of methods for accessing and manipulating payloads.
public interface PipeContext{ CancellationToken CancellationToken { get; }
bool HasPayloadType(Type payloadType);
bool TryGetPayload<T>(out T payload) where T : class;
T GetOrAddPayload<T>(PayloadFactory<T> payloadFactory) where T : class;
T AddOrUpdatePayload<T>(PayloadFactory<T> addFactory, UpdatePayloadFactory<T> updateFactory) where T : class;}Context payloads
Section titled “Context payloads”One of the most powerful features of PipeContext is its payload system. Payloads are stored in a type-based collection,
with last-in, first-out (LIFO) semantics.
A payload is an object stored in a context that can be:
- Added by one filter
- Used by another filter later in the pipeline
- Updated by a third filter
- Inspected by the original filter on return
Payloads are used internally for:
- Retry state
- Transaction context
- Outbox coordination
- Transport connections
- Diagnostic activities
This allows filters to track state and cooperate without tightly coupling to each other.
For example:
- A transport filter adds a connection payload
- A retry filter adds retry metadata (
RetryContext) - An outbox filter adds transactional state (
OutboxContext)
Payloads are identified by type, and can be retrieved, added, and updated using the PipeContext methods.
if (context.TryGetPayload<MyPayload>(out var payload)){ // use payload}Or you can add a payload if it doesn’t exist:
context.GetOrAddPayload(() => new MyPayload());Or you can add or update an existing payload:
context.AddOrUpdatePayload(() => new MyPayload(), payload => payload.SomeValue = newValue);This pattern allows features to be added to the pipeline without requiring changes to the core context types.
Context type checking
Section titled “Context type checking”The payload methods can also be used to check if a pipe context is another type of context. For example, to see if
the SendContext is a RabbitMqSendContext, the TryGetPayload method should be used instead of trying to pattern
match or cast the context parameter.
public async Task Send(SendContext context, IPipe<SendContext> next){ if(context.TryGetPayload<RabbitMqSendContext>(out var rabbitMqSendContext)) rabbitMqSendContext.Priority = 3;
return next.Send(context);}User-defined payloads are easily added so that the following filters can use them. The example below adds a payload to the pipe context.
public class SomePayload{ public int Value { get; set; }}
public async Task Send(SendContext context, IPipe<SendContext> next){ var payload = context.GetOrAddPayload(() => new SomePayload{Value = 27});
return next.Send(context);}Core contexts
Section titled “Core contexts”MassTransit uses pipes throughout the system. Some examples:
ReceiveContext
Section titled “ReceiveContext”Handles messages received from the transport.
- Deserializing message
- Moving skipped messages to the dead letter queue
- Moving faulted messages to the error queue
ConsumeContext
Section titled “ConsumeContext”Handles message consumption.
- Faulted message redelivery
- Faulted message retry
- In-memory or transactional outbox
- Consumer execution
SendContext
Section titled “SendContext”Handles sending messages:
- Message serialization
- Adding message headers
PublishContext
Section titled “PublishContext”Handles publishing events:
- Message serialization
- Adding message headers
Custom Middleware
Section titled “Custom Middleware”Developers can add custom filters to the pipeline to:
- Implement cross-cutting concerns
- Auditing, logging, and tracing
- Message validation or enrichment
- Control over execution flow, before and after consumers
- Authorization or security checks
- Multi-tenant routing
Filters can be added at different levels:
- Bus-level
- Endpoint-level
- Consumer-level
- Saga-level
This allows precise control over where behavior is applied.
Scoped filters extend this model by integrating with dependency injection, allowing middleware to safely use scoped services and per-message state.
To create middleware filters, refer to the guide.