Skip to content

Job Consumers

A Job Consumer is a specialized consumer type created to handle long-running tasks, usually referred to as jobs. Unlike regular consumers that hold a message lock on the broker and process messages quickly, job consumers are expected to handle messages that may take an extended time to complete. They provide additional functionality to manage long-running tasks, including retrying exceptions, limiting concurrency, and guaranteeing job completion even in the presence of service disruptions.

Job consumers meet the need for long-running tasks by allowing them to run asynchronously without blocking the broker. When you need to process a message that takes a long time, perhaps several minutes, hours, or even longer, a job consumer is the best option.

When determining if you should use a job consumer, consider the following factor: How long does the task take to complete?

  • If your task takes less than 5 minutes, a standard consumer is usually enough. Brokers like RabbitMQ, Azure, or SQS can hold a message lock for around 5 minutes, which is plenty of time for most tasks.
  • If your task exceeds 5 minutes, that’s when you should consider using a job consumer. As tasks exceed the broker’s lock time, you risk message redelivery or failures due to lock timeouts. Job consumers are specifically designed to handle these scenarios without worrying about broker timeouts.

That said, it’s important to recognize that job consumer overhead is not insignificant. There is additional bookkeeping required to manage job state, track and retry failures, and limit concurrency. Confirm that the benefits outweigh the extra complexity before adopting job consumers for your long-running tasks.

Job consumers were built entirely using MassTransit, and they use many of the framework’s features. Saga state machines are used to manage job state, the message scheduler is used to schedule jobs, temporary queues are used by job consumer instances, and a saga repository is used to store job state.

To use job consumers, you’ll need:

The job saga state machines only need to run as a single service per bus (broker). All job consumers using the same message broker should share a single saga repository. The job consumers themselves can be configured to run on multiple instances of the same bus.

Job consumers implement the IJobConsumer<TJob> interface, where TJob is the job’s message type.

public interface IJobConsumer<in TJob> :
IConsumer
where TJob : class
{
Task Run(JobContext<TJob> context);
}

Create a class that implements the IJobConsumer<TJob> interface and implement the Run method.

public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
await Task.Delay(30000, context.CancellationToken);
}
}

The Run method is called when a job is started. The JobContext<TJob> parameter contains the job’s identifier, properties, and cancellation token.

Once your job consumer has been created, you can configure the job consumer on the bus.

In a job consumer, JobContext<TJob> is the job consumer version of ConsumeContext<T>. Since job consumers run separate of the message broker and no message locks are held, a separate context is used. In addition to the standard message context properties, the job context also includes the following properties.

PropertyDescription
JobIdThe job’s identifier assigned when the job was submitted
AttemptIdUniquely identifies this job attempt
RetryAttemptIf greater than zero, the retry attempt of the job
LastProgressValueIf a previous job attempt updated the progress, the last updated value stored for the job
LastProgressLimitIf a previous job attempt updated the progress, the last updated limit stored for the job
ElapsedTimeHow long the current job attempt has been running
JobPropertiesThe properties added when the job was submitted
JobTypePropertiesThe properties configured by the JobOptions<T>
InstancePropertiesThe properties configured by the JobOptions<T> on a specific job consumer instance

When a job is canceled, the CancellationToken on JobContext is canceled. Job consumers should check for cancellation using IsCancellationRequested and when it is safe to cancel call:

context.CancellationToken.ThrowIfCancellationRequested()

This ensures the job is properly reported as canceled to the job saga state machines.

New in MassTransit v8.3.0

Job consumers can track progress and that progress is saved by the job saga. If a job is canceled or faults, the most recently saved progress is included in the JobContext passed to the job consumer if the job is retried.

To save progress, call SetJobProgress as shown below.

public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// some aspects of the content being process
long length = File.Length;
await context.SetJobProgress(0, length);
for (int index = 1; index <= length; index++)
{
// do something
context.SetJobProgress(index, length);
}
}
}

New in MassTransit v8.3.0

Job consumers can save state in the job saga. In the event that a job is canceled or faults, when the job is retried the previously saved state will be included in the JobContext passed to the job consumer.

To save the job state when a job is canceled:

public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// some aspects of the content being process
long length = File.Length;
int index = 1;
try
{
await context.SetJobProgress(0, length);
for (; index <= length; index++)
{
context.CancellationToken.ThrowIfCancellationRequested();
// do something
context.SetJobProgress(index, length);
}
}
catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
{
await context.SaveJobState(new ConsumerState { LastIndex = index });
throw;
}
}
class ConsumerState
{
public long LastIndex { get; set; }
}
}

