Skip to main content

S3 Storage Plugin

AWS S3 object storage with automatic format detection and pandas integration. Built on boto3.

Installation

pip install boto3 pandas  # pandas optional, for DataFrame support

Quick Start

Direct Usage (Scripts)

from daita.plugins import s3

# Direct usage in scripts
async with s3(bucket="my-bucket", region="us-east-1") as storage:
data = await storage.get_object("data/users.csv")
print(data)
from daita import SubstrateAgent
from daita.plugins import s3

# Create plugin
storage = s3(bucket="my-bucket", region="us-east-1")

# Agent uses S3 tools autonomously
agent = SubstrateAgent(
name="Storage Agent",
prompt="You are a cloud storage specialist. Help users interact with S3 storage.",
tools=[storage]
)

await agent.start()
result = await agent.run("Read the users.csv file from the data folder")

Connection Parameters

s3(
bucket: str,
region: str = "us-east-1",
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
endpoint_url: Optional[str] = None,
**kwargs
)

Parameters

  • bucket (str): S3 bucket name (required)
  • region (str): AWS region (default: "us-east-1")
  • aws_access_key_id (str): AWS access key (optional, uses IAM/env if not provided)
  • aws_secret_access_key (str): AWS secret key (optional, uses IAM/env if not provided)
  • aws_session_token (str): Session token for temporary credentials (optional)
  • endpoint_url (str): Custom S3 endpoint for S3-compatible services (optional)
  • **kwargs: Additional boto3 configuration

Authentication

# IAM Roles (recommended for AWS) - no credentials needed
async with s3(bucket="my-bucket") as storage:
data = await storage.get_object("data/file.csv")

# Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
async with s3(bucket="my-bucket") as storage:
data = await storage.get_object("data/file.csv")

# Explicit credentials
async with s3(
bucket="my-bucket",
aws_access_key_id="your_key",
aws_secret_access_key="your_secret"
) as storage:
data = await storage.get_object("data/file.csv")

# S3-compatible services (MinIO, DigitalOcean Spaces)
async with s3(
bucket="my-bucket",
endpoint_url="https://nyc3.digitaloceanspaces.com",
aws_access_key_id="key",
aws_secret_access_key="secret"
) as storage:
data = await storage.get_object("data/file.csv")

Reading Objects

Automatic format detection from file extension (.json, .csv, .parquet, .txt):

async with s3(bucket="my-bucket") as storage:
json_data = await storage.get_object("data/users.json") # Auto-parsed as JSON
csv_data = await storage.get_object("data/sales.csv") # Auto-parsed as CSV
df = await storage.get_object("data/analytics.parquet") # Auto-loaded as DataFrame

Explicit format - Override auto-detection:

async with s3(bucket="my-bucket") as storage:
df = await storage.get_object("data/report.csv", format="pandas")
raw = await storage.get_object("data/image.png", format="bytes")
text = await storage.get_object("data/log.txt", format="text")

Focus system - Filter columns when loading:

async with s3(bucket="my-bucket") as storage:
df = await storage.get_object(
"data/users.csv",
format="pandas",
focus=["user_id", "email", "created_at"]
)

Writing Objects

import pandas as pd

async with s3(bucket="my-bucket") as storage:
# Put object - automatic format handling
await storage.put_object("output/results.json", {"status": "success"})
await storage.put_object("logs/app.log", "Log message")
await storage.put_object("images/photo.png", image_bytes)

# Upload DataFrame
df = pd.DataFrame({"user_id": [1, 2], "name": ["Alice", "Bob"]})
await storage.upload_dataframe(df, "processed/users.csv", format="csv")
await storage.upload_dataframe(df, "processed/users.parquet", format="parquet")

# With metadata
await storage.put_object(
"reports/monthly.pdf",
pdf_bytes,
metadata={"department": "finance", "period": "2024-01"}
)

File Operations & Object Management

async with s3(bucket="my-bucket") as storage:
# Upload/download files
await storage.upload_file(local_path="/tmp/report.pdf", key="reports/2024/jan.pdf")
await storage.download_file(key="reports/jan.pdf", local_path="/tmp/report.pdf")

# List objects
objects = await storage.list_objects(prefix="logs/2024/", max_keys=100)

# Delete object
await storage.delete_object("temp/old_file.txt")

# Copy object
await storage.copy_object(
source_key="data/input.csv",
dest_key="backup/input_backup.csv"
)

