Producers
An application or service can produce messages using two different methods. A message can be sent or a message can be published. The behavior of each method is very different, but it’s easy to understand by looking at the type of messages involved with each particular method.
When a message is sent, it is delivered to a specific endpoint using a DestinationAddress. When a message is published, it is not sent to a specific endpoint, but is instead broadcasted to any consumers that have subscribed to the message type. For these two separate behaviors, we describe messages sent as commands and messages published as events.
See the messages concept page for more information about message names.
To send a message, the DestinationAddress is used to deliver the message to an endpoint — such as a queue. One of the Send method overloads on the
ISendEndpoint interface is called, which will then send the message to the transport. An ISendEndpoint is obtained from one of the following objects:
-
The
ConsumeContextof the message being consumedThis ensures that the correlation headers, message headers, and trace information is propagated to the sent message.
-
An
ISendEndpointProviderinstanceThis may be passed as an argument, but is typically specified on the constructor of an object that is resolved using a dependency injection container.
-
The
IBusThe last resort, and should only be used for messages that are being sent by an initiator — a process that is initiating a business process.
Once the Send method has been called (only once or repeatedly to send a series of messages), the ISendEndpoint reference should fall out of scope.
For instance, an IBus instance is a send endpoint provider, but it should never be used by a consumer to obtain an ISendEndpoint. The ConsumeContext can
also provide send endpoints, and should be used since it is closer to the consumer.
Get Send Endpoint
Section titled “Get Send Endpoint”To get a send endpoint from a send endpoint provider, call the GetSendEndpoint method as shown below. The method is async, so be sure to await the
result.
public record SubmitOrder{ public string OrderId { get; init; }}
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider){ var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
await endpoint.Send(new SubmitOrder { OrderId = "123" });}There are many overloads for the Send method. Because MassTransit is built around filters and pipes, pipes are used to customize the message delivery behavior
of Send. There are also some useful overloads (via extension methods) to make sending easier and less noisy due to the pipe construction, etc.
Send with timeout
Section titled “Send with timeout”If there is a connectivity issue between the application and the broker, Send will retry until the connection is restored. The returned Task will wait until
the broker has acknowledged the message. All ISendPoint methods have an optional CancellationToken parameter that can be used to cancel the send operation.
To specify a timeout, you can use a CancellationTokenSource with a TimeSpan to create a CancellationToken.
var timeout = TimeSpan.FromSeconds(30);using var source = new CancellationTokenSource(timeout);
await endpoint.Send(new SubmitOrder { OrderId = "123" }, source.Token);Typically, send operations complete quickly, only taking a few milliseconds. If the timeout elapses and the token is canceled, the send operation will throw an
OperationCanceledException.
CancellationTokenSourceis a disposable type, so it is important to use ausingblock/statement to ensure the object is properly disposed.
Send a batch of messages
Section titled “Send a batch of messages”The SendBatch method can be used to send a batch of messages to the same destination. This is useful for sending a large number of messages to the same
endpoint. It’s nicer than calling Send repeatedly in a loop awaiting each send operation, and uses Task.WhenAll to wait for all send operations to complete.
public class SubmitOrder{ public Guid OrderId { get; set; } public string CustomerNumber { get; set; } public decimal Amount { get; set; }}
public async Task SendOrders(ISendEndpointProvider sendEndpointProvider, IEnumerable<SubmitOrder> orders){ var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri("queue:submit-order?bind=false"));
await endpoint.SendBatch(orders);}When using SendBatch, a few key points:
- All messages go to the same endpoint
- Each message ends up as its own message on the broker
- The transport may batch the messages together into a single broker API call
- Send middleware (outbox, scoped filters, etc.) is still applied to each message individually
- Returns a single
Taskthat completes when all messages have been sent
You can also publish a batch of messages using PublishBatch.
Endpoint Address
Section titled “Endpoint Address”An endpoint address is a fully qualified URI that may include transport-specific details. For example, an endpoint on a local RabbitMQ server would be:
rabbitmq://localhost/input-queueTransport-specific details may include query parameters, such as:
rabbitmq://localhost/input-queue?durable=falseThis would configure the queue as non-durable, where messages would only be stored in memory and therefore would not survive a broker restart.
Short Addresses
Section titled “Short Addresses”Starting with MassTransit v6, short addresses are supported. For instance, to get a send endpoint for a queue on RabbitMQ, the caller would only have to specify:
GetSendEndpoint(new Uri("queue:input-queue"))This would return a send endpoint for the input-queue exchange, which would be bound to the input-queue queue. Both the exchange and the queue would be created if either did not exist. This short syntax eliminates the need to know the scheme, host, port, and virtual host of the broker; only the queue and/or exchange details are required.
Each transport has a specific set of supported short addresses.
Supported Address Schemes
Section titled “Supported Address Schemes”| Short Address | RabbitMQ | Azure Service Bus | ActiveMQ | Amazon SQS |
|---|---|---|---|---|
| queue:name | ||||
| topic:name | ||||
| exchange:name |
Publish
Section titled “Publish”Messages are published similarly to how messages are sent, but in this case, a single IPublishEndpoint is used. The same rules for endpoints apply, the
closest instance of the publish endpoint should be used. So the ConsumeContext for consumers, and IBus for applications that are published outside a
consumer context.
The same guidelines apply for publishing messages, the closest object should be used.
-
The
ConsumeContextof the message being consumedThis ensures that the correlation headers, message headers, and trace information is propagated to the published message.
-
An
IPublishEndpointinstanceThis may be passed as an argument, but is typically specified on the constructor of an object that is resolved using a dependency injection container.
-
The
IBusThe last resort, and should only be used for messages that are being published by an initiator — a process that is initiating a business process.
To publish a message, see the code below:
public record OrderSubmitted{ public string OrderId { get; init; } public DateTime OrderDate { get; init; }}
public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint){ await publishEndpoint.Publish<OrderSubmitted>(new() { OrderId = "27", OrderDate = DateTime.UtcNow, });}If you are planning to publish messages from within your consumers, this example would suit better:
public class SubmitOrderConsumer : IConsumer<SubmitOrder>{ private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter) => _orderSubmitter = submitter;
public async Task Consume(ConsumeContext<SubmitOrder> context) { await _orderSubmitter.Process(context.Message);
await context.Publish<OrderSubmitted>(new() { OrderId = context.Message.OrderId, OrderDate = DateTime.UtcNow }) }}Publish also supports cancellation, including timeouts. See the note above for details.
Publish a batch of messages
Section titled “Publish a batch of messages”The PublishBatch method can be used to publish a batch of messages. This is useful for publishing a large number of messages. It’s nicer than calling
Publish repeatedly in a loop awaiting each send operation, and uses Task.WhenAll to wait for all publish operations to complete.
public class OrderSubmitted{ public Guid OrderId { get; set; } public string CustomerNumber { get; set; } public decimal Amount { get; set; }}
public async Task PublishOrdersSubmitted(IPublishEndpoint publishEndpoint, IEnumerable<OrderSubmitted> orders){ await publishEndpoint.PublishBatch(orders);}When using PublishBatch, a few key points:
- Each message is delivered to its respective topic/exchange
- Each message ends up as its own message (or messages, with fan out) on the broker
- The transport may batch the messages together into a single broker API call
- Publish middleware (outbox, scoped filters, etc.) is still applied to each message individually
- Returns a single
Taskthat completes when all messages have been acknowledged
Use a message initializer
Section titled “Use a message initializer”Messages can be initialized by MassTransit using an anonymous object passed as an object to the publish or send methods. While originally designed to support the initialization of interface-based message types, anonymous objects can also be used to initialize message types defined using classes or records.
Object Properties
Section titled “Object Properties”Send, Publish, and most of the methods that behave in similar ways (scheduling, responding to requests, etc.) all support passing an object of values
which is used to set the properties on the specified interface. A simple example is shown below.
Consider this example message contract to submit an order.
public record SubmitOrder{ public Guid OrderId { get; init; } public DateTime OrderDate { get; init; } public string OrderNumber { get; init; } public decimal OrderAmount { get; init; }}To send this message to an endpoint:
await endpoint.Send<SubmitOrder>(new // <-- notice no (){ OrderId = NewId.NextGuid(), OrderDate = DateTime.UtcNow, OrderNumber = "18001", OrderAmount = 123.45m});The anonymous object properties are matched by name, and there is an extensive set of type conversions that may be used to match the types defined by the
interface. Most numeric, string, and date/time conversions are supported, as well as several advanced conversions (including variables, and asynchronous
Task<T> results).
Collections, including arrays, lists, and dictionaries, are broadly supported, including the conversion of list elements, as well as dictionary keys and values. For instance, a dictionary of (int,decimal) could be converted on the fly to (long, string) using the default format conversions.
Nested objects are also supported, for instance, if a property was of type Address and another anonymous object was created (or any type whose property names
match the names of the properties on the message contract), those properties would be set on the message contract.
Interface Messages
Section titled “Interface Messages”MassTransit supports interface message types, and there are convenience methods to initialize the interface without requiring the creation of a class implementing the interface.
public interface SubmitOrder{ public string OrderId { get; init; } public DateTime OrderDate { get; init; } public decimal OrderAmount { get; init; }}
public async Task SendOrder(ISendEndpoint endpoint){ await endpoint.Send<SubmitOrder>(new { OrderId = "27", OrderDate = DateTime.UtcNow, OrderAmount = 123.45m });}Headers
Section titled “Headers”Header values can be specified in the anonymous object using a double-underscore (pronounced ‘dunder’ apparently) property name. For instance, to set the
message time-to-live, specify a property with the duration. Remember, any value that can be converted to a TimeSpan works!
public record GetOrderStatus{ public Guid OrderId { get; init; }}
var response = await requestClient.GetResponse<OrderStatus>(new{ __TimeToLive = 15000, // 15 seconds, or in this case, 15000 milliseconds OrderId = orderId,});actually, that’s a bad example since the request client already sets the message expiration, but the concept is the same.
To add a custom header value, a special property name format is used. In the name, underscores are converted to dashes, and double underscores are converted to underscores. In the following example:
var response = await requestClient.GetResponse<OrderStatus>(new{ __Header_X_B3_TraceId = zipkinTraceId, __Header_X_B3_SpanId = zipkinSpanId, OrderId = orderId,});This would include set the headers used by open tracing (or Zipkin, as shown above) as part of the request message so the service could share in the span/trace.
In this case, X-B3-TraceId and X-B3-SpanId would be added to the message envelope, and depending upon the transport, copied to the transport headers as
well.
Variables
Section titled “Variables”MassTransit also supports variables, which are special types added to the anonymous object. Following the example above, the initialization could be changed to
use variables for the OrderId and OrderDate. Variables are consistent throughout the message creation, using the same variable multiple times returns the
value. For instance, the identifier created to set the OrderId would be the same used to set the OrderId in each item.
public record OrderItem{ public Guid OrderId { get; init; } public string ItemNumber { get; init; }}
public record SubmitOrder{ public Guid OrderId { get; init; } public DateTime OrderDate { get; init; } public string OrderNumber { get; init; } public decimal OrderAmount { get; init; } public OrderItem[] OrderItems { get; init; }}
await endpoint.Send<SubmitOrder>(new{ OrderId = InVar.Id, OrderDate = InVar.Timestamp, OrderNumber = "18001", OrderAmount = 123.45m, OrderItems = new[] { new { OrderId = InVar.Id, ItemNumber = "237" }, new { OrderId = InVar.Id, ItemNumber = "762" } }});Async Properties
Section titled “Async Properties”Message initializers are asynchronous, which makes it possible to do some pretty cool things, including waiting for Task input properties to complete and use the result to initialize the property. An example is shown below.
public record OrderUpdated{ public Guid CorrelationId { get; init; } public DateTime Timestamp { get; init; } public Guid OrderId { get; init; } public Customer Customer { get; init; }}
public async Task<CustomerInfo> LoadCustomer(Guid orderId){ // work happens up in here}
await context.Publish<OrderUpdated>(new{ InVar.CorrelationId, InVar.Timestamp, OrderId = context.Message.OrderId, Customer = LoadCustomer(context.Message.OrderId)});The property initializer will wait for the task result and then use it to initialize the property (converting all the types, etc. as it would any other object).
While it is of course possible to await the call to
LoadCustomer, properties are initialized in parallel, and thus, allowing the initializer to await the Task can result in better overall performance. Your mileage may vary, however.
Send Headers
Section titled “Send Headers”There are a variety of message headers available which are used for correlation and tracking of messages. It is also possible to override some default behaviors
of MassTransit when a fault occurs. For instance, a fault is normally published when a consumer throws an exception. If instead the application wants faults
delivered to a specific address, the FaultAddress can be specified via a header. How this is done is shown below.
public record SubmitOrder{ public string OrderId { get; init; } public DateTime OrderDate { get; init; } public decimal OrderAmount { get; init; }}
public async Task SendOrder(ISendEndpoint endpoint){ await endpoint.Send<SubmitOrder>(new { OrderId = "27", OrderDate = DateTime.UtcNow, OrderAmount = 123.45m }, context => context.FaultAddress = new Uri("rabbitmq://localhost/order_faults"));}Since a message initializer is being used, this can actually be simplified.
public async Task SendOrder(ISendEndpoint endpoint){ await endpoint.Send<SubmitOrder>(new { OrderId = "27", OrderDate = DateTime.UtcNow, OrderAmount = 123.45m,
// header names are prefixed with __, and types are converted as needed __FaultAddress = "rabbitmq://localhost/order_faults" });}[1]
[2] http://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq/
[3] http://codebetter.com/drusellers/2011/05/08/brain-dump-conventional-routing-in-rabbitmq/