Multi-Database Refresh Broker — Design Document
Implementation Status: Design only — not yet implemented. This feature is planned for a future release after v1.0. Track progress at ROADMAP.md.
The design below is a stable reference for contributors and reviewers. The API described here is aspirational and subject to change before implementation begins.
Status: Design only (v0.31.0 SCAL-2). Implementation planned for a future release.
Problem
When pg_trickle is installed in multiple databases on the same PostgreSQL cluster, each
per-database scheduler independently scans its change buffers. For workloads where two
databases reference the same upstream source — commonly via postgres_fdw foreign tables
or logical replication — each scheduler pays the full scan cost independently:
DB A scheduler: SELECT * FROM pgtrickle_changes.changes_12345 (full scan)
DB B scheduler: SELECT * FROM pgtrickle_changes.changes_12345 (same scan, again)
At 100 stream tables across 10 databases with 5 shared sources, this is 10× the necessary I/O.
Goal
Introduce a "refresh broker" — a singleton background worker that:
- De-duplicates change-buffer scans across databases in the same cluster.
- Distributes scan results to per-database schedulers via shared memory.
- Reduces total change-buffer I/O proportionally to the number of databases sharing a source.
Design
Components
┌─────────────────────────────────────────────────────────────┐
│ PostgreSQL Cluster │
│ │
│ ┌────────────────┐ shared memory ┌─────────────┐ │
│ │ Refresh Broker │ ──── scan results ────► │ DB-A sched │ │
│ │ (singleton) │ │ DB-B sched │ │
│ │ │ ◄─── scan requests ──── │ DB-C sched │ │
│ └────────────────┘ └─────────────┘ │
│ │ │
│ │ single scan per source OID per tick │
│ ▼ │
│ pgtrickle_changes.changes_{oid} │
└─────────────────────────────────────────────────────────────┘
Broker Protocol
-
Registration: Each per-database scheduler registers its interest in a set of source OIDs with the broker via shared memory (
PgLwLock<BrokerRegistry>). -
Tick coordination: At the start of each tick, the broker scans each registered source OID once. Results (row counts + LSN watermarks) are written to a shared memory segment indexed by (database_oid, source_oid).
-
Result consumption: Per-database schedulers read the broker's results instead of issuing their own SPI queries. The broker's scan is authoritative for the tick.
-
Fallback: If the broker is not running (e.g.
max_worker_processesexhausted), per-database schedulers fall back to their current direct-scan behaviour.
Shared Memory Layout
#![allow(unused)] fn main() { /// BRK-1: Broker registry entry for one (database, source) pair. struct BrokerEntry { db_oid: pg_sys::Oid, source_oid: pg_sys::Oid, /// Last scan result: row count in the change buffer. pending_rows: i64, /// Last scan result: maximum LSN seen in the change buffer. max_lsn_u64: u64, /// Monotone tick counter when this entry was last updated. last_updated_tick: u64, } /// Maximum number of (db, source) pairs the broker tracks. const BROKER_CAPACITY: usize = 4096; }
Broker Worker Loop
loop:
1. Sleep until next tick (shared scheduler_interval_ms).
2. Lock BrokerRegistry (read) to collect unique source OIDs.
3. For each unique source OID, run:
SELECT COUNT(*), MAX(lsn) FROM pgtrickle_changes.changes_{oid}
4. Write results to BrokerScanResults (lock-free CAS update).
5. Advance broker tick counter.
6. Per-DB schedulers wake and consume results.
Integration with Per-DB Schedulers
In src/scheduler.rs, has_table_source_changes would gain a fast path:
#![allow(unused)] fn main() { fn has_table_source_changes(st: &StreamTableMeta) -> bool { // Fast path: try broker results first. if config::pg_trickle_adaptive_batch_coalescing() { if let Some(result) = broker::get_scan_result(source_oid) { return result.pending_rows > 0; } } // Fallback: direct SPI query (current behaviour). // ... } }
Open Questions
-
Transaction isolation: The broker scans in its own transaction. Per-DB schedulers that read its results are using data from a different snapshot. Is this acceptable? (Short answer: yes — the existing behaviour already has a tick-window delay between when changes are written and when they are consumed.)
-
Cross-database connectivity: The broker must connect to each database to read its change buffers. PostgreSQL background workers connect to a specific database. We may need a pool of broker workers, one per database, coordinated by a shared-memory rendezvous point.
-
Authorization: The broker needs read access to
pgtrickle_changes.*in each database. This is satisfied byshared_preload_libraries+SECURITY DEFINERwrapper functions. -
Failure isolation: If the broker crashes, per-DB schedulers must detect the absence of fresh results and fall back to direct scans within one tick.
Next Steps (v0.32.0+)
-
Implement
BrokerRegistryshared memory struct +init_shared_memoryhook - Implement broker background worker registration
-
Add
broker::get_scan_resultfast path inhas_table_source_changes -
Add
pg_trickle.enable_refresh_brokerGUC (defaultfalseuntil stable) - Add E2E test: two databases sharing a source, broker reduces SPI queries to 1×
- Benchmark: 10 databases × 5 shared sources — measure scan reduction
Filed under v0.31.0 (SCAL-2). ADR reference: see plans/adrs/ for architectural rationale on trigger-based vs broker-based CDC.