Skip to content

Create a Consumer Saga

A consumer saga is a class, identified by a CorrelationId, that defines the state persisted by a saga repository. Along with the state, interfaces may be added to the saga class to define the events handled by the saga. This combination of state and behavior in a single class is a consumer saga. In the example below, an order saga initiated by a SubmitOrder message is defined.

The consumer saga class must implement the ISaga interface and at least one of the following interfaces to define the events handled by the saga.

public record SubmitOrder :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime OrderDate { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
SubmitDate = context.Message.OrderDate;
}
}

When a SubmitOrder message is received by the saga’s receive endpoint, the CorrelationId property is used to determine if an existing saga instance with that CorrelationId exists. If an existing instance is not found, the repository creates a new saga instance and calls the Consume method on the new instance. After the Consume method completes, the repository saves the newly created instance.

To define an event orchestrated by an existing saga instance, such as OrderAccepted, an additional interface and method is specified.

public record OrderAccepted :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>,
Orchestrates<OrderAccepted>,
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context)
{
AcceptDate = context.Message.Timestamp;
}
}

To define an event that can initiate a new or orchestrate an existing saga instance, such as OrderInvoiced, an additional interface and method is specified.

public record OrderInvoiced :
CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; }
public DateTime Timestamp { get; init; }
public decimal Amount { get; init; }
}
public class OrderPaymentSaga :
ISaga,
InitiatedByOrOrchestrates<OrderInvoiced>
{
public Guid CorrelationId { get; set; }
public DateTime? InvoiceDate { get; set; }
public decimal? Amount { get; set; }
public async Task Consume(ConsumeContext<OrderInvoiced> context)
{
InvoiceDate = context.Message.Timestamp;
Amount = context.Message.Amount;
}
}

To define an event observed by an existing saga instance that does not implement the CorrelatedBy interface, such as OrderShipped, an additional interface and method is specified.

public record OrderShipped
{
public Guid OrderId { get; init; }
public DateTime ShipDate { get; init; }
}
public class OrderSaga :
ISaga,
InitiatedBy<SubmitOrder>,
Orchestrates<OrderAccepted>,
Observes<OrderShipped, OrderSaga>
{
public Guid CorrelationId { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? AcceptDate { get; set; }
public DateTime? ShipDate { get; set; }
public async Task Consume(ConsumeContext<SubmitOrder> context) {...}
public async Task Consume(ConsumeContext<OrderAccepted> context) {...}
public async Task Consume(ConsumeContext<OrderShipped> context)
{
ShipDate = context.Message.ShipDate;
}
public Expression<Func<OrderSaga, OrderShipped, bool>> CorrelationExpression =>
(saga,message) => saga.CorrelationId == message.OrderId;
}

You can also test the consumer saga using the MassTransit test harness.

Now that you’ve learned how to create a consumer saga, see the saga configuration documentation for more information on how to configure the saga and the saga repository.