MongoDB Plugin

Async document database operations with flexible, schema-less data storage. Built on `motor`.

#Installation

bash
pip install motor

#Quick Start

python
from daita import Agent
from daita.plugins import mongodb
 
# Create plugin
db = mongodb(
    host="localhost",
    database="mydb"
)
 
# Agent uses database tools autonomously
agent = Agent(
    name="Data Agent",
    prompt="You are a database analyst. Help users query and analyze MongoDB data.",
    tools=[db]
)
 
await agent.start()
result = await agent.run("Show me all active users")

#Direct Usage

The plugin can be used directly without agents for programmatic access. For comprehensive MongoDB documentation, see the official MongoDB docs. The main value of this plugin is agent integration - enabling LLMs to autonomously query and analyze document data.

#Connection Parameters

python
mongodb(
    host: str = "localhost",
    port: int = 27017,
    database: str = "",
    username: Optional[str] = None,
    password: Optional[str] = None,
    connection_string: Optional[str] = None,
    max_pool_size: int = 10,
    min_pool_size: int = 1,
    server_timeout: int = 30000,
    **kwargs
)

#Parameters

  • host (str): MongoDB host address (default: "localhost")
  • port (int): MongoDB port (default: 27017)
  • database (str): Database name to connect to
  • username (str): Username for authentication (optional)
  • password (str): Password for authentication (optional)
  • connection_string (str): Full MongoDB connection string (overrides individual params)
  • max_pool_size (int): Maximum connections in pool (default: 10)
  • min_pool_size (int): Minimum connections in pool (default: 1)
  • server_timeout (int): Server selection timeout in milliseconds (default: 30000)
  • ****kwargs**: Additional motor configuration

#Connection Methods

python
# Local (no auth)
async with mongodb(host="localhost", database="appdb") as db:
    documents = await db.find("users")
 
# With authentication
async with mongodb(
    host="localhost",
    database="appdb",
    username="app_user",
    password="secure_password"
) as db:
    documents = await db.find("users")
 
# Connection string
async with mongodb(
    connection_string="mongodb://user:pass@localhost:27017/appdb"
) as db:
    documents = await db.find("users")
 
# MongoDB Atlas
async with mongodb(
    connection_string="mongodb+srv://user:pass@cluster.mongodb.net/appdb"
) as db:
    documents = await db.find("users")
 
# Advanced configuration
async with mongodb(
    host="localhost",
    database="mydb",
    max_pool_size=50,
    server_timeout=60000
) as db:
    documents = await db.find("users")

#Finding Documents

python
async with mongodb(host="localhost", database="app") as db:
    # Find all
    all_users = await db.find("users")
 
    # Find with filter
    active_users = await db.find("users", {"status": "active"})
 
    # Complex filter
    results = await db.find("orders", {
        "status": "completed",
        "total": {"$gte": 100},
        "created_at": {"$gte": "2024-01-01"}
    })
 
    # Pagination and sorting
    page_2 = await db.find(
        "products",
        filter_doc={"category": "electronics"},
        skip=20,
        limit=10,
        sort=[("price", -1)]  # -1=descending, 1=ascending
    )

#Inserting Documents

python
async with mongodb(host="localhost", database="app") as db:
    # Insert single document
    user_id = await db.insert("users", {
        "name": "Jane Doe",
        "email": "jane@example.com",
        "status": "active"
    })
 
    # Insert multiple documents
    users = [
        {"name": "Alice", "email": "alice@example.com"},
        {"name": "Bob", "email": "bob@example.com"}
    ]
    ids = await db.insert_many("users", users)

#Updating, Deleting, Counting

python
async with mongodb(host="localhost", database="app") as db:
    # Update documents
    result = await db.update(
        "users",
        {"name": "Jane Doe"},           # Filter
        {"$set": {"status": "inactive"}} # Update
    )
 
    # Update with multiple operations
    result = await db.update(
        "products",
        {"category": "electronics"},
        {"$set": {"on_sale": True}, "$inc": {"views": 1}}
    )
 
    # Upsert (insert if not exists)
    result = await db.update(
        "settings",
        {"key": "theme"},
        {"$set": {"value": "dark"}},
        upsert=True
    )
 
    # Delete documents
    deleted_count = await db.delete("users", {"status": "inactive"})
 
    # Count documents
    total = await db.count("users")
    active = await db.count("users", {"status": "active"})

#Aggregation Pipeline

python
async with mongodb(host="localhost", database="app") as db:
    # Complex aggregation
    pipeline = [
        {"$match": {"status": "active"}},
        {"$group": {"_id": "$department", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}},
        {"$limit": 10}
    ]
    results = await db.aggregate("employees", pipeline)
 
    # Join collections (lookup)
    with_orders = await db.aggregate("users", [
        {"$lookup": {
            "from": "orders",
            "localField": "_id",
            "foreignField": "user_id",
            "as": "user_orders"
        }}
    ])
 
