Skip to content

Consumer Configuration

To understand consumers and how to create one, refer to the Consumers section.

High-level concepts covered in this configuration section include:

ConceptDescription
ConsumerA class that consumes one or more messages types, one for each implementation of IConsumer<TMessage>
Batch ConsumerA class that consumes multiple messages in batches, by implementing IConsumer<Batch<TMessage>>
Job ConsumerA class that consumes a job message, specified by the IJobConsumer<TJob> interface
Consumer DefinitionA class, derived from ConsumerDefinition<TConsumer> that configures settings and the consumer’s receive endpoint
Receive EndpointReceives messages from a broker queue and delivers those messages to consumer types configured on the receive endpoint

Consumers can be added many ways allowing either a simple of fine-grained approach to registration. Consumers are added inside the AddMassTransit configuration, but before the transport.

using MassTransit;
services.AddMassTransit(x =>
{
x.AddConsumer<MyConsumer>();
x.Using[Transport]((context, cfg) =>
{
// transport, middleware, other configuration
cfg.ConfigureEndpoints(context);
});
});

Adds a single consumer, with all defaults

AddConsumer<MyConsumer>();
AddConsumer(typeof(MyConsumer));

Adds a consumer, with a consumer definition.

AddConsumer<MyConsumer, MyConsumerDefinition>();
AddConsumer(typeof(MyConsumer), typeof(MyConsumerDefinition));

Adds a consumer with a matching consumer definition and configures the consumer pipeline.

AddConsumer<MyConsumer, MyConsumerDefinition>(cfg =>
{
cfg.ConcurrentMessageLimit = 8;
});

Adds the specified consumers and consumer definitions. When consumer definitions are included they will be added with the matching consumer type.

void AddConsumers(params Type[] types);

Adds all consumers and consumer definitions in the specified an assembly or assemblies.

void AddConsumers(params Assembly[] assemblies);

Adds the consumers and any matching consumer definitions in the specified an assembly or assemblies that pass the filter. The filter is only called for consumer types.

void AddConsumers(Func<Type, bool> filter, params Assembly[] assemblies);

The consumer endpoint can be configured using the AddConsumer method by calling the Endpoint extension method. For more details, see the endpoint configuration section.

If you want your consumer to process multiple messages at a time, you can configure a Batch Consumer. This is a consumer that implements IConsumer<Batch<TMessage>>.

AddConsumer<MyBatchConsumer>(cfg =>
{
cfg.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(s: 1)
.SetTimeLimitStart(BatchTimeLimitStart.FromLast)
.GroupBy<MyMessage, string>(x => x.CustomerId)
.SetConcurrencyLimit(10));
});
PropertyTypeDefaultDescription
MessageLimitint10Max number of messages in a batch
ConcurrencyLimitint1number of concurrent batches
TimeLimitTimeSpan1 secmaximum time to wait before delivering a partial batch
TimeLimitStartTimeSpanFrom Firststarting point
GroupKeyProviderobject?nullthe property to group by

If you are configuring a job consumer, the job options should be configured. See the job consumers section for details.

By default, MassTransit can automatically configure receive endpoints for all consumers by calling ConfigureEndpoints. You can customize this behavior using a ConsumerDefinition (see below)[#create-a-consumer-definition] or by specifying the endpoint configuration as part of the AddConsumer method as shown above

using MassTransit;
services.AddMassTransit(x =>
{
// Step 1: Add Consumers Here
// Step 2: Select a Transport
x.Using[Transport]((context, cfg) => {
// Step 3: Configure the Transport
// Step 4: Configure Endpoints
// All consumers registered in step 1, will get
// default endpoints created.
cfg.ConfigureEndpoints(context);
});
});

To manually configure a consumer on a receive endpoint, use one of the following methods. You may want to do this for the following reasons.

  • Group Consumers onto a specific queue, vs the default of one queue per consumer
cfg.ReceiveEndpoint("manually-configured", e =>
{
// configure endpoint-specific settings first
e.SomeEndpointSetting = someValue;
// configure any required middleware components next
e.UseMessageRetry(r => r.Interval(5, 1000));
// configure the consumer last
e.ConfigureConsumer<MyConsumer>(context);
});
// configure any remaining consumers, sagas, etc.
cfg.ConfigureEndpoints(context);

Endpoint Configuration is Custom by Transport

ConfigureConsumer<T>(context);

Configures the consumer on the receive endpoint.

ConfigureConsumer<T>(context, consumer =>
{
// configure consumer-specific middleware
});

Configures the consumer on the receive endpoint and applies the additional consumer configuration to the consumer pipeline.

ConfigureConsumers(context);

Configures all consumers that haven’t been configured on the receive endpoint.

Consumer definitions are used to configure consumers and their endpoints. Definitions can be configured explicitly using the AddConsumer method and are also discovered automatically using any of the assembly/namespace scanning AddConsumers methods.

The constructor can be used to override the default endpoint name or to limit the number of messages consumed concurrently. The ConfigureConsumer method is used to configure both the Receive endpoint and the consumer pipeline.

public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
readonly ILogger<SubmitOrderConsumer> _logger;
public SubmitOrderConsumer(ILogger<SubmitOrderConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
_logger.LogInformation("Order Submitted: {OrderId}", context.Message.OrderId);
await context.Publish<OrderSubmitted>(new
{
context.Message.OrderId
});
}
}
public class SubmitOrderConsumerDefinition :
ConsumerDefinition<SubmitOrderConsumer>
{
public SubmitOrderConsumerDefinition()
{
// override the default endpoint name
EndpointName = "order-service";
// limit the number of messages consumed concurrently
// this applies to the consumer only, not the endpoint
ConcurrentMessageLimit = 4;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<SubmitOrderConsumer> consumerConfigurator, IRegistrationContext context)
{
// configure message retry with millisecond intervals
endpointConfigurator.UseMessageRetry(r => r.Intervals(100,200,500,800,1000));
// use the outbox to prevent duplicate events from being published
endpointConfigurator.UseInMemoryOutbox(context);
}
}

In this example, the consumer will be configured to use message retry, an in-memory outbox, and will be limited to four messages consumed concurrently.

ConceptDescription
EndpointDefinitionUsed internally to override an endpoint definition (commonly used for configuring multiple consumers on a shared endpoint)
EndpointNameThe name of the queue for this consumer
ConcurrentMessageLimitThe number of messages this consumer can process concurrently