S3 Storage Plugin

AWS S3 object storage with automatic format detection and pandas integration. Built on `boto3`.

#Installation

bash
pip install boto3 pandas  # pandas optional, for DataFrame support

#Quick Start

python
from daita import Agent
from daita.plugins import s3
 
# Create plugin
storage = s3(bucket="my-bucket", region="us-east-1")
 
# Agent uses S3 tools autonomously
agent = Agent(
    name="Storage Agent",
    prompt="You are a cloud storage specialist. Help users interact with S3 storage.",
    tools=[storage]
)
 
await agent.start()
result = await agent.run("Read the users.csv file from the data folder")

#Direct Usage

The plugin can be used directly without agents for programmatic access. For comprehensive S3 documentation, see the AWS S3 docs. The main value of this plugin is agent integration - enabling LLMs to autonomously manage cloud storage operations.

#Connection Parameters

python
s3(
    bucket: str,
    region: str = "us-east-1",
    aws_access_key_id: Optional[str] = None,
    aws_secret_access_key: Optional[str] = None,
    aws_session_token: Optional[str] = None,
    endpoint_url: Optional[str] = None,
    **kwargs
)

#Parameters

  • bucket (str): S3 bucket name (required)
  • region (str): AWS region (default: "us-east-1")
  • aws_access_key_id (str): AWS access key (optional, uses IAM/env if not provided)
  • aws_secret_access_key (str): AWS secret key (optional, uses IAM/env if not provided)
  • aws_session_token (str): Session token for temporary credentials (optional)
  • endpoint_url (str): Custom S3 endpoint for S3-compatible services (optional)
  • ****kwargs**: Additional boto3 configuration

#Authentication

python
# IAM Roles (recommended for AWS) - no credentials needed
async with s3(bucket="my-bucket") as storage:
    data = await storage.get_object("data/file.csv")
 
# Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
async with s3(bucket="my-bucket") as storage:
    data = await storage.get_object("data/file.csv")
 
# Explicit credentials
async with s3(
    bucket="my-bucket",
    aws_access_key_id="your_key",
    aws_secret_access_key="your_secret"
) as storage:
    data = await storage.get_object("data/file.csv")
 
# S3-compatible services (MinIO, DigitalOcean Spaces)
async with s3(
    bucket="my-bucket",
    endpoint_url="https://nyc3.digitaloceanspaces.com",
    aws_access_key_id="key",
    aws_secret_access_key="secret"
) as storage:
    data = await storage.get_object("data/file.csv")

#Using with Agents

#Direct Storage Operations (Scripts)

For scripts that don't need agent capabilities:

python
from daita.plugins import s3
import pandas as pd
 
async with s3(bucket="data-bucket") as storage:
    # Load, process, save
    df = await storage.get_object("input/data.csv", format="pandas")
    processed = df.groupby("event_type").size().reset_index(name="count")
    await storage.upload_dataframe(processed, "processed/output.parquet")
 
    print(f"Processed {len(df)} rows")

S3 plugin exposes storage operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import s3
import os
 
# Create S3 plugin
storage = s3(
    bucket="data-lake",
    aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
 
# Pass plugin to agent - agent can now use S3 tools autonomously
agent = Agent(
    name="Data Manager",
    prompt="You are a cloud storage manager. Help users manage and analyze data in S3.",
    llm_provider="openai",
    model="gpt-4",
    tools=[storage]
)
 
await agent.start()
 
# Agent autonomously uses S3 tools to answer questions
result = await agent.run("List all CSV files in the reports/ folder")
 
# The agent will autonomously:
# 1. Use list_s3_objects tool to list files
# 2. Filter for CSV files
# 3. Present results in natural language
 
await agent.stop()

#Available Tools

The S3 plugin exposes these tools to LLM agents:

ToolDescriptionParameters
read_s3_fileRead and parse S3 filekey (required), format (string: auto/csv/json/pandas)
write_s3_fileWrite data to S3key (required), data (object)
list_s3_objectsList objects in bucketprefix (string), max_keys (int: 100)
delete_s3_fileDelete S3 objectkey (required)

Tool Categories: storage Tool Source: plugin Bucket: Configured at plugin initialization

#Tool Usage Example

python
from daita import Agent
from daita.plugins import s3
 
# Setup S3 with tool integration
storage = s3(bucket="analytics-data", region="us-west-2")
 
agent = Agent(
    name="Report Generator",
    prompt="You are a data analyst. Help users analyze and manage data in S3.",
    llm_provider="openai",
    model="gpt-4",
    tools=[storage]
)
 
await agent.start()
 
# Natural language command - agent uses tools autonomously
result = await agent.run("""
Generate monthly report:
1. Read sales data from data/sales_2024.csv
2. Calculate total revenue by category
3. Save summary to reports/monthly_summary.json
""")
 
# Agent orchestrates S3 tool calls to complete the task
print(result)
await agent.stop()

#Error Handling

python
try:
    async with s3(bucket="my-bucket") as storage:
        data = await storage.get_object("data/file.csv")
except RuntimeError as e:
    if "boto3 not installed" in str(e):
        print("Install boto3: pip install boto3")
    elif "does not exist" in str(e):
        print("Bucket not found")
    elif "Access denied" in str(e):
        print("Permission denied - check IAM")

#Best Practices

Performance:

  • Use Parquet format for large datasets (more efficient than CSV)
  • Use focus system to load only needed columns
  • Batch operations within single context manager

Security:

  • Use IAM roles instead of hardcoded credentials (best)
  • Store credentials in environment variables, never in code
  • Set minimal IAM permissions (s3:GetObject, s3:PutObject, s3:ListBucket only)

Data Organization:

  • Use logical prefix structure (data/raw/2024/01/events.csv)
  • Include timestamps in keys for versioning
  • Separate raw and processed data with prefixes

#Common Patterns

ETL Pipeline:

python
async def etl_handler(data, context, agent):
    async with s3(bucket="data-pipeline") as storage:
        raw_df = await storage.get_object("raw/events.csv", format="pandas")
        transformed = raw_df.groupby("user_id").agg({"event_count": "count"})
        await storage.upload_dataframe(transformed, "processed/summary.parquet")
        return {"rows_processed": len(raw_df)}

Batch Processing:

python
async def batch_processor(data, context, agent):
    async with s3(bucket="data-bucket") as storage:
        objects = await storage.list_objects(prefix="inbox/")
        for obj in objects:
            data = await storage.get_object(obj["Key"], format="pandas")
            processed = process_data(data)
            await storage.upload_dataframe(processed, obj["Key"].replace("inbox/", "processed/"))

#Troubleshooting

IssueSolution
boto3 not installedpip install boto3
pandas not installedpip install pandas pyarrow (pyarrow for Parquet)
Bucket not foundCheck bucket name, region, and permissions
Access deniedVerify IAM permissions and AWS credentials
Credentials not foundSet environment variables or use IAM roles

#Next Steps