Skip to content

Saga State Machine Behavior

Behavior is what happens when an event occurs during a state. Saga state machine behaviors are built by chaining activities off an event. These activities are executed in order when the event is consumed. If any activity throws an exception, the behavior stops and the message fault handling applies (for example, moving the message to the error queue).

Below, the Initially block is used to define the behavior of the SubmitOrder event during the Initial state. When a SubmitOrder message is consumed and an instance with a CorrelationId matching the OrderId is not found, a new instance will be created in the Initial state. The TransitionTo activity transitions the instance to the Submitted state, after which the instance is persisted using the saga repository.

using MassTransit;
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted));
}
}

Subsequently, the OrderAccepted event could be handled by the behavior shown below.

using System;
using MassTransit;
public interface OrderAccepted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
}
public Event<OrderAccepted> OrderAccepted { get; private set; }
}

It may be necessary to ignore an event, either to avoid fault generation or to prevent messages from being moved to the _skipped queue. To ignore an event in a given state, use the Ignore method.

public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
}
}

By default, instances are not removed from the saga repository. To configure completed instance removal, specify the method used to determine if an instance has completed.

public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
DuringAny(
When(OrderCompleted)
.Finalize());
SetCompletedWhenFinalized();
}
public Event<OrderCompleted> OrderCompleted { get; private set; }
}

When the instance consumes the OrderCompleted event, the instance is finalized (which transitions the instance to the Final state). The SetCompletedWhenFinalized method defines an instance in the Final state as completed – which is then used by the saga repository to remove the instance.

To use a different completed expression, such as one that checks if the instance is in a Completed state, use the SetCompleted method as shown below.

public interface OrderCompleted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));
DuringAny(
When(OrderCompleted)
.TransitionTo(Completed));
SetCompleted(async instance =>
{
State<TInstance> currentState = await this.GetState(instance);
return Completed.Equals(currentState);
});
}
public State Completed { get; private set; }
public Event<OrderCompleted> OrderCompleted { get; private set; }
}

Message brokers typically do not guarantee message order. Therefore, it is important to consider out-of-order messages in state machine design.

In the example above, receiving a SubmitOrder message after an OrderAccepted event could cause the SubmitOrder message to end up in the _error queue. If the OrderAccepted event is received first, it would be discarded since it isn’t accepted in the Initial state. Below is an updated state machine that handles both of these scenarios.

using MassTransit;
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
Ignore(SubmitOrder));
}
}

In the updated example, receiving a SubmitOrder message while in an Accepted state ignores the event. However, data in the event may be useful. In that case, adding behavior to copy the data to the instance could be added. Below, data from the event is captured in both scenarios.

using System;
using MassTransit;
public interface SubmitOrder
{
Guid OrderId { get; }
DateTime OrderDate { get; }
}
public class OrderState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public DateTime? OrderDate { get; set; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Then(x => x.Saga.OrderDate = x.Message.OrderDate)
.TransitionTo(Submitted),
When(OrderAccepted)
.TransitionTo(Accepted));
During(Submitted,
When(OrderAccepted)
.TransitionTo(Accepted));
During(Accepted,
When(SubmitOrder)
.Then(x => x.Saga.OrderDate = x.Message.OrderDate));
}
}

State machine behaviors are defined as a sequence of activities which are executed in response to an event.

Then is used to execute synchronous code as part of the behavior. It is typically used to update saga instance state, copy data from the message, or perform lightweight logic.

Initially(
When(SubmitOrder)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.OrderDate = context.Message.OrderDate;
context.Saga.CustomerId = context.Message.CustomerId;
})
.TransitionTo(Submitted));

Guidance:

  • Keep Then fast and non-blocking (in-memory work only).
  • Use it for instance updates, simple calculations, and methods that does not require await.

ThenAsync is the asynchronous version of Then. It allows you to await asynchronous operations, such as database calls, external services, or asynchronous method calls.

Initially(
When(SubmitOrder)
.ThenAsync(async context =>
{
context.Saga.OrderId = context.Message.OrderId;
// Example: async lookup
context.Saga.CustomerSegment = await _customerService.GetSegment(context.Message.CustomerId);
})
.TransitionTo(Submitted));

Guidance:

  • Use ThenAsync when you must call async APIs.
  • Avoid long-running operations; the saga instance is locked while the behavior runs.
  • Do not block on async calls (avoid .Result / .Wait()).

If and IfAsync add conditional behavior based on the current instance and/or message content.

