Common Cloud Sink Patterns
Reusable patterns for building and operating cloud sinks.
Base class
examples/sinks/cloud_sink_base.pyprovides:Batching with size/time limits
Retry with exponential backoff
Background flush loop
Health check hook
Extend it for custom sinks:
from examples.sinks.cloud_sink_base import CloudSinkBase, CloudSinkConfig
class MySink(CloudSinkBase[dict]):
name = "mycloud"
async def _initialize_client(self): ...
async def _cleanup_client(self): ...
def _transform_entry(self, entry): ...
async def _send_batch(self, batch): ...
async def health_check(self): ...
Authentication
Prefer provider SDK default credentials (IAM roles, ADC) over inline secrets.
For API keys, load from environment variables or secret stores—not YAML.
Batching and rate limits
Start with
batch_size50–100; adjust for latency vs cost.Combine with
rate_limitfilter to avoid ingestion throttling.Align
batch_timeout_secondswith provider quotas and desired flush latency.
Retry strategy
Use exponential backoff (
retry_base_delay,retry_max_delay).Retry only on transient errors; surface permanent failures via diagnostics.
Entry-point wiring
Register sinks via
pyproject.toml:
[project.entry-points."fapilog.sinks"]
cloudwatch = "fapilog.plugins.sinks.contrib.cloudwatch:CloudWatchSink"
datadog = "myapp.sinks.datadog_sink:DatadogSink"
gcp_cloud_logging = "myapp.sinks.gcp_logging_sink:GCPCloudLoggingSink"
Then set core.sinks to the names; pass config via sink_config.*.
Troubleshooting
Verify credentials and network egress first.
Enable
core.enable_metricsto track drops and backpressure.Use
health_check()in readiness probes for cloud connectivity.