SQL Transport Functions
MassTransit uses PostgreSQL functions or SQL Server stored procedures for all transport operations. These functions are built to manage the constraints of the transport, such as message locking, redelivery, and subscription.
For completeness, the functions are detailed below.
Create Queue
Section titled “Create Queue”create_queue(queue_name text, auto_delete integer DEFAULT NULL)Creates a queue with its associated error and dead-letter queues. The resulting three queues each have their own type.
The current transport migration also includes a versioned queue creation function/procedure that accepts additional settings:
create_queue_v3(queue_name text, auto_delete integer DEFAULT NULL, max_delivery_count integer DEFAULT NULL, enable_message_archive boolean DEFAULT FALSE)When enable_message_archive is true, an archive queue record (queue type 4) is also provisioned.
Create Topic
Section titled “Create Topic”create_topic(topic_name text)Creates a topic.
Create Topic Subscription
Section titled “Create Topic Subscription”create_topic_subscription(source_topic_name text, destination_topic_name text, type integer, routing_key text DEFAULT '', filter jsonb DEFAULT '{{}}')Creates a subscription from one topic to another topic.
Create Queue Subscription
Section titled “Create Queue Subscription”create_queue_subscription(source_topic_name text, destination_queue_name text, type integer, routing_key text DEFAULT '', filter jsonb DEFAULT '{{}}')Creates a subscription from a topic to a queue.
Purge Queue
Section titled “Purge Queue”purge_queue(queue_name text)Removes all messages from a queue, including messages in the error and dead-letter sub-queues.
Fetch Messages
Section titled “Fetch Messages”fetch_messages(queue_name text, fetch_consumer_id uuid, fetch_lock_id uuid, lock_duration interval, fetch_count integer DEFAULT 1)Fetches messages from the specified queue.
Fetch Messages Partitioned
Section titled “Fetch Messages Partitioned”fetch_messages_partitioned(queue_name text, fetch_consumer_id uuid, fetch_lock_id uuid, lock_duration interval, fetch_count integer DEFAULT 1, concurrent_count integer DEFAULT 1, ordered integer DEFAULT 0)Fetches messages from the specified queue using the partitioned receive mode.
Receive Endpoint Started
Section titled “Receive Endpoint Started”receive_endpoint_started(p_queue_id bigint, p_notify_throttle interval, p_touch_throttle interval DEFAULT NULL)Registers per-queue startup settings for PostgreSQL receive endpoints, including notification throttle and queue-touch throttle intervals.
Delete Message
Section titled “Delete Message”delete_message(message_delivery_id bigint, lock_id uuid)Deletes a message that was previously fetched with the specified lock_id.
Archive Message
Section titled “Archive Message”archive_message(message_delivery_id bigint, lock_id uuid)Moves a consumed message delivery into archive storage (message_archive and message_delivery_archive) and removes it from active transport tables.
Renew Message Lock
Section titled “Renew Message Lock”renew_message_lock(message_delivery_id bigint, lock_id uuid, duration interval)Renews (extends) the lock on a message that was previously fetched with the specified lock_id.
Move Message
Section titled “Move Message”move_message(message_delivery_id bigint, lock_id uuid, queue_name text, queue_type integer, headers jsonb)Moves a message that was previously fetched with the specified lock_id to the destination queue, adding the headers to the message delivery. Typically used by the receive endpoint to either dead-letter (skipped) or fault (error) a message.
Unlock Message
Section titled “Unlock Message”unlock_message(message_delivery_id bigint, lock_id uuid, delay interval, headers jsonb)Unlocks a message that was previously fetched with the specified lock_id, adding the specified delay to the enqueue_time and any additional headers. Typically used when delayed message redelivery is used, or when faults are thrown back to the transport.
Send Message
Section titled “Send Message”send_message(entity_name text, priority integer DEFAULT NULL, transport_message_id uuid DEFAULT gen_random_uuid(), body jsonb DEFAULT NULL, binary_body bytea DEFAULT NULL, content_type text DEFAULT NULL, message_type text DEFAULT NULL, message_id uuid DEFAULT NULL, correlation_id uuid DEFAULT NULL, conversation_id uuid DEFAULT NULL, request_id uuid DEFAULT NULL, initiator_id uuid DEFAULT NULL, source_address text DEFAULT NULL, destination_address text DEFAULT NULL, response_address text DEFAULT NULL, fault_address text DEFAULT NULL, sent_time timestamptz DEFAULT NULL, headers jsonb DEFAULT NULL, host jsonb DEFAULT NULL, partition_key text DEFAULT NULL, routing_key text DEFAULT NULL, delay interval DEFAULT INTERVAL '0 seconds', scheduling_token_id uuid DEFAULT NULL, max_delivery_count int DEFAULT 10)Sends a message to the entity_name queue with all the specified properties.
Publish Message
Section titled “Publish Message”publish_message(entity_name text, priority integer DEFAULT NULL, transport_message_id uuid DEFAULT gen_random_uuid(), body jsonb DEFAULT NULL, binary_body bytea DEFAULT NULL, content_type text DEFAULT NULL, message_type text DEFAULT NULL, message_id uuid DEFAULT NULL, correlation_id uuid DEFAULT NULL, conversation_id uuid DEFAULT NULL, request_id uuid DEFAULT NULL, initiator_id uuid DEFAULT NULL, source_address text DEFAULT NULL, destination_address text DEFAULT NULL, response_address text DEFAULT NULL, fault_address text DEFAULT NULL, sent_time timestamptz DEFAULT NULL, headers jsonb DEFAULT NULL, host jsonb DEFAULT NULL, partition_key text DEFAULT NULL, routing_key text DEFAULT NULL, delay interval DEFAULT INTERVAL '0 seconds', scheduling_token_id uuid DEFAULT NULL, max_delivery_count int DEFAULT 10)Publishes a message to the entity_name topic with all the specified properties. Queues with matching subscriptions to the topic will each receive an instance of the message.
Delete Scheduled Message
Section titled “Delete Scheduled Message”delete_scheduled_message(token_id uuid)Deletes a previously scheduled message using the token_id.
Touch Queue
Section titled “Touch Queue”touch_queue(queue_name text)When a receive endpoint doesn’t receive any messages from a queue for a period of time, and that queue is an auto-delete queue, this function is called to add metrics for the queue so that it isn’t automatically deleted.
Process Metrics
Section titled “Process Metrics”process_metrics(row_limit int DEFAULT 10000)A background function used to process queue metrics, which includes messages consumed, faulted, and dead-lettered. Automatically called by idle receive
endpoints. Metrics can be viewed using the queues view.
Purge Topology
Section titled “Purge Topology”purge_topology()A background function used to purge any auto-delete queues that have reached their idle threshold. Automatically called by idle receive endpoints.
Ensure Archive Partitions
Section titled “Ensure Archive Partitions”ensure_archive_partitions(partition_interval_unit int, partition_interval_count int, precreate_partitions int, retention_intervals int, remove_expired boolean)Creates archive partitions ahead of time and optionally removes expired partitions based on retention configuration. Called by background maintenance.
End-User Functions
Section titled “End-User Functions”Requeue Messages
Section titled “Requeue Messages”requeue_messages(queue_name text, source_queue_type int, target_queue_type int, message_count int, delay interval DEFAULT INTERVAL '0 seconds', redelivery_count int DEFAULT 10)Use this function to move messages from the error or dead-letter queue back to the main queue. Up to message_count messages are moved. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message’s max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.
Requeue Message
Section titled “Requeue Message”requeue_message(message_delivery_id bigint, target_queue_type int, delay interval DEFAULT INTERVAL '0 seconds', redelivery_count int DEFAULT 10)Use this function to move a message from the error or dead-letter queue back to the main queue. An optional delay can be added to the current time to delay the message redelivery. An additional redelivery_count can also be added to the message’s max_delivery_count to ensure the message can be consumed if the delivery count previously exceeded the delivery count limit.
Queues
Section titled “Queues”Returns details about the queues and their metrics.
Subscriptions
Section titled “Subscriptions”Returns details about all subscriptions and their settings.
Messages
Section titled “Messages”Returns active message deliveries and message content joined with queue metadata.
Archive Messages
Section titled “Archive Messages”Returns archived message deliveries and archived message content joined with queue metadata.
Tables
Section titled “Tables”The tables used by the SQL transport include:
Queue metadata for all queue types. Queue types include:
| Type | Description |
|---|---|
| 1 | Main queue |
| 2 | Error queue |
| 3 | Dead-letter queue |
| 4 | Archive queue (when message archive is enabled) |
Topic metadata used for publish/subscribe routing.
Topic Subscription
Section titled “Topic Subscription”Subscription Type
Section titled “Subscription Type”| Type | Description |
|---|---|
| 1 | All |
| 2 | Routing Key |
| 3 | Pattern (uses Routing Key) |
Queue Subscription
Section titled “Queue Subscription”Topic-to-queue subscription bindings and routing settings.
Message
Section titled “Message”Active transport message payload and envelope metadata.
Message Delivery
Section titled “Message Delivery”Active message delivery records, including queue assignment, lock state, and delivery counters.
Message Archive
Section titled “Message Archive”Archived transport message payload and envelope metadata.
Message Delivery Archive
Section titled “Message Delivery Archive”Archived delivery records (queue, priority, partition/routing keys, delivery counts, and archive timestamps).
Notify Throttle (PostgreSQL)
Section titled “Notify Throttle (PostgreSQL)”Per-queue throttle interval and last-notified timestamp used to limit pg_notify frequency.
Touch Throttle (PostgreSQL)
Section titled “Touch Throttle (PostgreSQL)”Per-queue throttle interval and last-touched timestamp used to limit queue metric touch frequency for auto-delete queue activity tracking.