Job Consumers
A Job Consumer is a specialized consumer type created to handle long-running tasks, usually referred to as jobs. Unlike regular consumers that hold a message lock on the broker and process messages quickly, job consumers are expected to handle messages that may take an extended time to complete. They provide additional functionality to manage long-running tasks, including retrying exceptions, limiting concurrency, and guaranteeing job completion even in the presence of service disruptions.
Job consumers meet the need for long-running tasks by allowing them to run asynchronously without blocking the broker. When you need to process a message that takes a long time, perhaps several minutes, hours, or even longer, a job consumer is the best option.
When should I use a job consumer?
Section titled “When should I use a job consumer?”When determining if you should use a job consumer, consider the following factor: How long does the task take to complete?
- If your task takes less than 5 minutes, a standard consumer is usually enough. Brokers like RabbitMQ, Azure, or SQS can hold a message lock for around 5 minutes, which is plenty of time for most tasks.
- If your task exceeds 5 minutes, that’s when you should consider using a job consumer. As tasks exceed the broker’s lock time, you risk message redelivery or failures due to lock timeouts. Job consumers are specifically designed to handle these scenarios without worrying about broker timeouts.
That said, it’s important to recognize that job consumer overhead is not insignificant. There is additional bookkeeping required to manage job state, track and retry failures, and limit concurrency. Confirm that the benefits outweigh the extra complexity before adopting job consumers for your long-running tasks.
What do I need to use job consumers?
Section titled “What do I need to use job consumers?”Job consumers were built entirely using MassTransit, and they use many of the framework’s features. Saga state machines are used to manage job state, the message scheduler is used to schedule jobs, temporary queues are used by job consumer instances, and a saga repository is used to store job state.
To use job consumers, you’ll need:
- A saga repository for the job saga state machines
- A message scheduler to schedule jobs
The job saga state machines only need to run as a single service per bus (broker). All job consumers using the same message broker should share a single saga repository. The job consumers themselves can be configured to run on multiple instances of the same bus.
Create a job consumer
Section titled “Create a job consumer”Job consumers implement the IJobConsumer<TJob> interface, where TJob is the job’s message type.
public interface IJobConsumer<in TJob> : IConsumer where TJob : class{ Task Run(JobContext<TJob> context);}Create a class that implements the IJobConsumer<TJob> interface and implement the Run method.
public class ConvertVideoJobConsumer : IJobConsumer<ConvertVideo>{ public async Task Run(JobContext<ConvertVideo> context) { await Task.Delay(30000, context.CancellationToken); }}The Run method is called when a job is started. The JobContext<TJob> parameter contains the job’s identifier, properties, and cancellation token.
Once your job consumer has been created, you can configure the job consumer on the bus.
Job Context
Section titled “Job Context”In a job consumer, JobContext<TJob> is the job consumer version of ConsumeContext<T>. Since job consumers run separate of the message broker and no message
locks are held, a separate context is used. In addition to the standard message context properties, the job context also includes the following properties.
| Property | Description |
|---|---|
| JobId | The job’s identifier assigned when the job was submitted |
| AttemptId | Uniquely identifies this job attempt |
| RetryAttempt | If greater than zero, the retry attempt of the job |
| LastProgressValue | If a previous job attempt updated the progress, the last updated value stored for the job |
| LastProgressLimit | If a previous job attempt updated the progress, the last updated limit stored for the job |
| ElapsedTime | How long the current job attempt has been running |
| JobProperties | The properties added when the job was submitted |
| JobTypeProperties | The properties configured by the JobOptions<T> |
| InstanceProperties | The properties configured by the JobOptions<T> on a specific job consumer instance |
Job Cancellation
Section titled “Job Cancellation”When a job is canceled, the CancellationToken on JobContext is canceled. Job consumers should check for cancellation using
IsCancellationRequested and when it is safe to cancel call:
context.CancellationToken.ThrowIfCancellationRequested()
This ensures the job is properly reported as canceled to the job saga state machines.
Job Progress
Section titled “Job Progress”New in MassTransit v8.3.0
Job consumers can track progress and that progress is saved by the job saga. If a job is canceled or faults, the most recently saved progress is included
in the JobContext passed to the job consumer if the job is retried.
To save progress, call SetJobProgress as shown below.
public class ConvertVideoJobConsumer : IJobConsumer<ConvertVideo>{ public async Task Run(JobContext<ConvertVideo> context) { // some aspects of the content being process long length = File.Length;
await context.SetJobProgress(0, length);
for (int index = 1; index <= length; index++) { // do something
context.SetJobProgress(index, length); } }}Job State
Section titled “Job State”New in MassTransit v8.3.0
Job consumers can save state in the job saga. In the event that a job is canceled or faults, when the job is retried the previously saved state will be
included in the JobContext passed to the job consumer.
To save the job state when a job is canceled:
public class ConvertVideoJobConsumer : IJobConsumer<ConvertVideo>{ public async Task Run(JobContext<ConvertVideo> context) { // some aspects of the content being process long length = File.Length;
int index = 1; try { await context.SetJobProgress(0, length);
for (; index <= length; index++) { context.CancellationToken.ThrowIfCancellationRequested();
// do something
context.SetJobProgress(index, length); } } catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested) { await context.SaveJobState(new ConsumerState { LastIndex = index }); throw; } }
class ConsumerState { public long LastIndex { get; set; } }}When the job is started, the consumer can check if a previously saved job state exists, and use it to continue where processing left off.
public class ConvertVideoJobConsumer : IJobConsumer<ConvertVideo>{ public async Task Run(JobContext<ConvertVideo> context) { // some aspects of the content being process long length = File.Length;
int index = context.TryGetJobState(out ConsumerState? state) ? state.LastIndex + 1 : 1;
// elided, see above }}The job state type (in this case, ConsumerState) is only relevant to the job consumer and is stored as a serialized dictionary in the job saga.
Job Commands
Section titled “Job Commands”Submit Job
Section titled “Submit Job”To submit a job, call the SubmitJob extension method on an IPublishEndpoint as shown below. This is a fire-and-forget method, no response is sent.
[HttpPut("{path}")]public async Task<IActionResult> FireAndForgetSubmitJob(string path, [FromServices] IPublishEndpoint publishEndpoint){ var jobId = await publishEndpoint.SubmitJob<ConvertVideo>(new ConvertVideo { Path = path });
return Ok(new { jobId, path });}To wait for a response indicating the job submission was successful (not really necessary, but commonly used), use the request client, IRequestClient<T>,
submit a job using the SubmitJob extension method as shown below. The RequestId generated by the request client will be used as the JobId.
[HttpPost("{path}")]public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<ConvertVideo> client){ var jobId = await client.SubmitJob(new ConvertVideo { Path = path });
return Ok(new { jobId, path });}Additionally, a jobId can be specified if the IRequestClient<SubmitJob<TJob>> interface is used instead.
[HttpPost("{path}")]public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client){ var jobId = NewId.NextGuid();
await client.SubmitJob(jobId, new ConvertVideo { Path = path });
return Ok(new { jobId, path });}To submit a job including job properties (such as a tenantId or other property value typically reflecting some cross-cutting concern or environmental setting such a data center location, country, etc.), use the overload as shown.
[HttpPost("{path}")]public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client){ var jobId = NewId.NextGuid();
await client.SubmitJob(jobId, new ConvertVideo { Path = path }, x => x.Set("TenantId", _tenantId));
return Ok(new { jobId, path });}Cancel Job
Section titled “Cancel Job”To cancel a submitted job, call the CancelJob extension method on an IPublishEndpoint as shown.
[HttpPut("{jobId}")]public async Task<IActionResult> CancelJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint){ var jobId = await publishEndpoint.CancelJob(jobId);
return Ok();}Retry Job
Section titled “Retry Job”To retry a faulted or canceled job, call the RetryJob extension method on an IPublishEndpoint as shown.
[HttpPut("{jobId}")]public async Task<IActionResult> RetryJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint){ var jobId = await publishEndpoint.RetryJob(jobId);
return Ok();}Finalize Job
Section titled “Finalize Job”When a job is canceled or faults, the job is not removed from the saga repository. To remove a job in the Canceled or Faulted state, use the FinalizeJob
method as shown.
[HttpPut("{jobId}")]public async Task<IActionResult> FinalizeJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint){ var jobId = await publishEndpoint.FinalizeJob(jobId);
return Ok();}Schedule Job
Section titled “Schedule Job”New in MassTransit v8.3.0
By default, submitted jobs will run as soon as possible. Jobs can also be scheduled by specifying a start time, using the ScheduleJob method.
[HttpPost("{path}")]public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client){ await client.ScheduleJob(DateTimeOffset.Now.AddMinutes(15), new ConvertVideo { Path = path });
return Ok(new { jobId, path });}Recurring Jobs
Section titled “Recurring Jobs”New in MassTransit v8.3.0
MassTransit supports recurring jobs, which are useful when a consumer needs to run on a predefined schedule. Recurring jobs use regular job consumers and are scheduled using the transport’s built-in message scheduling and the job saga state machines.
Recurring jobs are configuring using a cron expression. Cron expressions are a well known way to define a schedule and can be built using any of the
online cron expression builders. Cron expressions can be very expressive. For instance, 0 0,15,30,45 * * * 1,3,5 would
mean at 0, 15, 30, and 45 minutes past the hour, only on Monday, Wednesday, and Friday.
Schedule Recurring Job
Section titled “Schedule Recurring Job”To schedule a recurring job, use the AddOrUpdateRecurringJob method as shown below.
public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint){ await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance", new RoutineMaintenanceCommand(), "0 0,15,30,45 * * * 1,3,5");}
public record RoutingMaintenanceCommand;A simple expression builder is also available, which can be used to generate a cron expression.
public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint){ await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance", new RoutineMaintenanceCommand(), x => x.Every(minutes: 15));}Recurring jobs can also be confined to a period of time, using start and end dates. Specifying a start date will apply the cron expression to run at the first opportunity after the specified start date. Specifying an end date will ensure the job is not run after that date.
public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint){ await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance", new RoutineMaintenanceCommand(), x => { x.Start = new DateTimeOffset(2024, 1, 1, 0, 0, 0, TimeSpan.Zero); x.End = new DateTimeOffset(2025, 1, 1, 0, 0, 0, TimeSpan.Zero); x.Every(minutes: 30); });}Calling AddOrUpdateRecurringJob can be used to update the job message or the schedule. If the schedule is changed, the next run will be rescheduled using the
newly specified cron expression.
Run Recurring Job
Section titled “Run Recurring Job”To force a recurring job to run immediately, use the RunRecurringJob method.
public async Task RunJobNow(IPublishEndpoint publishEndpoint){ await publishEndpoint.RunRecurringJob<RoutineMaintenanceCommand>("RoutineMaintenance");}Once the job has completed, the job’s next job run will be scheduled using the previously supplied cron expression.
Job Distribution Strategy
Section titled “Job Distribution Strategy”New in MassTransit v8.3.0
To support more complex job consumer scenarios, MassTransit enables the use of a custom job distribution strategy. This strategy is employed by the job type
saga to decide which job consumer bus instance should handle a particular job. By configuring JobProperties and InstanceProperties within JobOptions<T>,
you
can control how jobs are assigned to specific consumer instances. For example, you might allocate jobs from premium customers to consumer instances running on
premium hardware, ensuring that resource-intensive jobs are handled by more capable instances.
To use a custom strategy, create a class that implements IJobDistributionStrategy.
public class MachineTypeJobDistributionStrategy : IJobDistributionStrategy{ public Task<ActiveJob?> IsJobSlotAvailable(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo) { object? strategy = null; jobTypeInfo.Properties?.TryGetValue("DistributionStrategy", out strategy);
return strategy switch { "MachineType" => MachineType(context, jobTypeInfo), _ => DefaultJobDistributionStrategy.Instance.IsJobSlotAvailable(context, jobTypeInfo) }; }
Task<ActiveJob?> MachineType(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo) { var customerType = context.GetHeader("CustomerType");
var machineType = customerType switch { "Premium" => "S-Class", _ => "E-Class" };
var instances = from i in jobTypeInfo.Instances join a in jobTypeInfo.ActiveJobs on i.Key equals a.InstanceAddress into ai where (ai.Count() < jobTypeInfo.ConcurrentJobLimit && string.IsNullOrEmpty(dataCenter)) || (i.Value.Properties.TryGetValue("MachineType", out var mt) && mt is string mtext && mtext == machineType) orderby ai.Count(), i.Value.Used select new { Instance = i.Value, InstanceAddress = i.Key, InstanceCount = ai.Count() };
var firstInstance = instances.FirstOrDefault(); if (firstInstance == null) return Task.FromResult<ActiveJob?>(null);
return Task.FromResult<ActiveJob?>(new ActiveJob { JobId = context.Message.JobId, InstanceAddress = firstInstance.InstanceAddress }); }}Then register the strategy using the TryAddJobDistributionStrategy method:
services.TryAddJobDistributionStrategy<MachineTypeJobDistributionStrategy>();The strategy must be registered where the job saga state machines are registered and is not required on the job consumer bus instances.
The job distribution strategy is resolved from the container as a scoped service and any class dependencies will be resolved using the current scope
of the job type saga state machine. This allows dependencies to be injected, including the current DbContext if using Entity Framework Core.
Job Strategy Options
Section titled “Job Strategy Options”To support the use of job distribution strategies, new properties were added to JobOptions<TJob>. Following the example above, the MachineType property
should be added at startup.
x.AddConsumer<ConvertVideoConsumer>(c => c.Options<JobOptions<ConvertVideo>>(options => options .SetRetry(r => r.Interval(3, TimeSpan.FromSeconds(30))) .SetJobTimeout(TimeSpan.FromMinutes(10)) .SetConcurrentJobLimit(10) .SetJobTypeProperties(p => p.Set("DistributionStrategy", "MachineType")) .SetInstanceProperties(p => p.Set("MachineType", "S-Class")));The properties should be set using environmental information, such as machine type, data center location, or whatever makes sense for the desired strategy.
JobProperties apply to the job type and InstanceProperties apply to the job consumer bus instance (the bus instance containing the job consumer).
See the job consumer options section for more information on configuring job consumers.