Skip to content

Saga State Machine Behavior

Behavior is what happens when an event occurs during a state.

Below, the Initially block is used to define the behavior of the SubmitOrder event during the Initial state. When a SubmitOrder message is consumed and an instance with a CorrelationId matching the OrderId is not found, a new instance will be created in the Initial state. The TransitionTo activity transitions the instance to the Submitted state, after which the instance is persisted using the saga repository.

using MassTransit;
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted));
}
}

Subsequently, the OrderAccepted event could be handled by the behavior shown below.

using System;
using MassTransit;
public interface OrderAccepted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
}
public Event<OrderAccepted> OrderAccepted { get; private set; }
}

It may be necessary to ignore an event, either to avoid fault generation or to prevent messages from being moved to the _skipped queue. To ignore an event in a given state, use the Ignore method.

public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
}
}

By default, instances are not removed from the saga repository. To configure completed instance removal, specify the method used to determine if an instance has completed.

public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
DuringAny(
When(OrderCompleted)
.Finalize());
SetCompletedWhenFinalized();
}
public Event<OrderCompleted> OrderCompleted { get; private set; }
}

When the instance consumes the OrderCompleted event, the instance is finalized (which transitions the instance to the Final state). The SetCompletedWhenFinalized method defines an instance in the Final state as completed – which is then used by the saga repository to remove the instance.

To use a different completed expression, such as one that checks if the instance is in a Completed state, use the SetCompleted method as shown below.

public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
DuringAny(
When(OrderCompleted)
.TransitionTo(Completed));
SetCompleted(async instance =>
{
State<TInstance> currentState = await this.GetState(instance);
return Completed.Equals(currentState);
});
}
public State Completed { get; private set; }
public Event<OrderCompleted> OrderCompleted { get; private set; }
}

Message brokers typically do not guarantee message order. Therefore, it is important to consider out-of-order messages in state machine design.

In the example above, receiving a SubmitOrder message after an OrderAccepted event could cause the SubmitOrder message to end up in the _error queue. If the OrderAccepted event is received first, it would be discarded since it isn’t accepted in the Initial state. Below is an updated state machine that handles both of these scenarios.

using MassTransit;
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
}
}

In the updated example, receiving a SubmitOrder message while in an Accepted state ignores the event. However, data in the event may be useful. In that case, adding behavior to copy the data to the instance could be added. Below, data from the event is captured in both scenarios.

using System;
using MassTransit;
public interface SubmitOrder
{
Guid OrderId { get; }
DateTime OrderDate { get; }
}
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public DateTime? OrderDate { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Then(x => x.Saga.OrderDate = x.Message.OrderDate)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
When(SubmitOrder)
.Then(x => x.Saga.OrderDate = x.Message.OrderDate));
}
}

State machine behaviors are defined as a sequence of activities which are executed in response to an event. In addition to the activities included with Automatonymous, MassTransit includes activities to send, publish, and schedule messages, as well as initiate and respond to requests.

To publish an event, add a Publish activity.

public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderSubmittedEvent :
OrderSubmitted
{
public OrderSubmittedEvent(Guid orderId)
{
OrderId = orderId;
}
public Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Publish(context => (OrderSubmitted)new OrderSubmittedEvent(context.Saga.CorrelationId))
.TransitionTo(Submitted));
}
}

Alternatively, a message initializer can be used to eliminate the Event class.

public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.PublishAsync(context => context.Init<OrderSubmitted>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Submitted));
}
}

To send a message, add a Send activity.

public interface UpdateAccountHistory
{
Guid OrderId { get; }
}
public class UpdateAccountHistoryCommand :
UpdateAccountHistory
{
public UpdateAccountHistoryCommand(Guid orderId)
{
OrderId = orderId;
}
public Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Initially(
When(SubmitOrder)
.Send(settings.AccountServiceAddress, context => new UpdateAccountHistoryCommand(context.Saga.CorrelationId))
.TransitionTo(Submitted));
}
}

Alternatively, a message initializer can be used to eliminate the Command class.

public interface UpdateAccountHistory
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Initially(
When(SubmitOrder)
.SendAsync(settings.AccountServiceAddress, context => context.Init<UpdateAccountHistory>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Submitted));
}
}

A state machine can respond to requests by configuring the request message type as an event, and using the Respond method. When configuring a request event, configuring a missing instance method is recommended, to provide a better response experience (either through a different response type, or a response that indicates an instance was not found).

public interface RequestOrderCancellation
{
Guid OrderId { get; }
}
public interface OrderCanceled
{
Guid OrderId { get; }
}
public interface OrderNotFound
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCancellationRequested, e =>
{
e.CorrelateById(context => context.Message.OrderId);
e.OnMissingInstance(m =>
{
return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
});
});
DuringAny(
When(OrderCancellationRequested)
.RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Canceled));
}
public State Canceled { get; private set; }
public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}

There are scenarios where it is required to wait for the response from the state machine. In these scenarios the information that is required to respond to the original request should be stored.

public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>;
public record ProcessOrder(Guid OrderId, Guid ProcessingId);
public record OrderProcessed(Guid OrderId, Guid ProcessingId);
public record OrderCancelled(Guid OrderId, string Reason);
public class ProcessOrderConsumer : IConsumer<ProcessOrder>
{
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId));
}
}
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ProcessingId { get; set; }
public Guid? RequestId { get; set; }
public Uri ResponseAddress { get; set; }
public Guid OrderId { get; set; }
}
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Created { get; set; }
public State Cancelled { get; set; }
public Event<CreateOrder> OrderSubmitted { get; set; }
public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; }
public OrderStateMachine()
{
InstanceState(m => m.CurrentState);
Event(() => OrderSubmitted);
Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; });
Initially(
When(OrderSubmitted)
.Then(context =>
{
context.Saga.CorrelationId = context.Message.CorrelationId;
context.Saga.ProcessingId = Guid.NewGuid();
context.Saga.OrderId = Guid.NewGuid();
context.Saga.RequestId = context.RequestId;
context.Saga.ResponseAddress = context.ResponseAddress;
})
.Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value))
.TransitionTo(ProcessOrder.Pending)
);
During(ProcessOrder.Pending,
When(ProcessOrder.Completed)
.TransitionTo(Created)
.Send(x => x.Saga.ResponseAddress, x => x.Saga, (context, x) => x.RequestId = context.Saga.RequestId),
When(ProcessOrder.Faulted)
.TransitionTo(Cancelled)
.Send(x => x.Saga.ResponseAddress, x => new OrderCancelled(x.Saga.OrderId, "Faulted"), (context, x) => x.RequestId = context.Saga.RequestId),
When(ProcessOrder.TimeoutExpired)
.TransitionTo(Cancelled)
.Send(x => x.Saga.ResponseAddress, x => new OrderCancelled(x.Saga.OrderId, "Timeout"), (context, x) => x.RequestId = context.Saga.RequestId),
);
}
}