Skip to content

State Machine Activities

In a saga state machine, behaviors are defined using the When method followed by activities such as Then, ThenAsync, Publish, etc. In cases where more complex logic is needed, a custom state machine activity can be created. A custom activity can have its own dependencies that are resolved from the current message scope, such as an active database connection (or DbContext) or other components. Since the saga state machine itself is a singleton, custom activities enable scoped behavior when an event is consumed.

To create a state machine activity, add a new class to your project that implements IStateMachineActivity<TInstance, TMessage> as shown. This class has full access to the container (or IServiceProvider / IServiceScope), so constructor dependencies will be injected from the currently active scope.

The class below calls the OnOrderClosed method when the OrderClosed event is consumed, using the injected ISomeService.

public class OrderClosedActivity :
IStateMachineActivity<OrderState, OrderClosed>
{
readonly ISomeService _service;
public OrderClosedActivity(ISomeService service)
{
_service = service;
}
public async Task Execute(
BehaviorContext<OrderState, OrderClosed> context,
IBehavior<OrderState, SubmitOrder> next)
{
await _service.OnOrderClosed(context.Saga.CorrelationId);
// always call the next activity in the behavior
await next.Execute(context);
}
public Task Faulted<TException>(
BehaviorExceptionContext<OrderState, OrderClosed, TException> context,
IBehavior<OrderState, OrderClosed> next)
where TException : Exception
{
// always call the next activity in the behavior
return next.Faulted(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("publish-order-closed");
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
}

The injected service publishes an event when the OnOrderClosed method is called. Since the state machine activity and its dependencies are scoped, the ISomeService instance is different for each message consumed, but within the same scope as the state machine activity. This ensures that the IPublishEndpoint injected is from the ConsumeContext of the message that is being consumed by the state machine instance.

public interface ISomeService
{
Task OnOrderClosed(Guid correlationId);
}
public class SomeService :
ISomeService
{
IPublishEndpoint _publishEndpoint;
public SomeService(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public Task OnOrderClosed(Guid correlationId)
{
return _publishEndpoint.Publish<OrderUpdated>(new {CorrelationId = correlationId});
}
}

In the above example, the event type was known in advance. If a state machine activity capable of handling any event type is needed, it can be created without specifying the event type.

public class OrderClosedActivity :
IStateMachineActivity<OrderState>
{
readonly ISomeService _service;
public OrderClosedActivity(ISomeService service)
{
_service = service;
}
public async Task Execute(BehaviorContext<OrderState> context, IBehavior<OrderState> next)
{
await _service.OnOrderClosed(context.Saga.CorrelationId);
await next.Execute(context);
}
public async Task Execute<T>(BehaviorContext<OrderState, T> context, IBehavior<OrderState, T> next)
{
await _service.OnOrderClosed(context.Saga.CorrelationId);
await next.Execute(context);
}
public Task Faulted<TException>(BehaviorExceptionContext<OrderState, TException> context, IBehavior<OrderState> next)
where TException : Exception
{
return next.Faulted(context);
}
public Task Faulted<T, TException>(BehaviorExceptionContext<OrderState, T, TException> context, IBehavior<OrderState, T> next)
where TException : Exception
{
return next.Faulted(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("publish-order-closed");
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
}

In this scenario, the OrderClosedActivity can be used to handle any event type, resulting in either Execute method being called.

The state machine activity can be added to a behavior using the Activity method. For a typed state machine activity, the OfType method is used as shown below.

public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; } = null!;
public Event<OrderClosed> OrderClosed { get; private set; } = null!;
public OrderStateMachine()
{
// Tell the saga where to store the current state
InstanceState(x => x.CurrentState);
Initially(
When(OrderClosed)
.Activity(x => x.OfType<OrderClosedActivity>())
.TransitionTo(Submitted)
);
}
}

If the state machine activity is generic (the second example above), the Activity method is used with the OfInstanceType method.

public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; } = null!;
public Event<OrderClosed> OrderClosed { get; private set; } = null!;
public OrderStateMachine()
{
// Tell the saga where to store the current state
InstanceState(x => x.CurrentState);
Initially(
When(OrderClosed)
.Activity(x => x.OfInstanceType<OrderClosedActivity>())
.TransitionTo(Submitted)
);
}
}