Filters
Plugins that drop or transform events before enrichment.
Contract
Implement BaseFilter methods:
async filter(self, event: dict) -> dict | None: required; return event to continue,Noneto drop.async start(self) -> None: optional initialization.async stop(self) -> None: optional teardown.
from fapilog.plugins.filters import BaseFilter
class MyFilter:
name = "my-filter"
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def filter(self, event: dict) -> dict | None:
if event.get("level") == "DEBUG":
return None # Drop DEBUG events
return event
async def health_check(self) -> bool:
return True
Built-in Filters
Filter |
Name |
Description |
|---|---|---|
|
|
Drop events below configured minimum level |
|
|
Keep a random percentage of events |
|
|
Token bucket rate limiter (per-key optional) |
|
|
Dynamic sampling based on volume |
|
|
Sample based on trace/span context |
|
|
Track first occurrence of message patterns |
Configuration
Level Filter
Automatically wired when core.log_level is set above DEBUG:
export FAPILOG_CORE__LOG_LEVEL=INFO # Drops DEBUG events
Explicit configuration:
from fapilog import Settings
settings = Settings(
core__filters=["level"],
filter_config__level={"config": {"min_level": "WARNING"}},
)
Sampling Filter
Keep a percentage of events randomly:
settings = Settings(
core__filters=["sampling"],
filter_config__sampling={"config": {"sample_rate": 0.25}}, # Keep 25%
)
Environment variable:
export FAPILOG_CORE__FILTERS='["sampling"]'
Rate Limit Filter
Token bucket rate limiting:
settings = Settings(
core__filters=["rate_limit"],
filter_config__rate_limit={
"config": {
"rate": 100, # tokens per second
"burst": 200, # max burst capacity
"key_field": "user_id" # optional per-key limiting
}
},
)
Adaptive Sampling Filter
Dynamically adjusts sample rate based on event volume:
settings = Settings(
core__filters=["adaptive_sampling"],
filter_config__adaptive_sampling={
"config": {
"target_rate": 100, # target events/second
"window_seconds": 60, # measurement window
}
},
)
Trace Sampling Filter
Sample events based on trace context (for distributed tracing integration):
settings = Settings(
core__filters=["trace_sampling"],
filter_config__trace_sampling={
"config": {
"sample_rate": 0.1, # base sample rate
"honor_parent_decision": True, # respect upstream sampling
}
},
)
First Occurrence Filter
Track unique message patterns, optionally limiting repeats:
settings = Settings(
core__filters=["first_occurrence"],
filter_config__first_occurrence={
"config": {
"max_occurrences": 1, # only log first occurrence
"window_seconds": 300, # reset tracking after 5 minutes
}
},
)
Filter Order
Filters run in the order specified in core.filters. Earlier filters can drop events before later filters see them:
settings = Settings(
core__filters=["level", "rate_limit", "sampling"],
)
In this example:
leveldrops events below thresholdrate_limitapplies token bucket to remaining eventssamplingrandomly samples what’s left
Metrics
When core.enable_metrics=True, filter metrics are recorded:
Events filtered (dropped) count
Sample rate (for sampling filters)
Rate limit keys tracked (for per-key rate limiting)
Filters run after the log call but before enrichment, redaction, and sinks.