Skip to content

Topology

In MassTransit, topology is how message types are used to configure broker topics (exchanges in RabbitMQ) and queues. Topology is also used to access specific broker capabilities, such as RabbitMQ direct exchanges and routing keys, Azure Service Bus partition keys, and so on.

Topology is separate from the send, publish, and consume pipelines which are focused on middleware concerns. Topology allows conventions to be created and applied to message-specific configuration as messages are published and sent.

Message types are a core concept in MassTransit, so making it easy to configure how topology uses those message types seemed obvious.

MassTransit has built-in defaults for naming messaging entities (these are things like exchanges, topics, etc.). The defaults can be overridden as well. For instance, to change the topic name used by a message, call the SetEntityName method.

services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Message<OrderSubmitted>(x =>
{
x.SetEntityName("omg-we-got-one");
});
});
});

To change the default entity name formatter, create a class that implements IEntityNameFormatter and configure the message topology to use it.

class FancyNameFormatter :
IEntityNameFormatter
{
public FancyNameFormatter(IEntityNameFormatter original)
{
_original = original;
}
public string FormatEntityName<T>()
{
if(T is OrderSubmitted)
return "we-got-one";
return _original.FormatEntityName<T>();
}
}
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.MessageTopology.SetEntityNameFormatter(new FancyNameFormatter());
});
});

It’s also possible to create a message-specific entity name formatter, by implementing IMessageEntityNameFormatter<T> and specifying it during configuration.

class FancyNameFormatter<T> :
IMessageEntityNameFormatter<T>
{
public string FormatEntityName()
{
// seriously, please don't do this, like, ever.
return type(T).Name.ToString();
}
}
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Message<OrderSubmitted>(x =>
{
x.SetEntityNameFormatter(new FancyNameFormatter<OrderSubmitted>());
});
});
});

EntityName is an optional attribute used to override the default entity name for a message type. If present, the entity name will be used when creating the topic or exchange for the message.

[EntityName("order-submitted")]
public record LegacyOrderSubmittedEvent
{
}

ConfigureConsumeTopology is an optional attribute that may be specified on a message type to indicate whether the topic or exchange for the message type should be created and subscribed to the queue when consumed on a receive endpoint.

[ConfigureConsumeTopology(false)]
public record DeleteRecord
{
}

ExcludeFromTopology is an optional attribute that may be specified on a message type to indicate whether the topic or exchange for the message type should be created when publishing an implementing type or sub-type. In the example below, publishing the ReformatHardDrive command would not create the ICommand topic or exchange on the message broker.

[ExcludeFromTopology]
public interface ICommand
{
}
public record ReformatHardDrive :
ICommand
{
}

As an alternative to using the ExcludeFromTopology attribute, configure the publish topology during bus configuration.

x.UsingRabbitMq((context,cfg) =>
{
cfg.Publish<ICommand>(p => p.Exclude = true);
});

ExcludeFromImplementedTypes is an optional attribute that may be specified on a base message type to prevent scope filters being created for the message type.

[ExcludeFromImplementedTypes]
public interface ICommand
{
}
public record ReformatHardDrive :
ICommand
{
}

Topology does not cover sending messages beyond delivering messages to a queue. MassTransit sends messages via a send endpoint, which is retrieved using the endpoint’s address only.

The exception to this is when the transport supports additional capabilities on send, such as the partitioning of messages. With RabbitMQ this would include specifying the RoutingKey, and with Azure Service Bus this would include specifying the PartitionKey or the SessionId.

Conventions are used to apply topology to messages without requiring explicit configuration of every message type.

A basic example of a convention is the default CorrelationId convention, which is automatically applied to all sent messages. As message types are sent, the convention is used to determine if the message contains a property that could be considered a CorrelationId, and uses that property to set the CorrelationId header on the message envelope.

For example, the following message contains a property named CorrelationId, which is an obvious choice. Note that the CorrelatedBy<Guid> interface is not part of the message contract.

public record OrderCreated
{
public Guid CorrelationId { get; init; }
}

If there isn’t a property named CorrelationId, the convention also checks for CommandId and EventId and uses that property to set the header value (the type must be a Guid, or a Guid?, no magic type conversion happening here).

If the message implements the CorrelatedBy<Guid> interface, that would be used before referencing any properties by name.

During bus creation, it is possible to explicitly configure a message type (or any of the message type’s inherited interfaces) to use a specific property for the CorrelationId. In the example below, the OrderId property is specified as the CorrelationId.

public record OrderSubmitted
{
public Guid OrderId { get; init; }
public Guid CustomerId { get; init; }
}
Bus.Factory.CreateUsingRabbitMq(..., cfg =>
{
cfg.Send<OrderSubmitted>(x =>
{
x.UseCorrelationId(context => context.Message.OrderId);
});
});

The CorrelationId topology convention is implemented here, which can be used as an example of how to create your own conventions, or add additional CorrelationId detectors to the existing convention.

To learn about other topology conventions, check out:

Topology is a key part of publishing messages, and is responsible for how the broker’s facilities are configured.

The publish topology defines many aspects of broker configuration, including:

  • RabbitMQ Exchange names or Azure Service Bus Topic names
    • Formatted, based upon the message type
    • Explicit, based upon the configuration
  • RabbitMQ Exchange Bindings or Azure Service Bus Topic Subscriptions

When Publish is called, the topology is also used to:

  • Populate the RoutingKey of the message sent to the RabbitMQ exchange
  • Populate the PartitionId or SessionId of the message sent to the Azure Service Bus topic

MassTransit has a DeployPublishTopology option that can be specified when configuring the bus. When true, all configured publish topologies are deployed to the broker when the bus is started.

services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Publish<OrderSubmitted>();
cfg.Publish<PackageShipped>();
cfg.DeployPublishTopology = true;
});
});

In the example above, the publish topology is deployed when the bus is started. Exchanges will be created for the OrderSubmitted and PackageShipped message types (and any inherited interfaces).

Each receive endpoint has a consume topology, which is configured as consumers are added. Depending upon the transport, additional methods may be available to support exchange bindings, topic subscriptions, etc.

Consume topology uses the publish topology to ensure consistent naming of exchanges/topics for message types.

Once the bus is created, access to topology is via the Topology property on IBus. The message, publish, and send topologies are accessible on this interface. It is also possible to retrieve a message’s publish address. The Topology property may support other interfaces, such as a transport-specific host topology. Pattern matching can be used to check the host topology type as shown below.

if (bus.Topology is IServiceBusBusTopology serviceBusTopology)
{
}

There are some scenarios, such as when using Azure Functions, where it may be necessary to deploy the topology to the broker separately, without actually starting the service (and thereby consuming messages). To support this, MassTransit has a DeployTopologyOnly flag that can be specified when configuring the bus. When used with the DeployAsync method, a simple console application can be created that creates all the exchanges/topics, queues, and subscriptions/bindings.

To deploy the broker topology using a console application, see the example below.

services.AddMassTransit(x =>
{
x.AddConsumer<SubmitOrderConsumer>(typeof(SubmitOrderConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.DeployTopologyOnly = true;
cfg.ConfigureEndpoints(context);
});
});

IBusControl is used to deploy the topology:

var busControl = provider.GetRequiredService<IBusControl>();
try
{
using var source = new CancellationTokenSource(TimeSpan.FromMinutes(2));
await busControl.DeployAsync(source.Token);
Console.WriteLine("Topology Deployed");
}
catch (Exception ex)
{
Console.WriteLine("Failed to deploy topology: {0}", ex);
}