Skip to content

Bus Configuration

MassTransit is usable in most .NET application types. MassTransit is easily configured in ASP.NET Core or .NET Generic Host applications (using .NET 6 or later).

To use MassTransit, add the MassTransit package (from NuGet) and start with the AddMassTransit method shown below.

using MassTransit;
services.AddMassTransit(x =>
{
// A Transport
x.UsingRabbitMq((context, cfg) =>
{
});
});

In this configuration, the following variables are used:

VariableDescription
xConfigure the bus instance (not transport specific) and the underlying service collection
contextThe configured bus context, also implements IServiceProvider
cfgConfigure the bus specific to the transport (each transport has its own interface type

Adding MassTransit, as shown above, will configure the service collection with required components, including:

InterfaceNotes
IScopedBus
(New in v9)
Combines ISendEndpointProvider, IPublishEndpoint, and IScopedClientFactory into a single interface that can be used in any scoped context (with or without a consumer)
ISendEndpointProviderSend messages from consumer dependencies, ASP.NET Controllers
IPublishEndpointPublish messages from consumer dependencies, ASP.NET Controllers
IScopedClientFactoryUsed to create request clients (singleton, or within scoped consumers)
IRequestClientUsed to send requests
ConsumeContextAvailable in any message scope, such as a consumer, saga, or activity, is also an ISendEndpointProvider and IPublishEndpoint
InterfaceNotes
IReceiveEndpointConnectorUsed to connect additional receive endpoints to the bus after the bus is started
IBusThe bus instance, not typically used in modern applications
IBusControlUsed to start/stop the bus (not typically used)
IClientFactoryUsed to create request clients (singleton, or within scoped consumers)
MassTransit Hosted ServiceAutomatically starts and stops the bus with the .NET host
MassTransit Health ChecksHealth checks for each bus instance (including MultiBus)

MassTransit uses Microsoft.Extensions.Logging for all log output. Therefore, MassTransit will use any existing logging configuration. See Logging for more information.

MassTransit adds a hosted service so that the generic host can start and stop the bus (or buses if multiple bus instances are configured). The host options can be configured via MassTransitHostOptions using the Options pattern as shown below.

services.AddOptions<MassTransitHostOptions>()
.Configure(options =>
{
});
OptionDescription
WaitUntilStartedBy default, MassTransit connects to the broker asynchronously. When set to true, the MassTransit Hosted Service will block startup until the broker connection has been established.
StartTimeoutBy default, MassTransit waits infinitely until the broker connection is established. If specified, MassTransit will give up after the timeout has expired.
StopTimeoutMassTransit waits infinitely for the bus to stop, including any active message consumers. If specified, MassTransit will force the bus to stop after the timeout has expired.
ConsumerStopTimeoutIf specified, the ConsumeContext.CancellationToken will be canceled after the specified timeout when the bus is stopping. This allows long-running consumers to observe the cancellation token and react accordingly. Must be <= the StopTimeout

Each supported transport can be configured via a .Host() method or via the .NET Options Pattern.

To consume messages, one or more consumers must be added and receive endpoints configured for the added consumers. MassTransit connects each receive endpoint to a queue on the message broker.

To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.

services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});

Any consumer dependencies should be added to the container separately. Consumers are registered as scoped, and dependencies should be registered as scoped when possible, unless they are singletons.

MassTransit will automatically configure a receive endpoint for the SubmitOrderConsumer using the name returned by the configured endpoint name formatter. When the bus is started, the Receive endpoint will be started and messages will be delivered from the queue by the transport to an instance of the consumer.

All consumer types can be added, including consumers, sagas, saga state machines, and routing slip activities. If a job consumer is added, additional configuration is required.

To exclude a consumer, saga, or routing slip activity from automatic configuration, use the ExcludeFromConfigureEndpoints extension method when adding the consumer:

x.AddConsumer<SubmitOrderConsumer>()
.ExcludeFromConfigureEndpoints()

