Elasticsearch Plugin

Async full-text search, analytics, and log aggregation. Built on the official `elasticsearch` Python client.

#Installation

bash
pip install elasticsearch

#Quick Start

python
from daita import Agent
from daita.plugins import elasticsearch
 
# Create plugin
es = elasticsearch(hosts="localhost:9200")
 
# Agent uses Elasticsearch tools autonomously
agent = Agent(
    name="Search Agent",
    prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
    tools=[es]
)
 
await agent.start()
result = await agent.run("Search for all ERROR level logs from the last hour")

#Direct Usage

The plugin can be used directly without agents for programmatic access. For comprehensive Elasticsearch documentation, see the official Elasticsearch docs. The main value of this plugin is agent integration - enabling LLMs to autonomously search and analyze indexed data.

#Connection Parameters

python
elasticsearch(
    hosts: Union[str, List[str]] = "localhost:9200",
    auth_method: str = "basic",
    username: Optional[str] = None,
    password: Optional[str] = None,
    api_key_id: Optional[str] = None,
    api_key: Optional[str] = None,
    ssl_fingerprint: Optional[str] = None,
    verify_certs: bool = True,
    ca_certs: Optional[str] = None,
    timeout: int = 30,
    max_retries: int = 3,
    **kwargs
)

#Parameters

  • hosts (str | List[str]): Elasticsearch host(s) - single host or list
  • auth_method (str): Authentication method ("basic", "api_key", "ssl")
  • username (str): Username for basic authentication
  • password (str): Password for basic authentication
  • api_key_id (str): API key ID for API key authentication
  • api_key (str): API key for API key authentication
  • ssl_fingerprint (str): SSL certificate fingerprint
  • verify_certs (bool): Verify SSL certificates (default: True)
  • ca_certs (str): Path to CA certificates file
  • timeout (int): Request timeout in seconds (default: 30)
  • max_retries (int): Maximum retry attempts (default: 3)
  • ****kwargs**: Additional Elasticsearch client parameters

#Connection Methods

python
# Local (no auth)
async with elasticsearch(hosts="localhost:9200") as es:
    results = await es.search("logs", {"match_all": {}})
 
# Basic authentication
async with elasticsearch(
    hosts="https://elasticsearch.example.com:9200",
    auth_method="basic",
    username="elastic",
    password="your_password"
) as es:
    results = await es.search("logs", {"match_all": {}})
 
# API key authentication
async with elasticsearch(
    hosts="https://elasticsearch.example.com:9200",
    auth_method="api_key",
    api_key_id="your_api_key_id",
    api_key="your_api_key"
) as es:
    results = await es.search("logs", {"match_all": {}})
 
# Multiple hosts (high availability)
async with elasticsearch(
    hosts=["node1:9200", "node2:9200", "node3:9200"]
) as es:
    results = await es.search("logs", {"match_all": {}})

#Searching Documents

python
async with elasticsearch(hosts="localhost:9200") as es:
    # Basic match query
    results = await es.search("logs", {"match": {"level": "ERROR"}})
    print(f"Found {results['hits']['total']['value']} documents")
 
    # Boolean query (multiple conditions)
    results = await es.search("logs", {
        "bool": {
            "must": [
                {"match": {"level": "ERROR"}},
                {"range": {"timestamp": {"gte": "now-1h"}}}
            ],
            "must_not": [{"term": {"service": "test"}}]
        }
    })
 
    # Multi-match across fields
    results = await es.search("logs", {
        "multi_match": {
            "query": "database error",
            "fields": ["message", "description"]
        }
    })
 
    # Focus system (filter returned fields)
    results = await es.search(
        "logs",
        {"match": {"level": "ERROR"}},
        focus=["timestamp", "level", "message"]
    )
 
    # Pagination
    results = await es.search(
        "logs",
        {"match_all": {}},
        size=20,
        from_=20  # Second page
    )
 
    # Sorting
    results = await es.search(
        "logs",
        {"match_all": {}},
        sort=[{"timestamp": {"order": "desc"}}]
    )

#Indexing Documents

python
async with elasticsearch(hosts="localhost:9200") as es:
    # Index single document
    result = await es.index_document("logs", {
        "timestamp": "2024-01-15T10:30:00",
        "level": "INFO",
        "message": "Application started"
    })
 
    # Index with specific ID
    result = await es.index_document(
        "logs",
        {"level": "ERROR", "message": "Failed"},
        doc_id="custom_id_123"
    )
 
    # Bulk indexing
    documents = [
        {"timestamp": "2024-01-15T10:00:00", "level": "INFO", "message": "Log 1"},
        {"timestamp": "2024-01-15T10:01:00", "level": "ERROR", "message": "Log 2"}
    ]
    result = await es.bulk_index("logs", documents, batch_size=1000)

#Aggregations

python
async with elasticsearch(hosts="localhost:9200") as es:
    # Count by field
    results = await es.search(
        "logs",
        {"match_all": {}},
        size=0,  # Only return aggregations
        aggregations={
            "levels": {"terms": {"field": "level.keyword"}}
        }
    )
 
    # Statistics
    stats = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
        "response_time_stats": {"stats": {"field": "response_time_ms"}}
    })
 
    # Date histogram
    over_time = await es.search("logs", {"match_all": {}}, size=0, aggregations={
        "over_time": {
            "date_histogram": {
                "field": "timestamp",
                "calendar_interval": "1h"
            }
        }
    })
 
    # Nested aggregations
    by_service = await es.search("metrics", {"match_all": {}}, size=0, aggregations={
        "services": {
            "terms": {"field": "service.keyword"},
            "aggs": {"avg_duration": {"avg": {"field": "duration_ms"}}}
        }
    })

