Skip to content

Monitoring and Observability

OpenTelemetry is an open-source standard for distributed tracing, which allows you to collect and analyze data about the performance of your systems. MassTransit can be configured to use OpenTelemetry to instrument message handling, so that you can collect telemetry data about messages as they flow through your system.

By using OpenTelemetry with MassTransit, you can gain insights into the performance of your systems, which can help you to identify and troubleshoot issues, and to improve the overall performance of your application.

There is a good set of examples opentelemetry-dotnet how it can be used for different cases

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • OpenTelemetry.Exporter.Console
var builder = WebApplication.CreateBuilder(args);
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
builder.Services.AddOpenTelemetry()
.ConfigureResource(ConfigureResource)
.WithTracing(b => b
.AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
.AddConsoleExporter() // Any OTEL suportable exporter can be used here
);

This example is using following packages:

  • OpenTelemetry
  • OpenTelemetry.Exporter.Console
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
Sdk.CreateTracerProviderBuilder()
.ConfigureResource(ConfigureResource)
.AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
.AddConsoleExporter() // Any OTEL suportable exporter can be used here
.Build()

That’s it you application will start exporting MassTransit related traces within your application

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • OpenTelemetry.Exporter.Console
var builder = WebApplication.CreateBuilder(args);
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
builder.Services.AddOpenTelemetry()
.ConfigureResource(ConfigureResource)
.WithMetrics(b => b
.AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
.AddConsoleExporter() // Any OTEL suportable exporter can be used here
);

This example is using following packages:

  • OpenTelemetry
  • OpenTelemetry.Exporter.Console
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
Sdk.CreateTracerProviderBuilder()
.ConfigureResource(ConfigureResource)
.AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
.AddConsoleExporter() // Any OTEL suportable exporter can be used here
.Build()

The OpenTelemetry metrics captured by MassTransit:

Counters

NameDescription
messaging.masstransit.receiveNumber of messages received
messaging.masstransit.receive.errorsNumber of messages receive faults
messaging.masstransit.consumeNumber of messages consumed
messaging.masstransit.consume.errorsNumber of message consume faults
messaging.masstransit.sagaNumber of messages processed by saga
messaging.masstransit.saga.errorsNumber of message faults by saga
messaging.masstransit.consume.retriesNumber of message consume retries
messaging.masstransit.handlerNumber of messages handled
messaging.masstransit.handler.errorsNumber of message handler faults
messaging.masstransit.outbox.deliveryNumber of messages delivered by outbox
messaging.masstransit.outbox.delivery.errorsNumber of message delivery faults by outbox
messaging.masstransit.sendNumber of messages sent
messaging.masstransit.send.errorsNumber of message send faults
messaging.masstransit.outbox.sendNumber of messages sent to outbox
messaging.masstransit.outbox.send.errorsNumber of message send faults to outbox
messaging.masstransit.executeNumber of activities executed
messaging.masstransit.execute.errorsNumber of activity execution faults
messaging.masstransit.compensateNumber of activities compensated
messaging.masstransit.compensate.errorsNumber of activity compensation failures

Gauges

NameDescription
messaging.masstransit.receive.activeNumber of messages being received
messaging.masstransit.consume.activeNumber of consumers in progress
messaging.masstransit.execute.activeNumber of activity executions in progress
messaging.masstransit.compensate.activeNumber of activity compensations in progress
messaging.masstransit.handler.activeNumber of handlers in progress
messaging.masstransit.saga.activeNumber of sagas in progress

Histograms

NameDescription
messaging.masstransit.receive.durationElapsed time spent receiving a message, in millis
messaging.masstransit.consume.durationElapsed time spent consuming a message, in millis
messaging.masstransit.saga.durationElapsed time spent saga processing a message, in millis
messaging.masstransit.handler.durationElapsed time spent handler processing a message, in millis
messaging.masstransit.delivery.durationsElapsed time between when the message was sent and when it was consumed, in millis
messaging.masstransit.execute.durationElapsed time spent executing an activity, in millis
messaging.masstransit.compensate.durationElapsed time spent compensating an activity, in millis