Using with Agents

Direct Storage Operations (Scripts)

For scripts that don't need agent capabilities:

from daita.plugins import s3
import pandas as pd

async with s3(bucket="data-bucket") as storage:
# Load, process, save
df = await storage.get_object("input/data.csv", format="pandas")
processed = df.groupby("event_type").size().reset_index(name="count")
await storage.upload_dataframe(processed, "processed/output.parquet")

print(f"Processed {len(df)} rows")

S3 plugin exposes storage operations as tools that agents can use autonomously:

from daita import SubstrateAgent
from daita.plugins import s3
import os

# Create S3 plugin
storage = s3(
bucket="data-lake",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)

# Pass plugin to agent - agent can now use S3 tools autonomously
agent = SubstrateAgent(
name="Data Manager",
prompt="You are a cloud storage manager. Help users manage and analyze data in S3.",
llm_provider="openai",
model="gpt-4",
tools=[storage]
)

await agent.start()

# Agent autonomously uses S3 tools to answer questions
result = await agent.run("List all CSV files in the reports/ folder")

# The agent will autonomously:
# 1. Use list_s3_objects tool to list files
# 2. Filter for CSV files
# 3. Present results in natural language

await agent.stop()

Available Tools

The S3 plugin exposes these tools to LLM agents:

ToolDescriptionParameters
read_s3_fileRead and parse S3 filekey (required), format (string: auto/csv/json/pandas)
write_s3_fileWrite data to S3key (required), data (object)
list_s3_objectsList objects in bucketprefix (string), max_keys (int: 100)
delete_s3_fileDelete S3 objectkey (required)

Tool Categories: storage Tool Source: plugin Bucket: Configured at plugin initialization

Tool Usage Example

from daita import SubstrateAgent
from daita.plugins import s3

# Setup S3 with tool integration
storage = s3(bucket="analytics-data", region="us-west-2")

agent = SubstrateAgent(
name="Report Generator",
prompt="You are a data analyst. Help users analyze and manage data in S3.",
llm_provider="openai",
model="gpt-4",
tools=[storage]
)

await agent.start()

# Natural language command - agent uses tools autonomously
result = await agent.run("""
Generate monthly report:
1. Read sales data from data/sales_2024.csv
2. Calculate total revenue by category
3. Save summary to reports/monthly_summary.json
""")

# Agent orchestrates S3 tool calls to complete the task
print(result)
await agent.stop()

Error Handling

try:
async with s3(bucket="my-bucket") as storage:
data = await storage.get_object("data/file.csv")
except RuntimeError as e:
if "boto3 not installed" in str(e):
print("Install boto3: pip install boto3")
elif "does not exist" in str(e):
print("Bucket not found")
elif "Access denied" in str(e):
print("Permission denied - check IAM")

Best Practices

Performance:

  • Use Parquet format for large datasets (more efficient than CSV)
  • Use focus system to load only needed columns
  • Batch operations within single context manager

Security:

  • Use IAM roles instead of hardcoded credentials (best)
  • Store credentials in environment variables, never in code
  • Set minimal IAM permissions (s3:GetObject, s3:PutObject, s3:ListBucket only)

Data Organization:

  • Use logical prefix structure (data/raw/2024/01/events.csv)
  • Include timestamps in keys for versioning
  • Separate raw and processed data with prefixes

Common Patterns

ETL Pipeline:

async def etl_handler(data, context, agent):
async with s3(bucket="data-pipeline") as storage:
raw_df = await storage.get_object("raw/events.csv", format="pandas")
transformed = raw_df.groupby("user_id").agg({"event_count": "count"})
await storage.upload_dataframe(transformed, "processed/summary.parquet")
return {"rows_processed": len(raw_df)}

Batch Processing:

async def batch_processor(data, context, agent):
async with s3(bucket="data-bucket") as storage:
objects = await storage.list_objects(prefix="inbox/")
for obj in objects:
data = await storage.get_object(obj["Key"], format="pandas")
processed = process_data(data)
await storage.upload_dataframe(processed, obj["Key"].replace("inbox/", "processed/"))

Troubleshooting

IssueSolution
boto3 not installedpip install boto3
pandas not installedpip install pandas pyarrow (pyarrow for Parquet)
Bucket not foundCheck bucket name, region, and permissions
Access deniedVerify IAM permissions and AWS credentials
Credentials not foundSet environment variables or use IAM roles

Next Steps