Create a Consumer Saga
A consumer saga is a class, identified by a CorrelationId, that defines the state persisted by a saga repository. Along with the state, interfaces may be
added to the saga class to define the events handled by the saga. This combination of state and behavior in a single class is a consumer saga. In the example
below, an order saga initiated by a SubmitOrder message is defined.
Configure the saga messages
Section titled “Configure the saga messages”The consumer saga class must implement the ISaga interface and at least one of the following interfaces to define the events handled by the saga.
InitiatedBy
Section titled “InitiatedBy”public record SubmitOrder : CorrelatedBy<Guid>{ public Guid CorrelationId { get; init; } public DateTime OrderDate { get; init; }}
public class OrderSaga : ISaga, InitiatedBy<SubmitOrder>{ public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; } public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) { SubmitDate = context.Message.OrderDate; }}When a SubmitOrder message is received by the saga’s receive endpoint, the CorrelationId property is used to determine if an existing saga instance with
that CorrelationId exists. If an existing instance is not found, the repository creates a new saga instance and calls the Consume method on the new
instance. After the Consume method completes, the repository saves the newly created instance.
Orchestrates
Section titled “Orchestrates”To define an event orchestrated by an existing saga instance, such as OrderAccepted, an additional interface and method is specified.
public record OrderAccepted : CorrelatedBy<Guid>{ public Guid CorrelationId { get; init; } public DateTime Timestamp { get; init; }}
public class OrderSaga : ISaga, InitiatedBy<SubmitOrder>, Orchestrates<OrderAccepted>,{ public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; } public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context) { AcceptDate = context.Message.Timestamp; }}InitiatedByOrOrchestrates
Section titled “InitiatedByOrOrchestrates”To define an event that can initiate a new or orchestrate an existing saga instance, such as OrderInvoiced, an additional interface and method is specified.
public record OrderInvoiced : CorrelatedBy<Guid>{ public Guid CorrelationId { get; init; } public DateTime Timestamp { get; init; } public decimal Amount { get; init; }}
public class OrderPaymentSaga : ISaga, InitiatedByOrOrchestrates<OrderInvoiced>{ public Guid CorrelationId { get; set; }
public DateTime? InvoiceDate { get; set; } public decimal? Amount { get; set; }
public async Task Consume(ConsumeContext<OrderInvoiced> context) { InvoiceDate = context.Message.Timestamp; Amount = context.Message.Amount; }}Observes
Section titled “Observes”To define an event observed by an existing saga instance that does not implement the CorrelatedBy
public record OrderShipped{ public Guid OrderId { get; init; } public DateTime ShipDate { get; init; }}
public class OrderSaga : ISaga, InitiatedBy<SubmitOrder>, Orchestrates<OrderAccepted>, Observes<OrderShipped, OrderSaga>{ public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; } public DateTime? AcceptDate { get; set; } public DateTime? ShipDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...} public async Task Consume(ConsumeContext<OrderAccepted> context) {...}
public async Task Consume(ConsumeContext<OrderShipped> context) { ShipDate = context.Message.ShipDate; }
public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression => (saga,message) => saga.CorrelationId == message.OrderId;}Testing the consumer saga
Section titled “Testing the consumer saga”You can also test the consumer saga using the MassTransit test harness.
Next steps
Section titled “Next steps”Now that you’ve learned how to create a consumer saga, see the saga configuration documentation for more information on how to configure the saga and the saga repository.