Skip to content

SQL Transport Functions

MassTransit uses PostgreSQL functions or SQL Server stored procedures for all transport operations. These functions are built to manage the constraints of the transport, such as message locking, redelivery, and subscription.

For completeness, the functions are detailed below.

create_queue(queue_name text,
auto_delete integer DEFAULT NULL)

Creates a queue with its associated error and dead-letter queues. The resulting three queues each have their own type.

The current transport migration also includes a versioned queue creation function/procedure that accepts additional settings:

create_queue_v3(queue_name text,
auto_delete integer DEFAULT NULL,
max_delivery_count integer DEFAULT NULL,
enable_message_archive boolean DEFAULT FALSE)

When enable_message_archive is true, an archive queue record (queue type 4) is also provisioned.

create_topic(topic_name text)

Creates a topic.

create_topic_subscription(source_topic_name text,
destination_topic_name text,
type integer,
routing_key text DEFAULT '',
filter jsonb DEFAULT '{{}}')

Creates a subscription from one topic to another topic.

create_queue_subscription(source_topic_name text,
destination_queue_name text,
type integer,
routing_key text DEFAULT '',
filter jsonb DEFAULT '{{}}')

Creates a subscription from a topic to a queue.

purge_queue(queue_name text)

Removes all messages from a queue, including messages in the error and dead-letter sub-queues.

fetch_messages(queue_name text,
fetch_consumer_id uuid,
fetch_lock_id uuid,
lock_duration interval,
fetch_count integer DEFAULT 1)

Fetches messages from the specified queue.

fetch_messages_partitioned(queue_name text,
fetch_consumer_id uuid,
fetch_lock_id uuid,
lock_duration interval,
fetch_count integer DEFAULT 1,
concurrent_count integer DEFAULT 1,
ordered integer DEFAULT 0)

Fetches messages from the specified queue using the partitioned receive mode.

receive_endpoint_started(p_queue_id bigint,
p_notify_throttle interval,
p_touch_throttle interval DEFAULT NULL)

Registers per-queue startup settings for PostgreSQL receive endpoints, including notification throttle and queue-touch throttle intervals.

delete_message(message_delivery_id bigint,
lock_id uuid)

Deletes a message that was previously fetched with the specified lock_id.

archive_message(message_delivery_id bigint,
lock_id uuid)

Moves a consumed message delivery into archive storage (message_archive and message_delivery_archive) and removes it from active transport tables.

renew_message_lock(message_delivery_id bigint,
lock_id uuid,
duration interval)

Renews (extends) the lock on a message that was previously fetched with the specified lock_id.

move_message(message_delivery_id bigint,
lock_id uuid,
queue_name text,
queue_type integer,
headers jsonb)

Moves a message that was previously fetched with the specified lock_id to the destination queue, adding the headers to the message delivery. Typically used by the receive endpoint to either dead-letter (skipped) or fault (error) a message.

unlock_message(message_delivery_id bigint,
lock_id uuid,
delay interval,
headers jsonb)

Unlocks a message that was previously fetched with the specified lock_id, adding the specified delay to the enqueue_time and any additional headers. Typically used when delayed message redelivery is used, or when faults are thrown back to the transport.

send_message(entity_name text,
priority integer DEFAULT NULL,
transport_message_id uuid DEFAULT gen_random_uuid(),
body jsonb DEFAULT NULL,
binary_body bytea DEFAULT NULL,
content_type text DEFAULT NULL,
message_type text DEFAULT NULL,
message_id uuid DEFAULT NULL,
correlation_id uuid DEFAULT NULL,
conversation_id uuid DEFAULT NULL,
request_id uuid DEFAULT NULL,
initiator_id uuid DEFAULT NULL,
source_address text DEFAULT NULL,
destination_address text DEFAULT NULL,
response_address text DEFAULT NULL,
fault_address text DEFAULT NULL,
sent_time timestamptz DEFAULT NULL,
headers jsonb DEFAULT NULL,
host jsonb DEFAULT NULL,
partition_key text DEFAULT NULL,
routing_key text DEFAULT NULL,
delay interval DEFAULT INTERVAL '0 seconds',
scheduling_token_id uuid DEFAULT NULL,
max_delivery_count int DEFAULT 10)

