Sinks
Output plugins that deliver serialized log entries to destinations.
Contract
Implement BaseSink methods:
async start(self) -> None: optional initialization.async write(self, entry: dict) -> None: required; receives enriched/redacted envelope.async stop(self) -> None: optional teardown.
Errors should be contained; do not raise into the pipeline.
Optional methods
write_serialized
async def write_serialized(self, view: SerializedView) -> None
Fast path when Settings.core.serialize_in_flush=True. If present, fapilog pre-serializes entries once and calls this method instead of write() for sinks that consume bytes. If absent, fapilog automatically falls back to write().
SerializedView exposes:
@dataclass
class SerializedView:
data: memoryview
def __bytes__(self) -> bytes:
return bytes(self.data)
Built-in sinks
stdout_json: JSON lines to stdout.
stdout_pretty: human-readable console output with ANSI colors (TTY only).
rotating_file: size/time-based rotation with optional compression.
http: POST log entries to an HTTP endpoint.
webhook: POST log entries to a webhook with optional signing.
Convenience factories
rotating_file (fapilog.sinks)
from fapilog.sinks import rotating_file
sink = rotating_file(
"logs/app.log",
rotation="10 MB",
retention=7,
compression=True,
mode="json",
)
Parameters:
path: file path (directory is created if missing)rotation: size-based rotation (string or int bytes)retention: max rotated files to keepcompression: gzip rotated filesmode:jsonortext
Configuration (env)
Rotating file:
export FAPILOG_FILE__DIRECTORY=/var/log/myapp
export FAPILOG_FILE__MAX_BYTES="10 MB"
export FAPILOG_FILE__MAX_FILES=5
export FAPILOG_FILE__COMPRESS_ROTATED=true
export FAPILOG_FILE__INTERVAL_SECONDS="daily"
HTTP sink:
export FAPILOG_HTTP__ENDPOINT=https://logs.example.com/ingest
export FAPILOG_HTTP__TIMEOUT_SECONDS=5
export FAPILOG_HTTP__RETRY_MAX_ATTEMPTS=3
export FAPILOG_HTTP__BATCH_SIZE=100
export FAPILOG_HTTP__BATCH_TIMEOUT_SECONDS=5
export FAPILOG_HTTP__BATCH_FORMAT=array # array|ndjson|wrapped
export FAPILOG_HTTP__BATCH_WRAPPER_KEY=logs
Building Blocks
MemoryMappedPersistence
A low-level memory-mapped file writer for building custom zero-copy sinks.
Note: MemoryMappedPersistence is not a sink itself—it does not implement
the BaseSink protocol. Instead, it provides efficient byte-level append operations
that you can use to build performance-critical custom sinks.
from fapilog.plugins.sinks import BaseSink, MemoryMappedPersistence
import json
class MyMmapSink:
"""Example custom sink using MemoryMappedPersistence."""
name = "my_mmap"
def __init__(self, path: str):
self._mmap = MemoryMappedPersistence(path)
async def start(self) -> None:
await self._mmap.open()
async def write(self, entry: dict) -> None:
data = json.dumps(entry).encode()
await self._mmap.append_line(data)
async def stop(self) -> None:
await self._mmap.close()
async def health_check(self) -> bool:
return await self._mmap.health_check()
See the MemoryMappedPersistence class documentation for full API details including
configuration options for initial size, growth factor, and sync behavior.
CloudWatch Sink
AWS CloudWatch Logs sink with batching, retry, and circuit breaker support.
Requires: boto3>=1.26.0
pip install "fapilog[cloudwatch]"
Configuration Options
Option |
Type |
Default |
Description |
|---|---|---|---|
|
str |
“/fapilog/default” |
CloudWatch log group name |
|
str |
None (auto-generated) |
CloudWatch log stream name. If not set, defaults to |
|
str |
AWS_REGION env |
AWS region for CloudWatch |
|
bool |
True |
Auto-create log group if it doesn’t exist |
|
bool |
True |
Auto-create log stream if it doesn’t exist |
|
int |
100 |
Maximum events per batch |
|
float |
5.0 |
Maximum time to wait before flushing a batch |
|
int |
3 |
Number of retry attempts on failure |
|
float |
0.5 |
Base delay in seconds for exponential backoff |
|
str |
None |
Custom endpoint URL (for LocalStack or testing) |
|
bool |
True |
Enable circuit breaker for fault tolerance |
|
int |
5 |
Consecutive failures before circuit opens |
Environment Variables
Variable |
Maps To |
|---|---|
|
|
|
|
AWS credentials are resolved via the standard boto3 credential chain:
Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY)Shared credentials file (
~/.aws/credentials)IAM role (for EC2/ECS/Lambda)
Usage Example
from fapilog import Settings
settings = Settings(
sinks=[
{
"type": "cloudwatch",
"log_group_name": "/myapp/production",
"log_stream_name": "api-server-1",
"region": "us-east-1",
"batch_size": 50,
}
]
)
Notes
Events exceeding 256 KB are dropped with a diagnostic warning
Batches are automatically chunked to respect CloudWatch limits (10,000 events, 1 MB per call)
Sequence token handling is automatic (recovers from
InvalidSequenceTokenException)
Loki Sink
Grafana Loki sink with batching, label management, and retry support.
Configuration Options
Option |
Type |
Default |
Description |
|---|---|---|---|
|
str |
“http://localhost:3100” |
Loki server URL |
|
str |
None |
Multi-tenant org ID (sets |
|
dict |
{“service”: “fapilog”} |
Static labels applied to all log streams |
|
list |
[“level”] |
Fields to extract as dynamic labels |
|
int |
100 |
Maximum events per batch |
|
float |
5.0 |
Maximum time to wait before flushing a batch |
|
float |
10.0 |
HTTP request timeout |
|
int |
3 |
Number of retry attempts on failure |
|
float |
0.5 |
Base delay in seconds for exponential backoff |
|
str |
None |
Basic authentication username |
|
str |
None |
Basic authentication password |
|
str |
None |
Bearer token for authentication |
|
bool |
True |
Enable circuit breaker for fault tolerance |
|
int |
5 |
Consecutive failures before circuit opens |
Environment Variables
Variable |
Maps To |
|---|---|
|
|
|
|
|
|
|
|
|
|
Authentication Options
Basic Authentication:
settings = Settings(
sinks=[
{
"type": "loki",
"url": "https://loki.example.com",
"auth_username": "user",
"auth_password": "secret",
}
]
)
Bearer Token:
settings = Settings(
sinks=[
{
"type": "loki",
"url": "https://loki.example.com",
"auth_token": "your-api-token",
}
]
)
Labels Configuration
Labels determine how logs are grouped into streams in Loki. Use static labels for fixed metadata and dynamic labels for fields extracted from log entries.
settings = Settings(
sinks=[
{
"type": "loki",
"labels": {
"service": "api-gateway",
"env": "production",
},
"label_keys": ["level", "logger"], # Extract from each log entry
}
]
)
Note: Label values are sanitized (non-alphanumeric characters replaced with _, max 128 chars).
PostgreSQL Sink
PostgreSQL sink with async connection pooling, batching, and automatic table creation.
Requires: asyncpg>=0.28.0
pip install "fapilog[postgres]"
Configuration Options
Option |
Type |
Default |
Description |
|---|---|---|---|
|
str |
None |
Full connection string (overrides individual connection options) |
|
str |
“localhost” |
Database host |
|
int |
5432 |
Database port |
|
str |
“fapilog” |
Database name |
|
str |
“fapilog” |
Database user |
|
str |
None |
Database password |
|
str |
“logs” |
Target table name |
|
str |
“public” |
Target schema name |
|
bool |
True |
Auto-create table and indexes if they don’t exist |
|
int |
2 |
Minimum connections in pool |
|
int |
10 |
Maximum connections in pool |
|
float |
10.0 |
Timeout in seconds to acquire a connection |
|
int |
100 |
Maximum events per batch |
|
float |
5.0 |
Maximum time to wait before flushing a batch |
|
int |
3 |
Number of retry attempts on failure |
|
float |
0.5 |
Base delay in seconds for exponential backoff |
|
bool |
True |
Enable circuit breaker for fault tolerance |
|
int |
5 |
Consecutive failures before circuit opens |
|
bool |
True |
Use JSONB type instead of JSON (enables indexing) |
|
bool |
True |
Include full event in the JSON/JSONB column |
|
list |
[“timestamp”, “level”, “logger”, “correlation_id”, “message”] |
Fields to extract as dedicated columns |
Environment Variables
Variable |
Maps To |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Table Schema
When create_table=True, the sink auto-creates a table with:
CREATE TABLE IF NOT EXISTS "public"."logs" (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
timestamp TIMESTAMPTZ,
level VARCHAR(10),
logger VARCHAR(255),
correlation_id VARCHAR(64),
message TEXT,
event JSONB NOT NULL
)
Auto-created indexes:
idx_logs_timestampontimestamp DESC- for time-range queriesidx_logs_levelonlevel- for filtering by severityidx_logs_correlation_idoncorrelation_id(partial, WHERE NOT NULL) - for tracingidx_logs_event_ginGIN index onevent- for JSONB queries (only whenuse_jsonb=True)
Usage Example
from fapilog import Settings
settings = Settings(
sinks=[
{
"type": "postgres",
"host": "db.example.com",
"database": "logs",
"user": "fapilog_writer",
"password": "secret",
"table_name": "app_logs",
"max_pool_size": 20,
}
]
)
Using DSN:
settings = Settings(
sinks=[
{
"type": "postgres",
"dsn": "postgresql://user:pass@host:5432/dbname",
}
]
)
Connection Pooling Notes
Connections are pooled using
asyncpg.create_pool()Set
min_pool_size> 1 for high-throughput applications to avoid connection acquisition latencymax_pool_sizeshould match your expected concurrency; too high can exhaust database connectionspool_acquire_timeoutprevents indefinite blocking when pool is exhausted