Adaptive Pipeline

The adaptive pipeline automatically scales workers, batch sizes, and filter levels based on real-time queue pressure. Instead of manually tuning these parameters for peak load, the pipeline self-adjusts as traffic changes. Queue capacity is fixed at startup — use with_queue_budget() or with_queue_size() to set your memory ceiling.

Quick start

from fapilog import get_logger

# One-line setup
logger = get_logger(preset="adaptive")

Or with the builder:

from fapilog import LoggerBuilder

logger = (
    LoggerBuilder()
    .with_preset("production")
    .with_adaptive(enabled=True)
    .with_circuit_breaker(enabled=True, fallback_sink="rotating_file")
    .build()
)

How it works

The adaptive pipeline has three components:

  1. Pressure Monitor — Samples queue fill ratio at regular intervals (default: every 0.25s)

  2. Escalation State Machine — Computes the current pressure level based on fill ratio thresholds with hysteresis

  3. Actuators — Respond to pressure level changes by adjusting pipeline parameters

Pressure levels

Level

Fill Ratio

Meaning

NORMAL

< 60%

Pipeline is healthy, no adjustments

ELEVATED

>= 60%

Queue filling up, begin scaling

HIGH

>= 80%

Significant pressure, aggressive scaling

CRITICAL

>= 92%

Near capacity, maximum response

Hysteresis

To prevent oscillation between levels, de-escalation thresholds are lower than escalation thresholds:

Transition

Escalate At

De-escalate At

NORMAL / ELEVATED

60%

40%

ELEVATED / HIGH

80%

60%

HIGH / CRITICAL

92%

75%

This means the queue must drop well below the escalation threshold before the pipeline scales back down.

Actuators

Actuator

What It Does

NORMAL

ELEVATED

HIGH

CRITICAL

Filter tightening

Raises effective log level

None

Soft

Medium

Aggressive

Worker scaling

Adds worker tasks

Initial (2)

+1

+2

Max (8)

Batch sizing

Adjusts batch sizes (opt-in)

Base (100)

1.5x

2x

4x

Configuration reference

All settings live under the adaptive key:

Setting

Default

Description

enabled

false

Enable adaptive pipeline controller

check_interval_seconds

0.25

Seconds between queue pressure samples

cooldown_seconds

2.0

Minimum seconds between pressure level transitions

escalate_to_elevated

0.60

Fill ratio to escalate NORMAL to ELEVATED

escalate_to_high

0.80

Fill ratio to escalate ELEVATED to HIGH

escalate_to_critical

0.92

Fill ratio to escalate HIGH to CRITICAL

deescalate_from_elevated

0.40

Fill ratio to de-escalate ELEVATED to NORMAL

deescalate_from_high

0.60

Fill ratio to de-escalate HIGH to ELEVATED

deescalate_from_critical

0.75

Fill ratio to de-escalate CRITICAL to HIGH

max_workers

8

Maximum workers when dynamic scaling is active

batch_sizing

false

Enable adaptive batch sizing

circuit_pressure_boost

0.20

Pressure boost per open circuit breaker

filter_tightening

true

Enable adaptive filter tightening based on pressure level

worker_scaling

true

Enable dynamic worker scaling based on pressure level

Environment variables

FAPILOG_ADAPTIVE__ENABLED=true
FAPILOG_ADAPTIVE__CHECK_INTERVAL_SECONDS=0.25
FAPILOG_ADAPTIVE__COOLDOWN_SECONDS=2.0
FAPILOG_ADAPTIVE__ESCALATE_TO_ELEVATED=0.60
FAPILOG_ADAPTIVE__ESCALATE_TO_HIGH=0.80
FAPILOG_ADAPTIVE__ESCALATE_TO_CRITICAL=0.92
FAPILOG_ADAPTIVE__MAX_WORKERS=8
FAPILOG_ADAPTIVE__BATCH_SIZING=true
FAPILOG_ADAPTIVE__CIRCUIT_PRESSURE_BOOST=0.20
FAPILOG_ADAPTIVE__FILTER_TIGHTENING=true
FAPILOG_ADAPTIVE__WORKER_SCALING=false

Settings-based configuration

from fapilog import Settings

settings = Settings(adaptive={
    "enabled": True,
    "max_workers": 6,
    "batch_sizing": True,
    "check_interval_seconds": 0.5,
    "cooldown_seconds": 3.0,
})

Adaptive batch sizing

Adaptive batch sizing (batch_sizing=True) dynamically adjusts the worker drain batch size based on measured sink latency. It uses a proportional controller with EWMA smoothing — fast sinks get larger batches, slow sinks get smaller batches.

