Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Consumer Groups API

Functions for managing consumer groups. All live in the tide schema.


tide.create_consumer_group

Create a named consumer group for an outbox.

SELECT tide.create_consumer_group(
  p_name               TEXT,
  p_outbox             TEXT,
  p_auto_offset_reset  TEXT     DEFAULT 'earliest',
  p_if_not_exists      BOOLEAN  DEFAULT false
);
ParameterTypeDefaultDescription
p_nameTEXT(required)Unique group name
p_outboxTEXT(required)Outbox this group consumes from
p_auto_offset_resetTEXT'earliest'earliest, latest, or none
p_if_not_existsBOOLEANfalseSuppress error if already exists

Errors:

  • Outbox must exist
  • Group name must be unique (unless p_if_not_exists = true)
  • p_auto_offset_reset must be one of: earliest, latest, none

tide.drop_consumer_group

Drop a consumer group and all its offset/lease records.

SELECT tide.drop_consumer_group(
  p_name       TEXT,
  p_if_exists  BOOLEAN DEFAULT false
);

tide.commit_offset

Commit a consumer's processing position.

SELECT tide.commit_offset(
  p_group       TEXT,
  p_consumer    TEXT,
  p_last_offset BIGINT
);
ParameterTypeDescription
p_groupTEXTConsumer group name
p_consumerTEXTConsumer identifier (e.g., relay instance ID)
p_last_offsetBIGINTLast successfully processed message ID

Behavior:

  • Upserts into tide.tide_consumer_offsets
  • Updates last_heartbeat to now()
  • Releases any visibility lease held by this consumer

tide.consumer_heartbeat

Update the heartbeat timestamp for a consumer.

SELECT tide.consumer_heartbeat(
  p_group     TEXT,
  p_consumer  TEXT
);

Call this periodically (e.g., every 10 seconds) while processing to signal liveness.


Views

tide.consumer_lag

Per-consumer lag relative to the latest outbox message:

SELECT * FROM tide.consumer_lag;
ColumnTypeDescription
group_nameTEXTConsumer group
outbox_nameTEXTSource outbox
consumer_idTEXTConsumer identifier
committed_offsetBIGINTLast committed position
lagBIGINTMessages behind (max_id - committed_offset)
last_heartbeatTIMESTAMPTZLast heartbeat time