PostgreSQL Sink

Store structured logs in PostgreSQL with async batching, connection pooling, and JSONB storage.

Installation

pip install "fapilog[postgres]"

Quick start

from fapilog.plugins.sinks.contrib.postgres import PostgresSink, PostgresSinkConfig

sink = PostgresSink(
    PostgresSinkConfig(
        host="localhost",
        database="fapilog",
        user="fapilog",
        password="secret",
        table_name="logs",
    )
)

Environment-based setup:

export FAPILOG_CORE__SINKS='["postgres"]'
export FAPILOG_POSTGRES__HOST=localhost
export FAPILOG_POSTGRES__DATABASE=fapilog
export FAPILOG_POSTGRES__USER=fapilog
export FAPILOG_POSTGRES__PASSWORD=secret

Configuration highlights

  • Connection: dsn or host/port/database/user/password

  • Pooling: min_pool_size, max_pool_size, pool_acquire_timeout

  • Batching: batch_size, batch_timeout_seconds

  • Reliability: max_retries, retry_base_delay, circuit_breaker_enabled

  • Storage: use_jsonb, include_raw_json, extract_fields

  • Table management: schema_name, table_name, create_table

Auto Table Creation

Warning: By default, create_table=True causes the sink to execute DDL (CREATE TABLE, CREATE INDEX) at startup. In production environments with restricted database permissions or change management policies, set create_table=False and provision tables via migrations or infrastructure-as-code.

The production preset automatically sets create_table=False.

Setting

Default

Production Preset

create_table

True

False

For production deployments, either:

  1. Use preset="production" which disables auto-creation

  2. Explicitly set create_table=False in your configuration

  3. Create tables via migrations before deploying the application

Environment variable aliases:

Variable

Default

FAPILOG_POSTGRES__DSN

None

FAPILOG_POSTGRES__HOST

localhost

FAPILOG_POSTGRES__PORT

5432

FAPILOG_POSTGRES__DATABASE

fapilog

FAPILOG_POSTGRES__USER

fapilog

FAPILOG_POSTGRES__PASSWORD

None

FAPILOG_POSTGRES__TABLE_NAME

logs

FAPILOG_POSTGRES__BATCH_SIZE

100

FAPILOG_POSTGRES__CREATE_TABLE

true

FAPILOG_POSTGRES__USE_JSONB

true

Programmatic configuration:

from fapilog.plugins.sinks.contrib.postgres import PostgresSinkConfig

config = PostgresSinkConfig(
    dsn="postgresql://logger:secret@db/fapilog",
    table_name="application_logs",
    batch_size=200,
    use_jsonb=True,
    extract_fields=["timestamp", "level", "message", "correlation_id"],
)

Schema and indexes

Default schema (JSONB event column):

CREATE TABLE IF NOT EXISTS public.logs (
    id BIGSERIAL PRIMARY KEY,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    timestamp TIMESTAMPTZ,
    level VARCHAR(10),
    logger VARCHAR(255),
    correlation_id VARCHAR(64),
    message TEXT,
    event JSONB NOT NULL
);

The correlation_id column is extracted from entry["context"]["correlation_id"] (v1.1 schema). When no correlation context is active, the column is NULL.

Indexes:

  • timestamp DESC for time range queries

  • level for severity filters

  • correlation_id for tracing (partial index, excludes NULLs)

  • event GIN index when use_jsonb=True

Query examples

-- Recent errors
SELECT timestamp, message FROM logs WHERE level IN ('ERROR', 'CRITICAL') ORDER BY timestamp DESC LIMIT 50;

-- Requests by correlation id
SELECT * FROM logs WHERE correlation_id = 'req-123' ORDER BY timestamp;

-- JSONB fields
SELECT * FROM logs WHERE event->>'service' = 'api';
SELECT * FROM logs WHERE event->'metadata'->>'version' = '1.2.3';

Performance tuning

  • Increase batch_size for higher throughput (e.g., 500).

  • Adjust min_pool_size/max_pool_size based on concurrent writers.

  • Use pool_acquire_timeout to prevent stalls when the pool is exhausted.

  • Consider partitioning or TimescaleDB for very large datasets (see below).

High-volume scenarios

For applications generating millions of log events per day, consider these strategies:

Native partitioning

PostgreSQL 10+ supports declarative partitioning. Partition by timestamp for efficient time-range queries and simplified retention:

-- Create partitioned table (run manually, set create_table=False)
CREATE TABLE logs (
    id BIGSERIAL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    timestamp TIMESTAMPTZ,
    level VARCHAR(10),
    logger VARCHAR(255),
    correlation_id VARCHAR(64),
    message TEXT,
    event JSONB NOT NULL
) PARTITION BY RANGE (timestamp);

-- Create monthly partitions
CREATE TABLE logs_2024_01 PARTITION OF logs
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

TimescaleDB hypertables

TimescaleDB is a PostgreSQL extension optimized for time-series data. Convert your logs table to a hypertable for automatic partitioning and compression:

-- Install TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Convert to hypertable (after creating base table)
SELECT create_hypertable('logs', 'timestamp');

-- Enable compression for older data (optional)
ALTER TABLE logs SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'level'
);

-- Add compression policy (compress data older than 7 days)
SELECT add_compression_policy('logs', INTERVAL '7 days');

Benefits of TimescaleDB:

  • Automatic time-based partitioning (chunks)

  • Built-in compression (10-20x storage reduction)

  • Continuous aggregates for dashboards

  • Retention policies for automatic cleanup

Retention policies

For both native and TimescaleDB partitioning, implement retention to manage storage:

-- TimescaleDB: drop data older than 90 days
SELECT add_retention_policy('logs', INTERVAL '90 days');

-- Native partitioning: drop old partitions manually
DROP TABLE logs_2023_01;

Troubleshooting

  • Connection refused: verify PostgreSQL is reachable (psql -h ... -c "SELECT 1").

  • Pool exhaustion: increase max_pool_size or reduce the number of logger instances.

  • Slow inserts: increase batch_size, reduce index count, or add partitioning.