PostgreSQL Sink
Store structured logs in PostgreSQL with async batching, connection pooling, and JSONB storage.
Installation
pip install "fapilog[postgres]"
Quick start
from fapilog.plugins.sinks.contrib.postgres import PostgresSink, PostgresSinkConfig
sink = PostgresSink(
PostgresSinkConfig(
host="localhost",
database="fapilog",
user="fapilog",
password="secret",
table_name="logs",
)
)
Environment-based setup:
export FAPILOG_CORE__SINKS='["postgres"]'
export FAPILOG_POSTGRES__HOST=localhost
export FAPILOG_POSTGRES__DATABASE=fapilog
export FAPILOG_POSTGRES__USER=fapilog
export FAPILOG_POSTGRES__PASSWORD=secret
Configuration highlights
Connection:
dsnorhost/port/database/user/passwordPooling:
min_pool_size,max_pool_size,pool_acquire_timeoutBatching:
batch_size,batch_timeout_secondsReliability:
max_retries,retry_base_delay,circuit_breaker_enabledStorage:
use_jsonb,include_raw_json,extract_fieldsTable management:
schema_name,table_name,create_table
Auto Table Creation
Warning: By default,
create_table=Truecauses the sink to execute DDL (CREATE TABLE, CREATE INDEX) at startup. In production environments with restricted database permissions or change management policies, setcreate_table=Falseand provision tables via migrations or infrastructure-as-code.The
productionpreset automatically setscreate_table=False.
Setting |
Default |
Production Preset |
|---|---|---|
|
|
|
For production deployments, either:
Use
preset="production"which disables auto-creationExplicitly set
create_table=Falsein your configurationCreate tables via migrations before deploying the application
Environment variable aliases:
Variable |
Default |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Programmatic configuration:
from fapilog.plugins.sinks.contrib.postgres import PostgresSinkConfig
config = PostgresSinkConfig(
dsn="postgresql://logger:secret@db/fapilog",
table_name="application_logs",
batch_size=200,
use_jsonb=True,
extract_fields=["timestamp", "level", "message", "correlation_id"],
)
Schema and indexes
Default schema (JSONB event column):
CREATE TABLE IF NOT EXISTS public.logs (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
timestamp TIMESTAMPTZ,
level VARCHAR(10),
logger VARCHAR(255),
correlation_id VARCHAR(64),
message TEXT,
event JSONB NOT NULL
);
The correlation_id column is extracted from entry["context"]["correlation_id"] (v1.1 schema). When no correlation context is active, the column is NULL.
Indexes:
timestamp DESCfor time range querieslevelfor severity filterscorrelation_idfor tracing (partial index, excludes NULLs)eventGIN index whenuse_jsonb=True
Query examples
-- Recent errors
SELECT timestamp, message FROM logs WHERE level IN ('ERROR', 'CRITICAL') ORDER BY timestamp DESC LIMIT 50;
-- Requests by correlation id
SELECT * FROM logs WHERE correlation_id = 'req-123' ORDER BY timestamp;
-- JSONB fields
SELECT * FROM logs WHERE event->>'service' = 'api';
SELECT * FROM logs WHERE event->'metadata'->>'version' = '1.2.3';
Performance tuning
Increase
batch_sizefor higher throughput (e.g., 500).Adjust
min_pool_size/max_pool_sizebased on concurrent writers.Use
pool_acquire_timeoutto prevent stalls when the pool is exhausted.Consider partitioning or TimescaleDB for very large datasets (see below).
High-volume scenarios
For applications generating millions of log events per day, consider these strategies:
Native partitioning
PostgreSQL 10+ supports declarative partitioning. Partition by timestamp for efficient time-range queries and simplified retention:
-- Create partitioned table (run manually, set create_table=False)
CREATE TABLE logs (
id BIGSERIAL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
timestamp TIMESTAMPTZ,
level VARCHAR(10),
logger VARCHAR(255),
correlation_id VARCHAR(64),
message TEXT,
event JSONB NOT NULL
) PARTITION BY RANGE (timestamp);
-- Create monthly partitions
CREATE TABLE logs_2024_01 PARTITION OF logs
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
TimescaleDB hypertables
TimescaleDB is a PostgreSQL extension optimized for time-series data. Convert your logs table to a hypertable for automatic partitioning and compression:
-- Install TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- Convert to hypertable (after creating base table)
SELECT create_hypertable('logs', 'timestamp');
-- Enable compression for older data (optional)
ALTER TABLE logs SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'level'
);
-- Add compression policy (compress data older than 7 days)
SELECT add_compression_policy('logs', INTERVAL '7 days');
Benefits of TimescaleDB:
Automatic time-based partitioning (chunks)
Built-in compression (10-20x storage reduction)
Continuous aggregates for dashboards
Retention policies for automatic cleanup
Retention policies
For both native and TimescaleDB partitioning, implement retention to manage storage:
-- TimescaleDB: drop data older than 90 days
SELECT add_retention_policy('logs', INTERVAL '90 days');
-- Native partitioning: drop old partitions manually
DROP TABLE logs_2023_01;
Troubleshooting
Connection refused: verify PostgreSQL is reachable (
psql -h ... -c "SELECT 1").Pool exhaustion: increase
max_pool_sizeor reduce the number of logger instances.Slow inserts: increase
batch_size, reduce index count, or add partitioning.