MongoDB Plugin
Async document database operations with flexible, schema-less data storage. Built on motor.
Installation
pip install motor
Quick Start
Direct Usage (Scripts)
from daita.plugins import mongodb
# Direct usage in scripts
async with mongodb(
host="localhost",
database="mydb"
) as db:
users = await db.find("users", {"status": "active"})
print(users)
Agent Integration (Recommended)
from daita import SubstrateAgent
from daita.plugins import mongodb
# Create plugin
db = mongodb(
host="localhost",
database="mydb"
)
# Agent uses database tools autonomously
agent = SubstrateAgent(
name="Data Agent",
prompt="You are a database analyst. Help users query and analyze MongoDB data.",
tools=[db]
)
await agent.start()
result = await agent.run("Show me all active users")
Connection Parameters
mongodb(
host: str = "localhost",
port: int = 27017,
database: str = "",
username: Optional[str] = None,
password: Optional[str] = None,
connection_string: Optional[str] = None,
max_pool_size: int = 10,
min_pool_size: int = 1,
server_timeout: int = 30000,
**kwargs
)
Parameters
host(str): MongoDB host address (default: "localhost")port(int): MongoDB port (default: 27017)database(str): Database name to connect tousername(str): Username for authentication (optional)password(str): Password for authentication (optional)connection_string(str): Full MongoDB connection string (overrides individual params)max_pool_size(int): Maximum connections in pool (default: 10)min_pool_size(int): Minimum connections in pool (default: 1)server_timeout(int): Server selection timeout in milliseconds (default: 30000)**kwargs: Additional motor configuration
Connection Methods
# Local (no auth)
async with mongodb(host="localhost", database="appdb") as db:
documents = await db.find("users")
# With authentication
async with mongodb(
host="localhost",
database="appdb",
username="app_user",
password="secure_password"
) as db:
documents = await db.find("users")
# Connection string
async with mongodb(
connection_string="mongodb://user:pass@localhost:27017/appdb"
) as db:
documents = await db.find("users")
# MongoDB Atlas
async with mongodb(
connection_string="mongodb+srv://user:pass@cluster.mongodb.net/appdb"
) as db:
documents = await db.find("users")
# Advanced configuration
async with mongodb(
host="localhost",
database="mydb",
max_pool_size=50,
server_timeout=60000
) as db:
documents = await db.find("users")
Finding Documents
async with mongodb(host="localhost", database="app") as db:
# Find all
all_users = await db.find("users")
# Find with filter
active_users = await db.find("users", {"status": "active"})
# Complex filter
results = await db.find("orders", {
"status": "completed",
"total": {"$gte": 100},
"created_at": {"$gte": "2024-01-01"}
})
# Pagination and sorting
page_2 = await db.find(
"products",
filter_doc={"category": "electronics"},
skip=20,
limit=10,
sort=[("price", -1)] # -1=descending, 1=ascending
)
Inserting Documents
async with mongodb(host="localhost", database="app") as db:
# Insert single document
user_id = await db.insert("users", {
"name": "Jane Doe",
"email": "jane@example.com",
"status": "active"
})
# Insert multiple documents
users = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"}
]
ids = await db.insert_many("users", users)
Updating, Deleting, Counting
async with mongodb(host="localhost", database="app") as db:
# Update documents
result = await db.update(
"users",
{"name": "Jane Doe"}, # Filter
{"$set": {"status": "inactive"}} # Update
)
# Update with multiple operations
result = await db.update(
"products",
{"category": "electronics"},
{"$set": {"on_sale": True}, "$inc": {"views": 1}}
)
# Upsert (insert if not exists)
result = await db.update(
"settings",
{"key": "theme"},
{"$set": {"value": "dark"}},
upsert=True
)
# Delete documents
deleted_count = await db.delete("users", {"status": "inactive"})
# Count documents
total = await db.count("users")
active = await db.count("users", {"status": "active"})
Aggregation Pipeline
async with mongodb(host="localhost", database="app") as db:
# Complex aggregation
pipeline = [
{"$match": {"status": "active"}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10}
]
results = await db.aggregate("employees", pipeline)
# Join collections (lookup)
with_orders = await db.aggregate("users", [
{"$lookup": {
"from": "orders",
"localField": "_id",
"foreignField": "user_id",
"as": "user_orders"
}}
])
## Collection Management
```python
async with mongodb(host="localhost", database="app") as db:
collections = await db.collections()
# Returns: ["users", "orders", "products"]
Using with Agents
Direct Database Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import mongodb
async with mongodb(
host="localhost",
database="analytics",
username="analyst",
password="password"
) as db:
# Direct queries
user = await db.find("users", {"user_id": user_id})
activity = await db.aggregate("events", [
{"$match": {"user_id": user_id}},
{"$group": {"_id": "$event_type", "count": {"$sum": 1}}}
])
print(f"User: {user[0] if user else None}")
print(f"Activity: {activity}")