Skip to content

SQL Transport Functions

MassTransit uses PostgreSQL functions or SQL Server stored procedures for all transport operations. These functions are built to manage the constraints of the transport, such as message locking, redelivery, and subscription.

For completeness, the functions are detailed below.

create_queue(queue_name text,
auto_delete integer DEFAULT NULL)

Creates a queue with its associated error and dead-letter queues. The resulting three queues each have their own type.

create_topic(topic_name text)

Creates a topic.

create_topic_subscription(source_topic_name text,
destination_topic_name text,
type integer,
routing_key text DEFAULT '',
filter jsonb DEFAULT '{{}}')

Creates a subscription from one topic to another topic.

create_queue_subscription(source_topic_name text,
destination_queue_name text,
type integer,
routing_key text DEFAULT '',
filter jsonb DEFAULT '{{}}')

Creates a subscription from a topic to a queue.

purge_queue(queue_name text)

Removes all messages from a queue, including messages in the error and dead-letter sub-queues.

fetch_messages(queue_name text,
fetch_consumer_id uuid,
fetch_lock_id uuid,
lock_duration interval,
fetch_count integer DEFAULT 1)

Fetches messages from the specified queue.

fetch_messages_partitioned(queue_name text,
fetch_consumer_id uuid,
fetch_lock_id uuid,
lock_duration interval,
fetch_count integer DEFAULT 1,
concurrent_count integer DEFAULT 1,
ordered integer DEFAULT 0)

Fetches messages from the specified queue using the partitioned receive mode.

delete_message(message_delivery_id bigint,
lock_id uuid)

Deletes a message that was previously fetched with the specified lock_id.

renew_message_lock(message_delivery_id bigint,
lock_id uuid,
duration interval)

Renews (extends) the lock on a message that was previously fetched with the specified lock_id.

move_message(message_delivery_id bigint,
lock_id uuid,
queue_name text,
queue_type integer,
headers jsonb)

Moves a message that was previously fetched with the specified lock_id to the destination queue, adding the headers to the message delivery. Typically used by the receive endpoint to either dead-letter (skipped) or fault (error) a message.

unlock_message(message_delivery_id bigint,
lock_id uuid,
delay interval,
headers jsonb)

Unlocks a message that was previously fetched with the specified lock_id, adding the specified delay to the enqueue_time and any additional headers. Typically used when delayed message redelivery is used, or when faults are thrown back to the transport.

send_message(entity_name text,
priority integer DEFAULT NULL,
transport_message_id uuid DEFAULT gen_random_uuid(),
body jsonb DEFAULT NULL,
binary_body bytea DEFAULT NULL,
content_type text DEFAULT NULL,
message_type text DEFAULT NULL,
message_id uuid DEFAULT NULL,
correlation_id uuid DEFAULT NULL,
conversation_id uuid DEFAULT NULL,
request_id uuid DEFAULT NULL,
initiator_id uuid DEFAULT NULL,
source_address text DEFAULT NULL,
destination_address text DEFAULT NULL,
response_address text DEFAULT NULL,
fault_address text DEFAULT NULL,
sent_time timestamptz DEFAULT NULL,
headers jsonb DEFAULT NULL,
host jsonb DEFAULT NULL,
partition_key text DEFAULT NULL,
routing_key text DEFAULT NULL,
delay interval DEFAULT INTERVAL '0 seconds',
scheduling_token_id uuid DEFAULT NULL,
max_delivery_count int DEFAULT 10)

Sends a message to the entity_name queue with all the specified properties.

publish_message(entity_name text,
priority integer DEFAULT NULL,
transport_message_id uuid DEFAULT gen_random_uuid(),
body jsonb DEFAULT NULL,
binary_body bytea DEFAULT NULL,
content_type text DEFAULT NULL,
message_type text DEFAULT NULL,
message_id uuid DEFAULT NULL,
correlation_id uuid DEFAULT NULL,
conversation_id uuid DEFAULT NULL,
request_id uuid DEFAULT NULL,
initiator_id uuid DEFAULT NULL,
source_address text DEFAULT NULL,
destination_address text DEFAULT NULL,
response_address text DEFAULT NULL,
fault_address text DEFAULT NULL,
sent_time timestamptz DEFAULT NULL,
headers jsonb DEFAULT NULL,
host jsonb DEFAULT NULL,
partition_key text DEFAULT NULL,
routing_key text DEFAULT NULL,
delay interval DEFAULT INTERVAL '0 seconds',
scheduling_token_id uuid DEFAULT NULL,
max_delivery_count int DEFAULT 10)

Publishes a message to the entity_name topic with all the specified properties. Queues with matching subscriptions to the topic will each receive an instance of the message.

delete_scheduled_message(token_id uuid)

Deletes a previously scheduled message using the token_id.

touch_queue(queue_name text)

When a receive endpoint doesn’t receive any messages from a queue for a period of time, and that queue is an auto-delete queue, this function is called to add metrics for the queue so that it isn’t automatically deleted.

process_metrics(row_limit int DEFAULT 10000)

A background function used to process queue metrics, which includes messages consumed, faulted, and dead-lettered. Automatically called by idle receive endpoints. Metrics can be viewed using the queues view.

purge_topology()

A background function used to purge any auto-delete queues that have reached their idle threshold. Automatically called by idle receive endpoints.

requeue_messages(queue_name text,
source_queue_type int,
target_queue_type int,
message_count int,
delay interval DEFAULT INTERVAL '0 seconds',
redelivery_count int DEFAULT 10)

Use this function to move messages from the error or dead-letter queue back to the main queue. Up to message_count messages are moved. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message’s max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.

requeue_message(message_delivery_id bigint,
target_queue_type int,
delay interval DEFAULT INTERVAL '0 seconds',
redelivery_count int DEFAULT 10)

Use this function to move a message from the error or dead-letter queue back to the main queue. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message’s max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.

Returns details about the queues and their metrics.

Returns details about all subscriptions and their settings.

The tables used by the SQL transport include:

TypeDescription
1All
2Routing Key
3Pattern (uses Routing Key)