Skip to main content

Elasticsearch Plugin

Async full-text search, analytics, and log aggregation. Built on the official elasticsearch Python client.

Installation

pip install elasticsearch

Quick Start

Direct Usage (Scripts)

from daita.plugins import elasticsearch

# Direct usage in scripts
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match": {"level": "ERROR"}})
print(results)
from daita import SubstrateAgent
from daita.plugins import elasticsearch

# Create plugin
es = elasticsearch(hosts="localhost:9200")

# Agent uses Elasticsearch tools autonomously
agent = SubstrateAgent(
name="Search Agent",
prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
tools=[es]
)

await agent.start()
result = await agent.run("Search for all ERROR level logs from the last hour")

Connection Parameters

elasticsearch(
hosts: Union[str, List[str]] = "localhost:9200",
auth_method: str = "basic",
username: Optional[str] = None,
password: Optional[str] = None,
api_key_id: Optional[str] = None,
api_key: Optional[str] = None,
ssl_fingerprint: Optional[str] = None,
verify_certs: bool = True,
ca_certs: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3,
**kwargs
)

Parameters

  • hosts (str | List[str]): Elasticsearch host(s) - single host or list
  • auth_method (str): Authentication method ("basic", "api_key", "ssl")
  • username (str): Username for basic authentication
  • password (str): Password for basic authentication
  • api_key_id (str): API key ID for API key authentication
  • api_key (str): API key for API key authentication
  • ssl_fingerprint (str): SSL certificate fingerprint
  • verify_certs (bool): Verify SSL certificates (default: True)
  • ca_certs (str): Path to CA certificates file
  • timeout (int): Request timeout in seconds (default: 30)
  • max_retries (int): Maximum retry attempts (default: 3)
  • **kwargs: Additional Elasticsearch client parameters

Connection Methods

# Local (no auth)
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match_all": {}})

# Basic authentication
async with elasticsearch(
hosts="https://elasticsearch.example.com:9200",
auth_method="basic",
username="elastic",
password="your_password"
) as es:
results = await es.search("logs", {"match_all": {}})

# API key authentication
async with elasticsearch(
hosts="https://elasticsearch.example.com:9200",
auth_method="api_key",
api_key_id="your_api_key_id",
api_key="your_api_key"
) as es:
results = await es.search("logs", {"match_all": {}})

# Multiple hosts (high availability)
async with elasticsearch(
hosts=["node1:9200", "node2:9200", "node3:9200"]
) as es:
results = await es.search("logs", {"match_all": {}})

Searching Documents

async with elasticsearch(hosts="localhost:9200") as es:
# Basic match query
results = await es.search("logs", {"match": {"level": "ERROR"}})
print(f"Found {results['hits']['total']['value']} documents")

# Boolean query (multiple conditions)
results = await es.search("logs", {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-1h"}}}
],
"must_not": [{"term": {"service": "test"}}]
}
})

# Multi-match across fields
results = await es.search("logs", {
"multi_match": {
"query": "database error",
"fields": ["message", "description"]
}
})

# Focus system (filter returned fields)
results = await es.search(
"logs",
{"match": {"level": "ERROR"}},
focus=["timestamp", "level", "message"]
)

# Pagination
results = await es.search(
"logs",
{"match_all": {}},
size=20,
from_=20 # Second page
)

# Sorting
results = await es.search(
"logs",
{"match_all": {}},
sort=[{"timestamp": {"order": "desc"}}]
)

Indexing Documents

async with elasticsearch(hosts="localhost:9200") as es:
# Index single document
result = await es.index_document("logs", {
"timestamp": "2024-01-15T10:30:00",
"level": "INFO",
"message": "Application started"
})

# Index with specific ID
result = await es.index_document(
"logs",
{"level": "ERROR", "message": "Failed"},
doc_id="custom_id_123"
)

# Bulk indexing
documents = [
{"timestamp": "2024-01-15T10:00:00", "level": "INFO", "message": "Log 1"},
{"timestamp": "2024-01-15T10:01:00", "level": "ERROR", "message": "Log 2"}
]
result = await es.bulk_index("logs", documents, batch_size=1000)

Aggregations

async with elasticsearch(hosts="localhost:9200") as es:
# Count by field
results = await es.search(
"logs",
{"match_all": {}},
size=0, # Only return aggregations
aggregations={
"levels": {"terms": {"field": "level.keyword"}}
}
)

# Statistics
stats = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
"response_time_stats": {"stats": {"field": "response_time_ms"}}
})

# Date histogram
over_time = await es.search("logs", {"match_all": {}}, size=0, aggregations={
"over_time": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "1h"
}
}
})

# Nested aggregations
by_service = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
"services": {
"terms": {"field": "service.keyword"},
"aggs": {"avg_duration": {"avg": {"field": "duration_ms"}}}
}
})

Index Management

async with elasticsearch(hosts="localhost:9200") as es:
# Create index with mapping
result = await es.create_index(
"logs",
mapping={
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"},
"message": {"type": "text"}
}
}
)

