Lifecycle & Results
Runtime management helpers and the DrainResult structure returned when stopping loggers.
DrainResult {#drainresult}
@dataclass
class DrainResult:
submitted: int
processed: int
dropped: int
retried: int
queue_depth_high_watermark: int
flush_latency_seconds: float
adaptive: AdaptiveDrainSummary | None = None
Returned by AsyncLoggerFacade.drain() / stop_and_drain() and the sync logger’s stop_and_drain() (which you can run via asyncio.run).
The adaptive field is None when the adaptive pipeline is not enabled. When using preset="adaptive" or with_adaptive(enabled=True), it contains an AdaptiveDrainSummary with metrics about what the adaptive system did during the logger’s lifetime.
Example (async)
from fapilog import get_async_logger
logger = await get_async_logger("worker")
await logger.info("shutting down")
result = await logger.drain()
print(f"processed={result.processed} dropped={result.dropped}")
Example (sync)
import asyncio
from fapilog import get_logger
logger = get_logger("cli")
logger.info("done")
result = asyncio.run(logger.stop_and_drain())
print(result.queue_depth_high_watermark)
AdaptiveDrainSummary {#adaptivedrainsummary}
@dataclass(frozen=True)
class AdaptiveDrainSummary:
peak_pressure_level: PressureLevel
escalation_count: int
deescalation_count: int
time_at_level: dict[PressureLevel, float]
filters_swapped: int
workers_scaled: int
peak_workers: int
batch_resize_count: int
Available on DrainResult.adaptive when the adaptive pipeline is enabled. Provides a post-drain summary of adaptive pipeline activity.
Field |
Description |
|---|---|
|
Highest pressure level reached during the session |
|
Number of upward pressure transitions (e.g., NORMAL to ELEVATED) |
|
Number of downward pressure transitions |
|
Wall-clock seconds spent at each |
|
Number of times filters were tightened or restored |
|
Number of worker scaling events |
|
Maximum concurrent workers reached |
|
Number of adaptive batch size changes |
Example
from fapilog import get_async_logger
from fapilog.core.pressure import PressureLevel
logger = await get_async_logger(preset="adaptive")
# ... application work ...
result = await logger.drain()
if result.adaptive is not None:
summary = result.adaptive
print(f"Peak pressure: {summary.peak_pressure_level.value}")
print(f"Escalations: {summary.escalation_count}")
print(f"Time at NORMAL: {summary.time_at_level[PressureLevel.NORMAL]:.1f}s")
print(f"Peak workers: {summary.peak_workers}")
Context managers
Prefer runtime() / runtime_async() to manage startup and shutdown automatically:
from fapilog import runtime, runtime_async
with runtime() as logger:
logger.info("sync work")
async def main():
async with runtime_async() as logger:
await logger.info("async work")
Shutdown timeout
Settings.core.shutdown_timeout_seconds controls how long the shutdown path will wait for the background worker thread to complete. Configure via env var FAPILOG_CORE__SHUTDOWN_TIMEOUT_SECONDS.
Use the lifecycle helpers to ensure buffered logs are flushed before your app exits.