Plugin Error Handling
Guidance for containing errors in sinks, enrichers, redactors, and processors without breaking the logging pipeline.
Core Principle: Contain Errors (With Sink Exception)
Most plugins must not leak exceptions into the core pipeline from enrich(), redact(), or process(). Handle failures locally, emit diagnostics, and return a safe fallback so other plugins keep running.
Sinks are different: As of v0.4, sinks should signal failures to enable fallback and circuit breaker behavior. Raise SinkWriteError or return False from write(). The core catches these signals safely—no exceptions propagate to user code.
When Raising Is Acceptable
__init__: Reject invalid configuration or missing dependencies.start(): Fail fast if required resources cannot be acquired (or contain and mark unhealthy).write()(sinks only): RaiseSinkWriteErroror returnFalseto signal failure. The core triggers fallback and increments circuit breaker counters.All other methods (enrichers, redactors, processors): contain errors; do not re-raise into the pipeline.
Diagnostics API (rate-limited)
Use fapilog.core.diagnostics.warn for structured, rate-limited warnings:
from fapilog.core.diagnostics import warn
warn("my-sink", "failed to send log", error=str(exc), attempt=3)
# Optional rate limit grouping to avoid floods
warn(
"my-sink",
"repeated failure",
error=str(exc),
_rate_limit_key="send-error",
)
Best practices:
Component names should be specific (e.g.,
"my-sink","my-enricher").Include actionable context, never secrets or PII.
Prefer
_rate_limit_keyfor hot paths.
Patterns by Plugin Type
Sinks
As of v0.4, sinks should signal failures to enable fallback and circuit breaker behavior:
from fapilog.core.errors import SinkWriteError
class MySink:
name = "my_sink"
async def write(self, entry: dict) -> bool | None:
try:
await self._client.send(entry)
return None # Success (or return True)
except Exception as exc:
# Signal failure to the core - triggers fallback and circuit breaker
raise SinkWriteError(
f"Failed to write to {self.name}",
sink_name=self.name,
cause=exc,
) from exc
Return value semantics:
Return |
Meaning |
Core action |
|---|---|---|
|
Success |
None |
|
Success |
None |
|
Failure |
Trigger fallback, increment circuit breaker |
|
Failure |
Trigger fallback, increment circuit breaker |
For sinks where failure cannot be detected immediately (e.g., fire-and-forget UDP, async batched delivery), emit diagnostics and return False:
class FireAndForgetSink:
name = "udp-sink"
async def write(self, entry: dict) -> bool | None:
try:
self._socket.sendto(data, self._addr)
return None # Best-effort success
except Exception as exc:
from fapilog.core.diagnostics import warn
warn("udp-sink", "send failed", error=str(exc))
return False # Signal failure
Enrichers
Return an empty dict on failure so the event continues:
class MyEnricher:
name = "my_enricher"
async def enrich(self, event: dict) -> dict:
try:
info = await self._lookup(event.get("user_id"))
return {"user_email": info.email}
except Exception as exc:
from fapilog.core.diagnostics import warn
warn("my-enricher", "enrichment failed", error=str(exc))
return {}
Redactors
Be conservative to avoid leaking sensitive data:
class MyRedactor:
name = "my_redactor"
async def redact(self, event: dict) -> dict:
try:
return self._apply_rules(event)
except Exception as exc:
from fapilog.core.diagnostics import warn
warn("my-redactor", "redaction failed; using fallback", error=str(exc))
return {"level": event.get("level"), "message": "[REDACTION_ERROR]"}
Processors
Processors should mirror sink behavior: contain errors, emit diagnostics, and return the original or partially processed payload rather than raising.
What Fapilog Does If You Raise
Fapilog isolates plugin failures:
Sinks:
SinkWriteError(orFalsereturn) triggers the fallback handler (stderr by default) and increments circuit breaker counters. Other sinks still execute. This is the expected behavior for sink failures.Enrichers/redactors/processors: Exceptions are caught; diagnostics are emitted; the pipeline continues. These plugins should still contain their own errors for clearer diagnostics.
Health checks/metrics: Failures may mark the plugin unhealthy or record errors.
No plugin failure propagates to user code.
Health Checks Reflecting Error State
Sinks can track failures internally while still signaling them to the core:
import time
from fapilog.core.errors import SinkWriteError
class MySink:
name = "my_sink"
def __init__(self) -> None:
self._failures = 0
self._last_success = 0.0
async def write(self, entry: dict) -> bool | None:
try:
await self._send(entry)
self._failures = 0
self._last_success = time.time()
return None
except Exception as exc:
self._failures += 1
raise SinkWriteError(
f"Failed to write to {self.name}",
sink_name=self.name,
cause=exc,
) from exc
async def health_check(self) -> bool:
if self._failures >= 5:
return False
if self._last_success and (time.time() - self._last_success) > 60:
return False
return True
Retry for Transient Failures
Sinks can retry internally before signaling failure:
from fapilog.core.errors import SinkWriteError
from fapilog.core.retry import AsyncRetrier, RetryConfig
class MySink:
name = "my_sink"
def __init__(self) -> None:
self._retrier = AsyncRetrier(
RetryConfig(max_attempts=3, base_delay=1.0, max_delay=10.0)
)
async def write(self, entry: dict) -> bool | None:
try:
await self._retrier.retry(lambda: self._send(entry))
return None
except Exception as exc:
# Retries exhausted - signal failure to the core
raise SinkWriteError(
f"Failed to write to {self.name} after retries",
sink_name=self.name,
cause=exc,
) from exc
Quick Reference
Scenario |
Action |
|---|---|
Config invalid in |
Raise immediately |
|
Raise or mark unhealthy |
Failure in sink |
Raise |
Failure in |
Contain, emit diagnostics, return safe fallback |
Transient errors (sinks) |
Retry with backoff; raise |
Transient errors (other plugins) |
Retry with backoff; contain after retries |
Repeated failures |
Update health checks to report unhealthy |