State Machine Events
An event is something that happened which may result in a state change. An event can add or update instance data, as well as changing an instance’s current
state. The Event<T> is generic, where T must be a valid message type.
Declare an event
Section titled “Declare an event”In the example below, the SubmitOrder message is declared as an event including how to correlate the event to an instance.
Unless events implement
CorrelatedBy<Guid>, they must be declared with a correlation expression.
public interface SubmitOrder{ Guid OrderId { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId)); }
public Event<SubmitOrder> SubmitOrder { get; private set; }}As shown above, an event is a message that can be consumed by the state machine. Events can specify any valid message type, and each event may be configured. There are several event configuration methods available.
The built-in CorrelatedBy<Guid> interface can be used in a message contract to specify the event CorrelationId.
public interface OrderCanceled : CorrelatedBy<Guid>{}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => OrderCanceled); // not required, as it is the default convention }}While the event is declared explicitly above, it is not required. The default convention will automatically configure events that have a CorrelatedBy<Guid>
interface.
Correlate by a Guid property
Section titled “Correlate by a Guid property”While convenient, some consider the interface an intrusion of infrastructure to the message contract. MassTransit also supports a declarative approach to
specifying the CorrelationId for events. By configuring the global message topology, it is possible to specify a message property to use for correlation.
public interface SubmitOrder{ Guid OrderId { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ // this is shown here, but can be anywhere in the application as long as it executes // before the state machine instance is created. Startup, etc. is a good place for it. // It only needs to be called once per process. static OrderStateMachine() { GlobalTopology.Send.UseCorrelationId<SubmitOrder>(x => x.OrderId); }
public OrderStateMachine() { Event(() => SubmitOrder); }
public Event<SubmitOrder> SubmitOrder { get; private set; }}An alternative is to declare the event correlation, as shown below. This should be used when neither of the approaches above are used.
public interface SubmitOrder{ Guid OrderId { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId)); }
public Event<SubmitOrder> SubmitOrder { get; private set; }}Since OrderId is a Guid, it can be used for event correlation. When SubmitOrder is accepted in the Initial state, and because the OrderId is a Guid,
the CorrelationId on the new instance is automatically assigned the OrderId value.
Correlate by any property
Section titled “Correlate by any property”Events can also be correlated using a query expression, which is required when events are not correlated to the instance’s CorrelationId property. Queries are more expensive, and may match multiple instances, which should be considered when designing state machines and events.
Whenever possible, try to correlation using the CorrelationId. If a query is required, it may be necessary to create an index on the property so that database queries are optimized.
To correlate events using another type, additional configuration is required.
public interface ExternalOrderSubmitted{ string OrderNumber { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => ExternalOrderSubmitted, e => e .CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber) .SelectId(x => NewId.NextGuid())); }
public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }}Queries can also be written with two arguments, which are passed directly to the repository (and must be supported by the backing database).
public interface ExternalOrderSubmitted{ string OrderNumber { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => ExternalOrderSubmitted, e => e .CorrelateBy((instance,context) => instance.OrderNumber == context.Message.OrderNumber) .SelectId(x => NewId.NextGuid())); }
public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }}When the event doesn’t have a Guid that uniquely correlates to an instance, the .SelectId expression must be configured. In the above
example, NewId is used to generate a sequential identifier which will be assigned to the instance CorrelationId. Any
property on the event can be used to initialize the CorrelationId.
The message headers are also available, for example, instead of always generating a new identifier, the CorrelationId header could be used if present.
.SelectId(x => x.CorrelationId ?? NewId.NextGuid());Missing Instance
Section titled “Missing Instance”If an event is not matching to an instance, the missing instance behavior can be configured.
public interface RequestOrderCancellation{ Guid OrderId { get; }}
public interface OrderNotFound{ Guid OrderId { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => OrderCancellationRequested, e => { e.CorrelateById(context => context.Message.OrderId);
e.OnMissingInstance(m => { return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId })); }); }); }
public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }}In this example, when a cancel order request is consumed without a matching instance, a response will be sent that the order was not found. Instead of
generating a Fault, the response is more explicit. Other missing instance options include Discard, Fault, and Execute (a synchronous version of
ExecuteAsync).
Initial Insert
Section titled “Initial Insert”To increase new instance performance, configuring an event to directly insert into a saga repository may reduce lock contention. To configure an event to insert, it should be in the Initially block, as well as have a saga factory specified.
public interface SubmitOrder{ Guid OrderId { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => SubmitOrder, e => { e.CorrelateById(context => context.Message.OrderId));
e.InsertOnInitial = true; e.SetSagaFactory(context => new OrderState { CorrelationId = context.Message.OrderId }) });
Initially( When(SubmitOrder) .TransitionTo(Submitted)); }
public Event<SubmitOrder> SubmitOrder { get; private set; }}When using InsertOnInitial, it is critical that the saga repository is able to detect duplicate keys (in this case, CorrelationId - which is initialized using OrderId). In this case, having a clustered primary key on CorrelationId would prevent duplicate instances from being inserted. If an event is correlated using a different property, make sure that the database enforces a unique constraint on the instance property and the saga factory initializes the instance property with the event property value.
public interface ExternalOrderSubmitted{ string OrderNumber { get; }}
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { Event(() => ExternalOrderSubmitted, e => { e.CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber) e.SelectId(x => NewId.NextGuid());
e.InsertOnInitial = true; e.SetSagaFactory(context => new OrderState { CorrelationId = context.CorrelationId ?? NewId.NextGuid(), OrderNumber = context.Message.OrderNumber, }) });
Initially( When(SubmitOrder) .TransitionTo(Submitted)); }
public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }}The database would use a unique constraint on the OrderNumber to prevent duplicates, which the saga repository would detect as an existing instance, which would then be loaded to consume the event.