Building SDK-Based Sinks

SDK-based sinks (CloudWatch, GCP Logging, Azure Monitor, Kafka client APIs) share a few patterns:

  1. Lazy import SDKs so core installs don’t pull heavy dependencies.

  2. Wrap blocking calls with asyncio.to_thread() to keep the event loop responsive.

  3. Batch intentionally and enforce provider limits (event count, payload size).

  4. Handle provider-specific errors with retries/backoff (e.g., sequence tokens, throttling).

  5. Add circuit breakers to contain repeated failures.

  6. Emit diagnostics with rate limits; never let sink errors crash the pipeline.

Reference implementations:

  • fapilog.plugins.sinks.contrib.cloudwatch (CloudWatch Logs, boto3)

  • tests/integration/test_cloudwatch_sink_localstack.py (LocalStack testing pattern)

  • examples/cloudwatch_logging (FastAPI + LocalStack)

Suggested scaffold:

from fapilog.core import diagnostics
from fapilog.plugins.sinks._batching import BatchingMixin

class MySdkSink(BatchingMixin):
    name = "my_sink"

    async def start(self):
        # Lazy import + client creation
        ...

    async def write(self, entry: dict):
        await self._enqueue_for_batch(self._to_event(entry))

    async def _send_batch(self, batch: list[dict]):
        # Chunk, sort, retry, and respect provider limits
        ...

Testing tips:

  • Use provider stubs/fakes (or SDK stubbers) to avoid network calls in unit tests.

  • Add optional integration tests gated on environment variables (LocalStack).

  • Enable FAPILOG_CORE__INTERNAL_LOGGING_ENABLED=true when capturing diagnostics in tests.