Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Tutorial: Kafka + Flink Stream Processing

This tutorial shows how to build a real-time analytics pipeline using pg_tide and Apache Flink. You'll publish order events from PostgreSQL to Kafka via pg_tide, then process them with Flink SQL to compute running totals and write results back.

What You'll Build

PostgreSQL (orders)  →  pg_tide  →  Kafka  →  Flink SQL  →  Kafka (results)  →  pg_tide  →  PostgreSQL (analytics)

Prerequisites

  • PostgreSQL 14+ with pg_tide extension installed
  • pg-tide relay binary
  • Apache Kafka (or Redpanda)
  • Apache Flink 1.17+ with SQL Gateway

Step 1: Create the Outbox

-- Create an outbox for order events
SELECT tide.outbox_create('order_events');

-- Insert some test events
SELECT tide.outbox_publish('order_events', 'orders', jsonb_build_object(
    'order_id', 'ORD-001',
    'customer_id', 'CUST-42',
    'total', 149.99,
    'region', 'us-east',
    'created_at', now()
));

Step 2: Configure the Pipeline

SELECT tide.relay_set_outbox(
    'orders-to-kafka',
    'order_events',
    '{
        "sink_type": "kafka",
        "brokers": "kafka:9092",
        "topic": "order-events",
        "wire_format": "debezium",
        "wire_config": {
            "server_name": "production"
        }
    }'::jsonb
);

Step 3: Start the Relay

pg-tide --postgres-url "postgres://user:pass@localhost/mydb"

Connect to Flink SQL Gateway and define a source table reading from Kafka:

CREATE TABLE order_events (
    order_id STRING,
    customer_id STRING,
    total DECIMAL(10, 2),
    region STRING,
    created_at TIMESTAMP(3),
    WATERMARK FOR created_at AS created_at - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'order-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-analytics',
    'format' = 'debezium-json',
    'scan.startup.mode' = 'earliest-offset'
);

Define the output table:

CREATE TABLE order_analytics (
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    region STRING,
    order_count BIGINT,
    total_revenue DECIMAL(12, 2)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order-analytics',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

Run the aggregation:

INSERT INTO order_analytics
SELECT 
    window_start,
    window_end,
    region,
    COUNT(*) as order_count,
    SUM(total) as total_revenue
FROM TABLE(
    TUMBLE(TABLE order_events, DESCRIPTOR(created_at), INTERVAL '1' MINUTE)
)
GROUP BY window_start, window_end, region;

Step 5: Ingest Results Back to PostgreSQL

Create an inbox and configure a reverse pipeline to consume Flink's output:

-- Create inbox for analytics results
SELECT tide.inbox_create('analytics_results');

-- Configure pipeline to consume from the results topic
SELECT tide.relay_set_inbox(
    'analytics-from-flink',
    'analytics_results',
    '{
        "source_type": "kafka",
        "brokers": "kafka:9092",
        "topic": "order-analytics",
        "consumer_group": "pg-tide-analytics",
        "auto_offset_reset": "earliest"
    }'::jsonb
);

Step 6: Query Results

SELECT payload->>'region' as region,
       (payload->>'total_revenue')::decimal as revenue,
       (payload->>'order_count')::int as orders,
       payload->>'window_end' as period
FROM tide.inbox_pending('analytics_results')
ORDER BY period DESC;

Key Takeaways

  • pg_tide's Debezium wire format integrates seamlessly with Flink's debezium-json format
  • The bidirectional flow (outbox → Kafka → Flink → Kafka → inbox) keeps PostgreSQL as the source of truth
  • Flink provides windowed aggregations that would be expensive to compute in PostgreSQL

Further Reading