Skip to content

Producers

An application or service can produce messages using two different methods. A message can be sent or a message can be published. The behavior of each method is very different, but it’s easy to understand by looking at the type of messages involved with each particular method.

When a message is sent, it is delivered to a specific endpoint using a DestinationAddress. When a message is published, it is not sent to a specific endpoint, but is instead broadcasted to any consumers that have subscribed to the message type. For these two separate behaviors, we describe messages sent as commands and messages published as events.

See the messages concept page for more information about message names.

To send a message, the DestinationAddress is used to deliver the message to an endpoint — such as a queue. One of the Send method overloads on the ISendEndpoint interface is called, which will then send the message to the transport. An ISendEndpoint is obtained from one of the following objects:

  1. The ConsumeContext of the message being consumed

    This ensures that the correlation headers, message headers, and trace information is propagated to the sent message.

  2. An ISendEndpointProvider instance

    This may be passed as an argument, but is typically specified on the constructor of an object that is resolved using a dependency injection container.

  3. The IBus

    The last resort, and should only be used for messages that are being sent by an initiator — a process that is initiating a business process.

Once the Send method has been called (only once or repeatedly to send a series of messages), the ISendEndpoint reference should fall out of scope.

For instance, an IBus instance is a send endpoint provider, but it should never be used by a consumer to obtain an ISendEndpoint. The ConsumeContext can also provide send endpoints, and should be used since it is closer to the consumer.

To get a send endpoint from a send endpoint provider, call the GetSendEndpoint method as shown below. The method is async, so be sure to await the result.

public record SubmitOrder
{
public string OrderId { get; init; }
}
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
await endpoint.Send(new SubmitOrder { OrderId = "123" });
}

There are many overloads for the Send method. Because MassTransit is built around filters and pipes, pipes are used to customize the message delivery behavior of Send. There are also some useful overloads (via extension methods) to make sending easier and less noisy due to the pipe construction, etc.

If there is a connectivity issue between the application and the broker, Send will retry until the connection is restored. The returned Task will wait until the broker has acknowledged the message. All ISendPoint methods have an optional CancellationToken parameter that can be used to cancel the send operation.

To specify a timeout, you can use a CancellationTokenSource with a TimeSpan to create a CancellationToken.

var timeout = TimeSpan.FromSeconds(30);
using var source = new CancellationTokenSource(timeout);
await endpoint.Send(new SubmitOrder { OrderId = "123" }, source.Token);

Typically, send operations complete quickly, only taking a few milliseconds. If the timeout elapses and the token is canceled, the send operation will throw an OperationCanceledException.

CancellationTokenSource is a disposable type, so it is important to use a using block/statement to ensure the object is properly disposed.

The SendBatch method can be used to send a batch of messages to the same destination. This is useful for sending a large number of messages to the same endpoint. It’s nicer than calling Send repeatedly in a loop awaiting each send operation, and uses Task.WhenAll to wait for all send operations to complete.

public class SubmitOrder
{
public Guid OrderId { get; set; }
public string CustomerNumber { get; set; }
public decimal Amount { get; set; }
}
public async Task SendOrders(ISendEndpointProvider sendEndpointProvider, IEnumerable<SubmitOrder> orders)
{
var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri("queue:submit-order?bind=false"));
await endpoint.SendBatch(orders);
}

When using SendBatch, a few key points:

  • All messages go to the same endpoint
  • Each message ends up as its own message on the broker
  • The transport may batch the messages together into a single broker API call
  • Send middleware (outbox, scoped filters, etc.) is still applied to each message individually
  • Returns a single Task that completes when all messages have been sent

You can also publish a batch of messages using PublishBatch.

An endpoint address is a fully qualified URI that may include transport-specific details. For example, an endpoint on a local RabbitMQ server would be:

rabbitmq://localhost/input-queue

Transport-specific details may include query parameters, such as:

rabbitmq://localhost/input-queue?durable=false

This would configure the queue as non-durable, where messages would only be stored in memory and therefore would not survive a broker restart.

Starting with MassTransit v6, short addresses are supported. For instance, to get a send endpoint for a queue on RabbitMQ, the caller would only have to specify:

GetSendEndpoint(new Uri("queue:input-queue"))

