S3 Storage Plugin
AWS S3 object storage with automatic format detection and pandas integration. Built on boto3.
Installation
pip install boto3 pandas # pandas optional, for DataFrame support
Quick Start
Direct Usage (Scripts)
from daita.plugins import s3
# Direct usage in scripts
async with s3(bucket="my-bucket", region="us-east-1") as storage:
data = await storage.get_object("data/users.csv")
print(data)
Agent Integration (Recommended)
from daita import SubstrateAgent
from daita.plugins import s3
# Create plugin
storage = s3(bucket="my-bucket", region="us-east-1")
# Agent uses S3 tools autonomously
agent = SubstrateAgent(
name="Storage Agent",
prompt="You are a cloud storage specialist. Help users interact with S3 storage.",
tools=[storage]
)
await agent.start()
result = await agent.run("Read the users.csv file from the data folder")
Connection Parameters
s3(
bucket: str,
region: str = "us-east-1",
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
endpoint_url: Optional[str] = None,
**kwargs
)
Parameters
bucket(str): S3 bucket name (required)region(str): AWS region (default: "us-east-1")aws_access_key_id(str): AWS access key (optional, uses IAM/env if not provided)aws_secret_access_key(str): AWS secret key (optional, uses IAM/env if not provided)aws_session_token(str): Session token for temporary credentials (optional)endpoint_url(str): Custom S3 endpoint for S3-compatible services (optional)**kwargs: Additional boto3 configuration
Authentication
# IAM Roles (recommended for AWS) - no credentials needed
async with s3(bucket="my-bucket") as storage:
data = await storage.get_object("data/file.csv")
# Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
async with s3(bucket="my-bucket") as storage:
data = await storage.get_object("data/file.csv")
# Explicit credentials
async with s3(
bucket="my-bucket",
aws_access_key_id="your_key",
aws_secret_access_key="your_secret"
) as storage:
data = await storage.get_object("data/file.csv")
# S3-compatible services (MinIO, DigitalOcean Spaces)
async with s3(
bucket="my-bucket",
endpoint_url="https://nyc3.digitaloceanspaces.com",
aws_access_key_id="key",
aws_secret_access_key="secret"
) as storage:
data = await storage.get_object("data/file.csv")
Reading Objects
Automatic format detection from file extension (.json, .csv, .parquet, .txt):
async with s3(bucket="my-bucket") as storage:
json_data = await storage.get_object("data/users.json") # Auto-parsed as JSON
csv_data = await storage.get_object("data/sales.csv") # Auto-parsed as CSV
df = await storage.get_object("data/analytics.parquet") # Auto-loaded as DataFrame
Explicit format - Override auto-detection:
async with s3(bucket="my-bucket") as storage:
df = await storage.get_object("data/report.csv", format="pandas")
raw = await storage.get_object("data/image.png", format="bytes")
text = await storage.get_object("data/log.txt", format="text")
Focus system - Filter columns when loading:
async with s3(bucket="my-bucket") as storage:
df = await storage.get_object(
"data/users.csv",
format="pandas",
focus=["user_id", "email", "created_at"]
)
Writing Objects
import pandas as pd
async with s3(bucket="my-bucket") as storage:
# Put object - automatic format handling
await storage.put_object("output/results.json", {"status": "success"})
await storage.put_object("logs/app.log", "Log message")
await storage.put_object("images/photo.png", image_bytes)
# Upload DataFrame
df = pd.DataFrame({"user_id": [1, 2], "name": ["Alice", "Bob"]})
await storage.upload_dataframe(df, "processed/users.csv", format="csv")
await storage.upload_dataframe(df, "processed/users.parquet", format="parquet")
# With metadata
await storage.put_object(
"reports/monthly.pdf",
pdf_bytes,
metadata={"department": "finance", "period": "2024-01"}
)
File Operations & Object Management
async with s3(bucket="my-bucket") as storage:
# Upload/download files
await storage.upload_file(local_path="/tmp/report.pdf", key="reports/2024/jan.pdf")
await storage.download_file(key="reports/jan.pdf", local_path="/tmp/report.pdf")
# List objects
objects = await storage.list_objects(prefix="logs/2024/", max_keys=100)
# Delete object
await storage.delete_object("temp/old_file.txt")
# Copy object
await storage.copy_object(
source_key="data/input.csv",
dest_key="backup/input_backup.csv"
)
Using with Agents
Direct Storage Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import s3
import pandas as pd
async with s3(bucket="data-bucket") as storage:
# Load, process, save
df = await storage.get_object("input/data.csv", format="pandas")
processed = df.groupby("event_type").size().reset_index(name="count")
await storage.upload_dataframe(processed, "processed/output.parquet")
print(f"Processed {len(df)} rows")
Tool-Based Integration (Recommended)
S3 plugin exposes storage operations as tools that agents can use autonomously:
from daita import SubstrateAgent
from daita.plugins import s3
import os
# Create S3 plugin
storage = s3(
bucket="data-lake",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
# Pass plugin to agent - agent can now use S3 tools autonomously
agent = SubstrateAgent(
name="Data Manager",
prompt="You are a cloud storage manager. Help users manage and analyze data in S3.",
llm_provider="openai",
model="gpt-4",
tools=[storage]
)
await agent.start()
# Agent autonomously uses S3 tools to answer questions
result = await agent.run("List all CSV files in the reports/ folder")
# The agent will autonomously:
# 1. Use list_s3_objects tool to list files
# 2. Filter for CSV files
# 3. Present results in natural language
await agent.stop()
Available Tools
The S3 plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| read_s3_file | Read and parse S3 file | key (required), format (string: auto/csv/json/pandas) |
| write_s3_file | Write data to S3 | key (required), data (object) |
| list_s3_objects | List objects in bucket | prefix (string), max_keys (int: 100) |
| delete_s3_file | Delete S3 object | key (required) |
Tool Categories: storage
Tool Source: plugin
Bucket: Configured at plugin initialization
Tool Usage Example
from daita import SubstrateAgent
from daita.plugins import s3
# Setup S3 with tool integration
storage = s3(bucket="analytics-data", region="us-west-2")
agent = SubstrateAgent(
name="Report Generator",
prompt="You are a data analyst. Help users analyze and manage data in S3.",
llm_provider="openai",
model="gpt-4",
tools=[storage]
)
await agent.start()
# Natural language command - agent uses tools autonomously
result = await agent.run("""
Generate monthly report:
1. Read sales data from data/sales_2024.csv
2. Calculate total revenue by category
3. Save summary to reports/monthly_summary.json
""")
# Agent orchestrates S3 tool calls to complete the task
print(result)
await agent.stop()
Error Handling
try:
async with s3(bucket="my-bucket") as storage:
data = await storage.get_object("data/file.csv")
except RuntimeError as e:
if "boto3 not installed" in str(e):
print("Install boto3: pip install boto3")
elif "does not exist" in str(e):
print("Bucket not found")
elif "Access denied" in str(e):
print("Permission denied - check IAM")
Best Practices
Performance:
- Use Parquet format for large datasets (more efficient than CSV)
- Use focus system to load only needed columns
- Batch operations within single context manager
Security:
- Use IAM roles instead of hardcoded credentials (best)
- Store credentials in environment variables, never in code
- Set minimal IAM permissions (s3:GetObject, s3:PutObject, s3:ListBucket only)
Data Organization:
- Use logical prefix structure (
data/raw/2024/01/events.csv) - Include timestamps in keys for versioning
- Separate raw and processed data with prefixes
Common Patterns
ETL Pipeline:
async def etl_handler(data, context, agent):
async with s3(bucket="data-pipeline") as storage:
raw_df = await storage.get_object("raw/events.csv", format="pandas")
transformed = raw_df.groupby("user_id").agg({"event_count": "count"})
await storage.upload_dataframe(transformed, "processed/summary.parquet")
return {"rows_processed": len(raw_df)}
Batch Processing:
async def batch_processor(data, context, agent):
async with s3(bucket="data-bucket") as storage:
objects = await storage.list_objects(prefix="inbox/")
for obj in objects:
data = await storage.get_object(obj["Key"], format="pandas")
processed = process_data(data)
await storage.upload_dataframe(processed, obj["Key"].replace("inbox/", "processed/"))
Troubleshooting
| Issue | Solution |
|---|---|
boto3 not installed | pip install boto3 |
pandas not installed | pip install pandas pyarrow (pyarrow for Parquet) |
| Bucket not found | Check bucket name, region, and permissions |
| Access denied | Verify IAM permissions and AWS credentials |
| Credentials not found | Set environment variables or use IAM roles |
Next Steps
- REST Plugin - API integrations
- PostgreSQL Plugin - Database operations
- Slack Plugin - Team notifications
- Plugin Overview - All available plugins