When the job is started, the consumer can check if a previously saved job state exists, and use it to continue where processing left off.

public class ConvertVideoJobConsumer :
IJobConsumer<ConvertVideo>
{
public async Task Run(JobContext<ConvertVideo> context)
{
// some aspects of the content being process
long length = File.Length;
int index = context.TryGetJobState(out ConsumerState? state)
? state.LastIndex + 1
: 1;
// elided, see above
}
}

The job state type (in this case, ConsumerState) is only relevant to the job consumer and is stored as a serialized dictionary in the job saga.

To submit a job, call the SubmitJob extension method on an IPublishEndpoint as shown below. This is a fire-and-forget method, no response is sent.

[HttpPut("{path}")]
public async Task<IActionResult> FireAndForgetSubmitJob(string path, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.SubmitJob<ConvertVideo>(new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}

To wait for a response indicating the job submission was successful (not really necessary, but commonly used), use the request client, IRequestClient<T>, submit a job using the SubmitJob extension method as shown below. The RequestId generated by the request client will be used as the JobId.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<ConvertVideo> client)
{
var jobId = await client.SubmitJob(new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}

Additionally, a jobId can be specified if the IRequestClient<SubmitJob<TJob>> interface is used instead.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
var jobId = NewId.NextGuid();
await client.SubmitJob(jobId, new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}

To submit a job including job properties (such as a tenantId or other property value typically reflecting some cross-cutting concern or environmental setting such a data center location, country, etc.), use the overload as shown.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
var jobId = NewId.NextGuid();
await client.SubmitJob(jobId, new ConvertVideo
{
Path = path
}, x => x.Set("TenantId", _tenantId));
return Ok(new
{
jobId,
path
});
}

To cancel a submitted job, call the CancelJob extension method on an IPublishEndpoint as shown.

[HttpPut("{jobId}")]
public async Task<IActionResult> CancelJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.CancelJob(jobId);
return Ok();
}

To retry a faulted or canceled job, call the RetryJob extension method on an IPublishEndpoint as shown.

[HttpPut("{jobId}")]
public async Task<IActionResult> RetryJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.RetryJob(jobId);
return Ok();
}

When a job is canceled or faults, the job is not removed from the saga repository. To remove a job in the Canceled or Faulted state, use the FinalizeJob method as shown.

[HttpPut("{jobId}")]
public async Task<IActionResult> FinalizeJob(Guid jobId, [FromServices] IPublishEndpoint publishEndpoint)
{
var jobId = await publishEndpoint.FinalizeJob(jobId);
return Ok();
}

New in MassTransit v8.3.0

By default, submitted jobs will run as soon as possible. Jobs can also be scheduled by specifying a start time, using the ScheduleJob method.

[HttpPost("{path}")]
public async Task<IActionResult> SubmitJob(string path, [FromServices] IRequestClient<SubmitJob<ConvertVideo>> client)
{
await client.ScheduleJob(DateTimeOffset.Now.AddMinutes(15), new ConvertVideo
{
Path = path
});
return Ok(new
{
jobId,
path
});
}

New in MassTransit v8.3.0

MassTransit supports recurring jobs, which are useful when a consumer needs to run on a predefined schedule. Recurring jobs use regular job consumers and are scheduled using the transport’s built-in message scheduling and the job saga state machines.

Recurring jobs are configuring using a cron expression. Cron expressions are a well known way to define a schedule and can be built using any of the online cron expression builders. Cron expressions can be very expressive. For instance, 0 0,15,30,45 * * * 1,3,5 would mean at 0, 15, 30, and 45 minutes past the hour, only on Monday, Wednesday, and Friday.

To schedule a recurring job, use the AddOrUpdateRecurringJob method as shown below.

public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
new RoutineMaintenanceCommand(), "0 0,15,30,45 * * * 1,3,5");
}
public record RoutingMaintenanceCommand;

A simple expression builder is also available, which can be used to generate a cron expression.

public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
new RoutineMaintenanceCommand(), x => x.Every(minutes: 15));
}

Recurring jobs can also be confined to a period of time, using start and end dates. Specifying a start date will apply the cron expression to run at the first opportunity after the specified start date. Specifying an end date will ensure the job is not run after that date.

