Pipeline Architecture

How messages flow through fapilog’s high-performance pipeline.

Overview

fapilog uses a pipeline architecture that processes log messages through several stages, each optimized for specific tasks:

┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ Application │───▶│   Context   │───▶│   Filters   │───▶│ Enrichers   │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
                                                                  │
┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│    Sinks    │◀───│    Queue    │◀───│ Processors  │◀───│  Redactors  │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘

Pipeline Stages

1. Application Layer

Your application code calls logging methods:

from fapilog import get_logger

logger = get_logger()
logger.info("User action", user_id="123")

What happens:

  • Message is created with current context

  • Extra fields are merged with context

  • Timestamp and level are added

  • Message is queued for processing

2. Context Binding

Context is automatically attached to messages:

# Context is automatically included
logger.info("Request processed", status_code=200)

Context includes:

  • Request ID

  • User ID

  • Correlation ID

  • Service name

  • Environment

3. Filters

Filters run first and can drop or tweak events before any enrichment or redaction. Returning None drops the event; returning a dict continues processing.

Built-in filters:

  • Level - Drop events below a configured level (wired from core.log_level).

  • Sampling - Keep a percentage of events.

  • Rate Limit - Token bucket limiter (optionally per key).

4. Enrichers

Enrichers add additional metadata to messages:

Built-in enrichers:

  • Runtime Info - Python version, process ID, memory usage

  • Context Variables - Request context, user context

  • Custom Enrichers - Business-specific metadata

Example:

# Message before enrichment
{"message": "User logged in", "user_id": "123"}

# Message after enrichment
{
  "message": "User logged in",
  "user_id": "123",
  "timestamp": "2024-01-15T10:30:00.123Z",
  "process_id": 12345,
  "python_version": "3.11.0",
  "request_id": "req-abc123"
}

5. Redactors

Redactors remove or mask sensitive information:

Built-in redactors:

  • Field Mask - Mask specific field names

  • Regex Mask - Mask patterns (passwords, API keys)

  • URL Credentials - Remove credentials from URLs

Example:

# Before redaction
{
  "message": "User credentials",
  "username": "john_doe",
  "password": "secret123",
  "api_key": "sk-1234567890abcdef"
}

# After redaction
{
  "message": "User credentials",
  "username": "john_doe",
  "password": "***",
  "api_key": "***"
}

6. Processors

Processors transform and optimize messages:

Built-in processors:

  • Zero-Copy - Efficient message handling

  • Batch Processing - Group messages for efficiency

  • Compression - Reduce storage requirements

6. Queue

The queue buffers messages between processing and output:

Features:

  • Lock-free design - Maximum concurrency

  • Configurable capacity - Prevent memory issues

  • Backpressure handling - Drop or wait under load

  • Zero-copy operations - Minimal memory allocation

7. Sinks

Sinks are the final destination for messages:

Built-in sinks:

  • Stdout JSON - Development and containers

  • Rotating File - Production and compliance

  • HTTP Client - Remote systems and APIs

  • MMAP Persistence - High-performance local storage

Performance Characteristics

Async Processing

  • Non-blocking - Logging never blocks your application

  • Concurrent - Multiple messages processed simultaneously

  • Efficient - Minimal CPU and memory overhead

  • Configurable worker count - Use core.worker_count to increase worker tasks for I/O-bound enrichers and sinks; keep low (1) for CPU-bound workloads.

Zero-Copy Operations

  • Memory efficient - Messages flow without copying

  • Reduced GC pressure - Fewer temporary objects

  • Better performance - Especially under high load

Batching

  • Configurable batch sizes - Balance latency vs throughput

  • Automatic batching - Based on volume and time

  • Batch compression - Reduce storage and network usage

Guarantees

1. Async Operations

Logging calls enqueue work without blocking on sinks:

# Sync
logger.info("Processing started")

# Async
await async_logger.info("Processing started")

2. Bounded Memory

Memory usage is bounded and configurable:

# Set maximum queue size
export FAPILOG_CORE__MAX_QUEUE_SIZE=8192

# Set maximum batch size
export FAPILOG_CORE__BATCH_MAX_SIZE=100

3. Backpressure Handling

System handles overload gracefully:

# Configure backpressure behavior
export FAPILOG_CORE__DROP_ON_FULL=true
export FAPILOG_CORE__BACKPRESSURE_WAIT_MS=100

4. Deduplication

Automatic deduplication of similar messages:

# Configure deduplication window
export FAPILOG_CORE__ERROR_DEDUPE_WINDOW_SECONDS=60

Configuration

Pipeline Configuration

from fapilog import Settings

settings = Settings(
    # Queue configuration
    core__max_queue_size=16384,
    core__batch_max_size=200,
    core__drop_on_full=False,
    core__backpressure_wait_ms=100,
    core__error_dedupe_window_seconds=60,
    # Optional HTTP sink
    http__endpoint="https://logs.example.com/ingest",
)

Environment Variables

# Pipeline performance
export FAPILOG_CORE__MAX_QUEUE_SIZE=16384
export FAPILOG_CORE__BATCH_MAX_SIZE=200

# Deduplication
export FAPILOG_CORE__ERROR_DEDUPE_WINDOW_SECONDS=60

# Backpressure
export FAPILOG_CORE__DROP_ON_FULL=false
export FAPILOG_CORE__BACKPRESSURE_WAIT_MS=100

Monitoring

Enabling Metrics

Enable internal metrics collection via settings:

from fapilog import get_logger, Settings

settings = Settings(core__enable_metrics=True)
logger = get_logger(settings=settings)

Or via environment variable:

export FAPILOG_CORE__ENABLE_METRICS=true

When enabled, fapilog records:

  • Queue high-watermark

  • Events submitted, processed, and dropped

  • Flush latency

  • Sink errors

  • Plugin timing

Metrics can be exported to Prometheus when fapilog[metrics] is installed. See Metrics for details.

Health Checks

Sinks and plugins implement optional health_check() methods:

# Check sink health directly
is_healthy = await my_sink.health_check()

For FastAPI applications, consider exposing health via a dedicated endpoint that checks your logging infrastructure.

Next Steps


The pipeline architecture ensures high performance and reliability while maintaining simplicity for developers.