Labels

NameDescription
messaging.masstransit.serviceThe service name specified at bus configuration
messaging.masstransit.destinationThe endpoint address
messaging.masstransit.message_typeThe message type for the metric
messaging.masstransit.consumer_typeThe consumer, saga, or activity type for the metric
messaging.masstransit.activity_typeThe activity name
messaging.masstransit.argument_typeThe activity execute argument type
messaging.masstransit.log_typeThe activity compensate log type
messaging.masstransit.exception_typeThe exception type for a fault metric
messaging.masstransit.busThe bus instance
messaging.masstransit.endpointThe receive endpoint

Metric names and labels can be configured with Options:

services.Configure<InstrumentationOptions>(options =>
{
// Configure
});

Azure Monitor has direct integration with Open Telemetry:

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • Azure.Monitor.OpenTelemetry.Exporter
var builder = WebApplication.CreateBuilder(args);
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
builder.Services.AddOpenTelemetry()
.ConfigureResource(ConfigureResource)
.WithTracing(b => b
.AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
.AddAzureMonitorTraceExporter(
{
o.ConnectionString = "<Your Connection String>";
}))
.WithMetrics(b => b
.AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
.AddAzureMonitorMetricExporter(o =>
{
o.ConnectionString = "<Your Connection String>";
}));

This example is using following packages:

  • OpenTelemetry
  • Azure.Monitor.OpenTelemetry.Exporter
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
Sdk.CreateTracerProviderBuilder()
.ConfigureResource(ConfigureResource)
.AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit ActivitySource
.AddAzureMonitorTraceExporter(
{
o.ConnectionString = "<Your Connection String>";
})
.Build();
Sdk.CreateTracerProviderBuilder()
.ConfigureResource(ConfigureResource)
.AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
.AddAzureMonitorMetricExporter(o =>
{
o.ConnectionString = "<Your Connection String>";
})
.Build()

You can also refer to the sample: Sample-ApplicationInsights

Open Telemetry is a more preferable choice of integration.

This example is using following packages:

  • OpenTelemetry.Extensions.Hosting
  • OpenTelemetry.Exporter.Prometheus
  • OpenTelemetry.Exporter.Prometheus.AspNetCore
void ConfigureResource(ResourceBuilder r)
{
r.AddService("Service Name",
serviceVersion: "Version",
serviceInstanceId: Environment.MachineName);
}
builder.Services.AddOpenTelemetry()
.ConfigureResource(ConfigureResource)
.WithMetrics(b => b
.AddMeter(InstrumentationOptions.MeterName) // MassTransit Meter
.AddPrometheusExporter()
);
var app = builder.Build();
app.UseOpenTelemetryPrometheusScrapingEndpoint(); // Map prometheus metrics endpoint

In case you want to migrate from direct integration to using Open Telemetry, and use previous metric names, just configure them through Options:

builder.Services.Configure<InstrumentationOptions>(options =>
{
ReceiveTotal = "mt.receive.total";
// Configure other names by using similar approach
});

MassTransit supports several message observers allowing received, consumed, sent, and published messages to be monitored. There is a bus observer as well, so that the bus life cycle can be monitored.