Alternatively, the ExcludeFromConfigureEndpoints attribute may be specified on the consumer.

[ExcludeFromConfigureEndpoints]
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
}

As shown in the example above, using ConfigureEndpoints is the preferred approach to configure receive endpoints. By registering consumers, sagas, and routing slip activities along with their optional definitions, MassTransit is able to configure receive endpoints for all registered consumer types. Receive endpoint names are generated using an endpoint name formatter (unless otherwise specified in a definition), and each receive endpoint is configured.

As receive endpoints are configured, one or more consumer types are configured on each receive endpoint. If multiple consumer types share the same endpoint name, those consumer types will be configured on the same receive endpoint. For each consumer type, its respective consumer, saga, or activity definition will be applied to the receive endpoint.

To apply receive endpoint settings or configure middleware for all receive endpoints configured by ConfigureEndpoints, a callback can be added.

x.AddConfigureEndpointsCallback((name, cfg) =>
{
cfg.UseMessageRetry(r => r.Immediate(2));
});

When ConfigureEndpoints is called, any registered callbacks will be called for every receive endpoint. Each callback will only be called once per receive endpoint.

To conditionally apply transport-specific settings, the cfg parameter can be pattern-matched to the transport type as shown below.

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
rmq.SetQuorumQueue(3);
cfg.UseMessageRetry(r => r.Immediate(2));
});

When designing a distributed application, it is important to understand how consumers are mapped to queues.

By default, MassTransit uses a simple approach that maps each consumer to a queue with the name based on the consumer’s type name. The ConfigureEndpoints method configures an endpoint (which maps directly to a queue) for each consumer (a consumer can be a message consumer, saga, or routing slip activity). If a routing slip activity supports compensation, separate execution and compensation endpoints are configured.

For most applications, this approach is just right. It ensures that every consumer can be configured independently, including retries, delivery, and the outbox. It also ensures that messages for a consumer do not get stuck behind another consumer’s messages sharing the same queue.

However, there are some situations where it is desirable to configure consumers differently.

Configuring multiple consumers on a single queue, while supported and sensible in certain scenarios, is a choice that requires careful consideration. Sharing a single queue for multiple consumers ties those consumers to a single, shared resource (the queue). The queue may become a bottleneck, and it may be challenging to determine which consumer is responsible for each message type.

The recommendation is to configure multiple consumers on a single queue only when:

  • The consumer’s business functions are closely related
  • The consumers consume distinct message types
  • The consumers have different dependencies (e.g., database access or validation)

An example where this makes the most sense is when consumer dependencies are different, but message order (while not always guaranteed) is important.

In situations where it is preferable to consume multiple message types from a single queue, create a consumer that consumes multiple message types by adding more IConsumer<T> interface implementations to the consumer class.

public class AddressConsumer :
IConsumer<CreateAddress>,
IConsumer<UpdateAddress>
{
}

This is how saga state machines work. Events declared in the state machine are configured on a single queue (yes, this too is a configurable behavior).

This is never a good idea and is highly discouraged. While it is supported, it’s unlikely to be operationally sustainable.

ConfigureEndpoints uses an IEndpointNameFormatter to format the queue names for all supported consumer types. The default endpoint name formatter returns PascalCase class names without the namespace. There are several built-in endpoint name formatters included. For the SubmitOrderConsumer, the receive endpoint names would be formatted as shown below. Note that class suffixes such as Consumer, Saga, and Activity are trimmed from the endpoint name by default.

FormatConfigurationName
DefaultSetDefaultEndpointNameFormatterSubmitOrder
Snake CaseSetSnakeCaseEndpointNameFormattersubmit_order
Kebab CaseSetKebabCaseEndpointNameFormattersubmit-order

The endpoint name formatters can also be customized by constructing a new instance and configuring MassTransit to use it.

x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter(prefix: "Dev", includeNamespace: false));

