State Machine Requests
Request/response is easily the most commonly used integration pattern. A service sends a request to another service and continues after receiving the response. Most of the time, waiting for the response is a blocking operation – the requester waits for the response before it continues processing. In the early days of software development, blocking could limit overall system throughput. However, with modern async/await solutions and the .NET Task Parallel Library (TPL), the impact of waiting is mitigated.
In event-based application, the combination of a command followed by an event usually refines down to the same request/response pattern. In many cases, the event produced is only interesting to the command’s sender.
Saga state machines support request/response, both as a requester and a responder. Unlike the request client, however, support for requests is asynchronous at the message-level, eliminating the overhead of waiting for the response. After the request is produced, the saga state machine instance is persisted. When the response is received, the instance is loaded and the response is consumed by the state machine.
Configure a request
Section titled “Configure a request”Requests are declared as public properties on the saga state machine with the Request<TSaga, TRequest, TResponse> property type where TSaga is the saga
state machine instance type and both TRequest and TResponse are valid message types.
public record ValidateOrder(Guid OrderId);public record OrderValidated(Guid OrderId);
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public Request<OrderState, ValidateOrder, OrderValidated> ValidateOrder { get; private set; } = null!;
public OrderStateMachine() { Request(() => ValidateOrder, o => { o.Timeout = TimeSpan.FromMinutes(30); }); }}In the example above, ValidateOrder is a request to an order validation service that responds with OrderValidated. One of three possible outcomes will
happen after the request is produced.
| Event | Description |
|---|---|
ValidateOrder.Completed | The response was received |
ValidateOrder.TimeoutExpired | The request timed out |
ValidateOrder.Faulted | The order validation service faulted |
The request also includes a ValidateOrder.Pending state that can optionally be used while the request is pending.
Persist the RequestId
Section titled “Persist the RequestId”When defining a Request, an instance property should be specified to store the RequestId which is used to correlate responses to the state machine
instance. While the request is pending, the RequestId is stored in the property. When the request is completed, the property is cleared. If the request times
out or faults, the RequestId is retained to allow for later correlation if requests are ultimately completed (such as moving requests from the _error queue
back into the service queue).
This property optional, instead the instance’s CorrelationId can be used as the RequestId. This can simplify response correlation and also avoids the need
of a supplemental index on the saga repository. However, reusing the CorrelationId for the request might cause issues in highly complex systems. So consider
this when choosing which method to use.
Configure the request settings
Section titled “Configure the request settings”The request settings can be configured using the configuration callback. In the example above, the Timeout is set. The complete list of request settings
includes:
| Property | Type | Description |
|---|---|---|
ServiceAddress | Uri? | If specified, the endpoint address of the request service. If unspecified, the request is published. |
Timeout | TimeSpan | The request timeout (Defaults to 30 seconds). If set to TimeSpan.Zero, the request never times out. This is useful for requests that are guaranteed to complete or fault and reduces the load on the message scheduler. |
TimeToLive | TimeSpan? | The request message time-to-live, which is used by the transport to automatically delete the message after the time period elapses. If unspecified, and the Timeout is greater than TimeSpan.Zero, the timeout value is used. |
Response Types
Section titled “Response Types”Requests usually have a single response; however, up to two additional responses are supported. Additional response types are specified as generic parameters on the request property.
In the example below, the request includes an additional response type OrderNotValid.
public record ValidateOrder(Guid OrderId);public record OrderValidated(Guid OrderId);public record OrderNotValid(Guid OrderId);
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public Request<OrderState, ValidateOrder, OrderValidated, OrderNotValid> ValidateOrder { get; private set; } = null!;
public OrderStateMachine() { Request(() => ValidateOrder, o => { o.Timeout = TimeSpan.FromMinutes(30); }); }}Response Events
Section titled “Response Events”Additionally, each request event can be configured allowing complete control over how the response is correlated to the saga state machine instance.
| Property | Description |
|---|---|
| Completed | The first response event |
| Completed2 | The second response event, if specified |
| Completed3 | The third response event, if specified |
| Faulted | The Fault<TRequest> event |
| TimeoutExpired | The timeout event |
This can be useful to configure how an event is configured on the message broker. For example, to remove the response type bindings from the message broker,
the events can be configured with ConfigureConsumeTopology = false. Since responses are always sent to the ResponseAddress specified by the requester,
the bindings are not necessary and can be eliminated.
public record ValidateOrder(Guid OrderId);public record OrderValidated(Guid OrderId);
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public Request<OrderState, ValidateOrder, OrderValidated> ValidateOrder { get; private set; } = null!;
public OrderStateMachine() { Request(() => ValidateOrder, r => { r.Timeout = TimeSpan.FromMinutes(30);
r.Completed = e => e.ConfigureConsumeTopology = false; r.Faulted = e => e.ConfigureConsumeTopology = false; r.TimeoutExpired = e => e.ConfigureConsumeTopology = false; }); }}Sending Requests
Section titled “Sending Requests”To send a request, add a Request activity to an event behavior as shown in the example below.
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public Request<OrderState, ValidateOrder, OrderValidated> ValidateOrder { get; private set; } = null!;
public OrderStateMachine() { Initially( When(OrderSubmitted) .Request(ValidateOrder, x => new ValidateOrder(x.Saga.CorrelationId)) .TransitionTo(ValidateOrder.Pending) ); }}The request is published with the RequestId set the saga state machine instance CorrelationId (since no RequestId property was specified) and the
ResponseAddress set to the receive endpoint address of the saga state machine.
In this example, the ValidateOrder.Pending state is used while the request is pending. However, any state defined in the saga state machine can be used.
Handling Responses
Section titled “Handling Responses”When the response is received, the Completed event is triggered. If the order validation service threw an exception, the Faulted event is triggered
instead.
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public Request<OrderState, ValidateOrder, OrderValidated> ValidateOrder { get; private set; } = null!;
public OrderStateMachine() { During(ValidateOrder.Pending, // Handle the valid response When(ValidateOrder.Completed) .TransitionTo(Completed),
// Handle a validation fault When(ValidateOrder.Faulted) .TransitionTo(Failed) ); }}Request Overrides
Section titled “Request Overrides”There are many different Request method overrides that can be used depending on the features required. A few examples are shown below.
Service Address
Section titled “Service Address”Specify the service address for the request, optionally using the contents of the saga state machine instance or the event (via context.Message).
Useful when the instance stores data about which service should process the request.
.Request(ValidateOrder, serviceAddress, context => new ValidateOrder(context.Saga.CorrelationId))
.Request(ValidateOrder, context => context.Saga.ServiceAddress, context => new ValidateOrder(context.Saga.CorrelationId))Async Message Factory
Section titled “Async Message Factory”The request message can be created asynchronously if a message initializer is used or when the request message needs data returned by an asynchronous method.
.Request(ValidateOrder, async context => new ValidateOrder(context.Saga.CorrelationId))
.Request(ValidateOrder, context => context.Saga.ServiceAddress, async context => new ValidateOrder(context.Saga.CorrelationId))
.Request(ValidateOrder, async context =>{ await Task.Delay(1); // some async method return new ValidateOrder();});Once configured, the request activity can be added to a behavior.
public class OrderStateMachine : MassTransitStateMachine<OrderState>{ public OrderStateMachine() { During(Submitted, When(OrderAccepted) .Request(ProcessOrder, x => x.Init<ProcessOrder>(new { OrderId = x.Saga.CorrelationId})) .TransitionTo(ProcessOrder.Pending));
During(ProcessOrder.Pending, When(ProcessOrder.Completed) .Then(context => context.Saga.ProcessingId = context.Message.ProcessingId) .TransitionTo(Processed), When(ProcessOrder.Faulted) .TransitionTo(ProcessFaulted), When(ProcessOrder.TimeoutExpired) .TransitionTo(ProcessTimeoutExpired)); }
public State Processed { get; private set; } public State ProcessFaulted { get; private set; } public State ProcessTimeoutExpired { get; private set; }}The Request includes three events: Completed, Faulted, and TimeoutExpired. These events can be consumed during any state, however, the Request includes a Pending state which can be used to avoid declaring a separate pending state.
Configure a missing instance handler
Section titled “Configure a missing instance handler”If the saga instance has been finalized before the response, fault, or timeout have been received, it is possible to configure a missing instance handler, similar to a regular event.
Request(() => ProcessOrder, x => x.ProcessOrderRequestId, r =>{ r.Completed = m => m.OnMissingInstance(i => i.Discard()); r.Faulted = m => m.OnMissingInstance(i => i.Discard()); r.TimeoutExpired = m => m.OnMissingInstance(i => i.Discard());});