Elasticsearch Plugin
Async full-text search, analytics, and log aggregation. Built on the official `elasticsearch` Python client.
#Installation
pip install elasticsearch#Quick Start
from daita import Agent
from daita.plugins import elasticsearch
# Create plugin
es = elasticsearch(hosts="localhost:9200")
# Agent uses Elasticsearch tools autonomously
agent = Agent(
name="Search Agent",
prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
tools=[es]
)
await agent.start()
result = await agent.run("Search for all ERROR level logs from the last hour")#Direct Usage
The plugin can be used directly without agents for programmatic access. For comprehensive Elasticsearch documentation, see the official Elasticsearch docs. The main value of this plugin is agent integration - enabling LLMs to autonomously search and analyze indexed data.
#Connection Parameters
elasticsearch(
hosts: Union[str, List[str]] = "localhost:9200",
auth_method: str = "basic",
username: Optional[str] = None,
password: Optional[str] = None,
api_key_id: Optional[str] = None,
api_key: Optional[str] = None,
ssl_fingerprint: Optional[str] = None,
verify_certs: bool = True,
ca_certs: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
**kwargs
)#Parameters
hosts(str | List[str]): Elasticsearch host(s) - single host or listauth_method(str): Authentication method ("basic", "api_key", "ssl")username(str): Username for basic authenticationpassword(str): Password for basic authenticationapi_key_id(str): API key ID for API key authenticationapi_key(str): API key for API key authenticationssl_fingerprint(str): SSL certificate fingerprintverify_certs(bool): Verify SSL certificates (default: True)ca_certs(str): Path to CA certificates filetimeout(int): Request timeout in seconds (default: 30)max_retries(int): Maximum retry attempts (default: 3)- **
**kwargs**: Additional Elasticsearch client parameters
#Connection Methods
# Local (no auth)
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match_all": {}})
# Basic authentication
async with elasticsearch(
hosts="https://elasticsearch.example.com:9200",
auth_method="basic",
username="elastic",
password="your_password"
) as es:
results = await es.search("logs", {"match_all": {}})
# API key authentication
async with elasticsearch(
hosts="https://elasticsearch.example.com:9200",
auth_method="api_key",
api_key_id="your_api_key_id",
api_key="your_api_key"
) as es:
results = await es.search("logs", {"match_all": {}})
# Multiple hosts (high availability)
async with elasticsearch(
hosts=["node1:9200", "node2:9200", "node3:9200"]
) as es:
results = await es.search("logs", {"match_all": {}})#Searching Documents
async with elasticsearch(hosts="localhost:9200") as es:
# Basic match query
results = await es.search("logs", {"match": {"level": "ERROR"}})
print(f"Found {results['hits']['total']['value']} documents")
# Boolean query (multiple conditions)
results = await es.search("logs", {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-1h"}}}
],
"must_not": [{"term": {"service": "test"}}]
}
})
# Multi-match across fields
results = await es.search("logs", {
"multi_match": {
"query": "database error",
"fields": ["message", "description"]
}
})
# Focus system (filter returned fields)
results = await es.search(
"logs",
{"match": {"level": "ERROR"}},
focus=["timestamp", "level", "message"]
)
# Pagination
results = await es.search(
"logs",
{"match_all": {}},
size=20,
from_=20 # Second page
)
# Sorting
results = await es.search(
"logs",
{"match_all": {}},
sort=[{"timestamp": {"order": "desc"}}]
)#Indexing Documents
async with elasticsearch(hosts="localhost:9200") as es:
# Index single document
result = await es.index_document("logs", {
"timestamp": "2024-01-15T10:30:00",
"level": "INFO",
"message": "Application started"
})
# Index with specific ID
result = await es.index_document(
"logs",
{"level": "ERROR", "message": "Failed"},
doc_id="custom_id_123"
)
# Bulk indexing
documents = [
{"timestamp": "2024-01-15T10:00:00", "level": "INFO", "message": "Log 1"},
{"timestamp": "2024-01-15T10:01:00", "level": "ERROR", "message": "Log 2"}
]
result = await es.bulk_index("logs", documents, batch_size=1000)#Aggregations
async with elasticsearch(hosts="localhost:9200") as es:
# Count by field
results = await es.search(
"logs",
{"match_all": {}},
size=0, # Only return aggregations
aggregations={
"levels": {"terms": {"field": "level.keyword"}}
}
)
# Statistics
stats = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
"response_time_stats": {"stats": {"field": "response_time_ms"}}
})
# Date histogram
over_time = await es.search("logs", {"match_all": {}}, size=0, aggregations={
"over_time": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "1h"
}
}
})
# Nested aggregations
by_service = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
"services": {
"terms": {"field": "service.keyword"},
"aggs": {"avg_duration": {"avg": {"field": "duration_ms"}}}
}
})#Index Management
async with elasticsearch(hosts="localhost:9200") as es:
# Create index with mapping
result = await es.create_index(
"logs",
mapping={
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"},
"message": {"type": "text"}
}
}
)
# Get mapping
mapping = await es.get_mapping("logs")
# Delete document
result = await es.delete_document("logs", "document_id_123")#Agent-Specific Features
async with elasticsearch(hosts="localhost:9200") as es:
# Search agent logs
logs = await es.search_agent_logs(
"agent_logs",
agent_id="data_processor",
status="error",
time_range={"gte": "now-24h", "lte": "now"}
)
# Index agent results
result = await es.index_agent_results(
"agent_outputs",
agent_results={
"status": "success",
"records_processed": 1000,
"duration_ms": 5432
},
agent_id="data_processor"
)
# Analyze performance
analytics = await es.analyze_performance(
"agent_metrics",
metric_field="duration_ms",
group_by="agent_id.keyword",
time_range={"gte": "now-7d", "lte": "now"}
)#Using with Agents
#Direct Search Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import elasticsearch
async with elasticsearch(hosts="localhost:9200") as es:
# Search for error logs
error_logs = await es.search(
"application_logs",
{"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-1h"}}}
]
}},
size=100
)
# Aggregate errors by service
service_agg = await es.search(
"application_logs",
{"match": {"level": "ERROR"}},
size=0,
aggregations={"services": {"terms": {"field": "service.keyword"}}}
)
print(f"Total errors: {error_logs['hits']['total']['value']}")
print(f"By service: {service_agg['aggregations']['services']['buckets']}")#Tool-Based Integration (Recommended)
Elasticsearch plugin exposes search and indexing operations as tools that agents can use autonomously:
from daita import Agent
from daita.plugins import elasticsearch
import os
# Create Elasticsearch plugin
es = elasticsearch(
hosts=["localhost:9200"],
username=os.getenv("ES_USER"),
password=os.getenv("ES_PASSWORD")
)
# Pass plugin to agent - agent can now use search tools autonomously
agent = Agent(
name="Search Assistant",
prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
llm_provider="openai",
model="gpt-4",
tools=[es]
)
await agent.start()
# Agent autonomously uses Elasticsearch tools to answer questions
result = await agent.run("Find ERROR level logs from the past hour")
# The agent will autonomously:
# 1. Use search_elasticsearch tool to query logs
# 2. Analyze the results
# 3. Present findings in natural language
await agent.stop()#Available Tools
The Elasticsearch plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| search_elasticsearch | Search documents using query DSL | index (required), query (object), size (int) |
| index_document | Create or update a document | index (required), document (object), doc_id (string) |
| get_index_mapping | Get index mapping/schema | index (required) |
Tool Categories: search
Tool Source: plugin
Query Format: Elasticsearch query DSL (e.g., {"match": {"field": "value"}})
#Tool Usage Example
from daita import Agent
from daita.plugins import elasticsearch
# Setup Elasticsearch with tool integration
es = elasticsearch(hosts=["localhost:9200"])
agent = Agent(
name="Log Assistant",
prompt="You are a log analysis specialist. Help users query and analyze application logs.",
llm_provider="openai",
model="gpt-4",
tools=[es]
)
await agent.start()
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze application logs:
1. Find all ERROR logs from the last 24 hours
2. Group errors by service name
3. Show the most frequent error messages
""")
# Agent orchestrates Elasticsearch tool calls to answer the query
print(result)
await agent.stop()#Cluster Information
async with elasticsearch(hosts="localhost:9200") as es:
health = await es.get_cluster_health()
print(f"Status: {health['status']}, Nodes: {health['number_of_nodes']}")#Error Handling
try:
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match_all": {}})
except RuntimeError as e:
if "elasticsearch not installed" in str(e):
print("Install elasticsearch: pip install elasticsearch")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")#Best Practices
Connection Management:
- Always use context managers (
async with) for automatic cleanup - Configure multiple hosts for high availability
- Set appropriate timeouts and retries
Query Performance:
- Use keyword fields (
.keyword) for exact matches, not text fields - Limit result size to avoid memory issues
- Use focus system to fetch only needed fields
- Use aggregations instead of fetching all documents for counting
Indexing Performance:
- Use bulk indexing for multiple documents (batch size 1000-5000)
- Avoid immediate refresh in production (impacts performance)
- Use appropriate batch sizes based on document size
Security:
- Store credentials in environment variables, never hardcode
- Use API keys instead of basic auth in production
- Enable SSL/TLS for remote connections
- Grant minimal permissions to application users
#Troubleshooting
| Issue | Solution |
|---|---|
elasticsearch not installed | pip install elasticsearch |
| Connection failed | Check Elasticsearch is running, verify host/port |
| Authentication failed | Verify username/password or API key, check permissions |
| Index not found | Create index before searching, verify index name |
| Cluster status red | Check all nodes running, verify shard allocation, check disk space |
#Next Steps
- MongoDB Plugin - Document database operations
- S3 Plugin - Object storage for large files
- REST Plugin - API integrations
- Plugin Overview - All available plugins