Sends a message to the entity_name queue with all the specified properties.

publish_message(entity_name text,
priority integer DEFAULT NULL,
transport_message_id uuid DEFAULT gen_random_uuid(),
body jsonb DEFAULT NULL,
binary_body bytea DEFAULT NULL,
content_type text DEFAULT NULL,
message_type text DEFAULT NULL,
message_id uuid DEFAULT NULL,
correlation_id uuid DEFAULT NULL,
conversation_id uuid DEFAULT NULL,
request_id uuid DEFAULT NULL,
initiator_id uuid DEFAULT NULL,
source_address text DEFAULT NULL,
destination_address text DEFAULT NULL,
response_address text DEFAULT NULL,
fault_address text DEFAULT NULL,
sent_time timestamptz DEFAULT NULL,
headers jsonb DEFAULT NULL,
host jsonb DEFAULT NULL,
partition_key text DEFAULT NULL,
routing_key text DEFAULT NULL,
delay interval DEFAULT INTERVAL '0 seconds',
scheduling_token_id uuid DEFAULT NULL,
max_delivery_count int DEFAULT 10)

Publishes a message to the entity_name topic with all the specified properties. Queues with matching subscriptions to the topic will each receive an instance of the message.

delete_scheduled_message(token_id uuid)

Deletes a previously scheduled message using the token_id.

touch_queue(queue_name text)

When a receive endpoint doesn’t receive any messages from a queue for a period of time, and that queue is an auto-delete queue, this function is called to add metrics for the queue so that it isn’t automatically deleted.

process_metrics(row_limit int DEFAULT 10000)

A background function used to process queue metrics, which includes messages consumed, faulted, and dead-lettered. Automatically called by idle receive endpoints. Metrics can be viewed using the queues view.

purge_topology()

A background function used to purge any auto-delete queues that have reached their idle threshold. Automatically called by idle receive endpoints.

ensure_archive_partitions(partition_interval_unit int,
partition_interval_count int,
precreate_partitions int,
retention_intervals int,
remove_expired boolean)

Creates archive partitions ahead of time and optionally removes expired partitions based on retention configuration. Called by background maintenance.

requeue_messages(queue_name text,
source_queue_type int,
target_queue_type int,
message_count int,
delay interval DEFAULT INTERVAL '0 seconds',
redelivery_count int DEFAULT 10)

Use this function to move messages from the error or dead-letter queue back to the main queue. Up to message_count messages are moved. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message’s max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.

requeue_message(message_delivery_id bigint,
target_queue_type int,
delay interval DEFAULT INTERVAL '0 seconds',
redelivery_count int DEFAULT 10)

Use this function to move a message from the error or dead-letter queue back to the main queue. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message’s max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.

Returns details about the queues and their metrics.

Returns details about all subscriptions and their settings.

Returns active message deliveries and message content joined with queue metadata.

Returns archived message deliveries and archived message content joined with queue metadata.

The tables used by the SQL transport include:

Queue metadata for all queue types. Queue types include:

TypeDescription
1Main queue
2Error queue
3Dead-letter queue
4Archive queue (when message archive is enabled)

Topic metadata used for publish/subscribe routing.

TypeDescription
1All
2Routing Key
3Pattern (uses Routing Key)

Topic-to-queue subscription bindings and routing settings.

Active transport message payload and envelope metadata.

Active message delivery records, including queue assignment, lock state, and delivery counters.

Archived transport message payload and envelope metadata.

Archived delivery records (queue, priority, partition/routing keys, delivery counts, and archive timestamps).

Per-queue throttle interval and last-notified timestamp used to limit pg_notify frequency.

Per-queue throttle interval and last-touched timestamp used to limit queue metric touch frequency for auto-delete queue activity tracking.