Skip to main content

MongoDB Plugin

Async document database operations with flexible, schema-less data storage. Built on motor.

Installation

pip install motor

Quick Start

Direct Usage (Scripts)

from daita.plugins import mongodb

# Direct usage in scripts
async with mongodb(
host="localhost",
database="mydb"
) as db:
users = await db.find("users", {"status": "active"})
print(users)
from daita import SubstrateAgent
from daita.plugins import mongodb

# Create plugin
db = mongodb(
host="localhost",
database="mydb"
)

# Agent uses database tools autonomously
agent = SubstrateAgent(
name="Data Agent",
prompt="You are a database analyst. Help users query and analyze MongoDB data.",
tools=[db]
)

await agent.start()
result = await agent.run("Show me all active users")

Connection Parameters

mongodb(
host: str = "localhost",
port: int = 27017,
database: str = "",
username: Optional[str] = None,
password: Optional[str] = None,
connection_string: Optional[str] = None,
max_pool_size: int = 10,
min_pool_size: int = 1,
server_timeout: int = 30000,
**kwargs
)

Parameters

  • host (str): MongoDB host address (default: "localhost")
  • port (int): MongoDB port (default: 27017)
  • database (str): Database name to connect to
  • username (str): Username for authentication (optional)
  • password (str): Password for authentication (optional)
  • connection_string (str): Full MongoDB connection string (overrides individual params)
  • max_pool_size (int): Maximum connections in pool (default: 10)
  • min_pool_size (int): Minimum connections in pool (default: 1)
  • server_timeout (int): Server selection timeout in milliseconds (default: 30000)
  • **kwargs: Additional motor configuration

Connection Methods

# Local (no auth)
async with mongodb(host="localhost", database="appdb") as db:
documents = await db.find("users")

# With authentication
async with mongodb(
host="localhost",
database="appdb",
username="app_user",
password="secure_password"
) as db:
documents = await db.find("users")

# Connection string
async with mongodb(
connection_string="mongodb://user:pass@localhost:27017/appdb"
) as db:
documents = await db.find("users")

# MongoDB Atlas
async with mongodb(
connection_string="mongodb+srv://user:pass@cluster.mongodb.net/appdb"
) as db:
documents = await db.find("users")

# Advanced configuration
async with mongodb(
host="localhost",
database="mydb",
max_pool_size=50,
server_timeout=60000
) as db:
documents = await db.find("users")

Finding Documents

async with mongodb(host="localhost", database="app") as db:
# Find all
all_users = await db.find("users")

# Find with filter
active_users = await db.find("users", {"status": "active"})

# Complex filter
results = await db.find("orders", {
"status": "completed",
"total": {"$gte": 100},
"created_at": {"$gte": "2024-01-01"}
})

# Pagination and sorting
page_2 = await db.find(
"products",
filter_doc={"category": "electronics"},
skip=20,
limit=10,
sort=[("price", -1)] # -1=descending, 1=ascending
)

Inserting Documents

async with mongodb(host="localhost", database="app") as db:
# Insert single document
user_id = await db.insert("users", {
"name": "Jane Doe",
"email": "jane@example.com",
"status": "active"
})

# Insert multiple documents
users = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"}
]
ids = await db.insert_many("users", users)

Updating, Deleting, Counting

async with mongodb(host="localhost", database="app") as db:
# Update documents
result = await db.update(
"users",
{"name": "Jane Doe"}, # Filter
{"$set": {"status": "inactive"}} # Update
)

# Update with multiple operations
result = await db.update(
"products",
{"category": "electronics"},
{"$set": {"on_sale": True}, "$inc": {"views": 1}}
)

# Upsert (insert if not exists)
result = await db.update(
"settings",
{"key": "theme"},
{"$set": {"value": "dark"}},
upsert=True
)

# Delete documents
deleted_count = await db.delete("users", {"status": "inactive"})

# Count documents
total = await db.count("users")
active = await db.count("users", {"status": "active"})

Aggregation Pipeline

