Saga State Machines
The ability to orchestrate a series of events is a powerful feature, and MassTransit makes this possible.
A saga is a long-lived transaction managed by a coordinator. Sagas are initiated by an event, sagas orchestrate events, and sagas maintain the state of the overall transaction. Sagas are designed to manage the complexity of a distributed transaction without locking and immediate consistency. They manage state and track any compensations required if a partial failure occurs.
We didn’t create it, we learned it from the original Princeton paper and from Arnon Rotem-Gal-Oz’s description.
Saga State Machines
Section titled “Saga State Machines”MassTransit supports saga state machines, which provide a powerful syntax to create sagas. This is the preferred way to create sagas, as it is more expressive and easier to understand.
Consumer Sagas
Section titled “Consumer Sagas”MassTransit supports consumer sagas, which implement one or more interfaces to consume correlated saga events. This support is included so that it is easy to move applications from other saga implementations to MassTransit.
Saga Persistence
Section titled “Saga Persistence”Sagas are stateful event-based message consumers—they retain state. Therefore, saving state between events is important. Without persistent state, a saga would consider each event a new event, and orchestration of subsequent events would be meaningless.
To store the saga state, you need to use one form of saga persistence. There are several types of storage that MassTransit supports, all of those, which are included to the main distribution, are listed below. There is also a in-memory unreliable storage, which allows to temporarily store your saga state. It is useful to try things out since it does not require any infrastructure.
Order State
Section titled “Order State”An example state machine instance is shown below. This example will be used across every storage engine to show how each is configured.
public class OrderState : SagaStateMachineInstance{ public Guid CorrelationId { get; set; } public string CurrentState { get; set; }
public DateTime? OrderDate { get; set; }}Container Integration
Section titled “Container Integration”When using the AddMassTransit container extension, the repository should be specified at saga registration. The example below specifies the InMemory saga repository.
container.AddMassTransit(cfg =>{ cfg.AddSagaStateMachine<OrderStateMachine, OrderState>() .InMemoryRepository();});The saga repository is always registered with a singleton container lifecycle.
If the container registration is not being used, the InMemory saga repository can be created manually and specified on receive endpoint.
var orderStateMachine = new OrderStateMachine();var repository = new InMemorySagaRepository<OrderState>();
var busControl = Bus.Factory.CreateUsingInMemory(x =>{ x.ReceiveEndpoint("order-state", e => { e.StateMachineSaga(orderStateMachine, repository); });});There are two types of saga repository:
- Query repository
- Identity-only repository
Depending on the persistence mechanism, repository implementation can be either identity-only or identity plus query.
When using identity-only repository, such as Azure Service Bus message session or Redis, you can only use correlation by identity. This means that all events
that the saga receives, must hold the saga correlation id, and the correlation for each event can only use CorrelateById method to define the correlation.
Query repository by definition support identity correlation too, but in addition support other properties of events being received and saga state properties.
Such correlations are defined using CorrelateBy method and you can use any logical expression that involve the event data and saga state data to establish
such correlation. Repository implementations such as Entity Framework, NHibernate, and Marten support correlation by query. Of course, in-memory repository
supports it as well.
Saga persistence considerations
Section titled “Saga persistence considerations”What follows is a set of guidelines related to sagas that was collected from Discord, Stack Overflow, and other sources to provide an easy way to link answers to commonly asked questions.
Identity
Section titled “Identity”Saga instances are identified by a unique identifier (Guid), represented by the CorrelationId on the saga instance. Events are correlated to the saga
instance using either the unique identifier, or alternatively using an expression that correlates properties on the saga instance to each event. If the
CorrelationId is used, it’s always a one-to-one match, either the saga already exists or it’s a new saga instance. With a correlation expression, the
expression might match to more than one saga instance, so care should be used — because the event would be delivered to all matching instances.
Seriously, don’t sent an event to all instances — unless you want to watch your message consumers lock your entire saga storage engine.
It is strongly advised to have CorrelationId as your table/document key. This will enable better concurrency handling and will make the saga state consistent.
Publishing and sending from sagas
Section titled “Publishing and sending from sagas”Sagas are completely message-driven and therefore not only consume but also publish events and send commands. However, if your saga received a lot of messages coming roughly at the same time and the endpoint is set to process multiple messages in parallel — this can lead to a conflict between message processing and saga persistence.
This means that there could be more than one saga state updates that are being persisted at the same time. Depending on the saga repository type, this might fail for different reasons — versioning issue, row or table lock or eTag mismatch. All those problems are basically saying that you are having a concurrency issue.
It is normal for the saga repository to throw an exception in such case but if your saga is publishing messages, they were already published but the saga state has not been updated. MassTransit will eventually use retry policy on the endpoint and more messages will be send, potentially leading to mess. Or, if there are no retry policies configured, messages might be sent indicating that the process needs to continue but saga instance will be in the old state and will not accept any further messages because they will come in a wrong state.
This issue is common and can be solved by postponing the message publish and send operations until all persistence work is done. All messages that should be published, are collected in a buffer, which is called Outbox. MassTransit implements this feature and it can be configured by adding these lines to your endpoint configuration:
c.ReceiveEndpoint("queue", e =>{ e.UseInMemoryOutbox(); // other endpoint configuration here}Relational database recommendations
Section titled “Relational database recommendations”While it’s nice if you are developing a green-field system and you can define your Saga Db Entity with CorrelationId as the Primary Key (Clustered), sometimes we have to work within existing db entities. If this is the case, please remember that to keep your sagas performing quickly (optimistic OR pessimistic, it doesn’t matter) you should follow the note below.
Optimistic vs. pessimistic concurrency
Section titled “Optimistic vs. pessimistic concurrency”Most persistence mechanisms for sagas supported by MassTransit need some way to guarantee ACID when processing sagas. Because there can be multiple threads consuming multiple bus events meant for the same saga instance, they could end up overwriting each other (race condition).
Relational databases can easily handle this by setting the transaction type to serializable or (page/row) locking. This would be considered as pessimistic concurrency.
Another way to handle concurrency is to have some attribute like version or timestamp, which updates every time a saga is persisted. By doing that we can instruct the database only to update the record if this attribute matches between what we are trying to persist and what is stored in the database record we are trying to update.
This is type of concurrency is called an optimistic concurrency. It doesn’t guarantee your unit of work with the database will succeed (must retry after these exceptions), but it also doesn’t block anybody else from working within the same database page (not locking the table/page).
So, which one should I use?
Section titled “So, which one should I use?”For almost every scenario, it is recommended using the optimistic concurrency, because most state machine logic should be fairly quick.
If the chosen persistence method supports optimistic concurrency, race conditions can be handled rather easily by specifying a retry policy for concurrency exceptions or using generic retry policy.
Concurrency optimizations
Section titled “Concurrency optimizations”Saga concurrency issues happen, particularly when using optimistic concurrency. The most common reasons include:
- Simultaneous events correlating to the same instance, typically from multiple sources running in parallel
- Commands from the saga to consumers, where the consumer is quick and responds before the saga has finished processing the initiating event
There are certainly others, but anytime multiple events correlate to the same instance, concurrency issues are a concern. For that reason, the following baseline receive endpoint configuration is recommended as a starting point (tuning will depend upon the saga, repository, environment, etc.).
To configure the receive endpoint directly:
services.AddMassTransit(x =>{ x.AddSagaStateMachine<OrderStateMachine, OrderState>() .MongoDbRepository(r => { r.Connection = "mongodb://127.0.0.1"; r.DatabaseName = "orderdb"; });
x.UsingRabbitMq((context,cfg) => { cfg.ReceiveEndpoint("saga-queue", e => { const int ConcurrencyLimit = 20; // this can go up, depending upon the database capacity
e.PrefetchCount = ConcurrencyLimit;
e.UseMessageRetry(r => r.Interval(5, 1000)); e.UseInMemoryOutbox();
e.ConfigureSaga<OrderState>(context, s => { var partition = s.CreatePartitioner(ConcurrencyLimit);
s.Message<SubmitOrder>(x => x.UsePartitioner(partition, m => m.Message.OrderId)); s.Message<OrderAccepted>(x => x.UsePartitioner(partition, m => m.Message.OrderId)); s.Message<OrderCanceled>(x => x.UsePartitioner(partition, m => m.Message.OrderId)); }); }); }});Alternatively, if you are using a saga definition:
public sealed class OrderStateSagaDefinition : SagaDefinition<OrderState>{ private const int ConcurrencyLimit = 20; // this can go up, depending upon the database capacity
public OrderStateSagaDefinition() { // specify the message limit at the endpoint level, which influences // the endpoint prefetch count, if supported. Endpoint(e => { e.Name = "saga-queue"; e.PrefetchCount = ConcurrencyLimit; }); }
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<OrderState> sagaConfigurator) { endpointConfigurator.UseMessageRetry(r => r.Interval(5, 1000)); endpointConfigurator.UseInMemoryOutbox();
var partition = endpointConfigurator.CreatePartitioner(ConcurrencyLimit);
sagaConfigurator.Message<SubmitOrder>(x => x.UsePartitioner(partition, m => m.Message.OrderId)); sagaConfigurator.Message<OrderAccepted>(x => x.UsePartitioner(partition, m => m.Message.OrderId)); sagaConfigurator.Message<OrderCanceled>(x => x.UsePartitioner(partition, m => m.Message.OrderId)); }}This example uses message retry (Concurrency issues throw exceptions), the InMemoryOutbox (to avoid duplicate messages in the event of a concurrency failure). It uses a partitioner to limit the receive endpoint to only one concurrent message for each OrderId (the partitioner uses hashing to meet the partition count).
The partitioner in this case is only for this specific receive endpoint, multiple service instances (competing consumer) may still consume events for the same saga instance.