To observe bus life cycle events, create a class which implements IBusObserver. To configure a bus observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddBusObserver<BusObserver>();
services.AddBusObserver(provider => new BusObserver());
public class BusObserver :
IBusObserver
{
/// <summary>
/// Called before the bus is started, allowing observers to prepare
/// </summary>
public Task PreStart(IBus bus)
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the bus has been started successfully
/// </summary>
public Task PostStart(IBus bus, BusStartedContext context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called before the bus is stopped, allowing observers to complete any pending operations
/// </summary>
public Task PreStop(IBus bus)
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the bus has been stopped
/// </summary>
public Task PostStop(IBus bus, BusStoppedContext context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called when an unhandled exception occurs in the bus
/// </summary>
public Task BusFault(IBus bus, BusFaultContext context)
{
return Task.CompletedTask;
}
}

To configure a receive endpoint observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddReceiveEndpointObserver<ReceiveEndpointObserver>();
services.AddReceiveEndpointObserver(provider => new ReceiveEndpointObserver());
public class ReceiveEndpointObserver :
IReceiveEndpointObserver
{
/// <summary>
/// Called when the receive endpoint is ready to start receiving messages
/// </summary>
public Task Ready(ReceiveEndpointReadyContext context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called when a message consumer throws an exception - note this is a consume fault specific to the endpoint
/// </summary>
public Task ConsumeFault<T>(ConsumeFaultContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called when an exception occurs during message reception (e.g., deserialization failures)
/// </summary>
public Task ReceiveFault(ReceiveFaultContext context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the receive endpoint has stopped
/// </summary>
public Task PostStop(ReceiveEndpointStoppedContext context)
{
return Task.CompletedTask;
}
}

To observe messages as they are received by the transport, create a class that implements the IReceiveObserver interface, and connect it to the bus as shown below.

To configure a receive observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectReceiveObserver bus method can be used instead.

services.AddReceiveObserver<ReceiveObserver>();
services.AddReceiveObserver(provider => new ReceiveObserver());
public class ReceiveObserver :
IReceiveObserver
{
/// <summary>
/// Called immediately after the message is received from the transport, before any processing.
/// This is the earliest point in the pipeline - message may still fail deserialization.
/// </summary>
public Task PreReceive(ReceiveContext context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the message has been received and processed by the transport layer.
/// </summary>
public Task PostReceive(ReceiveContext context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called when an exception occurs during message reception, such as deserialization failures.
/// This occurs early in the pipeline before any consumer is invoked.
/// </summary>
public Task ReceiveFault(ReceiveContext context, Exception exception)
{
return Task.CompletedTask;
}
/// <summary>
/// Called when a consumer throws an exception while processing the message.
/// Called once per consumer that receives the message.
/// </summary>
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the message has been consumed successfully.
/// Called once per consumer that processes the message.
/// </summary>
public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)
where T : class
{
return Task.CompletedTask;
}
}

If the ReceiveContext isn’t fascinating enough for you, perhaps the actual consumption of messages might float your boat. A consume observer implements the IConsumeObserver interface, as shown below.

To configure a consume observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectConsumeObserver bus method can be used instead.

services.AddConsumeObserver<ConsumeObserver>();
services.AddConsumeObserver(provider => new ConsumeObserver());
public class ConsumeObserver :
IConsumeObserver
{
/// <summary>
/// Called before the consumer's Consume method is invoked. This is after the message
/// has been deserialized and is ready to be processed. Consumer filters have already run.
/// Note: If using retry, this is called before each retry attempt.
/// </summary>
public Task PreConsume<T>(ConsumeContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the consumer's Consume method completes successfully.
/// If an exception is thrown, ConsumeFault is called instead.
/// </summary>
public Task PostConsume<T>(ConsumeContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called when the consumer's Consume method throws an exception.
/// This is called after all retry attempts have been exhausted.
/// The message may be moved to an error queue depending on configuration.
/// </summary>
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception)
where T : class
{
return Task.CompletedTask;
}
}

Okay, so it’s obvious that if you’ve read this far you want a more specific observer, one that only is called when a specific message type is consumed. We have you covered there too, as shown below.

To connect the observer, use the ConnectConsumeMessageObserver method before starting the bus.

The ConsumeMessageObserver<T> interface may be deprecated at some point, it’s sort of a legacy observer that isn’t recommended.

Okay, so, incoming messages are not your thing. We get it, you’re all about what goes out. It’s cool. It’s better to send than to receive. Or is that give? Anyway, a send observer is also available.

To configure a send observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectSendObserver bus method can be used instead.

services.AddSendObserver<SendObserver>();
services.AddSendObserver(provider => new SendObserver());
public class SendObserver :
ISendObserver
{
/// <summary>
/// Called just before a message is sent to the transport.
/// All headers are already configured, message is serialized, and the destination is resolved.
/// Note: This is called for point-to-point sends (ISendEndpoint), not for publish operations.
/// </summary>
public Task PreSend<T>(SendContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called just after the message is sent to the transport and acknowledged.
/// The message may still be in transit to consumers.
/// </summary>
public Task PostSend<T>(SendContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called if an exception occurs while sending the message.
/// This may include routing failures, serialization errors, or transport errors.
/// </summary>
public Task SendFault<T>(SendContext<T> context, Exception exception)
where T : class
{
return Task.CompletedTask;
}
}

In addition to send, publish is also observable. Because the semantics matter, absolutely. Using the MessageId to link them up as it’s unique for each message. Remember that Publish and Send are two distinct operations so if you want to observe all messages that are leaving your service, you have to connect both Publish and Send observers.

To configure a public observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation. When a container is not being used, the ConnectPublishObserver bus method can be used instead.

services.AddPublishObserver<PublishObserver>();
services.AddPublishObserver(provider => new PublishObserver());
public class PublishObserver :
IPublishObserver
{
/// <summary>
/// Called right before the message is published to the transport (exchange/topic).
/// All headers are configured and the message is serialized.
/// Note: This is for publish operations (IPublishEndpoint), not for sends.
/// Use both PublishObserver and SendObserver to observe all outbound messages.
/// </summary>
public Task PrePublish<T>(PublishContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called after the message is published and acknowledged by the broker.
/// The message is now available to all subscribers.
/// </summary>
public Task PostPublish<T>(PublishContext<T> context)
where T : class
{
return Task.CompletedTask;
}
/// <summary>
/// Called if an exception occurs while publishing the message.
/// This includes serialization errors, routing failures, or transport errors.
/// </summary>
public Task PublishFault<T>(PublishContext<T> context, Exception exception)
where T : class
{
return Task.CompletedTask;
}
}

To observe events consumed by a saga state machine, use an IEventObserver<T> where T is the saga instance type.

To configure an event observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddEventObserver<T, EventObserver<T>>();
services.AddEventObserver<T>(provider => new EventObserver<T>());
public class EventObserver<T> :
IEventObserver<T>
where T : class, ISaga
{
/// <summary>
/// Called before the event is executed. The event has been received but the state machine
/// hasn't yet processed it. Use this to inspect the event.
/// </summary>
public Task PreExecute(EventContext<T> context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called immediately before a state transition occurs. Not that a transition does not
/// mean that the instance has been persisted to the saga repository.
/// </summary>
public Task PreTransition(EventContext<T> context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called immediately after a state transition occurs. Not that a transition does not
/// mean that the instance has been persisted to the saga repository.
/// </summary>
public Task PostTransition(EventContext<T> context)
{
return Task.CompletedTask;
}
/// <summary>
/// Called when an exception occurs during event processing.
/// This includes exceptions from activities, state transitions, or persistence.
/// </summary>
public Task Fault<TException>(EventExceptionContext<T, TException> context)
where TException : Exception
{
return Task.CompletedTask;
}
}

To observe state changes that happen in a saga state machine, use an IStateObserver<T> where T is the saga instance type.

To configure a state observer, add it to the container using one of the methods shown below. The factory method version allows customization of the observer creation.

services.AddStateObserver<T, StateObserver<T>>();
services.AddStateObserver<T>(provider => new StateObserver<T>());
public class StateObserver<T> :
IStateObserver<T>
where T : class, ISaga
{
/// <summary>
/// Called when the saga state changes. This provides visibility into state machine transitions.
/// Use this to track saga lifecycle, log state changes, or trigger external actions.
/// </summary>
public Task StateChanged(StateChangeContext<T> context)
{
return Task.CompletedTask;
}
}