# Get mapping
mapping = await es.get_mapping("logs")

# Delete document
result = await es.delete_document("logs", "document_id_123")

Agent-Specific Features

async with elasticsearch(hosts="localhost:9200") as es:
# Search agent logs
logs = await es.search_agent_logs(
"agent_logs",
agent_id="data_processor",
status="error",
time_range={"gte": "now-24h", "lte": "now"}
)

# Index agent results
result = await es.index_agent_results(
"agent_outputs",
agent_results={
"status": "success",
"records_processed": 1000,
"duration_ms": 5432
},
agent_id="data_processor"
)

# Analyze performance
analytics = await es.analyze_performance(
"agent_metrics",
metric_field="duration_ms",
group_by="agent_id.keyword",
time_range={"gte": "now-7d", "lte": "now"}
)

Using with Agents

Direct Search Operations (Scripts)

For scripts that don't need agent capabilities:

from daita.plugins import elasticsearch

async with elasticsearch(hosts="localhost:9200") as es:
# Search for error logs
error_logs = await es.search(
"application_logs",
{"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "now-1h"}}}
]
}},
size=100
)

# Aggregate errors by service
service_agg = await es.search(
"application_logs",
{"match": {"level": "ERROR"}},
size=0,
aggregations={"services": {"terms": {"field": "service.keyword"}}}
)

print(f"Total errors: {error_logs['hits']['total']['value']}")
print(f"By service: {service_agg['aggregations']['services']['buckets']}")

Elasticsearch plugin exposes search and indexing operations as tools that agents can use autonomously:

from daita import SubstrateAgent
from daita.plugins import elasticsearch
import os

# Create Elasticsearch plugin
es = elasticsearch(
hosts=["localhost:9200"],
username=os.getenv("ES_USER"),
password=os.getenv("ES_PASSWORD")
)

# Pass plugin to agent - agent can now use search tools autonomously
agent = SubstrateAgent(
name="Search Assistant",
prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
llm_provider="openai",
model="gpt-4",
tools=[es]
)

await agent.start()

# Agent autonomously uses Elasticsearch tools to answer questions
result = await agent.run("Find ERROR level logs from the past hour")

# The agent will autonomously:
# 1. Use search_elasticsearch tool to query logs
# 2. Analyze the results
# 3. Present findings in natural language

await agent.stop()

Available Tools

The Elasticsearch plugin exposes these tools to LLM agents:

ToolDescriptionParameters
search_elasticsearchSearch documents using query DSLindex (required), query (object), size (int)
index_documentCreate or update a documentindex (required), document (object), doc_id (string)
get_index_mappingGet index mapping/schemaindex (required)

Tool Categories: search Tool Source: plugin Query Format: Elasticsearch query DSL (e.g., {"match": {"field": "value"}})

Tool Usage Example

from daita import SubstrateAgent
from daita.plugins import elasticsearch

# Setup Elasticsearch with tool integration
es = elasticsearch(hosts=["localhost:9200"])

agent = SubstrateAgent(
name="Log Assistant",
prompt="You are a log analysis specialist. Help users query and analyze application logs.",
llm_provider="openai",
model="gpt-4",
tools=[es]
)

await agent.start()

# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze application logs:
1. Find all ERROR logs from the last 24 hours
2. Group errors by service name
3. Show the most frequent error messages
""")

# Agent orchestrates Elasticsearch tool calls to answer the query
print(result)
await agent.stop()

Cluster Information

async with elasticsearch(hosts="localhost:9200") as es:
health = await es.get_cluster_health()
print(f"Status: {health['status']}, Nodes: {health['number_of_nodes']}")

Error Handling

try:
async with elasticsearch(hosts="localhost:9200") as es:
results = await es.search("logs", {"match_all": {}})
except RuntimeError as e:
if "elasticsearch not installed" in str(e):
print("Install elasticsearch: pip install elasticsearch")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")

Best Practices

Connection Management:

  • Always use context managers (async with) for automatic cleanup
  • Configure multiple hosts for high availability
  • Set appropriate timeouts and retries

Query Performance:

  • Use keyword fields (.keyword) for exact matches, not text fields
  • Limit result size to avoid memory issues
  • Use focus system to fetch only needed fields
  • Use aggregations instead of fetching all documents for counting

Indexing Performance:

  • Use bulk indexing for multiple documents (batch size 1000-5000)
  • Avoid immediate refresh in production (impacts performance)
  • Use appropriate batch sizes based on document size

Security:

  • Store credentials in environment variables, never hardcode
  • Use API keys instead of basic auth in production
  • Enable SSL/TLS for remote connections
  • Grant minimal permissions to application users

Troubleshooting

IssueSolution
elasticsearch not installedpip install elasticsearch
Connection failedCheck Elasticsearch is running, verify host/port
Authentication failedVerify username/password or API key, check permissions
Index not foundCreate index before searching, verify index name
Cluster status redCheck all nodes running, verify shard allocation, check disk space

Next Steps