MongoDB Plugin
Async document database operations with flexible, schema-less data storage. Built on `motor`.
#Installation
pip install motor#Quick Start
from daita import Agent
from daita.plugins import mongodb
# Create plugin
db = mongodb(
host="localhost",
database="mydb"
)
# Agent uses database tools autonomously
agent = Agent(
name="Data Agent",
prompt="You are a database analyst. Help users query and analyze MongoDB data.",
tools=[db]
)
await agent.start()
result = await agent.run("Show me all active users")#Direct Usage
The plugin can be used directly without agents for programmatic access. For comprehensive MongoDB documentation, see the official MongoDB docs. The main value of this plugin is agent integration - enabling LLMs to autonomously query and analyze document data.
#Connection Parameters
mongodb(
host: str = "localhost",
port: int = 27017,
database: str = "",
username: Optional[str] = None,
password: Optional[str] = None,
connection_string: Optional[str] = None,
max_pool_size: int = 10,
min_pool_size: int = 1,
server_timeout: int = 30000,
**kwargs
)#Parameters
host(str): MongoDB host address (default: "localhost")port(int): MongoDB port (default: 27017)database(str): Database name to connect tousername(str): Username for authentication (optional)password(str): Password for authentication (optional)connection_string(str): Full MongoDB connection string (overrides individual params)max_pool_size(int): Maximum connections in pool (default: 10)min_pool_size(int): Minimum connections in pool (default: 1)server_timeout(int): Server selection timeout in milliseconds (default: 30000)- **
**kwargs**: Additional motor configuration
#Connection Methods
# Local (no auth)
async with mongodb(host="localhost", database="appdb") as db:
documents = await db.find("users")
# With authentication
async with mongodb(
host="localhost",
database="appdb",
username="app_user",
password="secure_password"
) as db:
documents = await db.find("users")
# Connection string
async with mongodb(
connection_string="mongodb://user:pass@localhost:27017/appdb"
) as db:
documents = await db.find("users")
# MongoDB Atlas
async with mongodb(
connection_string="mongodb+srv://user:pass@cluster.mongodb.net/appdb"
) as db:
documents = await db.find("users")
# Advanced configuration
async with mongodb(
host="localhost",
database="mydb",
max_pool_size=50,
server_timeout=60000
) as db:
documents = await db.find("users")#Finding Documents
async with mongodb(host="localhost", database="app") as db:
# Find all
all_users = await db.find("users")
# Find with filter
active_users = await db.find("users", {"status": "active"})
# Complex filter
results = await db.find("orders", {
"status": "completed",
"total": {"$gte": 100},
"created_at": {"$gte": "2024-01-01"}
})
# Pagination and sorting
page_2 = await db.find(
"products",
filter_doc={"category": "electronics"},
skip=20,
limit=10,
sort=[("price", -1)] # -1=descending, 1=ascending
)#Inserting Documents
async with mongodb(host="localhost", database="app") as db:
# Insert single document
user_id = await db.insert("users", {
"name": "Jane Doe",
"email": "jane@example.com",
"status": "active"
})
# Insert multiple documents
users = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"}
]
ids = await db.insert_many("users", users)#Updating, Deleting, Counting
async with mongodb(host="localhost", database="app") as db:
# Update documents
result = await db.update(
"users",
{"name": "Jane Doe"}, # Filter
{"$set": {"status": "inactive"}} # Update
)
# Update with multiple operations
result = await db.update(
"products",
{"category": "electronics"},
{"$set": {"on_sale": True}, "$inc": {"views": 1}}
)
# Upsert (insert if not exists)
result = await db.update(
"settings",
{"key": "theme"},
{"$set": {"value": "dark"}},
upsert=True
)
# Delete documents
deleted_count = await db.delete("users", {"status": "inactive"})
# Count documents
total = await db.count("users")
active = await db.count("users", {"status": "active"})#Aggregation Pipeline
async with mongodb(host="localhost", database="app") as db:
# Complex aggregation
pipeline = [
{"$match": {"status": "active"}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10}
]
results = await db.aggregate("employees", pipeline)
# Join collections (lookup)
with_orders = await db.aggregate("users", [
{"$lookup": {
"from": "orders",
"localField": "_id",
"foreignField": "user_id",
"as": "user_orders"
}}
])
## Collection Management
```python
async with mongodb(host="localhost", database="app") as db:
collections = await db.collections()
# Returns: ["users", "orders", "products"]#Using with Agents
#Direct Database Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import mongodb
async with mongodb(
host="localhost",
database="analytics",
username="analyst",
password="password"
) as db:
# Direct queries
user = await db.find("users", {"user_id": user_id})
activity = await db.aggregate("events", [
{"$match": {"user_id": user_id}},
{"$group": {"_id": "$event_type", "count": {"$sum": 1}}}
])
print(f"User: {user[0] if user else None}")
print(f"Activity: {activity}")#Tool-Based Integration (Recommended)
MongoDB plugin exposes document database operations as tools that agents can use autonomously:
from daita import Agent
from daita.plugins import mongodb
import os
# Create MongoDB plugin
db = mongodb(
host="localhost",
database="catalog",
username=os.getenv("MONGO_USER"),
password=os.getenv("MONGO_PASSWORD")
)
# Pass plugin to agent - agent can now use MongoDB tools autonomously
agent = Agent(
name="Product Catalog Manager",
prompt="You are a product catalog manager. Help users query and manage product data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)
await agent.start()
# Agent autonomously uses MongoDB tools to answer questions
result = await agent.run("Find all active products in the electronics category")
# The agent will autonomously:
# 1. Use list_collections to explore database
# 2. Use find_documents to query the products collection
# 3. Analyze and present results in natural language
await agent.stop()#Available Tools
The MongoDB plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| find_documents | Find documents in collection | collection (required), filter (object), limit (int) |
| insert_document | Insert single document | collection (required), document (object) |
| update_documents | Update matching documents | collection, filter, update (all required) |
| delete_documents | Delete matching documents | collection (required), filter (required) |
| list_collections | List all collections | None |
Tool Categories: database
Tool Source: plugin
Query Format: MongoDB query syntax (e.g., {"status": "active"})
#Tool Usage Example
from daita import Agent
from daita.plugins import mongodb
# Setup MongoDB with tool integration
db = mongodb(host="localhost", database="orders")
agent = Agent(
name="Order Processor",
prompt="You are an order processor. Help users manage and process orders.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)
await agent.start()
# Natural language command - agent uses tools autonomously
result = await agent.run("""
Process recent orders:
1. Find orders with status "pending"
2. Check inventory for each
3. Update orders to "processing" status
""")
# Agent orchestrates MongoDB tool calls to fulfill the request
print(result)
await agent.stop()#ObjectId Handling
MongoDB _id fields are automatically converted to strings. To query by ObjectId:
from bson import ObjectId
async with mongodb(host="localhost", database="app") as db:
user_id = await db.insert("users", {"name": "John"})
# user_id is a string: "507f1f77bcf86cd799439011"
# Query by ObjectId
user = await db.find("users", {"_id": ObjectId(user_id)})#Error Handling
try:
async with mongodb(host="localhost", database="mydb") as db:
results = await db.find("users", {"status": "active"})
except RuntimeError as e:
if "motor not installed" in str(e):
print("Install motor: pip install motor")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")#Best Practices
Connection Management:
- Always use context managers (
async with) for automatic cleanup - Configure pool size for high-traffic applications
- Set appropriate timeouts to prevent hanging operations
Query Performance:
- Create indexes for frequently queried fields
- Use aggregation pipeline for complex queries
- Limit result sets to avoid memory issues
- Use projection to fetch only needed fields
Security:
- Store credentials in environment variables, never hardcode
- Use authentication in production
- Enable SSL/TLS for remote connections
- Grant minimal privileges to application users
Data Modeling:
- Embed related data accessed together frequently
- Use references for large or independent data
#SQL vs MongoDB
| Concept | SQL | MongoDB |
|---|---|---|
| Table | Table | Collection |
| Row | Row | Document |
| Column | Column | Field |
| Primary Key | Primary Key | _id field |
| JOIN | JOIN | $lookup / Embedding |
| WHERE | WHERE clause | Filter document |
| GROUP BY | GROUP BY | $group |
#Troubleshooting
| Issue | Solution |
|---|---|
motor not installed | pip install motor |
| Connection failed | Check MongoDB is running, verify host/port/credentials |
| Authentication failed | Verify username/password, check user database access |
| Server selection timeout | Increase server_timeout, check network connectivity |
| Document too large | MongoDB limit is 16MB - split documents or use GridFS |
#Next Steps
- PostgreSQL Plugin - Relational database operations
- MySQL Plugin - MySQL database operations
- S3 Plugin - Object storage for large files
- Plugin Overview - All available plugins