Partitioner Configuration
To limit concurrent message consumption by partition key on a single bus instance, the partitioner filter can be used. For each message type, a partition key provider must be specified.
The partitioner ensures that messages with the same partition key are processed sequentially on a single bus instance. This is useful when you need to ensure message ordering for specific keys.
Configure a partitioner
Section titled “Configure a partitioner”Single message type
Section titled “Single message type”To add a partitioner to a receive endpoint:
cfg.ReceiveEndpoint("input-queue", e =>{ e.UsePartitioner<SubmitOrder>(16, p => p.Message.CustomerId);
e.ConfigureConsumer<SubmitOrderConsumer>(context);});Multiple message types
Section titled “Multiple message types”A single partitioner can be used for multiple message types. This is common in saga state machines where you want to prevent messages correlated to the same saga instance from being processed concurrently.
var partition = new Partitioner(16, new Murmur3UnsafeHashGenerator());
cfg.ReceiveEndpoint("job_saga", e =>{ e.UsePartitioner<JobSubmitted>(partition, p => p.Message.JobId); e.UsePartitioner<JobSlotAllocated>(partition, p => p.Message.JobId); e.UsePartitioner<JobSlotUnavailable>(partition, p => p.Message.JobId); e.UsePartitioner<Fault<AllocateJobSlot>>(partition, p => p.Message.Message.JobId); e.UsePartitioner<JobAttemptCreated>(partition, p => p.Message.JobId); e.UsePartitioner<Fault<StartJobAttempt>>(partition, p => p.Message.Message.JobId);
e.ConfigureSagaStateMachine<JobSaga>(context);});In this example, all job-related messages use the same partitioner with JobId as the partition key. This ensures that only one message for a specific job are
processed at a time, reducing race conditions.
Partitioner options
Section titled “Partitioner options”| Option | Description |
|---|---|
Partitioner | The partitioner instance that controls how messages are distributed across partitions |
PartitionKeyProvider | A function that extracts the partition key from the message |
Creating a partitioner
Section titled “Creating a partitioner”The Partitioner class requires two parameters:
- Partition Count – The number of partitions to use
- Hash Generator - A hash generator to distribute keys across partitions
var partition = new Partitioner(16, new Murmur3UnsafeHashGenerator());The Murmur3UnsafeHashGenerator is the recommended hash generator for distributing partition keys.
Use Cases
Section titled “Use Cases”The partitioner is commonly used for:
- Ensuring messages with the same correlation ID are processed in order
- Distributing load across multiple consumers while maintaining order for specific keys
- Saga state machines where message ordering is critical for a specific saga instance
- Job consumers where each job should be processed sequentially