Actionable coding, data-governance, and architectural rules for building and operating Single Source of Truth (SSOT) back-ends and data platforms.
You're building data platforms that need to scale, stay consistent, and remain trustworthy across dozens of services and teams. The traditional "copy data everywhere and hope for the best" approach breaks down fast, leaving you debugging inconsistent states, chasing data quality issues, and explaining why the same metric shows different values in different dashboards.
Every growing data platform hits the same wall. You start with a few services, some shared databases, and everything works. Then you scale:
The root cause? No clear ownership model. When everything owns data, nothing owns data.
These Cursor Rules implement a battle-tested approach that treats data ownership as a first-class architectural concern. Every piece of data has exactly one authoritative owner, and everything else is a projection.
Here's what changes:
# Instead of this chaos:
class OrderService:
def update_customer_credit(self, customer_id: str, amount: float):
# Direct database mutation - violates SSOT
customer_db.update(customer_id, {"credit": amount})
# You get this clarity:
class OrderService:
def handle_order_placed(self, event: OrderPlaced) -> None:
# Emit event for customer service to handle
self.publisher.publish(
topic="orders.order.placed.v1",
event=OrderPlaced(
order_id=event.order_id,
customer_id=event.customer_id,
amount=event.amount
)
)
The Customer service owns customer data. The Order service publishes events. No cross-service mutations, no data inconsistency.
Eliminate Data Debugging Sessions: With clear ownership and event-driven updates, you can trace any data issue back to its authoritative source in minutes, not hours.
Reduce Data Quality Incidents by 80%: Automated validation at ingestion points and immutable event streams prevent most corruption before it spreads.
Scale Teams Without Coordination Overhead: New services can consume existing data streams without coordinating database changes with other teams.
Pass Compliance Audits: Built-in lineage tracking, access controls, and audit trails mean your compliance story writes itself.
Before: You need customer data in your analytics pipeline:
After: With SSOT rules:
customers.customer.updated.v1 topicBefore: Data quality issues surface in production:
# Discover data issues during analytics
def generate_report():
customers = query_customer_db() # Might be stale
orders = query_order_db() # Different staleness
# Report shows inconsistent totals, team scrambles to debug
After: Quality gates at every boundary:
# Quality validation at ingestion
def handle_customer_event(raw_event: dict) -> None:
if not validate_event_schema(raw_event):
dead_letter_queue.send(raw_event)
raise ValidationError("Invalid customer event schema")
event = CustomerUpdated.parse_obj(raw_event)
if not event.meets_quality_thresholds():
alert_data_team(event, "Quality threshold breach")
return
process_customer_update(event)
Before: Different services show different customer totals because they calculate them differently and sync at different times.
After: Customer service owns the calculation, publishes canonical events:
# Customer service (authoritative)
class CustomerService:
def update_customer_total(self, customer_id: str):
total = self.calculate_authoritative_total(customer_id)
self.publish_event(CustomerTotalUpdated(
customer_id=customer_id,
total=total,
calculated_at=datetime.utcnow()
))
# Analytics service (projection)
class AnalyticsService:
def handle_customer_total_updated(self, event: CustomerTotalUpdated):
self.analytics_db.upsert_customer_total(
customer_id=event.customer_id,
total=event.total
)
Create the project structure that enforces SSOT principles:
mkdir your-data-platform && cd your-data-platform
# Create the standard layout
mkdir -p src/{adapters,domain,services,sql}
mkdir -p tests/{unit,integration}
mkdir -p ADRs dbt
Start by mapping your business domains to data ownership:
# src/domain/customer.py
from pydantic import BaseModel
from datetime import datetime
class Customer(BaseModel):
"""Customer domain model - owned by CustomerService"""
customer_id: str
email: str
created_at: datetime
class Config:
frozen = True # Immutable by default
# src/domain/events.py
class CustomerCreated(BaseModel):
"""Published when customer service creates a customer"""
customer_id: str
email: str
created_at: datetime
version: int = 1
Replace direct database mutations with event publishing:
# src/services/customer_service.py
from typing import Protocol
class EventPublisher(Protocol):
def publish(self, topic: str, event: BaseModel) -> None: ...
def create_customer(
email: str,
publisher: EventPublisher,
customer_repo: CustomerRepository
) -> Customer:
customer = Customer(
customer_id=generate_id(),
email=email,
created_at=datetime.utcnow()
)
# Store in authoritative database
customer_repo.save(customer)
# Publish for other services
publisher.publish(
topic="customers.customer.created.v1",
event=CustomerCreated.from_customer(customer)
)
return customer
Implement validation at every data boundary:
# src/adapters/kafka_consumer.py
def handle_raw_event(raw_event: dict, topic: str) -> None:
try:
# Validate schema first
event = EVENT_REGISTRY[topic].parse_obj(raw_event)
# Run quality checks
if not passes_quality_gates(event):
dead_letter_queue.send(raw_event, reason="Quality check failed")
return
# Process valid event
EVENT_HANDLERS[topic](event)
except ValidationError as e:
dead_letter_queue.send(raw_event, reason=f"Invalid schema: {e}")
metrics.increment("events.validation_failed", tags={"topic": topic})
Create monitoring that catches issues before they spread:
# src/services/data_quality_monitor.py
def run_daily_audit() -> AuditReport:
report = AuditReport()
for domain in MONITORED_DOMAINS:
# Check data freshness
last_update = get_last_update_time(domain)
if datetime.utcnow() - last_update > timedelta(hours=4):
report.add_issue(f"{domain} data is stale")
# Check completeness
completeness = calculate_completeness(domain)
if completeness < 0.95:
report.add_issue(f"{domain} completeness: {completeness:.2%}")
if report.has_issues():
alert_data_team(report)
create_jira_ticket(report)
return report
Teams using these patterns typically see:
Your data platform becomes a competitive advantage instead of a coordination bottleneck. Teams can move fast because they trust the data, and data engineers can focus on building new capabilities instead of fighting consistency issues.
The rules are comprehensive enough to handle complex enterprise scenarios while remaining practical for immediate implementation. Start with one domain, prove the approach works, then expand across your platform.
You are an expert in Python, SQL, Apache Kafka, AWS Glue, Redshift, Snowflake, and Domain-Driven Design for data platforms.
Key Principles
- Everything has one authoritative owner; every other copy is a projection. Never mutate projections.
- Embrace Domain-Driven Design (DDD): each bounded context owns its data model and publishes integration events.
- Data moves via immutable events (Kafka topics, DynamoDB Streams), never by direct writes to foreign stores.
- Prefer idempotent, append-only writes. Treat deletes as tombstone events.
- Changes are version-controlled, peer-reviewed, and deployed through CI/CD; no manual hot-fixes on data stores.
- Automate data quality metrics (completeness, uniqueness, freshness, validity) and fail the pipeline if thresholds are breached.
- Security is non-negotiable: least-privilege IAM, column-level encryption, and audited access.
Python
- Use type-annotated, side-effect-free functions for transformations; avoid class state.
- Standard project layout:
src/
adapters/ # Kafka, Glue, JDBC connectors
domain/ # Pure domain models (pydantic BaseModel)
services/ # Use-case orchestration (pure functions)
cli.py # Entrypoint (Typer)
settings.py # Environment & secrets via pydantic settings
- Naming conventions:
snake_case for functions/variables, PascalCase for pydantic models, UPPER_SNAKE for constants.
topic names: <bounded-context>.<aggregate>.<event-name>.v<version>
- Never inline SQL strings; keep parametrised queries in *.sql files under src/sql/.
- Use mypy --strict + Ruff lint; CI fails on warnings.
Error Handling and Validation
- Validate inbound events with pydantic; reject (and dead-letter) invalid payloads immediately.
- Guard clauses first; happy path last. Example:
```python
def handle_event(evt: OrderPlaced) -> None:
if evt.version != 2:
raise UnsupportedVersionError(evt.version)
if not evt.items:
raise ValueError("Empty order")
process_order(evt) # happy path
```
- All exceptions subclass PlatformError and include a machine-readable code.
- Retry only idempotent operations; otherwise, dead-letter.
Framework-Specific Rules
Kafka
- One topic per event type; use log compaction for idempotent state streams.
- Schema Registry enforced; consumers refuse unknown versions.
- Exactly-once semantics enabled with transactional producers.
AWS Glue / Spark Jobs
- Use Glue 3.0, Python 3.10, and Iceberg tables for ACID inserts.
- Partition by business time (yyyy/mm/dd) not ingestion time.
- Write jobs emit completion events to <ctx>.glue.job_completed.v1.
Redshift / Snowflake
- Treat as read-only marts fed by Glue jobs.
- Apply zero-copy cloning for back-fill tests.
- Use dbt for transformations; one model ↔ one business table; models locked to semantic version tags.
Additional Sections
Testing
- Every pipeline has Data Contract tests: schema, distribution, field-level expectations (Great Expectations).
- Integration tests spin up LocalStack + Redpanda in Docker; run "make e2e" in CI.
Performance
- Batch windows ≤ 5 minutes; if SLA < 30 s choose stream enrichment path.
- Prefer partition pruning over table scans; enforce via CI lint on dbt plans.
Security
- All secrets in AWS Secrets Manager, loaded via settings.py.
- IAM roles per microservice with strict allow-list to its own KMS key and S3 prefix.
- Column-level encryption for PII; key rotation every 90 days.
Governance & Audits
- Data Catalog (e.g., AWS Glue Data Catalog or Google Data Catalog) is the SSOT for schema metadata.
- Automatic daily audit job verifies row counts, freshness, lineage; posts report to #data-quality Slack.
- Any quality metric failure opens a Jira ticket with owner = data steward.
Change Management
- All schema changes follow ADR process; must include backward-compatibility matrix.
- Migrations are forward-only; destructive DDL permitted only after 30 days deprecation window.
Folder Summary
.
├── ADRs/
├── src/
│ ├── adapters/
│ ├── domain/
│ ├── services/
│ ├── sql/
│ ├── cli.py
│ └── settings.py
├── tests/
│ ├── unit/
│ └── integration/
├── dbt/
└── Makefile