During(Submitted,
When(PaymentCompleted)
.If(context => context.Message.Amount >= context.Saga.OrderTotal,
then => then
.Then(x => x.Saga.Paid = true)
.TransitionTo(Paid)));
  • If(predicate, then => ...)
  • IfAsync(asyncPredicate, then => ...)

With IfAsync, the predicate is asynchronous:

During(Submitted,
When(PaymentCompleted)
.IfAsync(async context => await _fraud.CheckAsync(context.Message),
then => then.TransitionTo(UnderReview)));

Guidance:

  • Use If for in-memory checks (flags, totals, dates).
  • Use IfAsync when the decision requires external data or async I/O.
  • Both branches receive their own EventActivityBinder, so you can chain any activities (Then, Publish, Send, TransitionTo, etc.).

IfElse and IfElseAsync are similar to If, but allows you to specify a fallback behavior when the predicate is false.

During(Submitted,
When(PaymentCompleted)
.IfElse(context => context.Message.IsRetry,
then => then.Then(x => x.Saga.RetryCount++),
@else => @else.Then(x => x.Saga.RetryCount = 0))
.TransitionTo(Paid));

Async variant:

During(Submitted,
When(PaymentCompleted)
.IfElseAsync(async context => await _fraud.IsHighRiskAsync(context),
then => then.TransitionTo(UnderReview),
@else => @else.TransitionTo(Paid));

TransitionTo changes the saga instance’s current state.

Initially(
When(SubmitOrder)
.Then(x => x.Saga.SubmittedAt = InVar.Now)
.TransitionTo(Submitted));

Key details:

  • The state does not change until the previous activities in the behavior succeed.
  • Only one state transition is typically used in a single behavior.
  • TransitionTo can be used in any state (not just Initially), including DuringAny.
  • TransitionTo does not automatically persist the saga instance, nor does it act as a checkpoint in the state machine.

Finalize transitions the instance to the built-in Final state. Combined with SetCompletedWhenFinalized, this marks the instance as completed and allows the saga repository to remove it.

DuringAny(
When(OrderCompleted)
.Finalize());
SetCompletedWhenFinalized();

Use Finalize when:

  • The saga is truly done and no future events should be correlated.
  • You want the underlying repository to delete the instance automatically.

If you prefer an explicit Completed state instead of Final, use TransitionTo(Completed) and configure SetCompleted(...) (as shown earlier on this page) instead of Finalize.

To publish an event, add a Publish activity.

public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderSubmittedEvent :
OrderSubmitted
{
public OrderSubmittedEvent(Guid orderId)
{
OrderId = orderId;
}
public Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.Publish(context => (OrderSubmitted)new OrderSubmittedEvent(context.Saga.CorrelationId))
.TransitionTo(Submitted));
}
}

Alternatively, a message initializer can be used to eliminate the Event class.

public interface OrderSubmitted
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Initially(
When(SubmitOrder)
.PublishAsync(context => context.Init<OrderSubmitted>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Submitted));
}
}

To send a message, add a Send activity.

public interface UpdateAccountHistory
{
Guid OrderId { get; }
}
public class UpdateAccountHistoryCommand :
UpdateAccountHistory
{
public UpdateAccountHistoryCommand(Guid orderId)
{
OrderId = orderId;
}
public Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Initially(
When(SubmitOrder)
.Send(settings.AccountServiceAddress, context => new UpdateAccountHistoryCommand(context.Saga.CorrelationId))
.TransitionTo(Submitted));
}
}

Alternatively, a message initializer can be used to eliminate the Command class.

public interface UpdateAccountHistory
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine(OrderStateMachineSettings settings)
{
Initially(
When(SubmitOrder)
.SendAsync(settings.AccountServiceAddress, context => context.Init<UpdateAccountHistory>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Submitted));
}
}

A state machine can respond to requests by configuring the request message type as an event, and using the Respond method. When configuring a request event, configuring a missing instance method is recommended, to provide a better response experience (either through a different response type, or a response that indicates an instance was not found).

public interface RequestOrderCancellation
{
Guid OrderId { get; }
}
public interface OrderCanceled
{
Guid OrderId { get; }
}
public interface OrderNotFound
{
Guid OrderId { get; }
}
public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
Event(() => OrderCancellationRequested, e =>
{
e.CorrelateById(context => context.Message.OrderId);
e.OnMissingInstance(m =>
{
return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
});
});
DuringAny(
When(OrderCancellationRequested)
.RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
.TransitionTo(Canceled));
}
public State Canceled { get; private set; }
public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}

