Skip to content

Mediator Configuration

MassTransit includes a mediator implementation, with full support for consumers, handlers, and sagas (including saga state machines). MassTransit’s Mediator runs in-process and in-memory, no transport is used. For maximum performance, messages are passed by reference instead of being serialized, and control flows directly from the Publish/Send call to the consumer. If a consumer throws an exception, the Publish/Send method throws and the exception should be handled by the caller.

Mediator is configured via the AddMediator extension method on IServiceCollection.

using MassTransit;
using MassTransit.Mediator;
services.AddMediator(cfg =>
{
cfg.AddConsumer<SubmitOrderConsumer>();
cfg.AddConsumer<OrderStatusConsumer>();
});

Once created, Mediator doesn’t need to be started or stopped and can be used immediately. IMediator and IScopedMediator combine several other interfaces into a single interface, including IPublishEndpoint, ISendEndpoint, and IClientFactory.

MassTransit dispatches the command to the consumer asynchronously. Once the Consume method completes, the Publish/Send method will complete. If the consumer throws an exception, it will be propagated back to the caller.

The main mediator interface IMediator is registered as a singleton. In most cases, IScopedMediator should be used instead. IScopedMediator is registered as scoped, and will flow the current container scope through to the consumer(s). The scoped mediator interface will share the scope (HttpContext or manually created) of the calling component with the entire pipeline.

By default, when using IMediator each consumer has its own scope. By using IScopedMediator, the scope is shared between all consumers.

Consumers can be connected and disconnected from mediator at run-time, allowing components and services to temporarily consume messages. Use the ConnectConsumer method to connect a consumer. The handle can be used to disconnect the consumer.

var handle = mediator.ConnectConsumer<SubmitOrderConsumer>();

TODO add example of using Request<T> in contract

To send a request using the mediator, a request client can be created from IMediator. The example below configures two consumers and then sends the SubmitOrder command, followed by the GetOrderStatus request.

Guid orderId = NewId.NextGuid();
await mediator.Send<SubmitOrder>(new { OrderId = orderId });
var client = mediator.CreateRequestClient<GetOrderStatus>();
var response = await client.GetResponse<OrderStatus>(new { OrderId = orderId });

The OrderStatusConsumer, along with the message contracts, is shown below.

public record GetOrderStatus
{
public Guid OrderId { get; init; }
}
public record OrderStatus
{
public Guid OrderId { get; init; }
public string Status { get; init; }
}
class OrderStatusConsumer :
IConsumer<GetOrderStatus>
{
public async Task Consume(ConsumeContext<GetOrderStatus> context)
{
await context.RespondAsync<OrderStatus>(new
{
context.Message.OrderId,
Status = "Pending"
});
}
}

Just like Send, the request is executed asynchronously. If an exception occurs, the exception will be propagated back to the caller. If the request times out, or if the request is canceled, the GetResponse method will throw an exception (either a RequestTimeoutException or an OperationCanceledException).

MassTransit’s Mediator is built using the same components used to create a bus, which means all the same middleware components can be configured. For instance, to configure the Mediator pipeline, such as adding a scoped filter, see the example below.

public class ValidateOrderStatusFilter<T> :
IFilter<SendContext<T>>
where T : class
{
public Task Send(SendContext<T> context, IPipe<SendContext<T>> next)
{
if (context.Message is GetOrderStatus getOrderStatus && getOrderStatus.OrderId == Guid.Empty)
throw new ArgumentException("The OrderId must not be empty");
return next.Send(context);
}
public void Probe(ProbeContext context)
{
}
}

The filter can be added to the Mediator pipeline using the ConfigureMediator method.

using MassTransit;
using MassTransit.Mediator;
services.AddMediator(cfg =>
{
cfg.ConfigureMediator((context, mcfg) =>
{
mcfg.UseSendFilter(typeof(ValidateOrderStatusFilter<>), context);
});
});