Outbox Configuration
In-Memory Outbox
Section titled “In-Memory Outbox”The in-memory outbox is used to store messages that are published or sent using ConsumeContext, IPublishEndpoint, and ISendEndpointProvider. See
the outbox section for details on how the in-memory outbox works.
The in-memory outbox is configured using the UseInMemoryOutbox method on the receive endpoint. It should always be used with message retry, message
redelivery, or a combination of both.
Configure for receive endpoints
Section titled “Configure for receive endpoints”To configure the outbox with retry:
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.AddConfigureEndpointsCallback((context,name,cfg) => { cfg.UseMessageRetry(r => r.Immediate(5)); cfg.UseInMemoryOutbox(context); });
x.UsingRabbitMq((context, cfg) => { cfg.ConfigureEndpoints(context); });});To configure the outbox with both redelivery and retry:
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.AddConfigureEndpointsCallback((context,name,cfg) => { cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseMessageRetry(r => r.Immediate(5)); cfg.UseInMemoryOutbox(context); });
x.UsingRabbitMq((context, cfg) => { cfg.ConfigureEndpoints(context); });});Configure for a specific consumer
Section titled “Configure for a specific consumer”If multiple consumers are configured on the same receive endpoint (which could potentially get you on the naughty list), you may want to apply the retry and/or outbox configuration to that consumer only. IN this scenario, the same configuration can be applied specifically to the consumer.
To configure the outbox for a specific consumer only, use the consumer configurator as shown.
services.AddMassTransit(x =>{ x.AddConsumer<SubmitOrderConsumer>();
x.UsingRabbitMq((context, cfg) => { cfg.ReceiveEndpoint("submit-order", e => { e.ConfigureConsumer<SubmitOrderConsumer>(context, c => { c.UseMessageRetry(r => r.Immediate(5)); c.UseInMemoryOutbox(context); });
e.ConfigureConsumer<SomeOtherConsumer>(context); }); });});Transactional Outbox
Section titled “Transactional Outbox”The Transaction Outbox is explained in the concepts section. This section covers how to configure the transactional outbox using any of the supported databases.
Bus Outbox Options
Section titled “Bus Outbox Options”The bus outbox has its own configuration settings, which are common across all supported databases.
x.AddEntityFrameworkOutbox<RegistrationDbContext>(o =>{ o.UseBusOutbox(options => { options.MessageDeliveryLimit = 100; options.MessageDeliveryTimeout = TimeSpan.FromSeconds(45); options.ConcurrentDeliveryLimit = 10; );});Options are the same for Entity Framework Core and MongoDB
| Option | Description |
|---|---|
| MessageDeliveryLimit | The number of messages to deliver per batch from the outbox to the broker |
| MessageDeliveryTimeout | Transport Send timeout when delivering messages to the transport |
| ConcurrentDeliveryLimit | The number of message to deliver concurrently (defaults to 1 to ensure message order) |
| DisableDeliveryService() | Disable the outbox message delivery service, removing the hosted service from the service collection |
Consumer Outbox Options
Section titled “Consumer Outbox Options”The consumer outbox options can be configured for each Receive Endpoint using the configuration callback as shown.
x.AddConfigureEndpointsCallback((context, name, cfg) =>{ cfg.UseEntityFrameworkOutbox<RegistrationDbContext>(context, options => { options.MessageDeliveryLimit = 100; options.MessageDeliveryTimeout = TimeSpan.FromSeconds(45); options.ConcurrentDeliveryLimit = 10; });});Options are the same for Entity Framework Core and MongoDB
| Option | Description |
|---|---|
| MessageDeliveryLimit | The number of messages to deliver per batch from the outbox to the broker |
| MessageDeliveryTimeout | Transport Send timeout when delivering messages to the transport |
| ConcurrentDeliveryLimit | The number of message to deliver concurrently (defaults to 1 to ensure message order) |
Entity Framework
Section titled “Entity Framework”The Transactional Outbox for Entity Framework Core uses three tables in the DbContext to store messages that are subsequently delivered to the message broker.
| Table | Description |
|---|---|
| InboxState | Tracks received messages by MessageId for each endpoint |
| OutboxMessage | Stores messages published or sent using ConsumeContext, IPublishEndpoint, and ISendEndpointProvider |
| OutboxState | Tracks delivery of outbox messages by the delivery service (similar to InboxState but for message sent outside of a consumer via the bus outbox) |
Configure Entity Framework Outbox
Section titled “Configure Entity Framework Outbox”The code below is based upon the sample application
The outbox components are included in the MassTransit.EntityFrameworkCore NuGet packages. The code below configures both the bus outbox and the consumer
outbox using the default settings. In this case, PostgreSQL is the database engine.
x.AddEntityFrameworkOutbox<RegistrationDbContext>(o =>{ // configure which database lock provider to use (Postgres, SqlServer, or MySql) o.UsePostgres();
// enable the bus outbox o.UseBusOutbox();});To configure the DbContext with the appropriate tables, use the extension methods shown below:
public class RegistrationDbContext : DbContext{ public RegistrationDbContext(DbContextOptions<RegistrationDbContext> options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder);
modelBuilder.AddInboxStateEntity(); modelBuilder.AddOutboxMessageEntity(); modelBuilder.AddOutboxStateEntity(); }}To configure the outbox on a receive endpoint, configure the receive endpoint as shown below. The configuration below uses a SagaDefinition to configure the
receive endpoint, which is added to MassTransit along with the saga state machine.
public class RegistrationStateDefinition : SagaDefinition<RegistrationState>{ protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<RegistrationState> consumerConfigurator, IRegistrationContext context) { endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 500, 1000, 1000, 1000, 1000, 1000));
endpointConfigurator.UseEntityFrameworkOutbox<RegistrationDbContext>(context); }}The definition is added with the saga state machine:
x.AddSagaStateMachine<RegistrationStateMachine, RegistrationState, RegistrationStateDefinition>() .EntityFrameworkRepository(r => { r.ExistingDbContext<RegistrationDbContext>(); r.UsePostgres(); });The Entity Framework outbox adds a hosted service which removes delivered InboxState entries after the DuplicateDetectionWindow has elapsed. The Bus Outbox includes an additional hosted service that delivers the outbox messages to the broker.
The outbox can also be added to all consumers using a configure endpoints callback:
x.AddConfigureEndpointsCallback((context, name, cfg) =>{ cfg.UseEntityFrameworkOutbox<RegistrationDbContext>(context);});Configuration Options
Section titled “Configuration Options”The available outbox settings are listed below.
| Setting | Description |
|---|---|
| DuplicateDetectionWindow | The amount of time a message remains in the inbox for duplicate detection (based on MessageId) |
| IsolationLevel | The transaction isolation level to use (Serializable by default) |
| LockStatementProvider | The lock statement provider, needed to execute pessimistic locks. Is set via UsePostgres, UseSqlServer (the default), or UseMySql |
| QueryDelay | The delay between queries when the outbox tables are idle (no delay is used when the outbox is active). |
| QueryMessageLimit | The number of outbox messages to query from the database in a single query |
| QueryTimeout | The database query timeout |
MongoDB Outbox
Section titled “MongoDB Outbox”Configure MongoDB Outbox
Section titled “Configure MongoDB Outbox”The code below is based upon the sample application
The outbox components are included in the MassTransit.MongoDb NuGet packages. The code below configures both the bus outbox and the consumer outbox using the
default settings.
x.AddMongoDbOutbox(o =>{ o.QueryDelay = TimeSpan.FromSeconds(1);
o.ClientFactory(provider => provider.GetRequiredService<IMongoClient>()); o.DatabaseFactory(provider => provider.GetRequiredService<IMongoDatabase>());
o.DuplicateDetectionWindow = TimeSpan.FromSeconds(30);
o.UseBusOutbox();});To configure the transactional outbox for a specific consumer, use a consumer definition:
public class ValidateRegistrationConsumerDefinition : ConsumerDefinition<ValidateRegistrationConsumer>{ protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<ValidateRegistrationConsumer> consumerConfigurator, IRegistrationContext context) { endpointConfigurator.UseMessageRetry(r => r.Intervals(10, 50, 100, 1000, 1000, 1000, 1000, 1000));
endpointConfigurator.UseMongoDbOutbox(context); }}To configure the transactional outbox for all configured receive endpoints, use a configure endpoints callback:
x.AddConfigureEndpointsCallback((context, name, cfg) =>{ cfg.UseMongoDbOutbox(context);});Configuration Options
Section titled “Configuration Options”The MongoDB Outbox has several options that can be configured.
| Setting | Description |
|---|---|
| DuplicateDetectionWindow | The amount of time a message remains in the inbox for duplicate detection (based on MessageId) |
| QueryDelay | The delay between queries when the outbox tables are idle (no delay is used when the outbox is active). |
| QueryMessageLimit | The number of outbox messages to query from the database in a single query |
| QueryTimeout | The database query timeout |
| Connection | The connection string used by the database client factory |
| DatabaseName | The database name for the outbox collection |
| CollectionNameFormatter | Specify the collection name formatter to override the default collection name format |
| DatabaseFactory | Specify a database factory (Connection and DatabaseName will be ignored) |
| ClientFactory | Specify a MongoDB client factory |
Outbox Delivery Service
Section titled “Outbox Delivery Service”The outbox delivery service is responsible for delivering messages to the broker that were published/sent from the bus outbox. The service is enabled by default, but can be disabled in scenarios where you may want to isolate message delivery to a specific application service.
x.AddEntityFrameworkOutbox<ReliableDbContext>(o =>{ o.UseBusOutbox(bo => bo.DisableDeliveryService());});Inbox Cleanup Service
Section titled “Inbox Cleanup Service”The inbox cleanup service is responsible for removing expired inbox entries from the database after the duplicate detection window period has elapsed. This service is enabled by default, but can be disabled in scenarios where you may want to isolate inbox cleanup to a specific application service. In practice, only a single instance of the cleanup service needs to be running.
x.AddEntityFrameworkOutbox<ReliableDbContext>(o =>{ o.DisableInboxCleanupService();});