By specifying a prefix, the endpoint name would be dev-submit-order. This is useful when sharing a single broker with multiple developers (Amazon SQS is account-wide, for instance).

The previous examples use conventions to configure receive endpoints. Alternatively, receive endpoints can be explicitly configured.

When configuring endpoints manually, ConfigureEndpoints should be excluded or be called after any explicitly configured receive endpoints.

To explicitly configure endpoints, use the ConfigureConsumer or ConfigureConsumers method.

services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
});
});

Receive endpoints have transport-independent settings that can be configured.

NameDescriptionDefault
PrefetchCountNumber of unacknowledged messages delivered by the brokermax(CPU Count x 2,16)
ConcurrentMessageLimitNumber of concurrent messages delivered to consumers(none, uses PrefetchCount)
ConfigureConsumeTopologyCreate exchanges/topics on the broker and bind them to the receive endpointtrue
ConfigureMessageTopologyCreate exchanges/topics on the broker and bind them to the receive endpoint for a specific message typetrue
PublishFaultsPublish Fault<T> events when consumers faulttrue
DefaultContentTypeThe default content type for received messagesSee serialization
SerializerContentTypeThe default content type for sending/publishing messagesSee serialization

The PrefetchCount, ConcurrentMessageLimit, and serialization settings can be specified at the bus level and will be applied to all receive endpoints.

In the following example, the PrefetchCount is set to 32 and the ConcurrentMessageLimit is set to 28.

services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.PrefetchCount = 32; // applies to all receive endpoints
cfg.ReceiveEndpoint("order-service", e =>
{
e.ConcurrentMessageLimit = 28; // only applies to this endpoint
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
});
});

When using ConfigureConsumer with a consumer that has a definition, the EndpointName, PrefetchCount, and Temporary properties of the consumer definition are not used.

Some consumers only need to receive messages while connected, and any messages published while disconnected should be discarded. This can be achieved by using a TemporaryEndpointDefinition to configure the receive endpoint.

services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>();
x.UsingInMemory((context, cfg) =>
{
cfg.ReceiveEndpoint(new TemporaryEndpointDefinition(), e =>
{
e.ConfigureConsumer<SubmitOrderConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});

To dynamically configure a receive endpoint on an existing bus, resolve the IReceiveEndpointConnector interface (either via dependency injection or from an IServiceProvider interface) and call ConnectReceiveEndpoint as shown.

var connector = provider.GetRequiredService<IReceiveEndpointConnector>();
var handle = connector.ConnectReceiveEndpoint("queue-name", (context, cfg) =>
{
cfg.ConfigureConsumer<MyConsumer>(context);
});
// optional, the handle can be used to wait for the receive endpoint to finish connecting
await handle.Ready;

There is also an override for connecting the receive endpoint using an endpoint definition. For instance, to create a temporary receive endpoint (typically used for fan-out event consumers), pass the endpoint definition as shown.

var connector = provider.GetRequiredService<IReceiveEndpointConnector>();
var endpointNameFormatter = provider.GetService<IEndpointNameFormatter>() ?? DefaultEndpointNameFormatter.Instance;
var handle = connector.ConnectReceiveEndpoint(new TemporaryEndpointDefinition(), endpointNameFormatter, (context, cfg) =>
{
cfg.ConfigureConsumer<UpdateCacheEventConsumer>(context);
});

See the endpoint name formatters section above for details.

To configure the endpoint for a consumer registration, or override the endpoint configuration in the definition, the Endpoint method can be added to the consumer registration. This will create an endpoint definition for the consumer and register it in the container. This method is available on consumer and saga registrations, with separate execute and compensate endpoint methods for activities.

services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
.Endpoint(e =>
{
// override the default endpoint name
e.Name = "order-service-extreme";
// more options shown below
});
x.UsingRabbitMq((context, cfg) => cfg.ConfigureEndpoints(context));
});

The configurable settings for an endpoint include:

