Sinks
Output destinations for serialized log entries. Implement BaseSink.
Implementing a sink
from fapilog.plugins import BaseSink
class MySink(BaseSink):
name = "my_sink"
async def start(self) -> None:
...
async def write(self, entry: dict) -> bool | None:
# entry is a dict log envelope; emit to your target
# Return None/True for success, False for failure
...
async def stop(self) -> None:
...
Return value semantics
Return |
Meaning |
Core action |
|---|---|---|
|
Success |
None |
|
Success |
None |
|
Failure |
Trigger fallback, increment circuit breaker |
|
Failure |
Trigger fallback, increment circuit breaker |
For detailed error handling patterns, see Plugin Error Handling.
Registering a sink
Declare an entry point under
fapilog.sinksinpyproject.toml.Add a
PLUGIN_METADATAdict withplugin_type: "sink"and an API version compatible withfapilog.plugins.versioning.PLUGIN_API_VERSION.
Built-in sinks (code-supported)
stdout_json(structured JSON)stdout_pretty(human-readable console output)rotating_file(size/time rotation)http(HTTP POST)webhook(JSON webhook with optional batching)cloudwatch(AWS CloudWatch Logs; optionalfapilog[aws])mmap_persistence(experimental; local persistence)
HTTP sink batching
HttpSink now supports sink-level batching to reduce request volume:
batch_size(default: 1 for backward compatibility; set >1 to enable batching)batch_timeout_seconds(default: 5.0) flush partial batches on timeoutbatch_format:array(JSON array),ndjson(newline-delimited),wrapped({"logs": [...]})batch_wrapper_key: wrapper key whenbatch_format="wrapped"(default:logs)
Examples:
export FAPILOG_HTTP__ENDPOINT=https://logs.example.com/ingest
export FAPILOG_HTTP__BATCH_SIZE=100
export FAPILOG_HTTP__BATCH_TIMEOUT_SECONDS=2.0
export FAPILOG_HTTP__BATCH_FORMAT=ndjson
from fapilog.plugins.sinks.http_client import HttpSink, HttpSinkConfig, BatchFormat
sink = HttpSink(
HttpSinkConfig(
endpoint="https://logs.example.com/ingest",
batch_size=100,
batch_timeout_seconds=2.0,
batch_format=BatchFormat.NDJSON,
)
)
Webhook sink batching
WebhookSink supports the same batch_size and batch_timeout_seconds fields to batch webhook POSTs (default batch_size=1 for compatibility).
Usage
Sinks are discovered via entry points when plugin discovery is enabled. You can also wire custom sinks programmatically by passing them into the container/settings before creating a logger.
Optional: write_serialized fast path
For sinks that operate on bytes (files, sockets, HTTP), implement write_serialized() to accept a pre-serialized payload and avoid redundant JSON encoding when Settings.core.serialize_in_flush=True:
from fapilog.core.serialization import SerializedView
class MyFastSink:
name = "my_fast_sink"
async def write(self, entry: dict) -> None:
# Fallback path: serialize yourself
data = json.dumps(entry).encode()
await self._send(data)
async def write_serialized(self, view: SerializedView) -> None:
# Fast path: fapilog already serialized; avoid extra work
await self._send(bytes(view.data))
When to implement:
You already need serialized bytes
You do not need to inspect/modify the dict entry
Performance or allocation reduction is important
If write_serialized is absent, fapilog automatically calls write() instead. The SerializedView wrapper exposes a memoryview via data and __bytes__ for convenience; treat it as read-only.
Error handling in write_serialized
Important: write_serialized must handle deserialization errors correctly to avoid silent data loss. Never replace invalid data with placeholder values like {"message": "fallback"}.
Required pattern:
import json
from fapilog.core.diagnostics import warn
from fapilog.core.errors import SinkWriteError
from fapilog.core.serialization import SerializedView
async def write_serialized(self, view: SerializedView) -> None:
"""Fast path for pre-serialized payloads."""
try:
data = json.loads(bytes(view.data))
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
warn(
f"{self.name}-sink",
"write_serialized deserialization failed",
error=str(exc),
data_size=len(view.data),
_rate_limit_key=f"{self.name}-sink-deserialize",
)
raise SinkWriteError(
f"Failed to deserialize payload in {self.name}.write_serialized",
sink_name=self.name,
cause=exc,
) from exc
await self.write(data)
Key requirements:
Catch specific exceptions - Use
json.JSONDecodeErrorandUnicodeDecodeError, not bareexcept Exception:Emit diagnostics - Call
diagnostics.warn()with context (sink name, error, data size)Raise SinkWriteError - Signal failure to the core for fallback/circuit breaker handling
Chain the cause - Use
from excto preserve the original exception
See Plugin Error Handling for more details on SinkWriteError and failure signaling.