End-to-end coding, operational, and architectural rules for building, deploying, and maintaining data, container, and AI orchestration pipelines.
Stop fighting with brittle orchestration setups and pipeline failures that break your production schedule. These advanced orchestration rules eliminate the complexity of managing multi-platform data, container, and AI workflows while delivering the reliability your team needs.
You're dealing with orchestration systems that break in production, task failures that cascade through your entire pipeline, and monitoring blind spots that leave you debugging at 2 AM. Your current setup probably involves:
These rules provide battle-tested patterns for Apache Airflow, Dagster, Prefect, Kubernetes, and LLM orchestration that eliminate common failure points while maintaining development velocity. You get production-ready workflows with built-in observability, automated error recovery, and consistent patterns across your entire stack.
Core Framework Advantages:
Built-in observability with structured logging, metrics export to Prometheus, and automatic alert routing means you know about issues before users do. No more mystery failures or manual log diving.
Incremental automation strategy starts with single-purpose tasks and expands only after stability metrics stay green for multiple releases. Your pipelines become more reliable as they grow.
Declarative YAML definitions with Git-based workflows ensure your orchestration setup is reproducible across environments. No more "works on my machine" deployment issues.
Proper resource limits, horizontal pod autoscaling, and worker pool separation handle production load increases automatically. Your orchestration layer grows with your business needs.
Before: Manual task coordination with shell scripts and cron jobs
# Fragile bash orchestration
./extract_data.sh && ./transform_data.py && ./load_to_warehouse.sh
After: Type-safe, observable Dagster assets with automatic lineage tracking
@asset(group_name="analytics")
def processed_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Transform raw orders with validation and metrics."""
if raw_orders.empty:
raise ValueError("raw_orders cannot be empty")
processed = raw_orders.pipe(clean_data).pipe(enrich_data)
emit_metric("orders_processed", len(processed))
return processed
Before: Manual kubectl commands and inconsistent manifest files
kubectl apply -f deployment.yaml # Hope it works
kubectl get pods | grep myapp # Manual checking
After: Standardized Kubernetes manifests with health checks and resource management
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-processor
labels:
team: data-platform
env: prod
spec:
replicas: 3
template:
spec:
containers:
- name: processor
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
livenessProbe:
httpGet:
path: /health
port: 8080
Before: Hard-coded API calls with no token tracking or error handling
# Brittle LLM integration
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
After: Structured LangChain workflows with observability and resource limits
@task(retries=3, retry_delay_seconds=60)
async def process_with_llm(documents: List[str]) -> List[str]:
"""Process documents with rate limiting and token tracking."""
async with asyncio.Semaphore(5): # Limit concurrent requests
chain = create_analysis_chain(
max_tokens=1000,
temperature=0.1,
callbacks=[TokenUsageCallback()]
)
return await chain.arun(documents)
src/
├── orchestration/
│ ├── dags/ # Airflow DAGs
│ ├── flows/ # Prefect flows
│ ├── assets/ # Dagster assets
│ ├── operators/ # Custom operators
│ ├── sensors/ # Event sensors
│ └── tests/ # Orchestration tests
└── k8s/
├── deployment-*.yaml
├── service-*.yaml
└── hpa-*.yaml
# Semantic commits for pipeline changes
git commit -m "feat(dag): add SLA monitoring to order processing"
git commit -m "fix(k8s): increase memory limits for transformer pod"
git commit -m "chore(sensor): remove unused file watcher"
# Standard metrics in every task
def process_data(context):
start_time = time.time()
try:
result = do_processing()
emit_metric("task_success", 1, tags={"dag_id": context["dag"].dag_id})
return result
except Exception as e:
emit_metric("task_failure", 1, tags={"error": str(e)})
send_alert_to_slack(context, e)
raise
finally:
duration = time.time() - start_time
emit_metric("task_duration", duration)
def test_dag_loads_without_errors():
"""Ensure all DAGs load successfully."""
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0
@pytest.mark.integration
def test_end_to_end_workflow():
"""Test complete pipeline with test data."""
with kind_cluster():
result = trigger_workflow(test_payload)
assert result.status == "success"
assert_data_quality_checks_pass()
These orchestration rules transform your pipeline management from a source of production anxiety into a reliable foundation for scaling your data and container workloads. The comprehensive approach covers everything from local development patterns to production monitoring, giving you the confidence to ship orchestration changes without fear.
Start with the project structure and Git workflow integration—you'll see immediate improvements in code organization and deployment consistency. Then layer in the observability and testing patterns to build the reliability your production systems demand.
You are an expert in Python, Apache Airflow, Dagster, Prefect, Kubernetes, LangChain, Semantic Kernel, Git, Prometheus/Grafana, and CI/CD tooling.
Key Principles
- Automate incrementally: start with single-purpose tasks, expand only after stability metrics are green for ≥2 releases.
- All code and YAML live in Git. Every pipeline/DAG/manifest change requires a PR, review, and semantic commit message (e.g., chore(dag): add SLA & alerts).
- Observability first: emit logs, metrics, and traces at each orchestration layer; dashboards must turn green before promoting to prod.
- Prefer declarative definitions (YAML/DSL) over imperative scripting; treat workflows as code.
- Fail fast & idempotent: design every task/operator to be safely re-run; keep side effects behind checkpoints.
- Choose control style deliberately: use centralized orchestration (Airflow, Dagster, Prefect) for strict ordering; choose choreography (event bus, Kubernetes operators, LangChain callbacks) for loose coupling and resilience.
- Review orchestration design quarterly; retire unused DAGs, remove dead sensors, and upgrade libraries to LTS versions.
Python
- Follow PEP-8 + Black auto-format; line length ≤ 100.
- Mandatory type hints on all public functions and task callables.
- Use f-strings, never "%" or .format.
- Prefer pure functions; side effects should be wrapped in context managers (with ...).
- Package layout
src/
└── <domain>/
├── dags/|flows/|assets/
├── operators/
├── sensors/
├── utils/
└── tests/
- Never import DAG/Flow modules from "utils" to avoid circular DAG loads.
Error Handling & Validation
- Guard clauses at top:
def fetch(...):
if not source_url:
raise ValueError("source_url required")
- Retries: max 3, exponential back-off 2×, jitter ±10 %.
- Always implement on_failure_callback to push context to Slack + PagerDuty.
- Use message queues (Pub/Sub, SQS) for hand-off in event-driven designs; enable dead-letter queues.
- Mark tasks idempotent via @provide_session and database locks to avoid duplicate inserts.
Apache Airflow Rules
- Define DAGs via @dag decorator; schedule with cron or "@daily", never with seconds-level intervals in prod.
- Version every DAG file; include dag_version = "vX.Y.Z" variable.
- Operators:
• Use deferrable operators (Async) whenever possible to release worker slots.
• Side-effects must live in dedicated operator classes inside operators/.
- Task naming: <verb>_<object> (e.g., load_orders).
- Tags: team:<squad> env:<dev|prod> app:<domain> to enhance filtering.
- Enable task-level SLA and set email_on_failure = False (alerts handled centrally).
Dagster Rules
- Use software-defined assets (SDA) instead of plain ops when data lineage matters.
- Type every input/output using Dagster’s typing system; invalid types raise InvariantViolation.
- Sensors should emit SkipReason when conditions not met; never return None.
- Schedule definitions live in schedules.py; one file per repository for discoverability.
Prefect 2 Rules
- model flows with @flow; tasks must be @task(retries=3, retry_delay_seconds=60).
- Persist flow runs to s3://<bucket>/prefect-results/<flow-name>/<run-id>/.
- Use deployment YAML; parameterize environment via InfraOverrides not hard-coded values.
Kubernetes Rules
- All manifests are YAML in k8s/ folder. File naming: <kind>-<name>.yaml.
- Mandatory fields: metadata.labels, resources.requests + limits, livenessProbe, readinessProbe.
- Patterns:
• Sidecar: envoy or log-agent for cross-cutting concerns.
• Init container: migrations/permission-checks.
• Co-located: GPU job + exporter.
- Enable HPA on CPU ≥ 70 %, scaleDownDelay = 5m.
- RBAC: least privilege; separate ServiceAccount per workflow controller.
LLM Orchestration (LangChain / Semantic Kernel)
- Chains must declare an explicit memory + cache provider.
- Set max_tokens and temperature at the prompt-template layer—not inside chain code.
- Use async chains with asyncio.Semaphore(5) to limit open sockets.
- Log token usage via callbacks; export metrics to Prometheus.
Testing
- Unit: pytest with fixtures for Airflow DagBag, Dagster build_job, Prefect test client.
- Integration: kind + Helm to spin up ephemeral clusters in CI; run end-to-end workflow.
- Property-based tests: use Hypothesis to fuzz task inputs.
Performance & Scalability
- Place NGINX ingress as reverse proxy; enable gzip & HTTP/2.
- Separate metadata DB from worker node pools; scale Celery/Worker pods horizontally.
- Tune Airflow parallelism = min(cpu_total×2, 128).
Security
- Secrets in HashiCorp Vault or AWS Secrets Manager; mount via env vars, never hard-code.
- Enable image scanning in CI (Trivy) + admission controller (Kyverno) to block high CVEs.
- Use NetworkPolicy to restrict egress from worker pods.
Observability & Alerts
- Metrics: Airflow statsd_exporter, Dagster /graphile-exporter, Prefect Orion metrics.
- Logs: stdout to Loki; label with dag_id, run_id, task_id.
- Traces: OpenTelemetry instrumented in Python client.
- Alert routing via Alertmanager -> Slack #on-call + PagerDuty escalation.
CI/CD
- GitHub Actions: lint → unit tests → build Docker → helmfile diff → deploy-to-stage → e2e → manual-approval → prod.
- Tag releases semver; propagate TAG env var to DAG/Docker images.
Common Pitfalls
- Dynamic task mapping without concurrency limit causes scheduler overload—always set max_active_tasks.
- Breaking downstream schema: version assets and emit deprecation warnings before removal.
- Long-running sensors block slots—convert to deferrable or external triggers.
Checklist Before Merge
[ ] All new tasks/operators have docstring + examples.
[ ] Alert coverage for every new critical path.
[ ] Unit + integration tests green.
[ ] Updated CHANGELOG.md.
[ ] Reviewed by at least 2 peers.