async with mongodb(host="localhost", database="app") as db:
# Complex aggregation
pipeline = [
{"$match": {"status": "active"}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
{"$limit": 10}
]
results = await db.aggregate("employees", pipeline)

# Join collections (lookup)
with_orders = await db.aggregate("users", [
{"$lookup": {
"from": "orders",
"localField": "_id",
"foreignField": "user_id",
"as": "user_orders"
}}
])

## Collection Management

```python
async with mongodb(host="localhost", database="app") as db:
collections = await db.collections()
# Returns: ["users", "orders", "products"]

Using with Agents

Direct Database Operations (Scripts)

For scripts that don't need agent capabilities:

from daita.plugins import mongodb

async with mongodb(
host="localhost",
database="analytics",
username="analyst",
password="password"
) as db:
# Direct queries
user = await db.find("users", {"user_id": user_id})
activity = await db.aggregate("events", [
{"$match": {"user_id": user_id}},
{"$group": {"_id": "$event_type", "count": {"$sum": 1}}}
])

print(f"User: {user[0] if user else None}")
print(f"Activity: {activity}")

MongoDB plugin exposes document database operations as tools that agents can use autonomously:

from daita import SubstrateAgent
from daita.plugins import mongodb
import os

# Create MongoDB plugin
db = mongodb(
host="localhost",
database="catalog",
username=os.getenv("MONGO_USER"),
password=os.getenv("MONGO_PASSWORD")
)

# Pass plugin to agent - agent can now use MongoDB tools autonomously
agent = SubstrateAgent(
name="Product Catalog Manager",
prompt="You are a product catalog manager. Help users query and manage product data.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)

await agent.start()

# Agent autonomously uses MongoDB tools to answer questions
result = await agent.run("Find all active products in the electronics category")

# The agent will autonomously:
# 1. Use list_collections to explore database
# 2. Use find_documents to query the products collection
# 3. Analyze and present results in natural language

await agent.stop()

Available Tools

The MongoDB plugin exposes these tools to LLM agents:

ToolDescriptionParameters
find_documentsFind documents in collectioncollection (required), filter (object), limit (int)
insert_documentInsert single documentcollection (required), document (object)
update_documentsUpdate matching documentscollection, filter, update (all required)
delete_documentsDelete matching documentscollection (required), filter (required)
list_collectionsList all collectionsNone

Tool Categories: database Tool Source: plugin Query Format: MongoDB query syntax (e.g., {"status": "active"})

Tool Usage Example

from daita import SubstrateAgent
from daita.plugins import mongodb

# Setup MongoDB with tool integration
db = mongodb(host="localhost", database="orders")

agent = SubstrateAgent(
name="Order Processor",
prompt="You are an order processor. Help users manage and process orders.",
llm_provider="openai",
model="gpt-4",
tools=[db]
)

await agent.start()

# Natural language command - agent uses tools autonomously
result = await agent.run("""
Process recent orders:
1. Find orders with status "pending"
2. Check inventory for each
3. Update orders to "processing" status
""")

# Agent orchestrates MongoDB tool calls to fulfill the request
print(result)
await agent.stop()

ObjectId Handling

MongoDB _id fields are automatically converted to strings. To query by ObjectId:

from bson import ObjectId

async with mongodb(host="localhost", database="app") as db:
user_id = await db.insert("users", {"name": "John"})
# user_id is a string: "507f1f77bcf86cd799439011"

# Query by ObjectId
user = await db.find("users", {"_id": ObjectId(user_id)})

Error Handling

try:
async with mongodb(host="localhost", database="mydb") as db:
results = await db.find("users", {"status": "active"})
except RuntimeError as e:
if "motor not installed" in str(e):
print("Install motor: pip install motor")
elif "connection" in str(e).lower():
print(f"Connection failed: {e}")

Best Practices

Connection Management:

  • Always use context managers (async with) for automatic cleanup
  • Configure pool size for high-traffic applications
  • Set appropriate timeouts to prevent hanging operations

Query Performance:

  • Create indexes for frequently queried fields
  • Use aggregation pipeline for complex queries
  • Limit result sets to avoid memory issues
  • Use projection to fetch only needed fields

Security:

  • Store credentials in environment variables, never hardcode
  • Use authentication in production
  • Enable SSL/TLS for remote connections
  • Grant minimal privileges to application users

Data Modeling:

  • Embed related data accessed together frequently
  • Use references for large or independent data

SQL vs MongoDB

ConceptSQLMongoDB
TableTableCollection
RowRowDocument
ColumnColumnField
Primary KeyPrimary Key_id field
JOINJOIN$lookup / Embedding
WHEREWHERE clauseFilter document
GROUP BYGROUP BY$group

Troubleshooting

IssueSolution
motor not installedpip install motor
Connection failedCheck MongoDB is running, verify host/port/credentials
Authentication failedVerify username/password, check user database access
Server selection timeoutIncrease server_timeout, check network connectivity
Document too largeMongoDB limit is 16MB - split documents or use GridFS

Next Steps