Mediator Configuration
MassTransit includes a mediator implementation, with full support for consumers, handlers, and sagas (including saga state machines). MassTransit’s Mediator runs in-process and in-memory, no transport is used. For maximum performance, messages are passed by reference instead of being serialized, and control flows directly from the Publish/Send call to the consumer. If a consumer throws an exception, the Publish/Send method throws and the exception should be handled by the caller.
Configure Mediator
Section titled “Configure Mediator”Mediator is configured via the AddMediator extension method on IServiceCollection.
using MassTransit;using MassTransit.Mediator;
services.AddMediator(cfg =>{ cfg.AddConsumer<SubmitOrderConsumer>(); cfg.AddConsumer<OrderStatusConsumer>();});Using Mediator
Section titled “Using Mediator”Once created, Mediator doesn’t need to be started or stopped and can be used immediately. IMediator and IScopedMediator combine several other interfaces
into a single interface, including IPublishEndpoint, ISendEndpoint, and IClientFactory.
MassTransit dispatches the command to the consumer asynchronously. Once the Consume method completes, the Publish/Send method will complete. If the consumer throws an exception, it will be propagated back to the caller.
Scoped Mediator
Section titled “Scoped Mediator”The main mediator interface IMediator is registered as a singleton. In most cases, IScopedMediator should be used instead. IScopedMediator is registered
as scoped, and will flow the current container scope through to the consumer(s). The scoped mediator interface will share the scope (HttpContext or manually
created) of the calling component with the entire pipeline.
By default, when using IMediator each consumer has its own scope. By using IScopedMediator, the scope is shared between all consumers.
Connect and disconnect consumers
Section titled “Connect and disconnect consumers”Consumers can be connected and disconnected from mediator at run-time, allowing components and services to temporarily consume messages. Use the ConnectConsumer method to connect a consumer. The handle can be used to disconnect the consumer.
var handle = mediator.ConnectConsumer<SubmitOrderConsumer>();Submit requests
Section titled “Submit requests”TODO add example of using
Request<T>in contract
To send a request using the mediator, a request client can be created from IMediator. The example below configures two consumers and then sends the SubmitOrder command, followed by the GetOrderStatus request.
Guid orderId = NewId.NextGuid();
await mediator.Send<SubmitOrder>(new { OrderId = orderId });
var client = mediator.CreateRequestClient<GetOrderStatus>();
var response = await client.GetResponse<OrderStatus>(new { OrderId = orderId });The OrderStatusConsumer, along with the message contracts, is shown below.
public record GetOrderStatus{ public Guid OrderId { get; init; }}
public record OrderStatus{ public Guid OrderId { get; init; } public string Status { get; init; }}
class OrderStatusConsumer : IConsumer<GetOrderStatus>{ public async Task Consume(ConsumeContext<GetOrderStatus> context) { await context.RespondAsync<OrderStatus>(new { context.Message.OrderId, Status = "Pending" }); }}Just like Send, the request is executed asynchronously. If an exception occurs, the exception will be propagated back to the caller. If the request times out, or if the request is canceled, the GetResponse method will throw an exception (either a RequestTimeoutException or an OperationCanceledException).
Create a middleware pipeline
Section titled “Create a middleware pipeline”MassTransit’s Mediator is built using the same components used to create a bus, which means all the same middleware components can be configured. For instance, to configure the Mediator pipeline, such as adding a scoped filter, see the example below.
public class ValidateOrderStatusFilter<T> : IFilter<SendContext<T>> where T : class{ public Task Send(SendContext<T> context, IPipe<SendContext<T>> next) { if (context.Message is GetOrderStatus getOrderStatus && getOrderStatus.OrderId == Guid.Empty) throw new ArgumentException("The OrderId must not be empty");
return next.Send(context); }
public void Probe(ProbeContext context) { }}The filter can be added to the Mediator pipeline using the ConfigureMediator method.
using MassTransit;using MassTransit.Mediator;
services.AddMediator(cfg =>{ cfg.ConfigureMediator((context, mcfg) => { mcfg.UseSendFilter(typeof(ValidateOrderStatusFilter<>), context); });});