PropertyDescription
NameThe receive endpoint (queue) name
InstanceIdIf specified, should be unique for each bus instance To enable fan-out (instead of load balancing)
TemporaryIf true, the endpoint will be automatically removed after the bus has stopped.
PrefetchCountNumber of unacknowledged messages delivered by the broker
ConcurrentMessageLimitNumber of concurrent messages delivered to consumers
ConfigureConsumeTopologyCreate exchanges/topics on the broker and bind them to the receive endpoint
AddConfigureEndpointCallback(Added in v8.3.0) Adds a callback method that will be invoked when the receive endpoint is configured.

When the endpoint is configured after the AddConsumer method, the configuration then overrides the endpoint configuration in the consumer definition. However, it cannot override the EndpointName if it is specified in the constructor. The order of precedence for endpoint naming is explained below.

  1. Specifying EndpointName = "submit-order-extreme" in the constructor which cannot be overridden

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    public SubmitOrderConsumerDefinition()
    {
    EndpointName = "submit-order-extreme";
    }
  2. Specifying .Endpoint(x => x.Name = "submit-order-extreme") in the consumer registration, chained to AddConsumer

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    .Endpoint(x => x.Name = "submit-order-extreme");
    public SubmitOrderConsumerDefinition()
    {
    Endpoint(x => x.Name = "not used");
    }
  3. Specifying Endpoint(x => x.Name = "submit-order-extreme") in the constructor, which creates an endpoint definition

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    public SubmitOrderConsumerDefinition()
    {
    Endpoint(x => x.Name = "submit-order-extreme");
    }
  4. Unspecified, the endpoint name formatter is used (in this case, the endpoint name is SubmitOrder using the default formatter)

    x.AddConsumer<SubmitOrderConsumer, SubmitOrderConsumerDefinition>()
    public SubmitOrderConsumerDefinition()
    {
    }

The AddMassTransit method adds an IHealthCheck to the service collection that you can use to monitor your health. The health check is added with the tags ready and masstransit.

To configure health checks, map the ready and live endpoints in your ASP.NET application.

app.MapHealthChecks("/health/ready", new HealthCheckOptions()
{
Predicate = (check) => check.Tags.Contains("ready"),
});
app.MapHealthChecks("/health/live", new HealthCheckOptions());

Example Output

{
"status": "Healthy",
"totalDuration": "00:00:00.2134026",
"entries": {
"masstransit-bus": {
"data": {
"Endpoints": {
"rabbitmq://localhost/dev-local/SubmitOrder": {
"status": "Healthy",
"description": "ready"
}
}
},
"description": "Ready",
"duration": "00:00:00.1853530",
"status": "Healthy",
"tags": [
"ready",
"masstransit"
]
}
}
}
  • When everything works correctly, MassTransit will report Healthy.
  • If any problems occur on application startup, MassTransit will report Unhealthy. This can cause an orchestrator to restart your application.
  • If any problems occur while the application is working (for example, application loses connection to broker), MassTransit will report Degraded.

Health Checks can be further configured using ConfigureHealthCheckOptions:

builder.Services.AddMassTransit(bus =>
{
bus.ConfigureHealthCheckOptions(options =>
{
options.Name = "masstransit";
options.MinimalFailureStatus = HealthStatus.Unhealthy;
options.Tags.Add("health");
});
}
SettingDescriptionDefault value
NameSet the health check name, overrides the default bus type name.Bus name.
MinimalFailureStatusThe minimal HealthStatus that will be reported when the health check fails.Unhealthy
TagsA list of tags that can be used to filter sets of health checks.”ready”, “masstransit”

By default, MassTransit reports all three statuses depending on the application state.

If MinimalFailureStatus is set to Healthy, MassTransit will log any issues, but the health check will always report Healthy. If MinimalFailureStatus is set to Degraded, MassTransit will report Degraded if any issues occur, but never report Unhealthy.

Tags inside options will override default tags. You will need to add ready and masstransit tags manually if you want to keep them.