Skip to content

Job Consumer Configuration

Job consumers are a specialized consumer type created to handle long-running tasks, usually referred to as jobs. To learn more about job consumers, see the Job Consumers section.

Job consumers are configured within the AddMassTransit configuration block similar to a regular consumer.

To configure a job consumer, and set the job options, use the AddConsumer method.

services.AddMassTransit(x =>
{
x.AddConsumer<ConvertVideoJobConsumer>(cfg =>
{
cfg.Options<JobOptions<ConvertVideo>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
});
});

While sensible defaults are provided, it’s best to configure the job options to match the requirements of your application. Job options are configured on the job consumer using the Options<T> method. The job options can be set using method chaining, providing a convenient fluent interface. Properties can be set individually or by using the .Set...() methods.`

OptionDescription
ConcurrentJobLimitThe number of concurrent jobs allowed to run on a given instance
JobTimeoutHow long a job consumer is allowed to run before the job is canceled (via the CancellationToken)
JobCancellationTimeoutHow long after a job consumer is canceled to wait before considering the job canceled regardless of whether it has completed
JobTypeNameOverride the default job type name (optional, not really recommended)
RetryPolicyThe retry policy applied when a job faults
ProgressBuffer.TimeLimitHow often any progress updates should be reported to the job saga
ProgressBuffer.UpdateLimitHow many progress updates should be reported before updating the job saga
JobTypePropertiesProperties associated with the job type (should be the same on every job consumer bus instance)
InstancePropertiesProperties associated with the currently running job consumer bus instance

The concurrent job limit is the maximum number of jobs that can be run concurrently on a given job consumer instance.

options.SetConcurrentJobLimit(10);

In cases where a particular job consumer instance may have a different limit than the ConcurrentJobLimit above, the ConcurrentInstanceJobLimit can be used to override the default limit.

options.SetInstanceConcurrentJobLimit(10);

The global concurrent job limit is the maximum number of jobs that can be run concurrently across all job consumer instances.

options.SetGlobalConcurrentJobLimit(100);

The job timeout is the maximum amount of time a job is allowed to run before being canceled.

options.SetJobTimeout(TimeSpan.FromMinutes(15));

The job cancellation timeout is the amount of time after a job is canceled that the job service will wait before considering the job canceled.

options.SetJobCancellationTimeout(TimeSpan.FromMinutes(2));

The job type name is the name of the job type that is associated with the job consumer. By default, the job type name is the name of the job consumer. This may be useful if the name displayed in the job saga is too long or confusing to override the default.

options.SetJobTypeName("ConvertVideo");

The retry policy is used when a job faults to determine how long a job should be retried after a fault. By default, jobs are not retried.

options.SetRetryPolicy(r => r.Interval(5, TimeSpan.FromMinutes(2)));

See the retry policy section for more information on configuring a retry policy.

When a job consumer is running, it can report progress updates to the job saga. The progress buffer is used to temporarily store progress updates to avoid spamming the job saga with updates. The buffer limits can be configured to suit your requirements (the defaults are probably fine, however).

options.SetProgressBuffer(100, TimeSpan.FromSeconds(10));

Job type properties are used to associate additional properties with the job type. These properties are used by the job distribution strategy to determine which job consumer instance should run the job. These properties should be the same across all job consumer instances.

options.SetJobTypeProperties(p => p.Set("AssignmentMode", "Region"));

Instance properties are used to associate additional properties with the currently running job consumer instance. These properties are used by the job distribution strategy to determine which job consumer instance should run the job. These properties may be different for each job consumer instance.

options.SetInstanceProperties(p => p.Set("Region", "East"));

Once a job consumer has been configured, the job consumer instance will be automatically configured by calling ConfigureEndpoints.

services.AddMassTransit(x =>
{
x.AddConsumer<ConvertVideoJobConsumer>(cfg =>
{
cfg.Options<JobOptions<ConvertVideo>>(options => options
.SetJobTimeout(TimeSpan.FromMinutes(15))
.SetConcurrentJobLimit(10));
});
x.AddDelayedMessageScheduler();
x.SetKebabCaseEndpointNameFormatter();
// in this case, just use the in-memory saga repository,
// but an actual database should be used
x.SetInMemorySagaRepositoryProvider();
x.AddJobSagaStateMachines();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
});
});

The job consumer options can be configured by calling SetJobConsumerOptions within the AddMassTransit method.

x.SetJobConsumerOptions(options => options.SetHeartbeatInterval(TimeSpan.FromSeconds(10));
OptionDescription
HeartbeatIntervalHow often to report job consumer instance health to the job saga state machines
RejectedJobDelayThe delay before retrying a job that was ready to start when a job consumer instance was stopping/stopped.

The job consumer instance endpoint (the temporary queue created to receive job requests) can also be configured using the Endpoint method. For instance, you may want to use RabbitMQ’s quorum queues (for good reason). To configure the temporary endpoint to use a quorum queue, use the SetQuorumQueue method in the endpoint configurator.

x.SetJobConsumerOptions(options =>
{
options.HeartbeatInterval = TimeSpan.FromSeconds(10);
})
.Endpoint(e =>
{
e.AddConfigureEndpointCallback(cfg =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
rmq.SetQuorumQueue();
});
});

Job consumers use three saga state machines to orchestrate jobs and keep track of available job consumer instances.

SagaDescription
JobSagaOrchestrates each job, including scheduling, retry, and failure handling
JobAttemptSagaOrchestrates each job attempt, communicating directly with the job consumer instances
JobTypeSagaKeep track of available job instances and allocates job slots to waiting jobs

The job saga state machines are configured using the AddJobSagaStateMachines method.

x.AddJobSagaStateMachines();

While this is the bare minimum required to configure the job saga state machines, it’s recommended to configure the JobSagaOptions as well.

x.AddJobSagaStateMachines(options =>
{
options.FinalizeCompleted = true;
options.ConcurrentMessageLimit = 32;
options.HeartbeatTimeout = TimeSpan.FromMinutes(5);
options.SlotWaitTime = TimeSpan.FromSeconds(30);
options.SuspectJobRetryCount = 2;
options.SuspectJobRetryDelay = TimeSpan.FromMinutes(1);
});
OptionDescription
FinalizeCompletedIf true (the default), completed job sagas are automatically removed from the saga repository
ConcurrentMessageLimitThe current message limit for the job sagas (does not affect job consumer concurrency limits)
HeartbeatTimeoutPeriod of time after which a job consumer instance is removed from the active list if no heartbeat has been received. This should typically be 2-3x the heartbeat interval set in the JobConsumerOptions
SlotWaitTimeWhen a job slot is not available to run a job, how long to wait before retrying the request
SuspectJobRetryCountHow many times to retry a job that becomes suspect (not faulted, but the job consumer instance stops responding)
SuspectJobRetryDelayHow long a job should wait before retrying when the job became suspect and SuspectJobRetryCount is > 0

The job saga state machines are configured on their own endpoints, using the configured endpoint name formatter. These endpoints are required on at least one bus instance. It is not necessary to configure them on every bus instance. In the example above, the job saga state machines endpoints are configured. A standard ConfigureEndpoints call will suffice to host the job consumers without the job saga state machines.

The job saga endpoints can be configured using the AddJobSagaStateMachines method by calling the Endpoints method (or the specific endpoint method) on the configurator returned by AddJobSagaStateMachines.

For instance, the SetPartitionedReceiveMode method (used by the SQL transport) calls the Endpoints method to configure the endpoint.

x.AddJobSagaStateMachines()
.Endpoints(e =>
{
e.AddConfigureEndpointCallback(cfg =>
{
if (cfg is ISqlReceiveEndpointConfigurator sql)
sql.SetReceiveMode(SqlReceiveMode.Partitioned);
});
});

Other transports may have different endpoint configuration options, but the general approach is the same.

A persistent saga repository should be used with job consumers and should be configured when adding the job saga state machines. In the example below, Entity Framework Core is configured, along with the Postgres lock statement provider (required when using PostgreSQL).

x.AddJobSagaStateMachines()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<JobServiceSagaDbContext>();
r.UsePostgres();
});

Refer to the specific saga repository documentation for more information on configuring the saga repository for use with job consumers.

For a more detailed example of configuring the job saga state machine endpoints, including persistent storage, see the sample mentioned in the box above.

A job can be in one of the following states:

  1. Initial
  2. Final
  3. Submitted
  4. Waiting to Start
  5. Waiting for Slot
  6. Started
  7. Completed
  8. Faulted
  9. Canceled
  10. Starting Job Attempt
  11. Allocating Job Slot
  12. Waiting to Retry
  13. Cancellation Pending

A job attempt can be in one of the following states:

  1. Initial
  2. Final
  3. Starting
  4. Running
  5. Faulted
  6. Checking Status
  7. Suspect

A job type can be in one of the following states:

  1. Initial
  2. Final
  3. Active
  4. Idle