Job Consumer Configuration
Job consumers are a specialized consumer type created to handle long-running tasks, usually referred to as jobs. To learn more about job consumers, see the Job Consumers section.
Job consumers are configured within the AddMassTransit configuration block similar to a regular consumer.
To configure a job consumer, and set the job options, use the AddConsumer method.
services.AddMassTransit(x =>{ x.AddConsumer<ConvertVideoJobConsumer>(cfg => { cfg.Options<JobOptions<ConvertVideo>>(options => options .SetJobTimeout(TimeSpan.FromMinutes(15)) .SetConcurrentJobLimit(10)); });});Configure the job options
Section titled “Configure the job options”While sensible defaults are provided, it’s best to configure the job options to match the requirements of your application. Job options are configured on the
job consumer using the Options<T> method. The job options can be set using method chaining, providing a convenient fluent interface. Properties can be set
individually or by using the .Set...() methods.`
| Option | Description |
|---|---|
| ConcurrentJobLimit | The number of concurrent jobs allowed to run on a given instance |
| JobTimeout | How long a job consumer is allowed to run before the job is canceled (via the CancellationToken) |
| JobCancellationTimeout | How long after a job consumer is canceled to wait before considering the job canceled regardless of whether it has completed |
| JobTypeName | Override the default job type name (optional, not really recommended) |
| RetryPolicy | The retry policy applied when a job faults |
| ProgressBuffer.TimeLimit | How often any progress updates should be reported to the job saga |
| ProgressBuffer.UpdateLimit | How many progress updates should be reported before updating the job saga |
| JobTypeProperties | Properties associated with the job type (should be the same on every job consumer bus instance) |
| InstanceProperties | Properties associated with the currently running job consumer bus instance |
Concurrent Job Limit
Section titled “Concurrent Job Limit”The concurrent job limit is the maximum number of jobs that can be run concurrently on a given job consumer instance.
options.SetConcurrentJobLimit(10);
Instance Concurrent Job Limit
Section titled “Instance Concurrent Job Limit”In cases where a particular job consumer instance may have a different limit than the ConcurrentJobLimit above, the ConcurrentInstanceJobLimit can be used
to override the default limit.
options.SetInstanceConcurrentJobLimit(10);
Global Concurrent Job Limit
Section titled “Global Concurrent Job Limit”The global concurrent job limit is the maximum number of jobs that can be run concurrently across all job consumer instances.
options.SetGlobalConcurrentJobLimit(100);
Job Timeout
Section titled “Job Timeout”The job timeout is the maximum amount of time a job is allowed to run before being canceled.
options.SetJobTimeout(TimeSpan.FromMinutes(15));
Job Cancellation Timeout
Section titled “Job Cancellation Timeout”The job cancellation timeout is the amount of time after a job is canceled that the job service will wait before considering the job canceled.
options.SetJobCancellationTimeout(TimeSpan.FromMinutes(2));
Job Type Name
Section titled “Job Type Name”The job type name is the name of the job type that is associated with the job consumer. By default, the job type name is the name of the job consumer. This may be useful if the name displayed in the job saga is too long or confusing to override the default.
options.SetJobTypeName("ConvertVideo");
Retry Policy
Section titled “Retry Policy”The retry policy is used when a job faults to determine how long a job should be retried after a fault. By default, jobs are not retried.
options.SetRetryPolicy(r => r.Interval(5, TimeSpan.FromMinutes(2)));
See the retry policy section for more information on configuring a retry policy.
Progress Buffer
Section titled “Progress Buffer”When a job consumer is running, it can report progress updates to the job saga. The progress buffer is used to temporarily store progress updates to avoid spamming the job saga with updates. The buffer limits can be configured to suit your requirements (the defaults are probably fine, however).
options.SetProgressBuffer(100, TimeSpan.FromSeconds(10));
Job Type Properties
Section titled “Job Type Properties”Job type properties are used to associate additional properties with the job type. These properties are used by the job distribution strategy to determine which job consumer instance should run the job. These properties should be the same across all job consumer instances.
options.SetJobTypeProperties(p => p.Set("AssignmentMode", "Region"));
Instance Properties
Section titled “Instance Properties”Instance properties are used to associate additional properties with the currently running job consumer instance. These properties are used by the job distribution strategy to determine which job consumer instance should run the job. These properties may be different for each job consumer instance.
options.SetInstanceProperties(p => p.Set("Region", "East"));
Configure a job consumer
Section titled “Configure a job consumer”Once a job consumer has been configured, the job consumer instance will be automatically configured by calling ConfigureEndpoints.
services.AddMassTransit(x =>{ x.AddConsumer<ConvertVideoJobConsumer>(cfg => { cfg.Options<JobOptions<ConvertVideo>>(options => options .SetJobTimeout(TimeSpan.FromMinutes(15)) .SetConcurrentJobLimit(10)); });
x.AddDelayedMessageScheduler();
x.SetKebabCaseEndpointNameFormatter();
// in this case, just use the in-memory saga repository, // but an actual database should be used x.SetInMemorySagaRepositoryProvider();
x.AddJobSagaStateMachines();
x.UsingRabbitMq((context, cfg) => { cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context); });});Configure the job consumer options
Section titled “Configure the job consumer options”The job consumer options can be configured by calling SetJobConsumerOptions within the AddMassTransit method.
x.SetJobConsumerOptions(options => options.SetHeartbeatInterval(TimeSpan.FromSeconds(10));| Option | Description |
|---|---|
| HeartbeatInterval | How often to report job consumer instance health to the job saga state machines |
| RejectedJobDelay | The delay before retrying a job that was ready to start when a job consumer instance was stopping/stopped. |
The job consumer instance endpoint (the temporary queue created to receive job requests) can also be configured using the Endpoint method.
For instance, you may want to use RabbitMQ’s quorum queues (for good reason). To configure the temporary endpoint to use a quorum queue, use the
SetQuorumQueue method in the endpoint configurator.
x.SetJobConsumerOptions(options => { options.HeartbeatInterval = TimeSpan.FromSeconds(10); }) .Endpoint(e => { e.AddConfigureEndpointCallback(cfg => { if (cfg is IRabbitMqReceiveEndpointConfigurator rmq) rmq.SetQuorumQueue(); }); });Configure job saga state machines
Section titled “Configure job saga state machines”Job consumers use three saga state machines to orchestrate jobs and keep track of available job consumer instances.
| Saga | Description |
|---|---|
| JobSaga | Orchestrates each job, including scheduling, retry, and failure handling |
| JobAttemptSaga | Orchestrates each job attempt, communicating directly with the job consumer instances |
| JobTypeSaga | Keep track of available job instances and allocates job slots to waiting jobs |
The job saga state machines are configured using the AddJobSagaStateMachines method.
x.AddJobSagaStateMachines();While this is the bare minimum required to configure the job saga state machines, it’s recommended to configure the JobSagaOptions as well.
x.AddJobSagaStateMachines(options =>{ options.FinalizeCompleted = true; options.ConcurrentMessageLimit = 32; options.HeartbeatTimeout = TimeSpan.FromMinutes(5); options.SlotWaitTime = TimeSpan.FromSeconds(30); options.SuspectJobRetryCount = 2; options.SuspectJobRetryDelay = TimeSpan.FromMinutes(1);});| Option | Description |
|---|---|
| FinalizeCompleted | If true (the default), completed job sagas are automatically removed from the saga repository |
| ConcurrentMessageLimit | The current message limit for the job sagas (does not affect job consumer concurrency limits) |
| HeartbeatTimeout | Period of time after which a job consumer instance is removed from the active list if no heartbeat has been received. This should typically be 2-3x the heartbeat interval set in the JobConsumerOptions |
| SlotWaitTime | When a job slot is not available to run a job, how long to wait before retrying the request |
| SuspectJobRetryCount | How many times to retry a job that becomes suspect (not faulted, but the job consumer instance stops responding) |
| SuspectJobRetryDelay | How long a job should wait before retrying when the job became suspect and SuspectJobRetryCount is > 0 |
Configure the job saga endpoints
Section titled “Configure the job saga endpoints”The job saga state machines are configured on their own endpoints, using the configured endpoint name formatter. These endpoints are required on at
least one bus instance. It is not necessary to configure them on every bus instance. In the example above, the job saga state machines
endpoints are configured. A standard ConfigureEndpoints call will suffice to host the job consumers without the job saga state machines.
The job saga endpoints can be configured using the AddJobSagaStateMachines method by calling the Endpoints method (or the specific endpoint method) on
the configurator returned by AddJobSagaStateMachines.
For instance, the SetPartitionedReceiveMode method (used by the SQL transport) calls the Endpoints method to configure the endpoint.
x.AddJobSagaStateMachines() .Endpoints(e => { e.AddConfigureEndpointCallback(cfg => { if (cfg is ISqlReceiveEndpointConfigurator sql) sql.SetReceiveMode(SqlReceiveMode.Partitioned); }); });Other transports may have different endpoint configuration options, but the general approach is the same.
Configure the job saga repository
Section titled “Configure the job saga repository”A persistent saga repository should be used with job consumers and should be configured when adding the job saga state machines. In the example below, Entity Framework Core is configured, along with the Postgres lock statement provider (required when using PostgreSQL).
x.AddJobSagaStateMachines() .EntityFrameworkRepository(r => { r.ExistingDbContext<JobServiceSagaDbContext>(); r.UsePostgres(); });Refer to the specific saga repository documentation for more information on configuring the saga repository for use with job consumers.
For a more detailed example of configuring the job saga state machine endpoints, including persistent storage, see the sample mentioned in the box above.
Job Saga States
Section titled “Job Saga States”A job can be in one of the following states:
- Initial
- Final
- Submitted
- Waiting to Start
- Waiting for Slot
- Started
- Completed
- Faulted
- Canceled
- Starting Job Attempt
- Allocating Job Slot
- Waiting to Retry
- Cancellation Pending
Job Attempt Saga States
Section titled “Job Attempt Saga States”A job attempt can be in one of the following states:
- Initial
- Final
- Starting
- Running
- Faulted
- Checking Status
- Suspect
Job Type Saga States
Section titled “Job Type Saga States”A job type can be in one of the following states:
- Initial
- Final
- Active
- Idle