Kafka Configuration
Kafka is supported as a Rider, and enables consuming and producing messages from/to Kafka topics. The Confluent .NET client is used, and has been tested with the community edition (running in Docker).
Configure the Kafka rider
Section titled “Configure the Kafka rider”To consume a Kafka topic, configure a Rider within the bus configuration as shown.
namespace KafkaConsumer;
using System.Threading.Tasks;using MassTransit;using Microsoft.Extensions.DependencyInjection;
public class Program{ public static async Task Main() { var services = new ServiceCollection();
services.AddMassTransit(x => { x.UsingInMemory();
x.AddRider(rider => { rider.AddConsumer<KafkaMessageConsumer>();
rider.UsingKafka((context, k) => { k.Host("localhost:9092");
k.TopicEndpoint<KafkaMessage>("topic-name", "consumer-group-name", e => { e.ConfigureConsumer<KafkaMessageConsumer>(context); }); }); }); }); }
class KafkaMessageConsumer : IConsumer<KafkaMessage> { public Task Consume(ConsumeContext<KafkaMessage> context) { return Task.CompletedTask; } }
public record KafkaMessage { public string Text { get; init; } }}Configure a topic endpoint
Section titled “Configure a topic endpoint”A topic endpoint connects a Kafka Consumer to a topic, using the specified topic name. The consumer group specified should be unique to the application, and shared by a cluster of service instances for load balancing. Consumers and sagas can be configured on the topic endpoint, which should be registered in the rider configuration. While the configuration for topic endpoints is the same as a Receive endpoint, there is no implicit binding of consumer message types to Kafka topics. The message type is specified on the TopicEndpoint as a generic argument.
Wildcard support
Section titled “Wildcard support”Kafka allows to subscribe to multiple topics by using Regex (also called wildcard) which matches multiple topics:
k.TopicEndpoint<KafkaMessage>("^topic-[0-9]*", "consumer-group-name", e =>{ e.ConfigureConsumer<KafkaMessageConsumer>(context);});Configuration
Section titled “Configuration”The configuration includes through Confluent client configs or using configurator to override it with style.
Configure checkpoint settings
Section titled “Configure checkpoint settings”Rider implementation is taking full responsibility of Checkpointing, there is no ability to change it. The checkpoint settings can be configured on topic bases through next properties:
| Name | Description | Default |
|---|---|---|
| CheckpointInterval | Checkpoint frequency based on time | 1 min |
| CheckpointMessageCount | Checkpoint every X messages | 5000 |
| MessageLimit | Checkpoint buffer size without blocking consumption | 10000 |
Please note, MassTransit tracks checkpoints for each partition within a topic separately and these settings are applied to each partition.
During graceful shutdown, MassTransit will try to “checkpoint” messages that have already been consumed. Force shutdown should be avoided to prevent duplicate message consumption due to lost checkpoints.
Configure scalability settings
Section titled “Configure scalability settings”Riders are designed with performance in mind, handling each topic partition within separate threadpool. As well, allowing to scale-up consumption within same partition by using Key, as long as keys are different they will be processed concurrently and all this without sacrificing ordering.
| Name | Description | Default |
|---|---|---|
| ConcurrentConsumerLimit | Number of Confluent Consumer instances within same endpoint | 1 |
| ConcurrentDeliveryLimit | Number of Messages delivered concurrently within same partition + key. Increasing this value will break ordering, helpful for topics where ordering is not required | 1 |
| ConcurrentMessageLimit | Number of Messages processed concurrently witin different keys (preserving ordering). When keys are the same for entire partition ConcurrentDeliveryLimit will be used instead | 1 |
| PrefetchCount | Number of Messages to prefetch from kafka topic into memory | 1000 |
Configure Kafka topics
Section titled “Configure Kafka topics”In most systems, creating Kafka topics at startup is not the typical. MassTransit can be configured to create topics if they do not exist, but genreally topics should be created with correct number of partitions and replicas beforehand.
Configure topics producers
Section titled “Configure topics producers”Producing messages to Kafka topics requires the producer to be registered. The producer can then be used to produce messages to the specified Kafka topic. In the example below, messages are produced to the Kafka topic as they are entered by the user.
namespace KafkaProducer;
using System;using System.Threading;using System.Threading.Tasks;using MassTransit;using Microsoft.Extensions.DependencyInjection;
public class Program{ public static async Task Main() { var services = new ServiceCollection();
services.AddMassTransit(x => { x.UsingInMemory();
x.AddRider(rider => { rider.AddProducer<KafkaMessage>("topic-name");
rider.UsingKafka((context, k) => { k.Host("localhost:9092"); }); }); });
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token); try { var producer = provider.GetRequiredService<ITopicProducer<KafkaMessage>>(); do { string value = await Task.Run(() => { Console.WriteLine("Enter text (or quit to exit)"); Console.Write("> "); return Console.ReadLine(); });
if ("quit".Equals(value, StringComparison.OrdinalIgnoreCase)) break;
await producer.Produce(new { Text = value }); } while (true); } finally { await busControl.StopAsync(); } }
public record KafkaMessage { public string Text { get; init; } }}Create topic producers dynamically
Section titled “Create topic producers dynamically”A topic producer can be created dynamically using ITopicProducerProvider (registered as Scoped).
First, given a configured Kafka rider as shown below:
services.AddMassTransit(x =>{ x.UsingInMemory();
x.AddRider(rider => { rider.UsingKafka((context, k) => { k.Host("localhost:9092"); }); });});Resolve the ITopicProducerProvider interface and use it create the topic producer for the specified topic name.
var producerProvider = provider.GetRequiredService<ITopicProducerProvider>();
var producer = producerProvider.GetProducer<KafkaMessage>(new Uri($"topic:topic-name"));
await producer.Produce(new{ TopicName = value});Produce a tombstone message
Section titled “Produce a tombstone message”A record with the same key from the record we want to delete is produced to the same topic and partition with a null payload. These records are called tombstones. This could be done by setting custom value serializer when calling produce:
var producer = provider.GetRequiredService<ITopicProducer<string, KafkaMessage>>();await producer.Produce("key", new { }, Pipe.Execute<KafkaSendContext<string, KafkaMessage>>(context =>{ context.ValueSerializer = new TombstoneSerializer<KafkaMessage>();}));The tombstone serializer is not included, but can be created as a simple class in your code:
class TombstoneSerializer<T> : IAsyncSerializer<T>{ public Task<byte[]> SerializeAsync(T data, SerializationContext context) { return Task.FromResult<byte[]>(null); }}Producing and Consuming Multiple Message Types on a Single Topic
Section titled “Producing and Consuming Multiple Message Types on a Single Topic”There are situations where you might want to produce / consume events of different types on the same Kafka topic. A common use case is to use a single topic to
log ordered meaningful state change events like SomethingRequested, SomethingStarted, SomethingFinished.
Confluent have some documentation about how this can be implemented on the Schema Registry side:
- Confluent Docs - Multiple Event Types in the Same Topic
- Confluent Docs - Multiple Event Types in the Same Topic with Avro
- Confluent Blog - Multiple Event Types in the Same Topic
Unfortunately, it is not yet widely supported in client tools and products and there is limited documentation about how to support this in your own applications.
However, it is possible… The following demo uses the MassTransit Kafka Rider with custom Avro serializer / deserializer implementations and the Schema Registry to support multiple event types on a single topic:
The custom serializers / deserializer implementations leverage the wire format used by the standard Confluent schema-based serializers, which includes the schema id in the data stored for each message. This is also good news for interoperability with non-MassTransit applications.
Warning: It’s a little hacky and only supports the Avro format, but there’s enough there to get you started.