public async Task ConfigureRecurringJobs(IPublishEndpoint endpoint)
{
await endpoint.AddOrUpdateRecurringJob("RoutineMaintenance",
new RoutineMaintenanceCommand(), x =>
{
x.Start = new DateTimeOffset(2024, 1, 1, 0, 0, 0, TimeSpan.Zero);
x.End = new DateTimeOffset(2025, 1, 1, 0, 0, 0, TimeSpan.Zero);
x.Every(minutes: 30);
});
}

Calling AddOrUpdateRecurringJob can be used to update the job message or the schedule. If the schedule is changed, the next run will be rescheduled using the newly specified cron expression.

To force a recurring job to run immediately, use the RunRecurringJob method.

public async Task RunJobNow(IPublishEndpoint publishEndpoint)
{
await publishEndpoint.RunRecurringJob<RoutineMaintenanceCommand>("RoutineMaintenance");
}

Once the job has completed, the job’s next job run will be scheduled using the previously supplied cron expression.

New in MassTransit v8.3.0

To support more complex job consumer scenarios, MassTransit enables the use of a custom job distribution strategy. This strategy is employed by the job type saga to decide which job consumer bus instance should handle a particular job. By configuring JobProperties and InstanceProperties within JobOptions<T>, you can control how jobs are assigned to specific consumer instances. For example, you might allocate jobs from premium customers to consumer instances running on premium hardware, ensuring that resource-intensive jobs are handled by more capable instances.

To use a custom strategy, create a class that implements IJobDistributionStrategy.

public class MachineTypeJobDistributionStrategy :
IJobDistributionStrategy
{
public Task<ActiveJob?> IsJobSlotAvailable(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo)
{
object? strategy = null;
jobTypeInfo.Properties?.TryGetValue("DistributionStrategy", out strategy);
return strategy switch
{
"MachineType" => MachineType(context, jobTypeInfo),
_ => DefaultJobDistributionStrategy.Instance.IsJobSlotAvailable(context, jobTypeInfo)
};
}
Task<ActiveJob?> MachineType(ConsumeContext<AllocateJobSlot> context, JobTypeInfo jobTypeInfo)
{
var customerType = context.GetHeader("CustomerType");
var machineType = customerType switch
{
"Premium" => "S-Class",
_ => "E-Class"
};
var instances = from i in jobTypeInfo.Instances
join a in jobTypeInfo.ActiveJobs on i.Key equals a.InstanceAddress into ai
where (ai.Count() < jobTypeInfo.ConcurrentJobLimit
&& string.IsNullOrEmpty(dataCenter))
|| (i.Value.Properties.TryGetValue("MachineType", out var mt) && mt is string mtext && mtext == machineType)
orderby ai.Count(), i.Value.Used
select new
{
Instance = i.Value,
InstanceAddress = i.Key,
InstanceCount = ai.Count()
};
var firstInstance = instances.FirstOrDefault();
if (firstInstance == null)
return Task.FromResult<ActiveJob?>(null);
return Task.FromResult<ActiveJob?>(new ActiveJob
{
JobId = context.Message.JobId,
InstanceAddress = firstInstance.InstanceAddress
});
}
}

Then register the strategy using the TryAddJobDistributionStrategy method:

services.TryAddJobDistributionStrategy<MachineTypeJobDistributionStrategy>();

The strategy must be registered where the job saga state machines are registered and is not required on the job consumer bus instances.

The job distribution strategy is resolved from the container as a scoped service and any class dependencies will be resolved using the current scope of the job type saga state machine. This allows dependencies to be injected, including the current DbContext if using Entity Framework Core.

To support the use of job distribution strategies, new properties were added to JobOptions<TJob>. Following the example above, the MachineType property should be added at startup.

x.AddConsumer<ConvertVideoConsumer>(c =>
c.Options<JobOptions<ConvertVideo>>(options => options
.SetRetry(r => r.Interval(3, TimeSpan.FromSeconds(30)))
.SetJobTimeout(TimeSpan.FromMinutes(10))
.SetConcurrentJobLimit(10)
.SetJobTypeProperties(p => p.Set("DistributionStrategy", "MachineType"))
.SetInstanceProperties(p => p.Set("MachineType", "S-Class")));

The properties should be set using environmental information, such as machine type, data center location, or whatever makes sense for the desired strategy. JobProperties apply to the job type and InstanceProperties apply to the job consumer bus instance (the bus instance containing the job consumer).

See the job consumer options section for more information on configuring job consumers.