Topology
In MassTransit, topology is how message types are used to configure broker topics (exchanges in RabbitMQ) and queues. Topology is also used to access specific broker capabilities, such as RabbitMQ direct exchanges and routing keys, Azure Service Bus partition keys, and so on.
Topology is separate from the send, publish, and consume pipelines which are focused on middleware concerns. Topology allows conventions to be created and applied to message-specific configuration as messages are published and sent.
Configure message topology
Section titled “Configure message topology”Message types are a core concept in MassTransit, so making it easy to configure how topology uses those message types seemed obvious.
Configure a message entity name
Section titled “Configure a message entity name”MassTransit has built-in defaults for naming messaging entities (these are things like exchanges, topics, etc.). The defaults can be overridden as well. For
instance, to change the topic name used by a message, call the SetEntityName method.
services.AddMassTransit(x =>{ x.UsingRabbitMq((context, cfg) => { cfg.Message<OrderSubmitted>(x => { x.SetEntityName("omg-we-got-one"); }); });});Create a custom entity name formatter
Section titled “Create a custom entity name formatter”To change the default entity name formatter, create a class that implements IEntityNameFormatter and configure the message topology to use it.
class FancyNameFormatter : IEntityNameFormatter{ public FancyNameFormatter(IEntityNameFormatter original) { _original = original; }
public string FormatEntityName<T>() { if(T is OrderSubmitted) return "we-got-one";
return _original.FormatEntityName<T>(); }}services.AddMassTransit(x =>{ x.UsingRabbitMq((context, cfg) => { cfg.MessageTopology.SetEntityNameFormatter(new FancyNameFormatter()); });});It’s also possible to create a message-specific entity name formatter, by implementing IMessageEntityNameFormatter<T> and specifying it during configuration.
class FancyNameFormatter<T> : IMessageEntityNameFormatter<T>{ public string FormatEntityName() { // seriously, please don't do this, like, ever. return type(T).Name.ToString(); }}services.AddMassTransit(x =>{ x.UsingRabbitMq((context, cfg) => { cfg.Message<OrderSubmitted>(x => { x.SetEntityNameFormatter(new FancyNameFormatter<OrderSubmitted>()); }); });});Add attributes to message types
Section titled “Add attributes to message types”EntityName
Section titled “EntityName”EntityName is an optional attribute used to override the default entity name for a message type. If present, the entity name will be used when creating the topic or exchange for the message.
[EntityName("order-submitted")]public record LegacyOrderSubmittedEvent{}ConfigureConsumeTopology
Section titled “ConfigureConsumeTopology”ConfigureConsumeTopology is an optional attribute that may be specified on a message type to indicate whether the topic or exchange for the message type should be created and subscribed to the queue when consumed on a receive endpoint.
[ConfigureConsumeTopology(false)]public record DeleteRecord{}ExcludeFromTopology
Section titled “ExcludeFromTopology”ExcludeFromTopology is an optional attribute that may be specified on a message type to indicate whether the topic or exchange for the message type should be
created when publishing an implementing type or sub-type. In the example below, publishing the ReformatHardDrive command would not create the ICommand topic
or exchange on the message broker.
[ExcludeFromTopology]public interface ICommand{}
public record ReformatHardDrive : ICommand{}As an alternative to using the ExcludeFromTopology attribute, configure the publish topology during bus configuration.
x.UsingRabbitMq((context,cfg) =>{ cfg.Publish<ICommand>(p => p.Exclude = true);});ExcludeFromImplementedTypes
Section titled “ExcludeFromImplementedTypes”ExcludeFromImplementedTypes is an optional attribute that may be specified on a base message type to prevent scope filters being created for the message type.
[ExcludeFromImplementedTypes]public interface ICommand{}
public record ReformatHardDrive : ICommand{}Topology does not cover sending messages beyond delivering messages to a queue. MassTransit sends messages via a send endpoint, which is retrieved using the endpoint’s address only.
The exception to this is when the transport supports additional capabilities on send, such as the partitioning of messages. With RabbitMQ this would include
specifying the RoutingKey, and with Azure Service Bus this would include specifying the PartitionKey or the SessionId.
Conventions are used to apply topology to messages without requiring explicit configuration of every message type.
A basic example of a convention is the default CorrelationId convention, which is automatically applied to all sent messages. As message types are sent, the
convention is used to determine if the message contains a property that could be considered a CorrelationId, and uses that property to set the CorrelationId
header on the message envelope.
For example, the following message contains a property named CorrelationId, which is an obvious choice. Note that the CorrelatedBy<Guid> interface is not
part of the message contract.
public record OrderCreated{ public Guid CorrelationId { get; init; }}If there isn’t a property named CorrelationId, the convention also checks for CommandId and EventId and uses that property to set the header value (the
type must be a Guid, or a Guid?, no magic type conversion happening here).
If the message implements the CorrelatedBy<Guid> interface, that would be used before referencing any properties by name.
During bus creation, it is possible to explicitly configure a message type (or any of the message type’s inherited interfaces) to use a specific property for
the CorrelationId. In the example below, the OrderId property is specified as the CorrelationId.
public record OrderSubmitted{ public Guid OrderId { get; init; } public Guid CustomerId { get; init; }}
Bus.Factory.CreateUsingRabbitMq(..., cfg =>{ cfg.Send<OrderSubmitted>(x => { x.UseCorrelationId(context => context.Message.OrderId); });});The CorrelationId topology convention is implemented here, which can be used as an example of how to create your own conventions, or add additional CorrelationId detectors to the existing convention.
To learn about other topology conventions, check out:
Publish
Section titled “Publish”Topology is a key part of publishing messages, and is responsible for how the broker’s facilities are configured.
The publish topology defines many aspects of broker configuration, including:
- RabbitMQ Exchange names or Azure Service Bus Topic names
- Formatted, based upon the message type
- Explicit, based upon the configuration
- RabbitMQ Exchange Bindings or Azure Service Bus Topic Subscriptions
When Publish is called, the topology is also used to:
- Populate the
RoutingKeyof the message sent to the RabbitMQ exchange - Populate the
PartitionIdorSessionIdof the message sent to the Azure Service Bus topic
Deploy publish topology
Section titled “Deploy publish topology”MassTransit has a DeployPublishTopology option that can be specified when configuring the bus. When true, all configured publish topologies are deployed to
the
broker when the bus is started.
services.AddMassTransit(x =>{ x.UsingRabbitMq((context, cfg) => { cfg.Publish<OrderSubmitted>(); cfg.Publish<PackageShipped>();
cfg.DeployPublishTopology = true; });});In the example above, the publish topology is deployed when the bus is started. Exchanges will be created for the OrderSubmitted and PackageShipped message
types (and any inherited interfaces).
Consume
Section titled “Consume”Each receive endpoint has a consume topology, which is configured as consumers are added. Depending upon the transport, additional methods may be available to support exchange bindings, topic subscriptions, etc.
Consume topology uses the publish topology to ensure consistent naming of exchanges/topics for message types.
Access the bus topology
Section titled “Access the bus topology”Once the bus is created, access to topology is via the Topology property on IBus. The message, publish, and send topologies are accessible on this interface. It is also possible to retrieve a message’s publish address. The Topology property may support other interfaces, such as a transport-specific host topology. Pattern matching can be used to check the host topology type as shown below.
if (bus.Topology is IServiceBusBusTopology serviceBusTopology){
}Deploy the broker topology
Section titled “Deploy the broker topology”There are some scenarios, such as when using Azure Functions, where it may be necessary to deploy the topology to the broker separately, without actually
starting the service (and thereby consuming messages). To support this, MassTransit has a DeployTopologyOnly flag that can be specified when configuring the
bus. When used with the DeployAsync method, a simple console application can be created that creates all the exchanges/topics, queues, and
subscriptions/bindings.
To deploy the broker topology using a console application, see the example below.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) => { cfg.DeployTopologyOnly = true;
cfg.ConfigureEndpoints(context); });});IBusControl is used to deploy the topology:
var busControl = provider.GetRequiredService<IBusControl>();
try{ using var source = new CancellationTokenSource(TimeSpan.FromMinutes(2));
await busControl.DeployAsync(source.Token);
Console.WriteLine("Topology Deployed");}catch (Exception ex){ Console.WriteLine("Failed to deploy topology: {0}", ex);}