Message Scheduler Configuration
Time is important, particularly in distributed applications. Sophisticated systems need to schedule things, and MassTransit has extensive scheduling support.
MassTransit supports two different methods of message scheduling:
- Scheduler-based, using either Quartz.NET or Hangfire, where the scheduler runs in a service and schedules messages using a queue.
- Transport-based, using the transports built-in message scheduling/delay capabilities. In some cases, such as RabbitMQ, this requires an additional plug-in to be installed and configured.
Recurring schedules are only supported by Quartz.NET or Hangfire.
Configure the transport-based scheduler
Section titled “Configure the transport-based scheduler”Using RabbitMQ
Section titled “Using RabbitMQ”services.AddMassTransit(x =>{ x.AddDelayedMessageScheduler();
x.UsingRabbitMq((context, cfg) => { cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context); });});Using Azure Service Bus
Section titled “Using Azure Service Bus”services.AddMassTransit(x =>{ x.AddServiceBusMessageScheduler();
x.UsingAzureServiceBus((context, cfg) => { cfg.UseServiceBusMessageScheduler();
cfg.ConfigureEndpoints(context); });});Using Amazon SQS
Section titled “Using Amazon SQS”services.AddMassTransit(x =>{ x.AddDelayedMessageScheduler();
x.UsingAmazonSqs((context, cfg) => { cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context); });});Using the SQL transport
Section titled “Using the SQL transport”services.AddMassTransit(x =>{ x.AddSqlMessageScheduler();
// Can use either SQL Server or PostgreSQL x.UsingPostgres((context, cfg) => { cfg.UseSqlMessageScheduler();
cfg.ConfigureEndpoints(context); });});Using ActiveMQ
Section titled “Using ActiveMQ”services.AddMassTransit(x =>{ x.AddDelayedMessageScheduler();
x.UsingActiveMq((context, cfg) => { cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context); });});Configure the scheduler-based scheduler
Section titled “Configure the scheduler-based scheduler”Using a scheduler endpoint
Section titled “Using a scheduler endpoint”When using a scheduler endpoint, the scheduler sends messages to the scheduler endpoint.
services.AddMassTransit(x =>{ Uri schedulerEndpoint = new Uri("queue:scheduler&bind=false");
x.AddMessageScheduler(schedulerEndpoint);
// the transport-specific configuration method should be called x.UsingRabbitMq((context, cfg) => { cfg.UseMessageScheduler(schedulerEndpoint);
cfg.ConfigureEndpoints(context); });});Using a publish endpoint
Section titled “Using a publish endpoint”When using a publish endpoint, the scheduler publishes messages to the scheduler endpoint. The transport’s message routing ensures that the message is delivered to the scheduler queue.
services.AddMassTransit(x =>{ x.AddPublishMessageScheduler();
// the transport-specific configuration method should be called x.UsingRabbitMq((context, cfg) => { cfg.UsePublishMessageScheduler();
cfg.ConfigureEndpoints(context); });});Using the message scheduler
Section titled “Using the message scheduler”From a consumer
Section titled “From a consumer”To schedule messages from a consumer, use any of the ConsumeContext extension methods, such as ScheduleSend, to schedule messages.
public class ScheduleNotificationConsumer : IConsumer<ScheduleNotification>{ public async Task Consume(ConsumeContext<ScheduleNotification> context) { Uri notificationService = new Uri("queue:notification-service&bind=false");
await context.ScheduleSend<SendNotification>(notificationService, context.Message.DeliveryTime, new() { EmailAddress = context.Message.EmailAddress, Body = context.Message.Body }); }}public record ScheduleNotification{ public DateTime DeliveryTime { get; init; } public string EmailAddress { get; init; } public string Body { get; init; }}public record SendNotification{ public string EmailAddress { get; init; } public string Body { get; init; }}The message scheduler, specified during bus configuration, will be used to schedule the message.
From a container scope
Section titled “From a container scope”To schedule messages from a bus, use IMessageScheduler from the container (or create a new one using the bus and the appropriate scheduler).
await using var scope = provider.CreateAsyncScope();
var scheduler = scope.ServiceProvider.GetRequiredService<IMessageScheduler>();
await scheduler.SchedulePublish<SendNotification>( DateTime.UtcNow + TimeSpan.FromSeconds(30), new() { EmailAddress = "frank@nul.org", Body = "Thank you for signing up for our awesome newsletter!" });public record SendNotification{ public string EmailAddress { get; init; } public string Body { get; init; }}Schedule recurring messages
Section titled “Schedule recurring messages”Using Quartz.NET or Hangfire, you can schedule a message to be sent or published periodically. This functionality requires some knowledge of cron expressions.
A recurring message should have a unique ScheduleId along with an optional ScheduleGroup.
public class PollExternalSystemSchedule : DefaultRecurringSchedule{ public PollExternalSystemSchedule() { ScheduleId = "PollExternalSystem"; CronExpression = "0 0/1 * 1/1 * ? *"; // this means every minute }}
public record PollExternalSystem;To schedule a recurring message, using the IRecurringMessageScheduler interface, which can be resolved from the container (IServiceProvider). This
interface is scoped, so it must be called from a valid container scope.
If using in a consumer, add IRecurringMessageScheduler as a constructor dependency.
var scheduler = scope.ServiceProvider.GetService<IRecurringMessageScheduler>();
var message = await scheduler.ScheduleRecurringSend( InputQueueAddress, new PollExternalSystemSchedule(), new PollExternalSystem());When you stop your service or just have any other need to tell Quartz service to stop sending you
these recurring messages, you can use the return value of ScheduleRecurringSend to cancel the recurring schedule.
await scheduler.CancelScheduledRecurringMessage("PollExternalSystem", null);