Elasticsearch Plugin
Async full-text search, analytics, and log aggregation. Built on the official elasticsearch Python client.
Installation
pip install elasticsearch
Quick Start
Direct Usage (Scripts)
from daita.plugins import elasticsearch
# Direct usage in scripts
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match": {"level": "ERROR"}})
print(results)
Agent Integration (Recommended)
from daita import SubstrateAgent
from daita.plugins import elasticsearch
# Create plugin
es = elasticsearch(hosts="localhost:9200")
# Agent uses Elasticsearch tools autonomously
agent = SubstrateAgent(
name="Search Agent",
prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
tools=[es]
)
await agent.start()
result = await agent.run("Search for all ERROR level logs from the last hour")
Connection Parameters
elasticsearch(
hosts: Union[str, List[str]] = "localhost:9200",
auth_method: str = "basic",
username: Optional[str] = None,
password: Optional[str] = None,
api_key_id: Optional[str] = None,
api_key: Optional[str] = None,
ssl_fingerprint: Optional[str] = None,
verify_certs: bool = True,
ca_certs: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
**kwargs
)
Parameters
hosts(str | List[str]): Elasticsearch host(s) - single host or listauth_method(str): Authentication method ("basic", "api_key", "ssl")username(str): Username for basic authenticationpassword(str): Password for basic authenticationapi_key_id(str): API key ID for API key authenticationapi_key(str): API key for API key authenticationssl_fingerprint(str): SSL certificate fingerprintverify_certs(bool): Verify SSL certificates (default: True)ca_certs(str): Path to CA certificates filetimeout(int): Request timeout in seconds (default: 30)max_retries(int): Maximum retry attempts (default: 3)**kwargs: Additional Elasticsearch client parameters
Connection Methods
# Local (no auth)
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match_all": {}})
# Basic authentication
async with elasticsearch(
hosts="https://elasticsearch.example.com:9200",
auth_method="basic",
username="elastic",
password="your_password"
) as es:
results = await es.search("logs", {"match_all": {}})
# API key authentication
async with elasticsearch(
hosts="https://elasticsearch.example.com:9200",
auth_method="api_key",
api_key_id="your_api_key_id",
api_key="your_api_key"
) as es:
results = await es.search("logs", {"match_all": {}})
# Multiple hosts (high availability)
async with elasticsearch(
hosts=["node1:9200", "node2:9200", "node3:9200"]
) as es:
results = await es.search("logs", {"match_all": {}})
Searching Documents
async with elasticsearch(hosts="localhost:9200") as es:
# Basic match query
results = await es.search("logs", {"match": {"level": "ERROR"}})
print(f"Found {results['hits']['total']['value']} documents")
# Boolean query (multiple conditions)
results = await es.search("logs", {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-1h"}}}
],
"must_not": [{"term": {"service": "test"}}]
}
})
# Multi-match across fields
results = await es.search("logs", {
"multi_match": {
"query": "database error",
"fields": ["message", "description"]
}
})
# Focus system (filter returned fields)
results = await es.search(
"logs",
{"match": {"level": "ERROR"}},
focus=["timestamp", "level", "message"]
)
# Pagination
results = await es.search(
"logs",
{"match_all": {}},
size=20,
from_=20 # Second page
)
# Sorting
results = await es.search(
"logs",
{"match_all": {}},
sort=[{"timestamp": {"order": "desc"}}]
)
Indexing Documents
async with elasticsearch(hosts="localhost:9200") as es:
# Index single document
result = await es.index_document("logs", {
"timestamp": "2024-01-15T10:30:00",
"level": "INFO",
"message": "Application started"
})
# Index with specific ID
result = await es.index_document(
"logs",
{"level": "ERROR", "message": "Failed"},
doc_id="custom_id_123"
)
# Bulk indexing
documents = [
{"timestamp": "2024-01-15T10:00:00", "level": "INFO", "message": "Log 1"},
{"timestamp": "2024-01-15T10:01:00", "level": "ERROR", "message": "Log 2"}
]
result = await es.bulk_index("logs", documents, batch_size=1000)
Aggregations
async with elasticsearch(hosts="localhost:9200") as es:
# Count by field
results = await es.search(
"logs",
{"match_all": {}},
size=0, # Only return aggregations
aggregations={
"levels": {"terms": {"field": "level.keyword"}}
}
)
# Statistics
stats = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
"response_time_stats": {"stats": {"field": "response_time_ms"}}
})
# Date histogram
over_time = await es.search("logs", {"match_all": {}}, size=0, aggregations={
"over_time": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "1h"
}
}
})
# Nested aggregations
by_service = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
"services": {
"terms": {"field": "service.keyword"},
"aggs": {"avg_duration": {"avg": {"field": "duration_ms"}}}
}
})
Index Management
async with elasticsearch(hosts="localhost:9200") as es:
# Create index with mapping
result = await es.create_index(
"logs",
mapping={
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"},
"message": {"type": "text"}
}
}
)
# Get mapping
mapping = await es.get_mapping("logs")
# Delete document
result = await es.delete_document("logs", "document_id_123")
Agent-Specific Features
async with elasticsearch(hosts="localhost:9200") as es:
# Search agent logs
logs = await es.search_agent_logs(
"agent_logs",
agent_id="data_processor",
status="error",
time_range={"gte": "now-24h", "lte": "now"}
)
# Index agent results
result = await es.index_agent_results(
"agent_outputs",
agent_results={
"status": "success",
"records_processed": 1000,
"duration_ms": 5432
},
agent_id="data_processor"
)
# Analyze performance
analytics = await es.analyze_performance(
"agent_metrics",
metric_field="duration_ms",
group_by="agent_id.keyword",
time_range={"gte": "now-7d", "lte": "now"}
)
Using with Agents
Direct Search Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import elasticsearch
async with elasticsearch(hosts="localhost:9200") as es:
# Search for error logs
error_logs = await es.search(
"application_logs",
{"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-1h"}}}
]
}},
size=100
)
# Aggregate errors by service
service_agg = await es.search(
"application_logs",
{"match": {"level": "ERROR"}},
size=0,
aggregations={"services": {"terms": {"field": "service.keyword"}}}
)
print(f"Total errors: {error_logs['hits']['total']['value']}")
print(f"By service: {service_agg['aggregations']['services']['buckets']}")
Tool-Based Integration (Recommended)
Elasticsearch plugin exposes search and indexing operations as tools that agents can use autonomously:
from daita import SubstrateAgent
from daita.plugins import elasticsearch
import os
# Create Elasticsearch plugin
es = elasticsearch(
hosts=["localhost:9200"],
username=os.getenv("ES_USER"),
password=os.getenv("ES_PASSWORD")
)
# Pass plugin to agent - agent can now use search tools autonomously
agent = SubstrateAgent(
name="Search Assistant",
prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
llm_provider="openai",
model="gpt-4",
tools=[es]
)
await agent.start()
# Agent autonomously uses Elasticsearch tools to answer questions
result = await agent.run("Find ERROR level logs from the past hour")
# The agent will autonomously:
# 1. Use search_elasticsearch tool to query logs
# 2. Analyze the results
# 3. Present findings in natural language
await agent.stop()
Available Tools
The Elasticsearch plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| search_elasticsearch | Search documents using query DSL | index (required), query (object), size (int) |
| index_document | Create or update a document | index (required), document (object), doc_id (string) |
| get_index_mapping | Get index mapping/schema | index (required) |
Tool Categories: search
Tool Source: plugin
Query Format: Elasticsearch query DSL (e.g., {"match": {"field": "value"}})
Tool Usage Example
from daita import SubstrateAgent
from daita.plugins import elasticsearch
# Setup Elasticsearch with tool integration
es = elasticsearch(hosts=["localhost:9200"])
agent = SubstrateAgent(
name="Log Assistant",
prompt="You are a log analysis specialist. Help users query and analyze application logs.",
llm_provider="openai",
model="gpt-4",
tools=[es]
)
await agent.start()
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze application logs:
1. Find all ERROR logs from the last 24 hours
2. Group errors by service name
3. Show the most frequent error messages
""")
# Agent orchestrates Elasticsearch tool calls to answer the query
print(result)
await agent.stop()
Cluster Information
async with elasticsearch(hosts="localhost:9200") as es:
health = await es.get_cluster_health()
print(f"Status: {health['status']}, Nodes: {health['number_of_nodes']}")
Error Handling
try:
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match_all": {}})
except RuntimeError as e:
if "elasticsearch not installed" in str(e):
print("Install elasticsearch: pip install elasticsearch")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")
Best Practices
Connection Management:
- Always use context managers (
async with) for automatic cleanup - Configure multiple hosts for high availability
- Set appropriate timeouts and retries
Query Performance:
- Use keyword fields (
.keyword) for exact matches, not text fields - Limit result size to avoid memory issues
- Use focus system to fetch only needed fields
- Use aggregations instead of fetching all documents for counting
Indexing Performance:
- Use bulk indexing for multiple documents (batch size 1000-5000)
- Avoid immediate refresh in production (impacts performance)
- Use appropriate batch sizes based on document size
Security:
- Store credentials in environment variables, never hardcode
- Use API keys instead of basic auth in production
- Enable SSL/TLS for remote connections
- Grant minimal permissions to application users
Troubleshooting
| Issue | Solution |
|---|---|
elasticsearch not installed | pip install elasticsearch |
| Connection failed | Check Elasticsearch is running, verify host/port |
| Authentication failed | Verify username/password or API key, check permissions |
| Index not found | Create index before searching, verify index name |
| Cluster status red | Check all nodes running, verify shard allocation, check disk space |
Next Steps
- MongoDB Plugin - Document database operations
- S3 Plugin - Object storage for large files
- REST Plugin - API integrations
- Plugin Overview - All available plugins