#Index Management

python
async with elasticsearch(hosts="localhost:9200") as es:
    # Create index with mapping
    result = await es.create_index(
        "logs",
        mapping={
            "properties": {
                "timestamp": {"type": "date"},
                "level": {"type": "keyword"},
                "message": {"type": "text"}
            }
        }
    )
 
    # Get mapping
    mapping = await es.get_mapping("logs")
 
    # Delete document
    result = await es.delete_document("logs", "document_id_123")

#Agent-Specific Features

python
async with elasticsearch(hosts="localhost:9200") as es:
    # Search agent logs
    logs = await es.search_agent_logs(
        "agent_logs",
        agent_id="data_processor",
        status="error",
        time_range={"gte": "now-24h", "lte": "now"}
    )
 
    # Index agent results
    result = await es.index_agent_results(
        "agent_outputs",
        agent_results={
            "status": "success",
            "records_processed": 1000,
            "duration_ms": 5432
        },
        agent_id="data_processor"
    )
 
    # Analyze performance
    analytics = await es.analyze_performance(
        "agent_metrics",
        metric_field="duration_ms",
        group_by="agent_id.keyword",
        time_range={"gte": "now-7d", "lte": "now"}
    )

#Using with Agents

#Direct Search Operations (Scripts)

For scripts that don't need agent capabilities:

python
from daita.plugins import elasticsearch
 
async with elasticsearch(hosts="localhost:9200") as es:
    # Search for error logs
    error_logs = await es.search(
        "application_logs",
        {"bool": {
            "must": [
                {"match": {"level": "ERROR"}},
                {"range": {"timestamp": {"gte": "now-1h"}}}
            ]
        }},
        size=100
    )
 
    # Aggregate errors by service
    service_agg = await es.search(
        "application_logs",
        {"match": {"level": "ERROR"}},
        size=0,
        aggregations={"services": {"terms": {"field": "service.keyword"}}}
    )
 
    print(f"Total errors: {error_logs['hits']['total']['value']}")
    print(f"By service: {service_agg['aggregations']['services']['buckets']}")

Elasticsearch plugin exposes search and indexing operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import elasticsearch
import os
 
# Create Elasticsearch plugin
es = elasticsearch(
    hosts=["localhost:9200"],
    username=os.getenv("ES_USER"),
    password=os.getenv("ES_PASSWORD")
)
 
# Pass plugin to agent - agent can now use search tools autonomously
agent = Agent(
    name="Search Assistant",
    prompt="You are a search specialist. Help users query and analyze data in Elasticsearch.",
    llm_provider="openai",
    model="gpt-4",
    tools=[es]
)
 
await agent.start()
 
# Agent autonomously uses Elasticsearch tools to answer questions
result = await agent.run("Find ERROR level logs from the past hour")
 
# The agent will autonomously:
# 1. Use search_elasticsearch tool to query logs
# 2. Analyze the results
# 3. Present findings in natural language
 
await agent.stop()

#Available Tools

The Elasticsearch plugin exposes these tools to LLM agents:

ToolDescriptionParameters
search_elasticsearchSearch documents using query DSLindex (required), query (object), size (int)
index_documentCreate or update a documentindex (required), document (object), doc_id (string)
get_index_mappingGet index mapping/schemaindex (required)

Tool Categories: search Tool Source: plugin Query Format: Elasticsearch query DSL (e.g., {"match": {"field": "value"}})

#Tool Usage Example

python
from daita import Agent
from daita.plugins import elasticsearch
 
# Setup Elasticsearch with tool integration
es = elasticsearch(hosts=["localhost:9200"])
 
agent = Agent(
    name="Log Assistant",
    prompt="You are a log analysis specialist. Help users query and analyze application logs.",
    llm_provider="openai",
    model="gpt-4",
    tools=[es]
)
 
await agent.start()
 
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze application logs:
1. Find all ERROR logs from the last 24 hours
2. Group errors by service name
3. Show the most frequent error messages
""")
 
# Agent orchestrates Elasticsearch tool calls to answer the query
print(result)
await agent.stop()

#Cluster Information

python
async with elasticsearch(hosts="localhost:9200") as es:
    health = await es.get_cluster_health()
    print(f"Status: {health['status']}, Nodes: {health['number_of_nodes']}")

#Error Handling

python
try:
    async with elasticsearch(hosts="localhost:9200") as es:
        results = await es.search("logs", {"match_all": {}})
except RuntimeError as e:
    if "elasticsearch not installed" in str(e):
        print("Install elasticsearch: pip install elasticsearch")
    elif "connection" in str(e).lower():
        print(f"Connection failed: {e}")

#Best Practices

Connection Management:

  • Always use context managers (async with) for automatic cleanup
  • Configure multiple hosts for high availability
  • Set appropriate timeouts and retries

Query Performance:

  • Use keyword fields (.keyword) for exact matches, not text fields
  • Limit result size to avoid memory issues
  • Use focus system to fetch only needed fields
  • Use aggregations instead of fetching all documents for counting

Indexing Performance:

  • Use bulk indexing for multiple documents (batch size 1000-5000)
  • Avoid immediate refresh in production (impacts performance)
  • Use appropriate batch sizes based on document size

Security:

  • Store credentials in environment variables, never hardcode
  • Use API keys instead of basic auth in production
  • Enable SSL/TLS for remote connections
  • Grant minimal permissions to application users

#Troubleshooting

IssueSolution
elasticsearch not installedpip install elasticsearch
Connection failedCheck Elasticsearch is running, verify host/port
Authentication failedVerify username/password or API key, check permissions
Index not foundCreate index before searching, verify index name
Cluster status redCheck all nodes running, verify shard allocation, check disk space

#Next Steps