Actionable coding rules and architecture guidelines for automating data governance, regulatory compliance, and data-quality enforcement in large enterprises.
Your compliance team is drowning in manual processes while your data scales exponentially. Every new dataset creates governance debt. Every regulatory audit becomes a fire drill. Meanwhile, your development teams are blocked by slow approval processes and unclear data policies.
You need automated data governance that scales with your enterprise.
Large enterprises face a perfect storm of compliance challenges:
Manual Governance Processes Don't Scale
Regulatory Complexity is Accelerating
Development Velocity vs. Compliance Tension
These Cursor Rules transform data governance from a manual bottleneck into automated infrastructure. Instead of fighting compliance requirements, you embed them directly into your development workflows.
Privacy-by-Design as Infrastructure Every dataset defaults to encrypted, access-controlled, and properly classified. Your pipelines automatically enforce data contracts, capture lineage, and generate audit trails - no manual intervention required.
Real-Time Policy Enforcement Policy violations surface within 5 minutes through streaming validation, not quarterly audit discoveries. Your governance rules execute as code alongside your data transformations.
Automated Compliance Workflows DPIA generation, DSAR handling, and consent management become pipeline steps, not manual processes. Your compliance team focuses on policy decisions while automation handles execution.
# Manual classification after deployment
def process_customer_data():
df = spark.read.table("raw_customer_data")
# No schema validation
# No PII detection
# No policy enforcement
df.write.mode("overwrite").saveAsTable("processed_customers")
# Manual steward notification via email
@governance_required(["gdpr", "pii"])
@contract_enforced("customer_data_v2")
def process_customer_data():
df = extract_with_validation("raw_customer_data")
df_clean = transform_with_contracts(df)
# Automatic governance checks
governance.check_policies(df_clean, dataset="customer_profile")
load_with_lineage(df_clean, "processed_customers")
# Automatic steward notification + audit trail
# Stream processing with embedded governance
kafka_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.load()
.transform(validate_pii_inline) # < 200ms validation
.transform(enforce_geo_fencing) # Block sovereign data violations
.writeStream
.foreachBatch(update_governance_metrics)
.start()
)
Development Acceleration
Operational Excellence
Risk Reduction
# Install governance dependencies
pip install apache-atlas-client collibra-api informatica-rest-client
pip install great-expectations pandas-profiling
# Configure Terraform for governed resources
terraform init data-governance/
terraform plan -var="compliance_tags={gdpr=true,pci=true}"
# contracts.py - Define your data contracts
@dataclass(frozen=True)
class CustomerDataContract:
version: int = 2
schema: Dict[str, str] = field(default_factory=lambda: {
"customer_id": "BIGINT NOT NULL",
"email": "STRING CLASSIFIED pii.email",
"country_code": "STRING CHECK (country_code IN ('US','EU','CA'))",
"consent_status": "STRING NOT NULL"
})
retention_days: int = 2555 # 7 years
geo_restrictions: List[str] = field(default_factory=lambda: ["EU"])
# dg_customer_ingest.py
with DAG("dg_customer_ingest", tags=["gdpr", "pii"]) as dag:
@task
def validate_contracts(df: pd.DataFrame) -> pd.DataFrame:
contract = CustomerDataContract()
validator = ContractValidator(contract)
return validator.validate_or_raise(df)
@task_group
def governance_checks():
return [
check_pii_classification(),
validate_geo_fencing(),
update_lineage_metadata(),
generate_audit_events()
]
extract() >> validate_contracts() >> governance_checks() >> load()
# streaming_governance.py - Real-time policy enforcement
def create_governance_stream():
return (
spark.readStream
.table("data_events")
.filter("event_type = 'schema_change'")
.writeStream
.foreachBatch(lambda batch, id: [
validate_schema_compatibility(batch),
check_breaking_changes(batch),
notify_stakeholders_if_violations(batch)
])
.trigger(processingTime='5 minutes')
.start()
)
# compliance_automation.py
class GDPRAutomation:
def process_dsar_request(self, request_id: str):
"""Automated GDPR Subject Access Request processing"""
lineage = self.atlas_client.get_downstream_tables(request_id)
data_export = self.collect_subject_data(lineage)
return self.generate_compliant_response(data_export)
@scheduled("@daily")
def audit_data_retention(self):
"""Automatic data deletion per retention policies"""
expired_data = self.find_expired_datasets()
self.secure_delete_with_audit_trail(expired_data)
Old Process: 2-3 weeks of manual reviews, Excel-based risk assessments, email chains New Process: 2 hours automated - contract definition triggers governance pipeline
# Developer defines contract
customer_contract = DataContract(
classification=["pii", "gdpr"],
retention_policy="7_years",
geo_scope=["EU"]
)
# Automated pipeline handles the rest
onboard_dataset("customer_data", customer_contract)
# → Atlas registration, Collibra sync, policy enforcement setup
Old Process: Manual impact analysis, downstream team notifications, rollback procedures New Process: Automated compatibility checks prevent breaking changes
# Contract versioning prevents breaking changes
@contract_version("v3", backward_compatible=True)
def evolve_customer_schema():
# New optional fields only - enforced at build time
add_column("preferences", "JSON", nullable=True)
# Breaking changes blocked by CI/CD
Old Process: Weeks of manual data gathering, lineage tracing, documentation assembly New Process: Minutes of automated evidence collection
# Instant audit readiness
audit_report = generate_compliance_report(
regulation="gdpr",
dataset="customer_data",
time_range="2023-01-01 to 2024-01-01"
)
# Returns: lineage, access logs, policy violations, remediation status
Your enterprise data governance transforms from a manual cost center into automated competitive advantage. Development teams ship compliant data products faster while your governance team focuses on strategic policy decisions instead of operational firefighting.
The choice is clear: automate governance now or fall further behind the compliance curve while your competitors ship data products at scale.
You are an expert in Python, SQL, Apache Atlas, Collibra, Informatica Axon, Informatica Cloud Data Governance & Catalog, Airflow, Spark, Terraform, AWS, Azure, GCP, Docker and Kubernetes.
Key Principles
- Privacy-by-Design first: treat every dataset as potentially sensitive; default to encryption, masking, and access-controls.
- Automate everything: cataloging, lineage capture, DPIA generation, DSAR workflows, consent management, data-quality checks, and policy enforcement must be executed by code or pipeline steps.
- Immutable audit trail: every metadata mutation, schema change, or policy override must generate a signed, append-only event stored for ≥7 years.
- Real-time governance: policy violations must surface within <5 minutes via streaming validation, not batch scans.
- Risk-based prioritisation: allocate resources to high-risk systems (PII, PCI, PHI) first; tag data with risk_level to drive controls programmatically.
- Contract-driven pipelines: every dataset moves only through pipelines that enforce a versioned data-contract (schema + constraints + policy references) at build time.
- Focus on metadata maturity: enforce mandatory technical, business, and stewardship metadata before any dataset is published.
- Transparency & Ethics: publish AI model cards and governance decisions; log model features & training datasets to lineage tools.
Python
- Follow PEP-8, enable mypy and Ruff in CI. 100 % type hints for production code.
- Use dataclasses for immutable configuration objects; freeze once loaded.
- Organise code: src/<domain>/<pipeline>/ with modules: contracts.py, policies.py, tasks.py, dag.py, tests/.
- Variables: snake_case; booleans start with is_/has_/should_.
- Functions ≤40 lines; each does one thing. Extract reusable validators to utils/validation.py.
- Use pathlib, not os.path.
- Never hard-code secrets; pull via environment or AWS/GCP/Azure secret managers.
- Prefer pandas-style type annotations (pd.DataFrame) only inside thin adapters; convert rapidly to PySpark or arrow tables for scale.
SQL
- Adopt ANSI SQL (SparkSQL compatible). Upper-case keywords, snake_case identifiers.
- All CREATE TABLE must include COMMENT, data_classification, retention_days, owner.
- Reference columns explicitly; SELECT * is prohibited in persistent objects.
- Enforce CHECK constraints for domain values (e.g., country_code IN …) and data_contract versions.
Terraform
- Use one module per governed domain (e.g., customer_data). Output resources with clear prefixes (dg_<env>_<name>).
- Tag every resource with {"owner":"<squad>","data-risk":"<low|medium|high>"}.
- Deny public S3/Blob access via aws_s3_bucket_public_access_block.
Error Handling and Validation
- Validate inputs at pipeline entry: schema, nullability, PII flag, geo_fence region.
- Raise custom exceptions: GovernanceViolation, ContractMismatch, DLPViolation.
- Always enrich exceptions with dataset_id, run_id, and policy_id for downstream alerting.
- Implement @retry(max_attempts=3, jitter=True) only around idempotent I/O operations.
- Log with structured JSON (dataset, severity, regulation, function, message). Route to Splunk/CloudWatch/Stackdriver.
- For Airflow tasks: use trigger_rule="all_done" on cleanup/notification tasks to ensure alerts fire on failures.
Apache Atlas
- Every new table must create a TypeDefinition if classification is novel.
- Lineage: register spark lineage via spark atlas connector; disallow direct JDBC writes that skip lineage registration.
- Use tags: gdpr, hipaa, pci, pii, phi, sovereign_<country>.
- Block promotion to PROD if atlas_client.has_unclassified_columns(table) returns True.
Collibra / Informatica Axon
- Sync metadata nightly via REST; diff scan ➔ open stewardship tasks automatically.
- Business glossary terms require steward + data_owner before status="Approved".
- DPIA records link to dataset assets; enforce completeness (purpose, lawful_basis, retention) via API rule.
Airflow
- One DAG per data domain. DAG ids: dg_<domain>_<verb> (e.g., dg_customer_ingest).
- Enable DAG serialization & DAG integrity checks (airflow configs) for tamper detection.
- Use TaskFlow API; tasks return pandas, not write files directly.
- Add governance.check_policies() task group after each transformation group.
- Emit OpenLineage events for every task.
Testing & Compliance Automation
- Unit tests ≥90 % coverage; mock cloud SDKs.
- Contract tests: every pull request runs great_expectations checkpoint on sample data.
- Regulatory regression suite: map dataset tags → pytest markers (gdpr, ccpa). Failing test blocks merge.
- Continuous DPIA: pipeline scaffolder adds DPIA yaml; validations triggered by git hooks.
Security & Privacy Controls
- Encrypt data in transit (TLS1.3) and at rest (AES-256-GCM).
- Geo-fencing: evaluate request.region vs dataset.sovereignity; raise GovernanceViolation on mismatch.
- Centralised DLP using regex + ML detectors (e.g., AWS Macie); inline block on detectExposure=="public".
- IAM: least-privilege via IaC; no broad wildcards; use short-lived tokens.
Performance & Observability
- Real-time rule engine within 200 ms per message; benchmark with locust.
- Streaming validation via Kafka Streams; window=5 minutes sliding.
- Metrics: policy_violations_total, dpias_pending, metadata_completeness_percent, lineage_missing_rate.
- Alert SRE on SLO breach: <5% lineage_missing in 24h.
Naming & Metadata Conventions
- Repositories: data-<domain> (e.g., data-customer).
- Data assets: <system>_<domain>_<entity>_<stage> (crm_customer_profile_raw).
- AI models: ai_<domain>_<purpose>_<v#> (ai_risk_scoring_v3).
- Add data_contract_version column to every governed table.
CI/CD & Deployment
- Use GitHub Actions; required checks: lint, test, contract-validate, terraform-plan, dpiaplan.
- Environments: dev → qa → prod; promotion requires green compliance gate.
- Docker images scanned with Trivy; block CVE >7 severity.
Common Pitfalls & Remedies
- Pitfall: ad-hoc data patching outside pipelines → Remedy: disable write access in prod; require pull request.
- Pitfall: "shadow" datasets not cataloged → Remedy: daily cloud inventory vs Atlas diff; auto-open Jira.
- Pitfall: schema drift breaks downstream → Remedy: enforce contract version bump + backward compat tests.
Examples
1. Registering a dataset with Atlas
```python
from atlas import Client, PiiTag
client = Client()
asset = client.register_table(
name="crm_customer_profile_raw",
schema="id BIGINT, email STRING, country_code STRING, data_contract_version INT",
classification=[PiiTag.EMAIL, "gdpr", "sovereign_eu"],
owner="customer-data-squad",
)
```
2. Airflow DAG snippet with governance checks
```python
with DAG(
"dg_customer_ingest",
schedule_interval="@hourly",
start_date=datetime(2023, 1, 1),
tags=["gdpr", "pii"],
) as dag:
df = extract() # returns DataFrame
df_clean = transform(df)
validate = governance.check_policies(df_clean, dataset="crm_customer_profile_raw")
load = load_to_warehouse(df_clean)
extract >> transform >> validate >> load
```