Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Tutorial: Singer/Meltano ETL Pipelines

This tutorial shows how to use pg_tide with Singer taps and targets to build ETL pipelines. You'll extract data from a SaaS API (HubSpot), load it into PostgreSQL, transform it, and export results to a data warehouse.

What You'll Build

HubSpot API  →  tap-hubspot  →  pg_tide inbox  →  Transform  →  pg_tide outbox  →  target-snowflake

Prerequisites

  • PostgreSQL with pg_tide installed
  • Python 3.8+ (for Singer taps/targets)
  • A HubSpot account with API access (or substitute any Singer tap)

Step 1: Install Singer Tap

pip install tap-hubspot

Step 2: Configure Extraction into pg_tide

CREATE EXTENSION pg_tide;
SELECT tide.inbox_create('hubspot_contacts');

SELECT tide.relay_set_inbox(
    'hubspot-extraction',
    'hubspot_contacts',
    '{
        "source_type": "singer",
        "tap_command": "tap-hubspot",
        "tap_config": {
            "api_key": "${env:HUBSPOT_API_KEY}",
            "start_date": "2024-01-01T00:00:00Z"
        },
        "stream_filter": ["contacts", "companies", "deals"],
        "state_persistence": true
    }'::jsonb
);

Step 3: Start the Relay

export HUBSPOT_API_KEY="your-api-key"
pg-tide --postgres-url "postgres://user:pass@localhost/mydb"

The relay runs the tap, captures its output, and writes records into the inbox. STATE messages are persisted for incremental syncs.

Step 4: Process Inbox Data

Query the inbox to see extracted records:

SELECT id, payload->>'email' as email, payload->>'company' as company
FROM tide.inbox_pending('hubspot_contacts')
WHERE payload->>'stream' = 'contacts'
LIMIT 10;

Process and transform:

-- Create a materialized view for analytics
CREATE MATERIALIZED VIEW contact_summary AS
SELECT 
    payload->>'company' as company,
    count(*) as contact_count,
    max((payload->>'last_activity_date')::date) as last_active
FROM tide.inbox_all('hubspot_contacts')
WHERE payload->>'stream' = 'contacts'
GROUP BY payload->>'company';

-- Mark processed
SELECT tide.inbox_mark_processed('hubspot_contacts', id)
FROM tide.inbox_pending('hubspot_contacts');

Step 5: Export to Data Warehouse

Create an outbox for warehouse loading:

SELECT tide.outbox_create('warehouse_events');

-- Publish transformed data
SELECT tide.outbox_publish('warehouse_events', 'contacts', jsonb_build_object(
    'company', company,
    'contact_count', contact_count,
    'last_active', last_active
))
FROM contact_summary;

Configure Singer target export:

SELECT tide.relay_set_outbox(
    'to-snowflake',
    'warehouse_events',
    '{
        "sink_type": "singer",
        "target_command": "target-snowflake",
        "target_config": {
            "account": "${env:SNOWFLAKE_ACCOUNT}",
            "user": "${env:SNOWFLAKE_USER}",
            "password": "${env:SNOWFLAKE_PASSWORD}",
            "database": "ANALYTICS",
            "schema": "RAW"
        }
    }'::jsonb
);

Step 6: Schedule Incremental Syncs

Since pg_tide persists Singer STATE, each run only extracts new/changed records. Schedule periodic syncs with cron or a workflow orchestrator:

# Run every hour - only extracts changes since last run
0 * * * * pg-tide --postgres-url "..." --run-once

Or keep the relay running continuously — it will re-run the tap at configurable intervals.

Available Taps (Examples)

TapData SourceInstall
tap-hubspotHubSpot CRMpip install tap-hubspot
tap-salesforceSalesforcepip install tap-salesforce
tap-stripeStripe paymentspip install tap-stripe
tap-githubGitHub repospip install tap-github
tap-postgresPostgreSQL DBpip install tap-postgres
tap-mysqlMySQL DBpip install tap-mysql
tap-google-analyticsGA4pip install tap-google-analytics
tap-shopifyShopifypip install tap-shopify

Browse 500+ taps at hub.meltano.com.

Further Reading