## Collection Management
 
```python
async with mongodb(host="localhost", database="app") as db:
    collections = await db.collections()
    # Returns: ["users", "orders", "products"]

#Using with Agents

#Direct Database Operations (Scripts)

For scripts that don't need agent capabilities:

python
from daita.plugins import mongodb
 
async with mongodb(
    host="localhost",
    database="analytics",
    username="analyst",
    password="password"
) as db:
    # Direct queries
    user = await db.find("users", {"user_id": user_id})
    activity = await db.aggregate("events", [
        {"$match": {"user_id": user_id}},
        {"$group": {"_id": "$event_type", "count": {"$sum": 1}}}
    ])
 
    print(f"User: {user[0] if user else None}")
    print(f"Activity: {activity}")

MongoDB plugin exposes document database operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import mongodb
import os
 
# Create MongoDB plugin
db = mongodb(
    host="localhost",
    database="catalog",
    username=os.getenv("MONGO_USER"),
    password=os.getenv("MONGO_PASSWORD")
)
 
# Pass plugin to agent - agent can now use MongoDB tools autonomously
agent = Agent(
    name="Product Catalog Manager",
    prompt="You are a product catalog manager. Help users query and manage product data.",
    llm_provider="openai",
    model="gpt-4",
    tools=[db]
)
 
await agent.start()
 
# Agent autonomously uses MongoDB tools to answer questions
result = await agent.run("Find all active products in the electronics category")
 
# The agent will autonomously:
# 1. Use list_collections to explore database
# 2. Use find_documents to query the products collection
# 3. Analyze and present results in natural language
 
await agent.stop()

#Available Tools

The MongoDB plugin exposes these tools to LLM agents:

ToolDescriptionParameters
find_documentsFind documents in collectioncollection (required), filter (object), limit (int)
insert_documentInsert single documentcollection (required), document (object)
update_documentsUpdate matching documentscollection, filter, update (all required)
delete_documentsDelete matching documentscollection (required), filter (required)
list_collectionsList all collectionsNone

Tool Categories: database Tool Source: plugin Query Format: MongoDB query syntax (e.g., {"status": "active"})

#Tool Usage Example

python
from daita import Agent
from daita.plugins import mongodb
 
# Setup MongoDB with tool integration
db = mongodb(host="localhost", database="orders")
 
agent = Agent(
    name="Order Processor",
    prompt="You are an order processor. Help users manage and process orders.",
    llm_provider="openai",
    model="gpt-4",
    tools=[db]
)
 
await agent.start()
 
# Natural language command - agent uses tools autonomously
result = await agent.run("""
Process recent orders:
1. Find orders with status "pending"
2. Check inventory for each
3. Update orders to "processing" status
""")
 
# Agent orchestrates MongoDB tool calls to fulfill the request
print(result)
await agent.stop()

#ObjectId Handling

MongoDB _id fields are automatically converted to strings. To query by ObjectId:

python
from bson import ObjectId
 
async with mongodb(host="localhost", database="app") as db:
    user_id = await db.insert("users", {"name": "John"})
    # user_id is a string: "507f1f77bcf86cd799439011"
 
    # Query by ObjectId
    user = await db.find("users", {"_id": ObjectId(user_id)})

#Error Handling

python
try:
    async with mongodb(host="localhost", database="mydb") as db:
        results = await db.find("users", {"status": "active"})
except RuntimeError as e:
    if "motor not installed" in str(e):
        print("Install motor: pip install motor")
    elif "connection" in str(e).lower():
        print(f"Connection failed: {e}")

#Best Practices

Connection Management:

  • Always use context managers (async with) for automatic cleanup
  • Configure pool size for high-traffic applications
  • Set appropriate timeouts to prevent hanging operations

Query Performance:

  • Create indexes for frequently queried fields
  • Use aggregation pipeline for complex queries
  • Limit result sets to avoid memory issues
  • Use projection to fetch only needed fields

Security:

  • Store credentials in environment variables, never hardcode
  • Use authentication in production
  • Enable SSL/TLS for remote connections
  • Grant minimal privileges to application users

Data Modeling:

  • Embed related data accessed together frequently
  • Use references for large or independent data

#SQL vs MongoDB

ConceptSQLMongoDB
TableTableCollection
RowRowDocument
ColumnColumnField
Primary KeyPrimary Key_id field
JOINJOIN$lookup / Embedding
WHEREWHERE clauseFilter document
GROUP BYGROUP BY$group

#Troubleshooting

IssueSolution
motor not installedpip install motor
Connection failedCheck MongoDB is running, verify host/port/credentials
Authentication failedVerify username/password, check user database access
Server selection timeoutIncrease server_timeout, check network connectivity
Document too largeMongoDB limit is 16MB - split documents or use GridFS

#Next Steps