This would return a send endpoint for the input-queue exchange, which would be bound to the input-queue queue. Both the exchange and the queue would be created if either did not exist. This short syntax eliminates the need to know the scheme, host, port, and virtual host of the broker; only the queue and/or exchange details are required.

Each transport has a specific set of supported short addresses.

Short AddressRabbitMQAzure Service BusActiveMQAmazon SQS
queue:name
topic:name
exchange:name

Messages are published similarly to how messages are sent, but in this case, a single IPublishEndpoint is used. The same rules for endpoints apply, the closest instance of the publish endpoint should be used. So the ConsumeContext for consumers, and IBus for applications that are published outside a consumer context.

The same guidelines apply for publishing messages, the closest object should be used.

  1. The ConsumeContext of the message being consumed

    This ensures that the correlation headers, message headers, and trace information is propagated to the published message.

  2. An IPublishEndpoint instance

    This may be passed as an argument, but is typically specified on the constructor of an object that is resolved using a dependency injection container.

  3. The IBus

    The last resort, and should only be used for messages that are being published by an initiator — a process that is initiating a business process.

To publish a message, see the code below:

public record OrderSubmitted
{
public string OrderId { get; init; }
public DateTime OrderDate { get; init; }
}
public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
await publishEndpoint.Publish<OrderSubmitted>(new()
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
});
}

If you are planning to publish messages from within your consumers, this example would suit better:

public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Publish<OrderSubmitted>(new()
{
OrderId = context.Message.OrderId,
OrderDate = DateTime.UtcNow
})
}
}

Publish also supports cancellation, including timeouts. See the note above for details.

The PublishBatch method can be used to publish a batch of messages. This is useful for publishing a large number of messages. It’s nicer than calling Publish repeatedly in a loop awaiting each send operation, and uses Task.WhenAll to wait for all publish operations to complete.

public class OrderSubmitted
{
public Guid OrderId { get; set; }
public string CustomerNumber { get; set; }
public decimal Amount { get; set; }
}
public async Task PublishOrdersSubmitted(IPublishEndpoint publishEndpoint, IEnumerable<OrderSubmitted> orders)
{
await publishEndpoint.PublishBatch(orders);
}

When using PublishBatch, a few key points:

  • Each message is delivered to its respective topic/exchange
  • Each message ends up as its own message (or messages, with fan out) on the broker
  • The transport may batch the messages together into a single broker API call
  • Publish middleware (outbox, scoped filters, etc.) is still applied to each message individually
  • Returns a single Task that completes when all messages have been acknowledged

Messages can be initialized by MassTransit using an anonymous object passed as an object to the publish or send methods. While originally designed to support the initialization of interface-based message types, anonymous objects can also be used to initialize message types defined using classes or records.

Send, Publish, and most of the methods that behave in similar ways (scheduling, responding to requests, etc.) all support passing an object of values which is used to set the properties on the specified interface. A simple example is shown below.

Consider this example message contract to submit an order.

public record SubmitOrder
{
public Guid OrderId { get; init; }
public DateTime OrderDate { get; init; }
public string OrderNumber { get; init; }
public decimal OrderAmount { get; init; }
}

To send this message to an endpoint:

await endpoint.Send<SubmitOrder>(new // <-- notice no ()
{
OrderId = NewId.NextGuid(),
OrderDate = DateTime.UtcNow,
OrderNumber = "18001",
OrderAmount = 123.45m
});

The anonymous object properties are matched by name, and there is an extensive set of type conversions that may be used to match the types defined by the interface. Most numeric, string, and date/time conversions are supported, as well as several advanced conversions (including variables, and asynchronous Task<T> results).

Collections, including arrays, lists, and dictionaries, are broadly supported, including the conversion of list elements, as well as dictionary keys and values. For instance, a dictionary of (int,decimal) could be converted on the fly to (long, string) using the default format conversions.

Nested objects are also supported, for instance, if a property was of type Address and another anonymous object was created (or any type whose property names match the names of the properties on the message contract), those properties would be set on the message contract.

MassTransit supports interface message types, and there are convenience methods to initialize the interface without requiring the creation of a class implementing the interface.

public interface SubmitOrder
{
public string OrderId { get; init; }
public DateTime OrderDate { get; init; }
public decimal OrderAmount { get; init; }
}
public async Task SendOrder(ISendEndpoint endpoint)
{
await endpoint.Send<SubmitOrder>(new
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
OrderAmount = 123.45m
});
}

