Actionable coding and architectural guidelines for building fault-tolerant, low-latency data-replication pipelines with Python, Kafka, and Debezium.
Transform your data engineering workflow from fragile batch ETL jobs to resilient, real-time replication pipelines that handle failure gracefully and deliver data with sub-5-second latency.
You know the drill. Your analytics team reports stale dashboards. Your microservices are polling databases instead of reacting to events. Your weekend gets ruined by yet another failed batch job that nobody knows how to debug.
Traditional ETL approaches create these specific pain points:
These Cursor Rules implement a battle-tested replication architecture that eliminates common failure modes through:
Event-Driven Change Data Capture: Instead of polling tables, capture changes directly from database WAL/binlog streams with Debezium, ensuring you never miss an update.
Fault-Tolerant Stream Processing: Python async patterns with automatic retry logic, dead letter queues, and graceful degradation when downstream services fail.
Schema Evolution Management: Automatic schema registry integration that validates changes before they break your pipeline, with rollback capabilities.
Observable Pipeline Health: Built-in metrics, tracing, and alerting that shows you exactly where bottlenecks occur and when lag exceeds SLA thresholds.
Before: Weeks of custom ETL code, brittle CRON jobs, manual schema mapping
# Typical brittle approach
def sync_orders():
last_sync = get_last_sync_time() # What if this fails?
orders = db.query(f"SELECT * FROM orders WHERE updated > {last_sync}") # Full table scans
for order in orders: # No error handling
target_db.insert(transform_order(order)) # Blocking I/O
With These Rules: Deploy a new Debezium connector in 5 minutes
# Resilient async pattern from the rules
async def consume_order_changes(topic: str) -> AsyncIterator[Mapping[str, Any]]:
consumer = aiokafka.AIOKafkaConsumer(
topic,
bootstrap_servers=settings.KAFKA_BROKERS,
group_id="order-replicator",
enable_auto_commit=False,
)
await consumer.start()
try:
async for msg in consumer:
yield json.loads(msg.value)
await consumer.commit() # Exactly-once processing
finally:
await consumer.stop()
Deploy the connector with declarative YAML:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: pg-orders-cdc
spec:
class: io.debezium.connector.postgresql.PostgresConnector
config:
snapshot.mode: initial_only
heartbeat.interval.ms: 1000
transforms.unwrap: io.debezium.transforms.ExtractNewRecordState
Before: Pipeline breaks when someone adds a column, manual fixes required With These Rules: Schema registry validates changes automatically, pipeline continues running with backwards compatibility
Before: "Why is the dashboard showing yesterday's data?" With These Rules: Real-time lag metrics trigger PagerDuty alerts when replication falls behind SLA
# Built-in observability patterns
@metrics.histogram("replication_lag_seconds")
async def process_event(event: Dict[str, Any]) -> None:
start_time = time.time()
try:
await transform_and_sink(event)
metrics.counter("events_processed").inc()
except Exception as e:
metrics.counter("events_failed").inc()
logger.error("Processing failed", extra={"event_id": event["id"], "error": str(e)})
raise
finally:
metrics.histogram("processing_duration").observe(time.time() - start_time)
git clone your-replication-project
cd replication-pipeline/
docker-compose -f docker/local-repl.yml up -d # PG + Kafka + Debezium locally
Copy the rules into your .cursor-rules file to enable intelligent code completion for replication patterns.
# processors/order_processor.py - Following the rules' structure
from typing import Any, Mapping
import asyncio
from src.connectors.kafka_consumer import consume
from src.sinks.snowflake_writer import write_batch
async def process_orders():
async for order_event in consume("orders-cdc"):
# Guard clause pattern from rules
if not validate_order_schema(order_event):
logger.warning("Invalid order schema", extra={"event": order_event})
continue
# Idempotent transformation
transformed = transform_order(order_event)
await write_batch([transformed])
# terraform/kafka-connect.tf
resource "kubernetes_deployment" "debezium_connect" {
metadata {
name = "debezium-connect"
}
spec {
template {
spec {
container {
name = "connect"
image = "debezium/connect:2.4"
resources {
requests = {
cpu = "500m"
memory = "1Gi"
}
}
}
}
}
}
}
Set up Grafana dashboards using the built-in metrics patterns:
replication_lag_seconds by source/topicevent_throughput_msgs to track pipeline healthdead_letter_queue_size for error monitoringYour data engineers stop being on-call heroes fixing broken batch jobs and start building features that matter. Your analytics team gets fresh data without asking when the next ETL run completes. Your platform scales from handling thousands to millions of events per second using the same patterns.
Ready to eliminate replication lag? These rules give you the exact patterns used by teams processing billions of events daily. No more fragile ETL jobs, no more weekend outages, no more stale dashboards.
The transformation starts with applying these rules to your next data pipeline project.
You are an expert in Python, Kafka, Debezium, PostgreSQL, AWS DMS, Airbyte, Terraform, Docker, and Kubernetes.
Technology Stack Declaration
- Source DBs: PostgreSQL ≥13, MySQL ≥8, Oracle 19c.
- Messaging Backbone: Apache Kafka 3.x with Confluent Schema Registry.
- CDC Layer: Debezium 2.x (Kafka Connect) and/or AWS DMS for heterogeneous targets.
- Transformation & Orchestration: Python 3.11, Faust (stream processing), Airbyte for managed ELT, Terraform for IaC.
- Storage Targets: Snowflake, S3 (Parquet/Avro), and Elasticsearch for search workloads.
- CI/CD: GitHub Actions, Docker-Compose for local, Helm charts for K8s deployments.
Key Principles
- Start by defining clear RTO/RPO and maximum acceptable lag (≤5 s for tier-1, ≤60 s for analytics).
- Embrace idempotency: every replication step must be repeatable without side effects.
- Prefer asynchronous, event-driven pipelines – eventual consistency is acceptable if business SLA permits.
- Single source of truth = the WAL/binlog. Never read application tables directly.
- All configs are code (Terraform/Helm); never click-ops in prod.
- Observability first: emit structured logs, metrics, and traces from day-0.
- Fail fast & auto-heal: unhealthy connectors are restarted by the platform within 30 s.
Python
- Use type hints everywhere; enable `mypy --strict` in CI.
- Structure modules by responsibility: `connectors/`, `processors/`, `sinks/`, `schemas/`, `tests/`.
- Functions only; avoid classes unless stateful coordination is required.
- Enforce PEP 8 + Black (`black --line-length 100`).
- Use `async def` + `await` in stream consumers; leverage `aiokafka` for non-blocking I/O.
- Environment variables via `pydantic.Settings`; forbid hard-coded secrets.
- Example pattern:
```python
from typing import Any, Mapping, AsyncIterator
import aiokafka, json, logging
logger = logging.getLogger(__name__)
async def consume(topic: str) -> AsyncIterator[Mapping[str, Any]]:
consumer = aiokafka.AIOKafkaConsumer(
topic,
bootstrap_servers=settings.KAFKA_BROKERS,
group_id="replicator",
enable_auto_commit=False,
)
await consumer.start()
try:
async for msg in consumer:
yield json.loads(msg.value)
await consumer.commit()
finally:
await consumer.stop()
```
Error Handling and Validation
- Guard clauses first: reject malformed events before any transformation.
- Wrap all external I/O (DB, S3) in exponential-back-off retry (max 7 tries, jittered).
- Deduplicate using `event_id` + source LSN/SCN to guarantee at-least-once → exactly-once.
- Maintain a persistent offset table keyed by `{source, partition}`.
- Validate schema evolution: break the build if an incompatible Avro/JSON-Schema is detected.
- Record consistency checkpoints every 5 000 messages; compare row counts between source and target nightly.
Debezium / Kafka Connect
- One connector per source DB to simplify offset management.
- Enable `snapshot.mode=initial_only` for greenfield loads, `incremental` afterwards.
- Set `heartbeat.interval.ms=1000` to monitor liveness.
- Use `transforms.unwrap` SMT with `drop.tombstones=false` so deletes replicate as tombstones.
- Partitions: `min(6, #CPUs*2)`; tasks auto-scaled via K8s HPA.
- Always deploy connectors via declarative YAML (see example below):
```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: pg-orders-cdc
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 4
config:
database.hostname: pg-master
database.port: 5432
...
```
AWS DMS
- Use CDC + Full-Load; enable `TargetMetadata` with `ParallelLoadThreads=8` for large tables.
- CloudWatch alarms: lag > 30 s triggers PagerDuty.
- KMS-encrypted endpoints only; IAM roles with least privilege.
Testing
- Unit: pytest with factory-boy generating fake WAL events.
- Contract: CDC event → Avro schema validation enforced in CI.
- Integration: `docker-compose -f docker/local-repl.yml up --abort-on-container-exit` spins up PG, Kafka, Debezium, sink.
- Chaos: inject `SIGSTOP` to Kafka brokers → pipeline must auto-recover within SLA.
Performance
- Measure p99 end-to-end latency; target ≤ 2 × average network RTT.
- Enable compression: Kafka `compression.type=zstd`, S3 `gzip` Parquet.
- Batch commits: 10 000 messages or 30 MiB, whichever comes first.
- Pipelining: enable `fetch.min.bytes=1MB` + `linger.ms=50` on producers.
Security
- TLS 1.2 mutual auth between all pipeline hops.
- Rotate credentials every 90 days via Vault dynamic secrets.
- Hash-based message authentication (`HMAC-SHA256`) on all payloads crossing trust boundaries.
Observability
- Metrics: `replication_lag_seconds`, `event_throughput_msgs`, `dead_letter_queue_size`.
- Expose Prometheus endpoints on `/metrics`; use Grafana pre-built replication dashboard.
- Trace every message via OpenTelemetry `trace_id` propagated in headers.
Deployment
- Docker images pinned to digest (not tags) to guarantee immutability.
- Helm chart values must include `resources.requests.cpu` and `memory` – no defaults.
- Blue/green releases: cut over consumers first, producers second.
Common Pitfalls & How to Avoid Them
- Skipped WAL segments → configure sufficient replication slots; monitor `pg_replication_slots`.
- Debezium snapshots blocking autovacuum → set `snapshot.fetch.size=10000`.
- JSONB columns bloating payloads → project only required fields with SMT `ExtractNewRecordState` + `add.fields`.
Example Directory Layout
```
replicator/
connectors/ # Kafka Connect JSON/YAML
processors/ # Python stream logic
sinks/ # Target writers (S3, Snowflake, ES)
schemas/ # Avro/JSON-Schema definitions
terraform/ # IaC modules (network, RDS, MSK)
helm-chart/ # K8s deployment
tests/
unit/
integration/
docs/
```
Follow these rules to build replication pipelines that are fault-tolerant, observable, and meet strict RTO/RPO objectives while remaining maintainable over time.