Adaptive Pipeline
The adaptive pipeline automatically scales workers, batch sizes, and filter levels based on real-time queue pressure. Instead of manually tuning these parameters for peak load, the pipeline self-adjusts as traffic changes. Queue capacity is fixed at startup — use with_queue_budget() or with_queue_size() to set your memory ceiling.
Quick start
from fapilog import get_logger
# One-line setup
logger = get_logger(preset="adaptive")
Or with the builder:
from fapilog import LoggerBuilder
logger = (
LoggerBuilder()
.with_preset("production")
.with_adaptive(enabled=True)
.with_circuit_breaker(enabled=True, fallback_sink="rotating_file")
.build()
)
How it works
The adaptive pipeline has three components:
Pressure Monitor — Samples queue fill ratio at regular intervals (default: every 0.25s)
Escalation State Machine — Computes the current pressure level based on fill ratio thresholds with hysteresis
Actuators — Respond to pressure level changes by adjusting pipeline parameters
Pressure levels
Level |
Fill Ratio |
Meaning |
|---|---|---|
NORMAL |
< 60% |
Pipeline is healthy, no adjustments |
ELEVATED |
>= 60% |
Queue filling up, begin scaling |
HIGH |
>= 80% |
Significant pressure, aggressive scaling |
CRITICAL |
>= 92% |
Near capacity, maximum response |
Hysteresis
To prevent oscillation between levels, de-escalation thresholds are lower than escalation thresholds:
Transition |
Escalate At |
De-escalate At |
|---|---|---|
NORMAL / ELEVATED |
60% |
40% |
ELEVATED / HIGH |
80% |
60% |
HIGH / CRITICAL |
92% |
75% |
This means the queue must drop well below the escalation threshold before the pipeline scales back down.
Actuators
Actuator |
What It Does |
NORMAL |
ELEVATED |
HIGH |
CRITICAL |
|---|---|---|---|---|---|
Filter tightening |
Raises effective log level |
None |
Soft |
Medium |
Aggressive |
Worker scaling |
Adds worker tasks |
Initial (2) |
+1 |
+2 |
Max (8) |
Batch sizing |
Adjusts batch sizes (opt-in) |
Base (100) |
1.5x |
2x |
4x |
Configuration reference
All settings live under the adaptive key:
Setting |
Default |
Description |
|---|---|---|
|
|
Enable adaptive pipeline controller |
|
|
Seconds between queue pressure samples |
|
|
Minimum seconds between pressure level transitions |
|
|
Fill ratio to escalate NORMAL to ELEVATED |
|
|
Fill ratio to escalate ELEVATED to HIGH |
|
|
Fill ratio to escalate HIGH to CRITICAL |
|
|
Fill ratio to de-escalate ELEVATED to NORMAL |
|
|
Fill ratio to de-escalate HIGH to ELEVATED |
|
|
Fill ratio to de-escalate CRITICAL to HIGH |
|
|
Maximum workers when dynamic scaling is active |
|
|
Enable adaptive batch sizing |
|
|
Pressure boost per open circuit breaker |
|
|
Enable adaptive filter tightening based on pressure level |
|
|
Enable dynamic worker scaling based on pressure level |
Environment variables
FAPILOG_ADAPTIVE__ENABLED=true
FAPILOG_ADAPTIVE__CHECK_INTERVAL_SECONDS=0.25
FAPILOG_ADAPTIVE__COOLDOWN_SECONDS=2.0
FAPILOG_ADAPTIVE__ESCALATE_TO_ELEVATED=0.60
FAPILOG_ADAPTIVE__ESCALATE_TO_HIGH=0.80
FAPILOG_ADAPTIVE__ESCALATE_TO_CRITICAL=0.92
FAPILOG_ADAPTIVE__MAX_WORKERS=8
FAPILOG_ADAPTIVE__BATCH_SIZING=true
FAPILOG_ADAPTIVE__CIRCUIT_PRESSURE_BOOST=0.20
FAPILOG_ADAPTIVE__FILTER_TIGHTENING=true
FAPILOG_ADAPTIVE__WORKER_SCALING=false
Settings-based configuration
from fapilog import Settings
settings = Settings(adaptive={
"enabled": True,
"max_workers": 6,
"batch_sizing": True,
"check_interval_seconds": 0.5,
"cooldown_seconds": 3.0,
})
Adaptive batch sizing
Adaptive batch sizing (batch_sizing=True) dynamically adjusts the worker drain batch size based on measured sink latency. It uses a proportional controller with EWMA smoothing — fast sinks get larger batches, slow sinks get smaller batches.
This is disabled by default because it operates globally across all sinks and only benefits sinks that accept batched writes.
When to enable batch sizing
Enable batch_sizing=True when your pipeline includes batch-aware sinks that benefit from larger payloads:
CloudWatch Logs — PutLogEvents accepts up to 10,000 events per call
Grafana Loki — Push API accepts multiple log streams per request
PostgreSQL — Bulk INSERT is significantly faster than individual rows
HTTP sinks — Remote endpoints that accept batched payloads
When to leave it disabled
Leave batch_sizing=False (the default) when you only use sinks that process events individually:
stdout — Writes one JSON line per event
Rotating file — Writes one line per event
For these sinks, growing the batch size just increases the time events sit in the worker buffer before being written, adding latency with no throughput benefit.
Enabling batch sizing
# Builder API
logger = (
LoggerBuilder()
.with_preset("adaptive")
.with_adaptive(batch_sizing=True)
.add_cloudwatch("/myapp/prod")
.build()
)
# Environment variable
FAPILOG_ADAPTIVE__BATCH_SIZING=true
Note: Adaptive batch sizing controls the worker-level drain batch size, not individual sink batch sizes. Cloud sinks have their own
batch_sizeparameters (e.g.,add_cloudwatch(batch_size=200)) that are configured independently.
Threshold validation
Escalation thresholds must be strictly ascending, and each de-escalation threshold must be strictly below its corresponding escalation threshold:
escalate_to_elevated < escalate_to_high < escalate_to_critical
deescalate_from_elevated < escalate_to_elevated
deescalate_from_high < escalate_to_high
deescalate_from_critical < escalate_to_critical
Invalid threshold ordering raises a ValidationError at configuration time.
Circuit breaker integration
When a sink’s circuit breaker is open, the adaptive pipeline treats it as additional pressure. The circuit_pressure_boost setting (default: 0.20) is added to the effective fill ratio for each open circuit:
effective_fill_ratio = actual_fill_ratio + (open_circuits * circuit_pressure_boost)
This ensures the pipeline responds proactively when sinks are failing, even if the queue isn’t physically full yet.
Tuning guidelines
For latency-sensitive services:
builder.with_adaptive(
max_workers=4, # Cap worker scaling
check_interval_seconds=0.1, # Faster response
).with_queue_budget(main_mb=20, protected_mb=5) # Fixed memory ceiling
For high-throughput batch processing:
builder.with_adaptive(
max_workers=8,
batch_sizing=True,
cooldown_seconds=5.0, # Slower transitions to avoid oscillation
).with_queue_budget(main_mb=100, protected_mb=20) # Generous buffer
Conservative escalation (fewer false alarms):
Settings(adaptive={
"enabled": True,
"escalate_to_elevated": 0.70,
"escalate_to_high": 0.85,
"escalate_to_critical": 0.95,
"cooldown_seconds": 5.0,
})
Inspecting adaptive behavior
After draining, DrainResult.adaptive contains a summary of what the adaptive system did during the logger’s lifetime. This is useful for monitoring, alerting, and tuning your configuration.
result = await logger.drain()
if result.adaptive is not None:
summary = result.adaptive
print(f"Peak pressure: {summary.peak_pressure_level.value}")
print(f"Escalations: {summary.escalation_count}")
print(f"De-escalations: {summary.deescalation_count}")
print(f"Filters swapped: {summary.filters_swapped}")
print(f"Workers scaled: {summary.workers_scaled} (peak: {summary.peak_workers})")
print(f"Batch resizes: {summary.batch_resize_count}")
# Time breakdown by pressure level
for level, seconds in summary.time_at_level.items():
print(f" {level.value}: {seconds:.1f}s")
When the adaptive pipeline is not enabled, result.adaptive is None and existing DrainResult fields are unchanged.
See Lifecycle & Results for the full field reference.
The adaptive preset
The adaptive preset (get_logger(preset="adaptive")) enables all adaptive features with sensible defaults:
Production base settings (2 workers, batch size 100)
Adaptive pipeline enabled (worker scaling, filter tightening)
Adaptive batch sizing disabled by default (enable with
with_adaptive(batch_sizing=True)when using batch-aware sinks)Circuit breaker with rotating file fallback
Protected levels: ERROR, CRITICAL, FATAL, AUDIT, SECURITY
Credential redaction enabled
See Presets for a comparison of all available presets.