There are scenarios where it is required to wait for the response from the state machine. In these scenarios the information that is required to respond to the original request should be stored.

public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>;
public record ProcessOrder(Guid OrderId, Guid ProcessingId);
public record OrderProcessed(Guid OrderId, Guid ProcessingId);
public record OrderCancelled(Guid OrderId, string Reason);
public class ProcessOrderConsumer : IConsumer<ProcessOrder>
{
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId));
}
}
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid? ProcessingId { get; set; }
public Guid? RequestId { get; set; }
public Uri ResponseAddress { get; set; }
public Guid OrderId { get; set; }
}
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Created { get; set; }
public State Cancelled { get; set; }
public Event<CreateOrder> OrderSubmitted { get; set; }
public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; }
public OrderStateMachine()
{
InstanceState(m => m.CurrentState);
Event(() => OrderSubmitted);
Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; });
Initially(
When(OrderSubmitted)
.Then(context =>
{
context.Saga.CorrelationId = context.Message.CorrelationId;
context.Saga.ProcessingId = Guid.NewGuid();
context.Saga.OrderId = Guid.NewGuid();
context.Saga.RequestId = context.RequestId;
context.Saga.ResponseAddress = context.ResponseAddress;
})
.Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value))
.TransitionTo(ProcessOrder.Pending)
);
During(ProcessOrder.Pending,
When(ProcessOrder.Completed)
.TransitionTo(Created)
.Send(x => x.Saga.ResponseAddress, x => x.Saga, (context, x) => x.RequestId = context.Saga.RequestId),
When(ProcessOrder.Faulted)
.TransitionTo(Cancelled)
.Send(x => x.Saga.ResponseAddress, x => new OrderCancelled(x.Saga.OrderId, "Faulted"), (context, x) => x.RequestId = context.Saga.RequestId),
When(ProcessOrder.TimeoutExpired)
.TransitionTo(Cancelled)
.Send(x => x.Saga.ResponseAddress, x => new OrderCancelled(x.Saga.OrderId, "Timeout"), (context, x) => x.RequestId = context.Saga.RequestId),
);
}
}

Schedule is an activity used with a configured Schedule<,> on the state machine to send a scheduled message in the future.

After defining the schedule:

public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public Schedule<OrderState, OrderExpired> OrderExpiration { get; private set; }
public OrderStateMachine()
{
Schedule(() => OrderExpiration, x => x.ExpirationId, x =>
{
x.Delay = TimeSpan.FromMinutes(30);
x.Received = e => e.CorrelateById(context => context.Message.OrderId);
});
Initially(
When(SubmitOrder)
.Then(x =>
{
x.Saga.OrderId = x.Message.OrderId;
x.Saga.Created = InVar.Now;
})
.Schedule(OrderExpiration, context => new OrderExpired(context.Saga.OrderId))
.TransitionTo(Submitted));
}
}
  • Schedule(schedule, messageFactory) schedules the message using the configured delay.

Guidance:

  • Use a separate scheduled message type per distinct timeout purpose.
  • Store the schedule token (ExpirationId in the example) on the saga instance if you plan to cancel or replace it later.

Unschedule cancels a previously scheduled message for a given Schedule<,>.

During(Submitted,
When(OrderCanceled)
.Unschedule(OrderExpiration)
.TransitionTo(Canceled));

When the OrderCanceled event is handled, the existing scheduled OrderExpired message will be cancelled (if one exists), preventing the timeout from firing later.

Guidance:

  • Always pair Schedule with Unschedule when a timeout or reminder should be cleared because the saga reached a terminal or alternate state.
  • If the scheduled message has already been consumed, Unschedule is a no-op.

Produce is an activity used to send messages from a saga state machine to riders such as Kafka or Azure Event Hubs.

Instead of going through the bus Publish/Send pipeline, Produce uses a configured producer (for example, a Kafka topic producer) and writes the message directly to the configured topic/stream.

public class OrderStateMachine :
MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; } = null!;
public State Accepted { get; private set; } = null!;
public Event<OrderAccepted> OrderAccepted { get; private set; } = null!;
public OrderStateMachine()
{
Initially(
When(OrderAccepted)
.Then(x =>
{
x.Saga.OrderId = x.Message.OrderId;
x.Saga.AcceptedAt = InVar.Now;
})
.Produce(context => new OrderAcceptedEvent
{
context.Saga.OrderId,
context.Saga.CustomerId,
context.Saga.AcceptedAt
})
.TransitionTo(Accepted));
}
}