This is disabled by default because it operates globally across all sinks and only benefits sinks that accept batched writes.

When to enable batch sizing

Enable batch_sizing=True when your pipeline includes batch-aware sinks that benefit from larger payloads:

  • CloudWatch Logs — PutLogEvents accepts up to 10,000 events per call

  • Grafana Loki — Push API accepts multiple log streams per request

  • PostgreSQL — Bulk INSERT is significantly faster than individual rows

  • HTTP sinks — Remote endpoints that accept batched payloads

When to leave it disabled

Leave batch_sizing=False (the default) when you only use sinks that process events individually:

  • stdout — Writes one JSON line per event

  • Rotating file — Writes one line per event

For these sinks, growing the batch size just increases the time events sit in the worker buffer before being written, adding latency with no throughput benefit.

Enabling batch sizing

# Builder API
logger = (
    LoggerBuilder()
    .with_preset("adaptive")
    .with_adaptive(batch_sizing=True)
    .add_cloudwatch("/myapp/prod")
    .build()
)
# Environment variable
FAPILOG_ADAPTIVE__BATCH_SIZING=true

Note: Adaptive batch sizing controls the worker-level drain batch size, not individual sink batch sizes. Cloud sinks have their own batch_size parameters (e.g., add_cloudwatch(batch_size=200)) that are configured independently.

Threshold validation

Escalation thresholds must be strictly ascending, and each de-escalation threshold must be strictly below its corresponding escalation threshold:

escalate_to_elevated < escalate_to_high < escalate_to_critical
deescalate_from_elevated < escalate_to_elevated
deescalate_from_high < escalate_to_high
deescalate_from_critical < escalate_to_critical

Invalid threshold ordering raises a ValidationError at configuration time.

Circuit breaker integration

When a sink’s circuit breaker is open, the adaptive pipeline treats it as additional pressure. The circuit_pressure_boost setting (default: 0.20) is added to the effective fill ratio for each open circuit:

effective_fill_ratio = actual_fill_ratio + (open_circuits * circuit_pressure_boost)

This ensures the pipeline responds proactively when sinks are failing, even if the queue isn’t physically full yet.

Tuning guidelines

For latency-sensitive services:

builder.with_adaptive(
    max_workers=4,                # Cap worker scaling
    check_interval_seconds=0.1,   # Faster response
).with_queue_budget(main_mb=20, protected_mb=5)  # Fixed memory ceiling

For high-throughput batch processing:

builder.with_adaptive(
    max_workers=8,
    batch_sizing=True,
    cooldown_seconds=5.0,   # Slower transitions to avoid oscillation
).with_queue_budget(main_mb=100, protected_mb=20)  # Generous buffer

Conservative escalation (fewer false alarms):

Settings(adaptive={
    "enabled": True,
    "escalate_to_elevated": 0.70,
    "escalate_to_high": 0.85,
    "escalate_to_critical": 0.95,
    "cooldown_seconds": 5.0,
})

Inspecting adaptive behavior

After draining, DrainResult.adaptive contains a summary of what the adaptive system did during the logger’s lifetime. This is useful for monitoring, alerting, and tuning your configuration.

result = await logger.drain()

if result.adaptive is not None:
    summary = result.adaptive
    print(f"Peak pressure: {summary.peak_pressure_level.value}")
    print(f"Escalations: {summary.escalation_count}")
    print(f"De-escalations: {summary.deescalation_count}")
    print(f"Filters swapped: {summary.filters_swapped}")
    print(f"Workers scaled: {summary.workers_scaled} (peak: {summary.peak_workers})")
    print(f"Batch resizes: {summary.batch_resize_count}")

    # Time breakdown by pressure level
    for level, seconds in summary.time_at_level.items():
        print(f"  {level.value}: {seconds:.1f}s")

When the adaptive pipeline is not enabled, result.adaptive is None and existing DrainResult fields are unchanged.

See Lifecycle & Results for the full field reference.

The adaptive preset

The adaptive preset (get_logger(preset="adaptive")) enables all adaptive features with sensible defaults:

  • Production base settings (2 workers, batch size 100)

  • Adaptive pipeline enabled (worker scaling, filter tightening)

  • Adaptive batch sizing disabled by default (enable with with_adaptive(batch_sizing=True) when using batch-aware sinks)

  • Circuit breaker with rotating file fallback

  • Protected levels: ERROR, CRITICAL, FATAL, AUDIT, SECURITY

  • Credential redaction enabled

See Presets for a comparison of all available presets.