Bus Configuration
MassTransit is usable in most .NET application types. MassTransit is easily configured in ASP.NET Core or .NET Generic Host applications (using .NET 6 or later).
To use MassTransit, add the MassTransit package (from NuGet) and start with the AddMassTransit method shown below.
using MassTransit;
services.AddMassTransit(x =>{ // A Transport x.UsingRabbitMq((context, cfg) => { });});In this configuration, the following variables are used:
| Variable | Description |
|---|---|
| x | Configure the bus instance (not transport specific) and the underlying service collection |
| context | The configured bus context, also implements IServiceProvider |
| cfg | Configure the bus specific to the transport (each transport has its own interface type |
Adding MassTransit, as shown above, will configure the service collection with required components, including:
Scoped Container Registrations
Section titled “Scoped Container Registrations”| Interface | Notes |
|---|---|
| IScopedBus (New in v9) | Combines ISendEndpointProvider, IPublishEndpoint, and IScopedClientFactory into a single interface that can be used in any scoped context (with or without a consumer) |
| ISendEndpointProvider | Send messages from consumer dependencies, ASP.NET Controllers |
| IPublishEndpoint | Publish messages from consumer dependencies, ASP.NET Controllers |
| IScopedClientFactory | Used to create request clients (singleton, or within scoped consumers) |
| IRequestClient | Used to send requests |
| ConsumeContext | Available in any message scope, such as a consumer, saga, or activity, is also an ISendEndpointProvider and IPublishEndpoint |
Singleton Container Registrations
Section titled “Singleton Container Registrations”| Interface | Notes |
|---|---|
| IReceiveEndpointConnector | Used to connect additional receive endpoints to the bus after the bus is started |
| IBus | The bus instance, not typically used in modern applications |
| IBusControl | Used to start/stop the bus (not typically used) |
| IClientFactory | Used to create request clients (singleton, or within scoped consumers) |
| MassTransit Hosted Service | Automatically starts and stops the bus with the .NET host |
| MassTransit Health Checks | Health checks for each bus instance (including MultiBus) |
MassTransit uses Microsoft.Extensions.Logging for all log output. Therefore, MassTransit
will use any existing logging configuration. See Logging for more information.
Host Options
Section titled “Host Options”MassTransit adds a hosted service so that the generic host can start and stop the bus (or buses if multiple bus instances are configured). The host options can be configured via MassTransitHostOptions using the Options pattern as shown below.
services.AddOptions<MassTransitHostOptions>() .Configure(options => { });| Option | Description |
|---|---|
| WaitUntilStarted | By default, MassTransit connects to the broker asynchronously. When set to true, the MassTransit Hosted Service will block startup until the broker connection has been established. |
| StartTimeout | By default, MassTransit waits infinitely until the broker connection is established. If specified, MassTransit will give up after the timeout has expired. |
| StopTimeout | MassTransit waits infinitely for the bus to stop, including any active message consumers. If specified, MassTransit will force the bus to stop after the timeout has expired. |
| ConsumerStopTimeout | If specified, the ConsumeContext.CancellationToken will be canceled after the specified timeout when the bus is stopping. This allows long-running consumers to observe the cancellation token and react accordingly. Must be <= the StopTimeout |
Transport Options
Section titled “Transport Options”Each supported transport can be configured via a .Host() method or via the
.NET Options Pattern.
- Rabbit MQ: RabbitMqTransportOptions
- Azure Service Bus: AzureServiceBusTransportOptions
- Amazon SQS: AmazonSqsTransportOptions
Consumer Registration
Section titled “Consumer Registration”To consume messages, one or more consumers must be added and receive endpoints configured for the added consumers. MassTransit connects each receive endpoint to a queue on the message broker.
To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.ConfigureEndpoints(context); });});Any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.
MassTransit will automatically configure a receive endpoint for the SubmitOrderConsumer using the name returned by the configured endpoint name formatter.
When the bus is started, the Receive endpoint will be started and messages will be delivered from the queue by the transport to an instance of the consumer.
All consumer types can be added, including consumers, sagas, saga state machines, and routing slip activities. If a job consumer is added, additional configuration is required.
To exclude a consumer, saga, or routing slip activity from automatic configuration, use the ExcludeFromConfigureEndpoints extension method when adding the consumer:
x.AddConsumer<SubmitOrderConsumer>() .ExcludeFromConfigureEndpoints()Alternatively, the ExcludeFromConfigureEndpoints attribute may be specified on the consumer.
[ExcludeFromConfigureEndpoints]public class SubmitOrderConsumer : IConsumer<SubmitOrder>{}Configure Endpoints
Section titled “Configure Endpoints”As shown in the example above, using ConfigureEndpoints is the preferred approach to configure receive endpoints. By registering consumers, sagas, and routing
slip activities along with their optional definitions, MassTransit is able to configure receive endpoints for all registered consumer types. Receive endpoint
names are generated using an endpoint name formatter (unless otherwise specified in a definition), and each receive endpoint is
configured.
As receive endpoints are configured, one or more consumer types are configured on each receive endpoint. If multiple consumer types share the same endpoint name, those consumer types will be configured on the same receive endpoint. For each consumer type, its respective consumer, saga, or activity definition will be applied to the receive endpoint.
Configure Endpoints Callback
Section titled “Configure Endpoints Callback”To apply receive endpoint settings or configure middleware for all receive endpoints configured by ConfigureEndpoints, a callback can be added.
x.AddConfigureEndpointsCallback((name, cfg) =>{ cfg.UseMessageRetry(r => r.Immediate(2));});When ConfigureEndpoints is called, any registered callbacks will be called for every receive endpoint. Each callback will only be called once per
receive endpoint.
To conditionally apply transport-specific settings, the cfg parameter can be pattern-matched to the transport type as shown below.
x.AddConfigureEndpointsCallback((name, cfg) =>{ if (cfg is IRabbitMqReceiveEndpointConfigurator rmq) rmq.SetQuorumQueue(3);
cfg.UseMessageRetry(r => r.Immediate(2));});Endpoint Mapping Strategies
Section titled “Endpoint Mapping Strategies”When designing a distributed application, it is important to understand how consumers are mapped to queues.
One Consumer per Queue
Section titled “One Consumer per Queue”By default, MassTransit uses a simple approach that maps each consumer to a queue with the name based on the consumer’s type name. The ConfigureEndpoints
method configures an endpoint (which maps directly to a queue) for each consumer (a consumer can be a message consumer, saga, or routing slip activity). If a
routing slip activity supports compensation, separate execution and compensation endpoints are configured.
For most applications, this approach is just right. It ensures that every consumer can be configured independently, including retries, delivery, and the outbox. It also ensures that messages for a consumer do not get stuck behind another consumer’s messages sharing the same queue.
However, there are some situations where it is desirable to configure consumers differently.
Multiple Consumers per Queue
Section titled “Multiple Consumers per Queue”Configuring multiple consumers on a single queue, while supported and sensible in certain scenarios, is a choice that requires careful consideration. Sharing a single queue for multiple consumers ties those consumers to a single, shared resource (the queue). The queue may become a bottleneck, and it may be challenging to determine which consumer is responsible for each message type.
The recommendation is to configure multiple consumers on a single queue only when:
- The consumer’s business functions are closely related
- The consumers consume distinct message types
- The consumers have different dependencies (e.g., database access or validation)
An example where this makes the most sense is when consumer dependencies are different, but message order (while not always guaranteed) is important.
Consume Multiple Message Types
Section titled “Consume Multiple Message Types”In situations where it is preferable to consume multiple message types from a single queue, create a consumer that consumes multiple message types by adding
more IConsumer<T> interface implementations to the consumer class.
public class AddressConsumer : IConsumer<CreateAddress>, IConsumer<UpdateAddress>{}This is how saga state machines work. Events declared in the state machine are configured on a single queue (yes, this too is a configurable behavior).
All Consumers on a Single Queue
Section titled “All Consumers on a Single Queue”This is never a good idea and is highly discouraged. While it is supported, it’s unlikely to be operationally sustainable.
Endpoint Name Formatters
Section titled “Endpoint Name Formatters”ConfigureEndpoints uses an IEndpointNameFormatter to format the queue names for all supported consumer types. The default endpoint name formatter returns
PascalCase class names without the namespace. There are several built-in endpoint name formatters included. For the SubmitOrderConsumer, the receive
endpoint names would be formatted as shown below. Note that class suffixes such as Consumer, Saga, and Activity are trimmed from the endpoint name by
default.
| Format | Configuration | Name |
|---|---|---|
| Default | SetDefaultEndpointNameFormatter | SubmitOrder |
| Snake Case | SetSnakeCaseEndpointNameFormatter | submit_order |
| Kebab Case | SetKebabCaseEndpointNameFormatter | submit-order |
The endpoint name formatters can also be customized by constructing a new instance and configuring MassTransit to use it.
x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter(prefix: "Dev", includeNamespace: false));By specifying a prefix, the endpoint name would be dev-submit-order. This is useful when sharing a single broker with multiple developers (Amazon SQS is
account-wide, for instance).
Receive Endpoints
Section titled “Receive Endpoints”The previous examples use conventions to configure receive endpoints. Alternatively, receive endpoints can be explicitly configured.
When configuring endpoints manually, ConfigureEndpoints should be excluded or be called after any explicitly configured receive endpoints.
To explicitly configure endpoints, use the ConfigureConsumer or ConfigureConsumers method.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.ReceiveEndpoint("order-service", e => { e.ConfigureConsumer<SubmitOrderConsumer>(context); }); });});Receive endpoints have transport-independent settings that can be configured.
| Name | Description | Default |
|---|---|---|
| PrefetchCount | Number of unacknowledged messages delivered by the broker | max(CPU Count x 2,16) |
| ConcurrentMessageLimit | Number of concurrent messages delivered to consumers | (none, uses PrefetchCount) |
| ConfigureConsumeTopology | Create exchanges/topics on the broker and bind them to the receive endpoint | true |
| ConfigureMessageTopology | Create exchanges/topics on the broker and bind them to the receive endpoint for a specific message type | true |
| PublishFaults | Publish Fault<T> events when consumers fault | true |
| DefaultContentType | The default content type for received messages | See serialization |
| SerializerContentType | The default content type for sending/publishing messages | See serialization |
The PrefetchCount, ConcurrentMessageLimit, and serialization settings can be specified at the bus level and will be applied to all receive endpoints.
In the following example, the PrefetchCount is set to 32 and the ConcurrentMessageLimit is set to 28.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.PrefetchCount = 32; // applies to all receive endpoints
cfg.ReceiveEndpoint("order-service", e => { e.ConcurrentMessageLimit = 28; // only applies to this endpoint e.ConfigureConsumer<SubmitOrderConsumer>(context); }); });});When using ConfigureConsumer with a consumer that has a definition, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.
Temporary Endpoints
Section titled “Temporary Endpoints”Some consumers only need to receive messages while connected, and any messages published while disconnected should be discarded. This can be achieved by using a TemporaryEndpointDefinition to configure the receive endpoint.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.UsingInMemory((context, cfg) => { cfg.ReceiveEndpoint(new TemporaryEndpointDefinition(), e => { e.ConfigureConsumer<SubmitOrderConsumer>(context); });
cfg.ConfigureEndpoints(context); });});Dynamic Endpoints
Section titled “Dynamic Endpoints”To dynamically configure a receive endpoint on an existing bus, resolve the IReceiveEndpointConnector interface (either via dependency injection or from an
IServiceProvider interface) and call ConnectReceiveEndpoint as shown.
var connector = provider.GetRequiredService<IReceiveEndpointConnector>();
var handle = connector.ConnectReceiveEndpoint("queue-name", (context, cfg) =>{ cfg.ConfigureConsumer<MyConsumer>(context);});
// optional, the handle can be used to wait for the receive endpoint to finish connectingawait handle.Ready;There is also an override for connecting the receive endpoint using an endpoint definition. For instance, to create a temporary receive endpoint (typically used for fan-out event consumers), pass the endpoint definition as shown.
var connector = provider.GetRequiredService<IReceiveEndpointConnector>();var endpointNameFormatter = provider.GetService<IEndpointNameFormatter>() ?? DefaultEndpointNameFormatter.Instance;
var handle = connector.ConnectReceiveEndpoint(new TemporaryEndpointDefinition(), endpointNameFormatter, (context, cfg) =>{ cfg.ConfigureConsumer<UpdateCacheEventConsumer>(context);});See the endpoint name formatters section above for details.
Endpoint Configuration
Section titled “Endpoint Configuration”To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint method can be added to the
consumer registration. This will create an endpoint definition for the consumer and register it in the container. This method is available on consumer and saga
registrations, with separate execute and compensate endpoint methods for activities.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>() .Endpoint(e => { // override the default endpoint name e.Name = "order-service-extreme";
// more options shown below });
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));});The configurable settings for an endpoint include:
| Property | Description |
|---|---|
| Name | The receive endpoint (queue) name |
| InstanceId | If specified, should be unique for each bus instance To enable fan-out (instead of load balancing) |
| Temporary | If true, the endpoint will be automatically removed after the bus has stopped. |
| PrefetchCount | Number of unacknowledged messages delivered by the broker |
| ConcurrentMessageLimit | Number of concurrent messages delivered to consumers |
| ConfigureConsumeTopology | Create exchanges/topics on the broker and bind them to the receive endpoint |
| AddConfigureEndpointCallback | (Added in v8.3.0) Adds a callback method that will be invoked when the receive endpoint is configured. |
Endpoint Naming
Section titled “Endpoint Naming”When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However,
it cannot override the EndpointName if it is specified in the constructor. The order of precedence for endpoint naming is explained below.
-
Specifying
EndpointName = "submit-order-extreme"in the constructor which cannot be overriddenx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){EndpointName = "submit-order-extreme";} -
Specifying
.Endpoint(x => x.Name = "submit-order-extreme")in the consumer registration, chained toAddConsumerx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>().Endpoint(x => x.Name = "submit-order-extreme");public SubmitOrderConsumerDefinition(){Endpoint(x => x.Name = "not used");} -
Specifying
Endpoint(x => x.Name = "submit-order-extreme")in the constructor, which creates an endpoint definitionx.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){Endpoint(x => x.Name = "submit-order-extreme");} -
Unspecified, the endpoint name formatter is used (in this case, the endpoint name is
SubmitOrderusing the default formatter)x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()public SubmitOrderConsumerDefinition(){}
Health Checks
Section titled “Health Checks”The AddMassTransit method adds an IHealthCheck to the service collection that you can use to monitor your health. The health check is added with the tags
ready and masstransit.
To configure health checks, map the ready and live endpoints in your ASP.NET application.
app.MapHealthChecks("/health/ready", new HealthCheckOptions(){ Predicate = (check) => check.Tags.Contains("ready"),});
app.MapHealthChecks("/health/live", new HealthCheckOptions());Example Output
{ "status": "Healthy", "totalDuration": "00:00:00.2134026", "entries": { "masstransit-bus": { "data": { "Endpoints": { "rabbitmq://localhost/dev-local/SubmitOrder": { "status": "Healthy", "description": "ready" } } }, "description": "Ready", "duration": "00:00:00.1853530", "status": "Healthy", "tags": [ "ready", "masstransit" ] } }}- When everything works correctly, MassTransit will report
Healthy. - If any problems occur on application startup, MassTransit will report
Unhealthy. This can cause an orchestrator to restart your application. - If any problems occur while the application is working (for example, application loses connection to broker), MassTransit will report
Degraded.
Health Check Options
Section titled “Health Check Options”Health Checks can be further configured using ConfigureHealthCheckOptions:
builder.Services.AddMassTransit(bus =>{ bus.ConfigureHealthCheckOptions(options => { options.Name = "masstransit"; options.MinimalFailureStatus = HealthStatus.Unhealthy; options.Tags.Add("health"); });
}| Setting | Description | Default value |
|---|---|---|
| Name | Set the health check name, overrides the default bus type name. | Bus name. |
| MinimalFailureStatus | The minimal HealthStatus that will be reported when the health check fails. | Unhealthy |
| Tags | A list of tags that can be used to filter sets of health checks. | ”ready”, “masstransit” |
By default, MassTransit reports all three statuses depending on the application state.
If MinimalFailureStatus is set to Healthy, MassTransit will log any issues, but the health check will always report Healthy.
If MinimalFailureStatus is set to Degraded, MassTransit will report Degraded if any issues occur, but never report Unhealthy.
Tags inside options will override default tags. You will need to add ready and masstransit tags manually if you want to keep them.