MessageData Configuration
Message brokers are built to be fast, and when it comes to messages, size matters. In this case, however, bigger is not better — large messages negatively impact broker performance.
MassTransit offers a built-in solution that stores large data (a string, byte array, or even an object) in a separate repository and replaces it with a reference in the message body. When the message is consumed, the reference is replaced with the original data which is loaded from the repository.
Add MessageData to your message contract
Section titled “Add MessageData to your message contract”To use message data, add a MessageData<T> property to a message. Properties can be anywhere in the message, nested within message properties, or in
collections such as arrays, lists, or dictionaries. The generic argument T can be string, byte[], Stream, or a serializable reference type (that meets
the same requirements as a message type).
public interface IndexDocumentContent{ Guid DocumentId { get; } MessageData<byte[]> Document { get; }}The bus must be configured to use message data, specifying the message data repository. In the example below, the message data repository is configured to use an in-memory repository.
services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(x => x.InMemory());
cfg.ConfigureEndpoints(context); });});Use MessageData in your consumer
Section titled “Use MessageData in your consumer”Configuring the message data middleware (via UseMessageData) adds a transform to replace any deserialized message data reference with an object that loads the
message data asynchronously. By using middleware, the consumer doesn’t need to use the message data repository. The consumer can simply use the property value
to access the message data (asynchronously, of course). If the message data was not loaded, an exception will be thrown. The HasValue property is true if
message data is present.
public class IndexDocumentConsumer : IConsumer<IndexDocumentContent>
public async Task Consume(ConsumeContext<IndexDocumentContent> context){ byte[] document = await context.Message.Document.Value;}Initialize MessageData in your producer
Section titled “Initialize MessageData in your producer”To initialize a message contract with one or more MessageData<T> properties, the byte[], string, or Stream value can be specified and the data will be
stored to the repository by the initializer. If the message has the TimeToLive header property specified, that same value will be used for the message data in
the repository.
Guid documentId = NewId.NextGuid();byte[] document = new byte[100000]; // get byte array, or a big string
await endpoint.Send<IndexDocumentContent>(new{ DocumentId = documentId, Document = document});If using a message class, or not using a message initializer, the data must be stored to the repository explicitly.
class IndexDocumentContentMessage : IndexDocumentContent{ public Guid DocumentId { get; set; } public MessageData<byte[]> Document { get; set; }}
Guid documentId = NewId.NextGuid();byte[] document = new byte[100000]; // get byte array, or a big string
await endpoint.Send<IndexDocumentContent>(new IndexDocumentContentMessage{ DocumentId = documentId, Document = await repository.PutBytes(document, TimeSpan.FromDays(1))});The message data is stored, and the reference added to the outbound message.
Configure the MessageData defaults
Section titled “Configure the MessageData defaults”There are several configuration settings available to adjust message data behavior.
Time To Live
Section titled “Time To Live”By default, there is no default message data time-to-live. To specify a default time-to-live, set the default as shown.
MessageDataDefaults.TimeToLive = TimeSpan.FromDays(2);These settings specify the default value when calling the repository, it is up to the repository to apply any time-to-live values to the actual message data.
If the SendContext has specified a time-to-live value, that value is applied to the message data automatically (when using message initializers). To add extra
time, perhaps to account for system latency or differences in time, extra time can be added.
MessageDataDefaults.ExtraTimeToLive = TimeSpan.FromMinutes(5);Inline Threshold
Section titled “Inline Threshold”Newly added is the ability to specify a threshold for message data so that smaller values are included in the actual message body. This eliminates the need to read the data from storage, which increases performance. The message data can also be configured to not write that data to the repository if it is under the threshold. By default (for now), data is written to the repository to support services that have not yet upgraded to the latest MassTransit.
If you know your systems are upgraded, you can change the default so that data sizes under the threshold are not written to the repository.
To configure the threshold, and to optionally turn off storage of data sizes under the threshold:
// the default value is 4096MessageDataDefaults.Threshold = 8192;
// to disable writing to the repository for sizes under the threshold// defaults to true, which may change to false in a future releaseMessageDataDefaults.AlwaysWriteToRepository = false;Configure the MessageData repository
Section titled “Configure the MessageData repository”MassTransit includes several message data repositories.
| Name | Description |
|---|---|
| InMemoryMessageDataRepository | Entirely in memory, meant for unit testing |
| FileSystemMessageDataRepository | Writes message data to the file system, which may be a network drive or other shared storage |
| MongoDbMessageDataRepository | Stores message data using MongoDB’s GridFS |
| AzureStorageMessageDataRepository | Stores message data using Azure Blob Storage |
| EncryptedMessageDataRepository | Adds encryption to any other message data repository |
In-Memory
Section titled “In-Memory”To configure the in-memory message data repository, use the InMemory() extension method.
services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(x => x.InMemory());
cfg.ConfigureEndpoints(context); });});File System
Section titled “File System”To configure the file system message data repository, use the FileSystem(string path) extension method, where path is the root directory for the message
data files.
services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(x => x.FileSystem("/message-data/"));
cfg.ConfigureEndpoints(context); });});Encrypted
Section titled “Encrypted”To configure an encrypted message data repository, use the Encrypted() extension method, and pass a crypto stream provider then specify the actual message
data repository to use.
ISymmetricKeyProvider keyProvider = new MySymmetricKeyProvider();var csp = new AesCryptoStreamProvider(keyProvider, "default");
services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(x => x.Encrypted(csp, y => y.FileSystem("/message-data/")))
cfg.ConfigureEndpoints(context); });});MongoDB
Section titled “MongoDB”To configure the MongoDB GridFS message data repository, follow the example shown below.
services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(x => x.MongoDb(connectionString, databaseName));
cfg.ConfigureEndpoints(context); });});Amazon S3
Section titled “Amazon S3”To configure the Amazon S3 message data repository, follow the example shown below.
services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(x => x.AmazonS3("bucket-name"));
cfg.ConfigureEndpoints(context); });});Azure Storage
Section titled “Azure Storage”An Azure Cloud Storage account can be used to store message data. To configure Azure storage, first create the BlobServiceClient object using your connection
string and then use the extension method to create the repository as shown below. You can replace message-data with the desired container name. You can set
the compress flag to true to compress your blob files as .gz files, helping reduce cloud storage costs. The consumer side will automatically detect and
decompress the .gz files.
var client = new BlobServiceClient("<storage account connection string>");_repository = client.CreateMessageDataRepository("message-data", compress: true);Previous to version 7.1.8 of MassTransit this was done creating a CloudStorageAccount object from your connection string the following way.
var account = CloudStorageAccount.Parse("<storage account connection string>");_repository = account.CreateMessageDataRepository("message-data");services.AddMassTransit(x =>{ x.UsingInMemory((context, cfg) => { cfg.UseMessageData(_repository);
cfg.ConfigureEndpoints(context); });});