Consumers
Consumer is a widely used noun for something that consumes something. In MassTransit, a consumer consumes one or more message types when configured on or connected to a receive endpoint. MassTransit includes many consumer types, including consumers, sagas, saga state machines, routing slip activities, handlers, and job consumers.
Message Consumers
Section titled “Message Consumers”A message consumer, the most common consumer type, is a class that consumes one or more message types. For each message type, the class implements
IConsumer<TMessage> and the Consume method.
public interface IConsumer<in TMessage> : IConsumer where TMessage : class{ Task Consume(ConsumeContext<TMessage> context);}Messages must be reference types, either a record, interface, or class. See the messages concept page for more details.
An example message consumer that consumes the SubmitOrder message type is shown below.
class SubmitOrderConsumer : IConsumer<SubmitOrder>{ public async Task Consume(ConsumeContext<SubmitOrder> context) { await context.Publish<OrderSubmitted>(new { context.Message.OrderId }); }}To add a consumer and automatically configure a receive endpoint for the consumer, call one of the AddConsumer methods and call ConfigureEndpoints as shown below.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); });});Automatic receive endpoint configuration by calling ConfigureEndpoints is highly recommended. Several optional configuration options can be used to change the default conventions and customize endpoints, covered in the configuration section.
Consumer Lifecycle
Section titled “Consumer Lifecycle”MassTransit embraces The Hollywood Principle, “Don’t call us, we’ll call you.” Control flows from MassTransit to the developer’s code in response to an event,
which in this case is the delivery of a message by the transport. This behavior is similar to ASP.NET, which creates controllers and invokes action methods on
receipt of an HTTP request. When a message is delivered from the transport on a receive endpoint and the message type is consumed by the consumer, MassTransit
creates a container scope, resolves a consumer instance, and invokes the Consume method passing a ConsumeContext<T> containing the message.
When a message is delivered from the broker to the transport, the message remains in the queue and is locked by the broker. This lock ensures that the message won’t be delivered to another consumer instance, even on different bus instances reading from the same queue (the competing consumer pattern).
The Consume method returns a Task that is awaited by MassTransit. MassTransit automatically renews the message lock until the control returns from the
consumer’s Consume method to prevent message redelivery. This works well for most cases where the processing time is short or when the broker connection is
reliable (of course, the network is reliable is the first fallacy of distributed systems). If the broker connection is lost, and the lock cannot be renewed, the
message will be redelivered to another consumer.
If you need to perform long-running processing, it’s recommended to use a job consumer.
Once the consumer completes, MassTransit acknowledges the message so that the broker can remove it from the queue.
If the Consume method throws an exception or is canceled (explicitly, or via an OperationCanceledException), the consumer instance is released and
the exception is thrown back up the message pipeline. If the exception does not trigger a retry, the default pipeline will move the message to an error queue.
Consumer Broker Topology
Section titled “Consumer Broker Topology”When a consumer is configured on a receive endpoint, the consumer message types (one for each IConsumer<T>) are used to configure the Consume topology.
The Consume topology is then used to configure the broker so that published messages are delivered to the queue.
Broker topology varies by transport. For example, when using RabbitMQ this configuration would create a SubmitOrder exchange for the message type and a
binding from that exchange to an exchange with the same name as the queue (the latter exchange then being bound directly to the queue).
If the queue is persistent (AutoDelete = false, the default), the topology remains in place even after the bus has stopped. When the bus is
recreated and started, the broker entities are configured again to ensure they are correct. Any messages waiting in the queue will be delivered to the Receive
endpoint once the bus is started.
Message Handlers
Section titled “Message Handlers”A message handler is a simple consumer of a single message type that is used to simulate the behavior of a consumer in a unit test. Message handlers are
configured using the AddHandler method.
services.AddMassTransit(x =>{ x.AddHandler<SubmitOrder>(async (SubmitOrder message, IPublishEndpoint publishEndpoint) => { await publishEndpoint.Publish<OrderSubmitted>(new { message.OrderId }); });
x.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); });});Message handlers can be configured using various overloads of the AddHandler method, allowing both synchronous and asynchronous handlers. Services can be
injected into the handler by specifying them as parameters to the method (for example, the IPublishEndpoint parameter in the example above).
Batch Consumers
Section titled “Batch Consumers”In some scenarios, high message volume can lead to consumer resource bottlenecks. If a system is publishing thousands of messages per second, and has a consumer writing the content of those messages to some type of storage, the storage system might not be optimized for thousands of individual writes per second. It may, however, perform better if writes are performed in batches. For example, receiving one hundred messages and then writing the content of those messages using a single storage operation may be significantly more efficient (and faster).
MassTransit supports receiving multiple messages and delivering those messages to the consumer in a batch.
To create a batch consumer, consume the Batch<T> interface, where T is the message type. That consumer can then be configured using the container
integration, with the batch options specified in a consumer definition. The example below consumes a batch of OrderAudit events, up to 100 at a time, and up
to 10 concurrent batches.
class BatchMessageConsumer : IConsumer<Batch<Message>>{ public async Task Consume(ConsumeContext<Batch<Message>> context) { for(int i = 0; i < context.Message.Length; i++) { ConsumeContext<Message> message = context.Message[i]; } }}Definitions
Section titled “Definitions”Consumer definitions are used to specify the behavior of consumers so that they can be automatically configured. Definitions may be explicitly added by AddConsumer or discovered automatically using any of the AddConsumers methods.
An example consumer definition is shown below. For a complete configuration reference, see the configuration section.
public class SubmitOrderConsumerDefinition : ConsumerDefinition<SubmitOrderConsumer>{ public SubmitOrderConsumerDefinition() { // override the default endpoint name, for whatever reason EndpointName = "ha-submit-order";
// limit the number of messages consumed concurrently // this applies to the consumer only, not the endpoint ConcurrentMessageLimit = 4; }
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<DiscoveryPingConsumer> consumerConfigurator) { endpointConfigurator.UseMessageRetry(r => r.Interval(5, 1000)); endpointConfigurator.UseInMemoryOutbox(); }}Skipped Messages
Section titled “Skipped Messages”Skipped messages are messages that are delivered to a Receive endpoint that does not have a consumer for the message type. This may be the result of any of the following:
- A consumer previously configured on the Receive endpoint was moved/removed
- A message type was removed from a consumer (an
IConsumer<T>on a consumer, or anEvent<T>on a saga state machine) - A message was mistakenly sent to the Receive endpoint
If this occurs, the unconsumed message is moved to a _skipped queue (prefixed by the original queue name). The original message content is retained, and additional headers are added to identify the host that skipped the message.