Header values can be specified in the anonymous object using a double-underscore (pronounced ‘dunder’ apparently) property name. For instance, to set the message time-to-live, specify a property with the duration. Remember, any value that can be converted to a TimeSpan works!

public record GetOrderStatus
{
public Guid OrderId { get; init; }
}
var response = await requestClient.GetResponse<OrderStatus>(new
{
__TimeToLive = 15000, // 15 seconds, or in this case, 15000 milliseconds
OrderId = orderId,
});

actually, that’s a bad example since the request client already sets the message expiration, but the concept is the same.

To add a custom header value, a special property name format is used. In the name, underscores are converted to dashes, and double underscores are converted to underscores. In the following example:

var response = await requestClient.GetResponse<OrderStatus>(new
{
__Header_X_B3_TraceId = zipkinTraceId,
__Header_X_B3_SpanId = zipkinSpanId,
OrderId = orderId,
});

This would include set the headers used by open tracing (or Zipkin, as shown above) as part of the request message so the service could share in the span/trace. In this case, X-B3-TraceId and X-B3-SpanId would be added to the message envelope, and depending upon the transport, copied to the transport headers as well.

MassTransit also supports variables, which are special types added to the anonymous object. Following the example above, the initialization could be changed to use variables for the OrderId and OrderDate. Variables are consistent throughout the message creation, using the same variable multiple times returns the value. For instance, the identifier created to set the OrderId would be the same used to set the OrderId in each item.

public record OrderItem
{
public Guid OrderId { get; init; }
public string ItemNumber { get; init; }
}
public record SubmitOrder
{
public Guid OrderId { get; init; }
public DateTime OrderDate { get; init; }
public string OrderNumber { get; init; }
public decimal OrderAmount { get; init; }
public OrderItem[] OrderItems { get; init; }
}
await endpoint.Send<SubmitOrder>(new
{
OrderId = InVar.Id,
OrderDate = InVar.Timestamp,
OrderNumber = "18001",
OrderAmount = 123.45m,
OrderItems = new[]
{
new { OrderId = InVar.Id, ItemNumber = "237" },
new { OrderId = InVar.Id, ItemNumber = "762" }
}
});

Message initializers are asynchronous, which makes it possible to do some pretty cool things, including waiting for Task input properties to complete and use the result to initialize the property. An example is shown below.

public record OrderUpdated
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
public Guid OrderId { get; init; }
public Customer Customer { get; init; }
}
public async Task<CustomerInfo> LoadCustomer(Guid orderId)
{
// work happens up in here
}
await context.Publish<OrderUpdated>(new
{
InVar.CorrelationId,
InVar.Timestamp,
OrderId = context.Message.OrderId,
Customer = LoadCustomer(context.Message.OrderId)
});

The property initializer will wait for the task result and then use it to initialize the property (converting all the types, etc. as it would any other object).

While it is of course possible to await the call to LoadCustomer, properties are initialized in parallel, and thus, allowing the initializer to await the Task can result in better overall performance. Your mileage may vary, however.

There are a variety of message headers available which are used for correlation and tracking of messages. It is also possible to override some default behaviors of MassTransit when a fault occurs. For instance, a fault is normally published when a consumer throws an exception. If instead the application wants faults delivered to a specific address, the FaultAddress can be specified via a header. How this is done is shown below.

public record SubmitOrder
{
public string OrderId { get; init; }
public DateTime OrderDate { get; init; }
public decimal OrderAmount { get; init; }
}
public async Task SendOrder(ISendEndpoint endpoint)
{
await endpoint.Send<SubmitOrder>(new
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
OrderAmount = 123.45m
}, context => context.FaultAddress = new Uri("rabbitmq://localhost/order_faults"));
}

Since a message initializer is being used, this can actually be simplified.

public async Task SendOrder(ISendEndpoint endpoint)
{
await endpoint.Send<SubmitOrder>(new
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
OrderAmount = 123.45m,
// header names are prefixed with __, and types are converted as needed
__FaultAddress = "rabbitmq://localhost/order_faults"
});
}

[1]

[2] http://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq/

[3] http://codebetter.com/drusellers/2011/05/08/brain-dump-conventional-routing-in-rabbitmq/