To use Produce, register a producer for the message type with the rider:

// Kafka example
rider.AddProducer<OrderAcceptedEvent>("order-accepted");

At runtime, Produce:

  1. Resolves the appropriate producer for that message type from the current scope (the Kafka/Event Hub rider).
  2. Produces the message to the configured topic/event hub, propagating correlation and diagnostic headers from the saga context.

When to use Produce:

  • You are using a rider (Kafka, Event Hub, etc.) and want saga events to appear directly on that transport.
  • You already have AddProducer<T> configured and want the state machine to reuse the same producer configuration (topic name, serialization, partitioning, etc.).

MassTransit provides activities to deal with exceptions in a saga state machine directly, without using the default fault pipeline. This allows you to define exception handling behavior for each event and react to exceptions in a specific way.

Catch lets you attach exception handling behavior to a state machine event. If an activity in the behavior chain throws a matching exception type, the Catch block runs instead of letting the exception bubble out to the default fault pipeline.

Initially(
When(SubmitOrder)
.Then(x =>
{
// normal behavior – might throw
x.Saga.OrderTotal = x.Message.Lines.Sum(l => l.Quantity * l.Price);
})
.Catch<Exception>(ex => ex
.Then(x =>
{
// x.Exception is the exception that was thrown
x.Saga.LastError = x.Exception.Message;
x.Saga.FaultedAt = InVar.Now;
})
.PublishAsync(x => x.Init<OrderFaulted>(new
{
x.Saga.CorrelationId,
x.Saga.LastError
}))
.TransitionTo(Faulted)));

Key points:

  • Catch<TException> is scoped to the event’s behavior pipeline.
  • Only exceptions of type TException (or derived types) are handled by that Catch.
  • You can use any of the usual activities (Then, ThenAsync, Publish, Send, TransitionTo, Finalize, etc.) to react to the failure.
  • You can define multiple Catch blocks for different exception types – more specific types should come first.

Typical use cases:

  • Mark the saga as faulted and transition to a Faulted state.
  • Publish a domain-specific “saga faulted” event.
  • Record diagnostic information on the saga instance before finalizing it.

Retry attaches a retry policy to the activities in the behavior. If an activity throws an exception that matches the retry filter, MassTransit will automatically retry the behavior according to the configured policy before treating the message as a failure.

Initially(
When(SubmitOrder)
.Retry(r =>
{
// Example: 3 attempts, 1 second apart
r.Interval(3, TimeSpan.FromSeconds(1));
// Optional: ignore some exception types – they will not be retried
r.Ignore<ArgumentException>();
}, rx => rx
.ThenAsync(async x =>
{
// Transient operation – for example, calling a remote service
x.Saga.ReservationId = await _inventory.ReserveAsync(x.Message.OrderId, x.CancellationToken);
})
.TransitionTo(Reserved));

How it works:

  • The retry policy applies to the activities configured within the Retry in the activity.

  • When a matching exception is thrown, the activities are re-executed according to the policy (immediate, interval, incremental, exponential, etc.).

  • When retries are exhausted, the exception is treated as a normal failure:

    • Any configured Catch handlers for that event can run.
    • If unhandled, the message follows the normal MassTransit fault / error flow.

Guidance:

  • Use Retry for transient failures – network calls, database timeouts, rate limiting, etc.

  • Keep the retried section small and focused (for example, the external call and related instance updates).

  • Avoid combining aggressive state-machine retries with aggressive endpoint-level or consumer-level retries for the same event – otherwise you can multiply the number of attempts.

  • Pair Retry with Catch when you want:

    • Retrying for transient issues, but
    • A clear, explicit behavior (state transition, event publish, logging) when all retries fail.

Together, Catch and Retry let you model robust, explicit failure behavior directly in your saga state machine: first retry when it makes sense, then * catch* and handle failures gracefully when it doesn’t.

Most real-world behaviors mix these activities:

Initially(
When(SubmitOrder)
.Then(x =>
{
x.Saga.OrderId = x.Message.OrderId;
x.Saga.Created = InVar.Now;
})
.If(x => x.Message.IsPriority,
then => then.Then(x => x.Saga.IsPriority = true))
.Schedule(OrderExpiration, x => new OrderExpired(x.Saga.OrderId))
.TransitionTo(Submitted));