Building SDK-Based Sinks
SDK-based sinks (CloudWatch, GCP Logging, Azure Monitor, Kafka client APIs) share a few patterns:
Lazy import SDKs so core installs don’t pull heavy dependencies.
Wrap blocking calls with
asyncio.to_thread()to keep the event loop responsive.Batch intentionally and enforce provider limits (event count, payload size).
Handle provider-specific errors with retries/backoff (e.g., sequence tokens, throttling).
Add circuit breakers to contain repeated failures.
Emit diagnostics with rate limits; never let sink errors crash the pipeline.
Reference implementations:
fapilog.plugins.sinks.contrib.cloudwatch(CloudWatch Logs, boto3)tests/integration/test_cloudwatch_sink_localstack.py(LocalStack testing pattern)examples/cloudwatch_logging(FastAPI + LocalStack)
Suggested scaffold:
from fapilog.core import diagnostics
from fapilog.plugins.sinks._batching import BatchingMixin
class MySdkSink(BatchingMixin):
name = "my_sink"
async def start(self):
# Lazy import + client creation
...
async def write(self, entry: dict):
await self._enqueue_for_batch(self._to_event(entry))
async def _send_batch(self, batch: list[dict]):
# Chunk, sort, retry, and respect provider limits
...
Testing tips:
Use provider stubs/fakes (or SDK stubbers) to avoid network calls in unit tests.
Add optional integration tests gated on environment variables (LocalStack).
Enable
FAPILOG_CORE__INTERNAL_LOGGING_ENABLED=truewhen capturing diagnostics in tests.