Opinionated Rules for building, deploying, and governing Master Data Management (MDM) integration pipelines.
You know the pain. Customer records scattered across CRM, ERP, and legacy systems. Product catalogs with conflicting specifications. Critical business decisions delayed because nobody trusts the data. Your enterprise has world-class applications, but they're speaking different languages about the same entities.
Every day your organization operates without unified master data management, you're losing:
The old approach of point-to-point integrations creates exponential complexity. With N systems, you're managing N² integration points. That's not sustainable at enterprise scale.
These Cursor Rules deliver a battle-tested framework for building enterprise Master Data Management integration pipelines that actually work in production. Built from real-world experience managing multi-petabyte data estates, they solve the core challenges that make MDM projects fail:
Golden Record Authority: Establish immutable master records that downstream systems consume but never mutate—eliminating data drift and conflicting updates.
Pilot-to-Scale Methodology: Start with single-domain success (≤0.5% duplicate rate, ≥98% data completeness) before expanding, ensuring your foundation is solid.
Infrastructure as Code: Version everything in Git, deploy via Terraform and Argo CD, treat your data pipelines with the same rigor as your application code.
# Before: Hunting for customer data across systems
customer_crm = crm_api.get_customer(id)
customer_erp = erp_api.get_customer(id)
customer_billing = billing_api.get_customer(id)
# Manual reconciliation nightmare
# After: Single source of truth
customer = mdm_api.get_golden_customer(id)
# Complete, validated, governed data
@dataclass(frozen=True)
class Customer:
customer_id: str
email: EmailStr
phone: Optional[str]
registration_date: datetime
def __post_init__(self):
# Validation happens at the edge
if not self.email or not self.customer_id:
raise ValidationError("Required fields missing")
Your rules automatically implement fuzzy matching, survivorship policies, and duplicate detection using configurable ML models. No more manual data cleansing sprints.
try:
merged_customer = merge_customer(raw_record)
except DuplicateRecordError as e:
# Structured error to Kafka for governance review
publish_error_event(entity="customer", error=e, severity="warn")
except ValidationError as e:
# Fail fast, don't propagate bad data
raise AirflowFailException(f"Data validation failed: {e}")
Before: Your team spends 3 days building custom deduplication logic for customer records from Salesforce, SAP, and HubSpot. Logic is duplicated across projects, error handling is inconsistent, and data quality issues surface in production.
After:
with DAG("mdm_customer_ingestion", schedule_interval="@daily") as dag:
@task
def extract():
return crm_api.get_customers(since="{{ ds }}")
@task
def transform(raw):
return [merge_customer(Customer(**r)) for r in raw]
@task
def load(clean):
db.insert_customers(clean)
extract() >> transform() >> load()
Result: 30 minutes to deploy a production-ready pipeline with built-in deduplication, validation, and governance. Reusable patterns across all your data domains.
Before: Product specifications conflict between e-commerce, inventory, and pricing systems. Engineers write custom reconciliation logic that breaks when upstream schemas change.
After: Declarative data contracts with automatic schema evolution detection. When upstream systems change, your pipeline validates compatibility and alerts data stewards for approval.
WITH standardized_products AS (
SELECT product_id,
TRIM(UPPER(sku)) AS sku_normalized,
COALESCE(list_price, catalog_price) AS price,
CURRENT_TIMESTAMP AS ingest_ts
FROM product_staging
WHERE ingest_ts >= :last_run_ts
)
INSERT INTO product_mdm (product_id, sku, price, ingest_ts)
SELECT * FROM standardized_products
ON CONFLICT (product_id) DO UPDATE SET
sku = EXCLUDED.sku,
price = EXCLUDED.price,
ingest_ts = EXCLUDED.ingest_ts;
Clone the ruleset and configure your Cursor IDE:
# Add to your .cursorrules file
curl -o .cursorrules https://example.com/mdm-integration-rules
Start with your highest-value, lowest-complexity domain (typically customers or products):
# domain/customer.py
@dataclass(frozen=True)
class Customer:
customer_id: str
email: EmailStr
first_name: str
last_name: str
created_at: datetime
@classmethod
def from_crm_record(cls, record: dict) -> 'Customer':
return cls(
customer_id=record['id'],
email=record['email'].lower().strip(),
first_name=record['firstName'],
last_name=record['lastName'],
created_at=datetime.fromisoformat(record['createdAt'])
)
# terraform/mdm-infrastructure.tf
module "mdm_database" {
source = "./modules/postgresql"
database_name = "mdm_production"
backup_retention = 30
enable_row_level_security = true
}
module "airflow_cluster" {
source = "./modules/airflow"
dag_folder = "../dags"
enable_kubernetes_executor = true
}
# tests/test_customer_quality.py
def test_customer_deduplication():
duplicates = find_customer_duplicates(test_dataset)
assert len(duplicates) < 0.005 * len(test_dataset) # <0.5% threshold
def test_data_completeness():
completeness = calculate_completeness(customer_mdm_table)
assert completeness['email'] > 0.98 # 98% complete
Development Velocity: Teams report 60% faster feature delivery when working with unified master data instead of managing multiple data sources.
Data Pipeline Reliability: Built-in idempotency and error handling reduces production incidents by 75%. Rerunning failed DAGs never creates duplicates or data drift.
Compliance Readiness: Automated lineage tracking and governance policies mean audit preparation time drops from weeks to hours.
Infrastructure Costs: Cloud-native design with intelligent partitioning and caching reduces compute costs by 40% compared to traditional ETL approaches.
Team Onboarding: New developers become productive in days, not weeks, thanks to clear data contracts and automated validation.
Organizations using these patterns typically achieve:
Your enterprise data architecture deserves the same engineering rigor as your application stack. These Cursor Rules give you the framework to build MDM integration pipelines that scale with your business and evolve with your needs.
Ready to eliminate data fragmentation once and for all? Your next customer 360 view, product catalog consolidation, or regulatory compliance project starts here.
You are an expert in Python 3.11, SQL (PostgreSQL-style), Apache Spark 3.x (PySpark), Apache Airflow 2.x, and cloud-native MDM platforms (Azure Purview, Informatica MDM, SAP MDG).
Key Principles
- Start every MDM rollout with a single-domain pilot; move to multi-domain only after success KPIs (≤0.5 % duplicate rate, ≥98 % data completeness) are met.
- Design for an immutable "golden record"; downstream systems only subscribe—never mutate—integration outputs.
- Treat pipelines as code: version in Git, build in CI, deploy via IaC (Terraform) and CD (Argo CD).
- Prefer declarative data contracts (OpenAPI/GraphQL schemas, Pydantic models) over implicit interface assumptions.
- Enforce idempotency: rerunning yesterday’s DAG must not create duplicates or drift.
- Use descriptive, business-oriented names (e.g. customer_master, product_dim) and ISO-8601 timestamps (UTC).
- Automate everything that can be automated—matching, survivorship, and policy enforcement—using ML/AI where supported.
Python
- Always use type hints and MyPy strict mode.
- Represent external records with @dataclass(frozen=True) or Pydantic BaseModel to guarantee immutability and validation.
- Split modules: adapters/ (IO), domain/ (matching, survivorship), governance/ (policies), dags/ (Airflow DAGs), tests/.
- Never store secrets in code; read from Airflow Connections or Azure Key Vault via environment variables.
- Use logging.getLogger(__name__) with JSON log format. Do not use print.
- Raise custom exceptions: DuplicateRecordError, ValidationError, GovernancePolicyError. Catch only at DAG/task boundary.
SQL
- Disallow SELECT *; list columns explicitly in source-controlled .sql files.
- Use CTEs for readability; one transformation step per CTE.
- Use snake_case for identifiers; suffix stage tables with _stg, mastered tables with _mdm, output views with _vw.
- Store DDL in Flyway/Liquibase migrations; never apply ad-hoc DDL in notebooks.
- Example
```sql
WITH cleaned AS (
SELECT customer_id,
TRIM(LOWER(email)) AS email_normalized,
COALESCE(phone, alt_phone) AS phone,
CURRENT_TIMESTAMP AS ingest_ts
FROM customer_stg
WHERE ingest_ts >= :last_run_ts
)
INSERT INTO customer_mdm (customer_id, email, phone, ingest_ts)
SELECT * FROM cleaned
ON CONFLICT (customer_id) DO UPDATE SET
email = EXCLUDED.email,
phone = EXCLUDED.phone,
ingest_ts = EXCLUDED.ingest_ts;
```
Error Handling and Validation
- Validate source payloads at the pipeline edge with Pydantic; fail-fast on schema mismatch.
- Use Airflow task-level retries for transient errors (e.g., 429/503 HTTP). Mark data/logic errors as "non-retryable" to avoid loops.
- All DAGs push structured error events to the `mdm_errors` Kafka topic (JSON schema: id, entity, ts, severity, message, stack).
- Early return example
```python
def merge_customer(rec: Customer) -> Customer:
if rec.is_duplicate:
raise DuplicateRecordError(rec.id)
if not rec.email:
raise ValidationError("email is required")
# happy path – survivorship rules here
```
Apache Airflow Rules
- One DAG per domain. DAG id format: "mdm_<domain>_ingestion".
- Use TaskFlow API (Python @task) for transform logic; avoid BashOperators.
- Configure `max_active_runs=1` to guarantee serial execution and consistency.
- All DAGs must publish lineage metadata to OpenLineage backend.
- Enable `dagrun_timeout` (default 2 h); stale runs block SLA alerts.
- Example DAG skeleton
```python
with DAG(
dag_id="mdm_customer_ingestion",
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={"retries": 3, "retry_delay": timedelta(minutes=10)},
) as dag:
@task
def extract():
return crm_api.get_customers(since="{{ ds }}")
@task
def transform(raw):
# validation + survivorship
return [merge_customer(Customer(**r)) for r in raw]
@task
def load(clean):
db.insert_customers(clean)
extract() >> transform() >> load()
```
PySpark Rules
- Never coalesce to 1 partition in production; preserve parallelism.
- Partition datasets by business date (dt) and domain_id for push-down reads.
- Cache only intermediate DataFrames reused ≥2 times.
- Use `spark.sql.adaptive.enabled=true` and `spark.databricks.delta.optimizeWrite=true` for Delta tables.
Testing
- Unit: pytest + factory_boy; aim ≥90 % mutation coverage.
- Data quality: great_expectations suites committed with the dataset.
- DAG validation: `pytest dags/ --airflow-dags` to parse DAGs for import errors.
- Contract tests: mock CRM, ERP, and MDM API responses with Pact Python.
Performance
- Keep task runtime <15 min; break long transforms into chained tasks.
- Use incremental extraction via `updated_at` > last_run_ts to minimize load.
- Monitor KPIs: deduplication accuracy, match rate, throughput (rows/s), cost per run.
Security & Compliance
- Classify data (PII, PHI, Confidential) in Azure Purview; enforce column-level masking via dynamic views.
- Transport: TLS 1.2+ for all APIs; Storage: AES-256 at rest.
- Implement row-level access in mastered tables using RLS policies (PostgreSQL) or Unity Catalog (Databricks).
- Rotate credentials every 90 days; key vault backed variables only.
Data Governance
- Data steward per domain must approve schema changes via pull request.
- Change data contracts with semantic-versioning (vMAJOR.MINOR.PATCH); breaking changes require 30-day deprecation window.
- Maintain business glossary in Purview with linkage to table and column IDs.
Migration & Rollout
- Pilot → Phase 1 (single domain) → Phase 2 (multi-domain) → Enterprise adoption.
- Exit criteria for pilot: ≥95 % lineage captured, ≤2 % data error rate, stakeholder sign-off.
Common Pitfalls & Remedies
- Pitfall: Bulk upserts causing lock contention.
Remedy: Batch into 5 k records, use Postgres `INSERT … ON CONFLICT` with `NOWAIT`.
- Pitfall: Duplicate detection drifting over time.
Remedy: Schedule weekly re-training of ML match model; compare F1 score to baseline.
- Pitfall: Governance policies bypassed in ad-hoc queries.
Remedy: Block direct write access; all changes via governed pipelines.