Snowflake Plugin
Async data warehouse operations with automatic connection management built on `snowflake-connector-python`.
#Installation
pip install snowflake-connector-python#Quick Start
from daita import Agent
from daita.plugins import snowflake
# Create plugin
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS_DB",
user="analyst",
password="secure_password"
)
# Create agent
agent = Agent(
name="Data Warehouse Analyst",
model="gpt-4o-mini",
prompt="You are a data analyst. Help users query and analyze data warehouse data."
)
# Give agent access to Snowflake tools
agent.add_plugin(db)
await agent.start()
result = await agent.run("What were our top 10 products by revenue last quarter?")#Direct Usage
The plugin can be used directly without agents for programmatic access. For comprehensive Snowflake documentation, see the official Snowflake docs. The main value of this plugin is agent integration - enabling LLMs to autonomously query and analyze data warehouse data.
#Connection Parameters
snowflake(
account: Optional[str] = None,
warehouse: Optional[str] = None,
database: Optional[str] = None,
schema: str = "PUBLIC",
user: Optional[str] = None,
password: Optional[str] = None,
role: Optional[str] = None,
private_key_path: Optional[str] = None,
private_key_passphrase: Optional[str] = None,
authenticator: Optional[str] = None,
session_parameters: Optional[Dict] = None
)#Parameters
account(str): Snowflake account identifier (e.g., "xy12345" or "xy12345.us-east-1")warehouse(str): Virtual warehouse name for compute resourcesdatabase(str): Database name to connect toschema(str): Schema name (default: "PUBLIC")user(str): Username for authenticationpassword(str): Password for password authenticationrole(str): Role to use after connecting (optional)private_key_path(str): Path to RSA private key file for key-pair authenticationprivate_key_passphrase(str): Passphrase for encrypted private key (optional)authenticator(str): External authenticator (e.g., "externalbrowser", "oauth")session_parameters(dict): Additional session parameters
#Environment Variables
All parameters support environment variable fallback:
SNOWFLAKE_ACCOUNTSNOWFLAKE_WAREHOUSESNOWFLAKE_DATABASESNOWFLAKE_SCHEMASNOWFLAKE_USERSNOWFLAKE_PASSWORDSNOWFLAKE_ROLESNOWFLAKE_PRIVATE_KEY_PATHSNOWFLAKE_PRIVATE_KEY_PASSPHRASESNOWFLAKE_AUTHENTICATOR
#Authentication Methods
The Snowflake plugin supports three authentication methods. Choose based on your deployment environment and security requirements.
#Authentication Method Comparison
| Method | Use Case | Environment | Security | Setup Complexity |
|---|---|---|---|---|
| Password | Quick testing, development | Local, staging | Basic | Low |
| Key-Pair | Production, Lambda, CI/CD | Production, cloud | High | Medium |
| External Browser | Enterprise SSO accounts | Local only | Managed by SSO | Low |
#1. Password Authentication
Best for: Local development, testing, personal Snowflake accounts
from daita.plugins import snowflake
# Explicit credentials
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst",
password="secure_password"
)
# Or use environment variables (recommended)
import os
os.environ['SNOWFLAKE_ACCOUNT'] = 'xy12345'
os.environ['SNOWFLAKE_USER'] = 'analyst'
os.environ['SNOWFLAKE_PASSWORD'] = 'secure_password'
db = snowflake(
warehouse="COMPUTE_WH",
database="ANALYTICS"
)Pros:
- Simple setup
- Works everywhere
- Familiar authentication flow
Cons:
- Less secure than key-pair
- Passwords may need rotation
- Not recommended for production
#2. Key-Pair Authentication (Recommended for Production)
Best for: Production deployments, Lambda functions, CI/CD pipelines, service accounts
Why use this:
- No password to manage or rotate
- Works in headless environments (Lambda, Docker, Kubernetes)
- More secure than password authentication
- Supports encrypted private keys for additional security
Setup:
- Generate RSA key pair:
# Generate private key (unencrypted)
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
# Generate public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub- Assign public key to Snowflake user:
ALTER USER service_account SET RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...';- Use in code:
from daita.plugins import snowflake
# Unencrypted private key
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="service_account",
private_key_path="/path/to/rsa_key.p8"
)
# Or via environment variable
os.environ['SNOWFLAKE_PRIVATE_KEY_PATH'] = '/path/to/rsa_key.p8'
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="service_account"
)For encrypted keys:
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="service_account",
private_key_path="/path/to/encrypted_rsa_key.p8",
private_key_passphrase="key_passphrase"
)Pros:
- Most secure authentication method
- No password management needed
- Works in all environments (including headless)
- Industry standard for service accounts
Cons:
- Slightly more complex setup
- Need to manage private key securely
#3. External Browser Authentication (SSO)
Best for: Enterprise developers with SSO-enabled Snowflake accounts, local development
Important: This method opens a browser window for authentication and only works locally. Cannot be used in Lambda, Docker, or any headless environment.
from daita.plugins import snowflake
# Browser-based SSO (e.g., Okta, Azure AD)
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst@company.com",
authenticator="externalbrowser"
)
# Or via environment variable
os.environ['SNOWFLAKE_AUTHENTICATOR'] = 'externalbrowser'
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst@company.com"
)When you run this:
- A browser window opens automatically
- You authenticate via your company's SSO (Okta, Azure AD, etc.)
- After successful login, the connection is established
Pros:
- Works with SSO-only Snowflake accounts
- Leverages existing enterprise authentication
- No password to manage locally
- Supports MFA automatically
Cons:
- Only works locally - cannot be used in production/Lambda
- Requires browser access
- Not suitable for automated scripts
Migration path for production: When deploying to production, switch to key-pair authentication:
- Local development: Use
externalbrowserfor easy SSO login - Production/Lambda: Create service account with key-pair auth
#OAuth Authentication
Best for: Advanced integrations with OAuth 2.0 providers
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst",
authenticator="oauth",
session_parameters={"token": "oauth_access_token"}
)#Query Operations
SELECT queries - Use query() method, returns list of dictionaries:
from daita.plugins import snowflake
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="SALES") as db:
# Simple query
customers = await db.query("SELECT id, name, total_spent FROM customers")
# Returns: [{"id": 1, "name": "Acme Corp", "total_spent": 50000.00}, ...]
# Parameterized query (use %s, %(name)s placeholders)
high_value = await db.query(
"SELECT * FROM customers WHERE total_spent > %s AND status = %s",
[10000, "active"]
)
# Named parameters
recent = await db.query(
"SELECT * FROM orders WHERE order_date >= %(start_date)s",
{"start_date": "2024-01-01"}
)Parameter syntax: Snowflake uses %s for positional parameters and %(name)s for named parameters
#Data Modification
INSERT, UPDATE, DELETE - Use execute() method, returns row count:
from daita.plugins import snowflake
async with snowflake(account="xy12345", warehouse="LOAD_WH", database="SALES") as db:
# Insert
await db.execute(
"INSERT INTO customers (name, email, status) VALUES (%s, %s, %s)",
["New Customer", "customer@example.com", "active"]
)
# Update
await db.execute(
"UPDATE customers SET last_purchase = CURRENT_TIMESTAMP() WHERE id = %s",
[customer_id]
)
# Delete
await db.execute(
"DELETE FROM customers WHERE status = %s AND days_inactive > %s",
["inactive", 365]
)#Warehouse Management
Snowflake virtual warehouses control compute resources and costs:
from daita.plugins import snowflake
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
# List all warehouses
warehouses = await db.list_warehouses()
# Returns: [{"name": "COMPUTE_WH", "state": "STARTED", "size": "MEDIUM"}, ...]
# Switch to different warehouse
await db.switch_warehouse("ANALYTICS_WH")
# Use larger warehouse for heavy queries
await db.switch_warehouse("LARGE_WH")
results = await db.query("SELECT * FROM huge_fact_table")
# Switch back to save costs
await db.switch_warehouse("SMALL_WH")#Stage Operations
Snowflake stages are used for loading and unloading data:
from daita.plugins import snowflake
async with snowflake(account="xy12345", warehouse="LOAD_WH", database="RAW_DATA") as db:
# List available stages
stages = await db.list_stages()
# Returns: [{"name": "my_s3_stage", "url": "s3://bucket/path/", "type": "EXTERNAL"}, ...]
# Upload file to stage
await db.put_file(
local_path="/data/sales.csv",
stage_path="@my_stage/sales/",
overwrite=True
)
# Download file from stage
await db.get_file(
stage_path="@my_stage/sales/sales.csv",
local_path="/data/downloaded_sales.csv"
)
# Load data from stage into table
result = await db.load_from_stage(
table_name="sales",
stage_path="@my_stage/sales/",
file_format="CSV",
pattern=r".*\.csv"
)
# Create new stage
await db.create_stage(
stage_name="my_new_stage",
stage_type="internal"
)
# Create external stage (S3)
await db.create_stage(
stage_name="s3_stage",
stage_type="external",
url="s3://my-bucket/data/",
credentials={
"AWS_KEY_ID": "your_key",
"AWS_SECRET_KEY": "your_secret"
}
)#Schema Information
from daita.plugins import snowflake
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
# List all tables
tables = await db.tables()
# Returns: ["customers", "orders", "products"]
# List schemas
schemas = await db.schemas()
# Returns: ["PUBLIC", "STAGING", "ANALYTICS"]
# Get table structure
columns = await db.describe("customers")
# Returns: [{"name": "id", "type": "NUMBER", "nullable": False}, ...]#Query History
from daita.plugins import snowflake
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
# Get recent query history
history = await db.query_history(limit=10)
# Returns query details including execution time, rows, bytes scanned
# Filter by time range
from datetime import datetime, timedelta
start_time = datetime.now() - timedelta(hours=24)
history = await db.query_history(
start_time=start_time,
limit=100
)#Using with Agents
#Direct Database Operations (Scripts)
For scripts that don't need agent capabilities:
from daita.plugins import snowflake
async with snowflake(
account="xy12345",
warehouse="ANALYTICS_WH",
database="SALES",
user="analyst",
password="password"
) as db:
# Analyze sales data
monthly_sales = await db.query("""
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(total_amount) as revenue
FROM orders
WHERE order_date >= DATEADD('year', -1, CURRENT_DATE())
GROUP BY month
ORDER BY month DESC
""")
print(f"Found {len(monthly_sales)} months of data")
for row in monthly_sales:
print(f"{row['month']}: ${row['revenue']:,.2f}")#Tool-Based Integration (Recommended)
Snowflake plugin exposes data warehouse operations as tools that agents can use autonomously:
from daita import Agent
from daita.plugins import snowflake
import os
# Create Snowflake plugin (auto-loads from environment variables)
db = snowflake(
account=os.getenv("SNOWFLAKE_ACCOUNT"),
warehouse="ANALYTICS_WH",
database="SALES",
user=os.getenv("SNOWFLAKE_USER"),
private_key_path=os.getenv("SNOWFLAKE_PRIVATE_KEY_PATH")
)
# Create agent
agent = Agent(
name="Data Warehouse Analyst",
model="gpt-4o-mini",
prompt="You are a data analyst specializing in Snowflake data warehouse analysis."
)
# Give agent access to Snowflake tools
agent.add_plugin(db)
await agent.start()
# Agent autonomously uses Snowflake tools to answer questions
result = await agent.run("Analyze our sales trends over the last 6 months")
# The agent will autonomously:
# 1. Use list_tables to explore available data
# 2. Use describe_table to understand schema
# 3. Use query_database to fetch sales data
# 4. Analyze trends and present insights
await db.disconnect()#Available Tools
The Snowflake plugin exposes these tools to LLM agents:
| Tool | Description | Parameters |
|---|---|---|
| query_database | Execute SELECT queries | sql (required), params (optional array) |
| execute_sql | Run INSERT/UPDATE/DELETE | sql (required), params (optional array) |
| list_tables | List all tables in database | None |
| list_schemas | List all schemas | None |
| describe_table | Get table column info | table_name (required) |
| list_warehouses | List available warehouses | None |
| switch_warehouse | Change active warehouse | warehouse_name (required) |
| get_query_history | Retrieve query history | limit (int), start_time (datetime) |
| list_stages | List data stages | None |
| upload_to_stage | Upload file to stage | local_path, stage_path, overwrite (bool) |
| download_from_stage | Download file from stage | stage_path, local_path |
| load_from_stage | Load stage data into table | table_name, stage_path, file_format, pattern |
Tool Categories: database, data_warehouse
Tool Source: plugin
#Tool Usage Example
from daita import Agent
from daita.plugins import snowflake
# Create Snowflake plugin
db = snowflake(
account="xy12345",
warehouse="ANALYTICS_WH",
database="ECOMMERCE"
)
# Create agent
agent = Agent(
name="E-commerce Analyst",
model="gpt-4o-mini",
prompt="You are an e-commerce analyst. Help users analyze sales, inventory, and customer data."
)
# Give agent access to Snowflake tools
agent.add_plugin(db)
await agent.start()
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze our e-commerce performance:
1. What are the available data tables?
2. Calculate total revenue by product category for Q1 2024
3. Identify top 10 customers by purchase amount
4. Show sales trend over the last 90 days
""")
# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - describe_table("orders") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns comprehensive analysis in natural language
print(result)
await db.disconnect()#Error Handling
from daita.plugins import snowflake
try:
async with snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="SALES"
) as db:
results = await db.query("SELECT * FROM customers WHERE id = %s", [customer_id])
except RuntimeError as e:
if "snowflake-connector-python not installed" in str(e):
print("Install Snowflake connector: pip install snowflake-connector-python")
elif "authentication" in str(e).lower():
print(f"Authentication failed: {e}")
elif "warehouse" in str(e).lower():
print(f"Warehouse issue: {e}")
else:
print(f"Error: {e}")#Best Practices
Authentication:
- Local Development: Use password or
externalbrowser(SSO) for convenience - Production/Lambda: Always use key-pair authentication - password and externalbrowser won't work in headless environments
- Service Accounts: Create dedicated Snowflake users with key-pair auth for automated systems
- Credential Storage: Use environment variables, AWS Secrets Manager, or similar - never hardcode credentials
- Key Management: Store private keys securely, use encrypted keys with passphrases in production
Connection Management:
- Always use context managers (
async with) for automatic cleanup when not using agents - When using agents, call
await db.disconnect()at the end of execution - Set appropriate warehouse sizes for your workload
- Use connection pooling for high-frequency operations
Warehouse Optimization:
- Use smaller warehouses for light queries to reduce costs
- Switch to larger warehouses only for heavy analytics
- Suspend warehouses when not in use (Snowflake auto-suspend helps)
- Monitor query performance and adjust warehouse size accordingly
Query Performance:
- Use parameterized queries to prevent SQL injection
- Leverage Snowflake's clustering keys for large tables
- Use result caching by running identical queries
- Limit result sets to avoid memory issues
Data Loading:
- Use stages for bulk data loading (more efficient than individual inserts)
- Prefer external stages (S3, Azure) for large datasets
- Use file patterns to load multiple files at once
- Leverage Snowflake's COPY INTO command via
load_from_stage()
Security:
- Use key-pair authentication for service accounts
- Grant minimal required privileges (read-only when possible)
- Use separate warehouses for different workloads
- Enable multi-factor authentication for user accounts
- Regularly rotate credentials
Cost Management:
- Monitor warehouse usage and credit consumption
- Use auto-suspend and auto-resume features
- Right-size warehouses for your workload
- Use query history to identify expensive queries
#Troubleshooting
| Issue | Solution |
|---|---|
snowflake-connector-python not installed | pip install snowflake-connector-python |
| Authentication failed (password) | Verify account, username, password are correct |
| Authentication failed (key-pair) | Check private key format (PEM/DER), verify public key assigned to user in Snowflake |
| External browser authentication SAML error | Your Snowflake account may not have SSO configured, use password or key-pair auth instead |
| External browser not opening | Ensure you're running locally (not in Lambda/Docker), check browser is accessible |
| Authentication required error | Provide at least one auth method: password, private_key_path, or authenticator |
| Warehouse not found | Check warehouse name, verify it exists with list_warehouses() |
| Database not found | Verify database name, check access permissions |
| Insufficient privileges | Grant required privileges to user/role |
| Query timeout | Use larger warehouse, optimize query, check for locks |
| Network connection issues | Check firewall, verify account region, test network connectivity |
| Private key format error | Ensure key is in PEM or DER format (PKCS#8), check passphrase |
| Lambda authentication fails | Use key-pair auth - password and externalbrowser don't work in headless environments |
#Next Steps
- PostgreSQL Plugin - For PostgreSQL databases
- MySQL Plugin - For MySQL databases
- Elasticsearch Plugin - For search and analytics
- S3 Plugin - For cloud storage integration
- Workflows - Use Snowflake in multi-agent workflows
- Plugin Overview - Learn about other plugins