Actionable coding, deployment, and operational rules for building high-performance, highly-available search and analytics platforms with Elasticsearch/OpenSearch and Python.
You've been there—building yet another search feature that starts simple but quickly spirals into a complex mess of performance bottlenecks, relevance tuning nightmares, and operational headaches. While your team debates whether to roll their own solution or piece together a fragile stack, your users are getting frustrated with slow, irrelevant search results.
Building production search isn't just about getting basic queries working. You're facing:
The typical response? Months of trial-and-error, reading dense documentation, and rebuilding the same infrastructure patterns that every search team eventually discovers.
These Cursor Rules give you battle-tested patterns for building high-performance search and analytics platforms with Elasticsearch and Python. Instead of learning through painful production incidents, you get proven architectural decisions, operational practices, and code patterns used by teams handling billions of documents.
You'll implement modern search patterns including vector search for AI applications, robust data ingestion pipelines, and production-grade cluster management—all while avoiding the common pitfalls that derail search projects.
Skip the research phase. Get explicit mappings, proper error handling, and async Python patterns that work under load from day one.
# Not this amateur approach that breaks under load
def search_products(query):
results = es.search(index="products", q=query)
return results["hits"]["hits"]
# This production-ready pattern with proper error handling
async def search_products(q: str, size: int = 20) -> list[Product]:
try:
resp = await es.search(
index='products',
size=size,
query={'match': {'title': q}},
timeout='10s'
)
except NotFoundError:
return []
except TransportError as e:
if e.status_code in [429, 503]: # Retry transient errors
await asyncio.sleep(random.uniform(0.1, 0.5))
return await search_products(q, size)
raise SearchUnavailableError(f"Search failed: {e.error}")
return [Product(
id=h['_id'],
title=h['_source']['title'],
score=h['_score']
) for h in resp['hits']['hits']]
Stop guessing at cluster topology. Get specific node configurations, sharding strategies, and resource allocation formulas.
Before: Generic 3-node clusters that waste resources and create bottlenecks After: Purpose-built architectures with dedicated master, data, ingest, and ML nodes sized correctly for your workload
Implement semantic search patterns that combine traditional text matching with AI-powered vector similarity—the approach powering today's intelligent search experiences.
# Hybrid search combining BM25 and vector similarity
query = {
"bool": {
"should": [
{"match": {"title": user_query}},
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'title_embedding') + 1.0",
"params": {"query_vector": generate_embedding(user_query)}
}
}
}
]
}
}
Get Kafka integration patterns with exactly-once semantics, proper bulk indexing strategies, and ILM policies that automatically manage your data lifecycle.
Impact: Handle 10x more data with the same infrastructure by implementing proper hot-warm-cold storage tiers and automated index management.
Challenge: Product catalog search that needs to handle fuzzy matching, faceted navigation, and personalized ranking.
Implementation:
# Index structure with proper mappings
PRODUCT_MAPPING = {
"properties": {
"title": {"type": "text", "analyzer": "standard"},
"category": {"type": "keyword", "doc_values": True},
"price": {"type": "scaled_float", "scaling_factor": 100},
"title_embedding": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "cosine"
}
}
}
# Multi-faceted search with aggregations
async def search_products_with_facets(
query: str,
category_filters: list[str] = None,
price_range: tuple[float, float] = None
) -> ProductSearchResult:
# Build complex query with filters and aggregations
# Handle vector search for semantic matching
# Return structured results with facet counts
Result: Sub-100ms search responses with relevant results, faceted navigation, and semantic understanding of user intent.
Challenge: Process millions of log entries daily with real-time alerting and historical analysis.
Implementation:
Result: 95% reduction in storage costs through intelligent data tiering, with query performance maintained across petabytes of historical data.
Challenge: Build semantic similarity search for content recommendations using vector embeddings.
Implementation:
# Vector similarity with filtering
async def find_similar_content(
content_id: str,
user_preferences: dict,
exclude_seen: list[str]
) -> list[Content]:
content_embedding = await get_content_embedding(content_id)
query = {
"knn": {
"field": "content_embedding",
"query_vector": content_embedding,
"k": 50,
"filter": {
"bool": {
"must": [{"terms": {"tags": user_preferences["interests"]}}],
"must_not": [{"terms": {"id": exclude_seen}}]
}
}
}
}
# Execute and return typed results
Result: Personalized recommendations with 40% higher engagement rates through semantic understanding of content similarity.
# Install with proper async support
pip install elasticsearch>=8.10 pydantic fastapi uvicorn
# Docker development cluster
docker-compose up -d # Uses provided production-like configuration
from elasticsearch import AsyncElasticsearch
from pydantic import BaseSettings
class ElasticsearchSettings(BaseSettings):
es_host: str = "localhost:9200"
es_api_key: str = ""
es_timeout: int = 10
class Config:
env_file = ".env"
settings = ElasticsearchSettings()
es = AsyncElasticsearch(
hosts=[settings.es_host],
api_key=settings.es_api_key,
timeout=settings.es_timeout
)
# Kubernetes StatefulSet with proper resource allocation
# Includes anti-affinity rules, persistent storage, and monitoring
kubectl apply -f elasticsearch-cluster.yaml
# Prometheus metrics for cluster health
# Grafana dashboards for operational visibility
# Automated alerts for cluster state changes
Your search features become a competitive advantage instead of a maintenance burden. Users get fast, relevant results while your team focuses on business logic instead of infrastructure complexity.
Ready to transform your search development experience? These rules provide everything you need to build production-grade search platforms that scale with your business—no trial and error required.
### Technology Stack Declaration
- Search Engine: Elasticsearch 8.x / OpenSearch 2.x (API compatible)
- Programming Language: Python 3.11 using the official async client (elastic-transport + elasticsearch >= 8.10)
- Orchestration: Docker & Kubernetes (Elastic Cloud on Kubernetes or OpenSearch Operator)
- Data Pipeline: Kafka + Kafka Connect (Elasticsearch Sink Connector)
- Visualization & Monitoring: Kibana / OpenSearch Dashboards, Elastic APM, Prometheus & Grafana
- Machine Learning: TensorFlow, scikit-learn, SentenceTransformers for vector embeddings
### Key Principles
- Design for resilience: multi-node, multi-AZ clusters with dedicated master, data, ingest, ml, and coordinating nodes
- Everything as code: manage indices, templates, ILM policies, and users with version-controlled JSON/YAML manifests
- Prefer explicit mappings: avoid dynamic fields in production to control index bloat and improve relevance
- Keep read & write paths isolated: separate hot–warm–cold tiers using ILM
- Treat indices as immutable; use reindex-from-remote for schema migrations
- Use bulk operations for ingestion; never write one document at a time in production
- Monitor, measure, iterate; back every change with metrics
### Python
- Follow PEP 8 and enforce black formatting
- Always use type hints (PEP 484) and mypy strict mode
- Prefer async Elasticsearch client to avoid thread blocking in FastAPI apps
- Wrap raw client calls in domain services returning Pydantic models
- Use environment variables or typed settings classes (pydantic.BaseSettings) for connection info
- Keep index names, mappings, and queries in dedicated modules (e.g., search/indexes.py, search/queries.py)
```python
from elasticsearch import AsyncElasticsearch, NotFoundError
from pydantic import BaseModel
import os
class Product(BaseModel):
id: str
title: str
score: float
es = AsyncElasticsearch(os.getenv('ES_HOST'))
async def search_products(q: str, size: int = 20) -> list[Product]:
try:
resp = await es.search(index='products', size=size, query={'match': {'title': q}})
except NotFoundError:
return []
return [Product(id=h['_id'], title=h['_source']['title'], score=h['_score']) for h in resp['hits']['hits']]
```
### Error Handling and Validation
- Catch elasticsearch.TransportError and inspect `.status_code` & `.error` for actionable messages
- Retry transient 429/503 errors with exponential back-off and jitter (max 5 attempts)
- Validate mappings and ILM policies with the simulate API before applying
- Enforce timeouts: 1 s connect, 10 s request, 60 s socket
- Return domain-level errors (e.g., `SearchUnavailableError`) instead of propagating raw client exceptions
- Guard against mapping explosion via `index.mapping.total_fields.limit` and `index.field_name_length_limit`
### Elasticsearch
- Cluster
- Minimum 3 master-eligible nodes
- Disable swapping (`bootstrap.memory_lock=true`); JVM heap = 50 % RAM, max 32 GB
- Enable disk-based shard allocation (`cluster.routing.allocation.disk.watermark.high=85%`)
- Indices & Mappings
- Naming: `<env>-<domain>-<yyyyMM>` e.g., `prod-orders-202401`
- One primary shard per ~50 GB; start with 1 replica in prod
- Use `keyword` for exact matches, `text` with BM25 + `dense_vector` for semantic search
- Set `index.query.default_field` for multi-match convenience
- ILM
- hot: rollover at 30 GB or 7 d
- warm: shrink to 1 shard, set `index.priority=50`
- cold: move to low-cost nodes, set `index.blocks.write=true`
- delete: after 365 d
- Vector Search
- Store embeddings in `dense_vector` with `index:true`, `similarity:cosine`
- Use kNN search API or script_score combining BM25 and cosine
- Keep dimension ≤ 768; tune HNSW `ef_construction=256`, `m=16`
- Ingest
- Use ingest pipelines for enrichment (geoip, ML inference)
- Prefer Kafka Connect sink with exactly-once semantics
- Bulk size 5–15 MB; concurrency ≈ 1.5 × data nodes
- Snapshots
- Daily snapshots to S3/GCS (`repository-s3` plugin)
- Use `cluster-state-include-global-state=false` unless restoring security objects
- Upgrades
- Run Upgrade Assistant and `_migration/deprecations` before every major bump
### Testing
- Unit: mock Elasticsearch with `respx` or `pytest-httpx`
- Integration: use Testcontainers-python to spin up a disposable single-node cluster
- Provide seed data via `_bulk` API during test setup
- Smoke: run functional search scenarios in CI against staging cluster
### Performance
- Disable `index.refresh_interval` during bulk backfills; restore to 1 s afterwards
- Profile queries with `_profile` and Kibana Dev Tools; optimize those > 100 ms
- Set `doc_values=false` on high-cardinality `text` fields
- Avoid per-hit `script_score`; pre-compute when possible
### Security
- Enforce TLS for node-to-node and client-to-node traffic
- Use API keys with least privilege for service-to-service auth
- Rotate built-in `elastic` superuser password regularly
- Enable audit logging to a dedicated audit index
### Deployment
- Prefer Elastic Cloud or managed OpenSearch when possible
- For on-prem K8s:
- Use StatefulSets with pod anti-affinity per AZ
- PersistentVolumeClaims using XFS with `noatime`
- Add readinessProbe on `/_cluster/health?wait_for_status=yellow`
### Observability
- Install `elasticsearch_exporter` and Filebeat for logs
- Track key SLI: search latency P95 < 200 ms, indexing latency P95 < 50 ms, JVM heap < 75 %, CPU < 70 %
- Alert on unassigned shards, red cluster state, disk watermark high
### Common Pitfalls
- Oversharding small indices wastes memory
- Disabling `_source` prevents reindex & update; only disable with full understanding
- Ignoring mapping explosions; always monitor `number_of_fields`