Skip to main content

Snowflake Plugin

Async data warehouse operations with automatic connection management built on snowflake-connector-python.

Installation

pip install snowflake-connector-python

Quick Start

Direct Usage (Scripts)

from daita.plugins import snowflake

# Direct usage in scripts
async with snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS_DB",
user="analyst",
password="secure_password"
) as db:
results = await db.query("SELECT * FROM sales LIMIT 100")
print(results)
from daita import SubstrateAgent
from daita.plugins import snowflake

# Create plugin
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS_DB",
user="analyst",
password="secure_password"
)

# Create agent
agent = SubstrateAgent(
name="Data Warehouse Analyst",
model="gpt-4o-mini",
prompt="You are a data analyst. Help users query and analyze data warehouse data."
)

# Give agent access to Snowflake tools
agent.add_plugin(db)

await agent.start()
result = await agent.run("What were our top 10 products by revenue last quarter?")

Connection Parameters

snowflake(
account: Optional[str] = None,
warehouse: Optional[str] = None,
database: Optional[str] = None,
schema: str = "PUBLIC",
user: Optional[str] = None,
password: Optional[str] = None,
role: Optional[str] = None,
private_key_path: Optional[str] = None,
private_key_passphrase: Optional[str] = None,
authenticator: Optional[str] = None,
session_parameters: Optional[Dict] = None
)

Parameters

  • account (str): Snowflake account identifier (e.g., "xy12345" or "xy12345.us-east-1")
  • warehouse (str): Virtual warehouse name for compute resources
  • database (str): Database name to connect to
  • schema (str): Schema name (default: "PUBLIC")
  • user (str): Username for authentication
  • password (str): Password for password authentication
  • role (str): Role to use after connecting (optional)
  • private_key_path (str): Path to RSA private key file for key-pair authentication
  • private_key_passphrase (str): Passphrase for encrypted private key (optional)
  • authenticator (str): External authenticator (e.g., "externalbrowser", "oauth")
  • session_parameters (dict): Additional session parameters

Environment Variables

All parameters support environment variable fallback:

  • SNOWFLAKE_ACCOUNT
  • SNOWFLAKE_WAREHOUSE
  • SNOWFLAKE_DATABASE
  • SNOWFLAKE_SCHEMA
  • SNOWFLAKE_USER
  • SNOWFLAKE_PASSWORD
  • SNOWFLAKE_ROLE
  • SNOWFLAKE_PRIVATE_KEY_PATH
  • SNOWFLAKE_PRIVATE_KEY_PASSPHRASE
  • SNOWFLAKE_AUTHENTICATOR

Authentication Methods

The Snowflake plugin supports three authentication methods. Choose based on your deployment environment and security requirements.

Authentication Method Comparison

MethodUse CaseEnvironmentSecuritySetup Complexity
PasswordQuick testing, developmentLocal, stagingBasicLow
Key-PairProduction, Lambda, CI/CDProduction, cloudHighMedium
External BrowserEnterprise SSO accountsLocal onlyManaged by SSOLow

1. Password Authentication

Best for: Local development, testing, personal Snowflake accounts

from daita.plugins import snowflake

# Explicit credentials
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst",
password="secure_password"
)

# Or use environment variables (recommended)
import os
os.environ['SNOWFLAKE_ACCOUNT'] = 'xy12345'
os.environ['SNOWFLAKE_USER'] = 'analyst'
os.environ['SNOWFLAKE_PASSWORD'] = 'secure_password'

db = snowflake(
warehouse="COMPUTE_WH",
database="ANALYTICS"
)

Pros:

  • Simple setup
  • Works everywhere
  • Familiar authentication flow

Cons:

  • Less secure than key-pair
  • Passwords may need rotation
  • Not recommended for production

Best for: Production deployments, Lambda functions, CI/CD pipelines, service accounts

Why use this:

  • No password to manage or rotate
  • Works in headless environments (Lambda, Docker, Kubernetes)
  • More secure than password authentication
  • Supports encrypted private keys for additional security

Setup:

  1. Generate RSA key pair:
# Generate private key (unencrypted)
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

# Generate public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  1. Assign public key to Snowflake user:
ALTER USER service_account SET RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...';
  1. Use in code:
from daita.plugins import snowflake

# Unencrypted private key
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="service_account",
private_key_path="/path/to/rsa_key.p8"
)

# Or via environment variable
os.environ['SNOWFLAKE_PRIVATE_KEY_PATH'] = '/path/to/rsa_key.p8'

db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="service_account"
)

For encrypted keys:

db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="service_account",
private_key_path="/path/to/encrypted_rsa_key.p8",
private_key_passphrase="key_passphrase"
)

Pros:

  • Most secure authentication method
  • No password management needed
  • Works in all environments (including headless)
  • Industry standard for service accounts

Cons:

  • Slightly more complex setup
  • Need to manage private key securely

3. External Browser Authentication (SSO)

Best for: Enterprise developers with SSO-enabled Snowflake accounts, local development

Important: This method opens a browser window for authentication and only works locally. Cannot be used in Lambda, Docker, or any headless environment.

from daita.plugins import snowflake

# Browser-based SSO (e.g., Okta, Azure AD)
db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst@company.com",
authenticator="externalbrowser"
)

# Or via environment variable
os.environ['SNOWFLAKE_AUTHENTICATOR'] = 'externalbrowser'

db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst@company.com"
)

When you run this:

  1. A browser window opens automatically
  2. You authenticate via your company's SSO (Okta, Azure AD, etc.)
  3. After successful login, the connection is established

Pros:

  • Works with SSO-only Snowflake accounts
  • Leverages existing enterprise authentication
  • No password to manage locally
  • Supports MFA automatically

Cons:

  • Only works locally - cannot be used in production/Lambda
  • Requires browser access
  • Not suitable for automated scripts

Migration path for production: When deploying to production, switch to key-pair authentication:

  1. Local development: Use externalbrowser for easy SSO login
  2. Production/Lambda: Create service account with key-pair auth

OAuth Authentication

Best for: Advanced integrations with OAuth 2.0 providers

db = snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="ANALYTICS",
user="analyst",
authenticator="oauth",
session_parameters={"token": "oauth_access_token"}
)

Query Operations

SELECT queries - Use query() method, returns list of dictionaries:

from daita.plugins import snowflake

async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="SALES") as db:
# Simple query
customers = await db.query("SELECT id, name, total_spent FROM customers")
# Returns: [{"id": 1, "name": "Acme Corp", "total_spent": 50000.00}, ...]

# Parameterized query (use %s, %(name)s placeholders)
high_value = await db.query(
"SELECT * FROM customers WHERE total_spent > %s AND status = %s",
[10000, "active"]
)

# Named parameters
recent = await db.query(
"SELECT * FROM orders WHERE order_date >= %(start_date)s",
{"start_date": "2024-01-01"}
)

Parameter syntax: Snowflake uses %s for positional parameters and %(name)s for named parameters

Data Modification

INSERT, UPDATE, DELETE - Use execute() method, returns row count:

from daita.plugins import snowflake

async with snowflake(account="xy12345", warehouse="LOAD_WH", database="SALES") as db:
# Insert
await db.execute(
"INSERT INTO customers (name, email, status) VALUES (%s, %s, %s)",
["New Customer", "customer@example.com", "active"]
)

# Update
await db.execute(
"UPDATE customers SET last_purchase = CURRENT_TIMESTAMP() WHERE id = %s",
[customer_id]
)

# Delete
await db.execute(
"DELETE FROM customers WHERE status = %s AND days_inactive > %s",
["inactive", 365]
)

Warehouse Management

Snowflake virtual warehouses control compute resources and costs:

from daita.plugins import snowflake

async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
# List all warehouses
warehouses = await db.list_warehouses()
# Returns: [{"name": "COMPUTE_WH", "state": "STARTED", "size": "MEDIUM"}, ...]

# Switch to different warehouse
await db.switch_warehouse("ANALYTICS_WH")

# Use larger warehouse for heavy queries
await db.switch_warehouse("LARGE_WH")
results = await db.query("SELECT * FROM huge_fact_table")

# Switch back to save costs
await db.switch_warehouse("SMALL_WH")

Stage Operations

Snowflake stages are used for loading and unloading data:

from daita.plugins import snowflake

async with snowflake(account="xy12345", warehouse="LOAD_WH", database="RAW_DATA") as db:
# List available stages
stages = await db.list_stages()
# Returns: [{"name": "my_s3_stage", "url": "s3://bucket/path/", "type": "EXTERNAL"}, ...]

# Upload file to stage
await db.put_file(
local_path="/data/sales.csv",
stage_path="@my_stage/sales/",
overwrite=True
)

# Download file from stage
await db.get_file(
stage_path="@my_stage/sales/sales.csv",
local_path="/data/downloaded_sales.csv"
)

# Load data from stage into table
result = await db.load_from_stage(
table_name="sales",
stage_path="@my_stage/sales/",
file_format="CSV",
pattern=r".*\.csv"
)

# Create new stage
await db.create_stage(
stage_name="my_new_stage",
stage_type="internal"
)

# Create external stage (S3)
await db.create_stage(
stage_name="s3_stage",
stage_type="external",
url="s3://my-bucket/data/",
credentials={
"AWS_KEY_ID": "your_key",
"AWS_SECRET_KEY": "your_secret"
}
)

Schema Information

from daita.plugins import snowflake

async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
# List all tables
tables = await db.tables()
# Returns: ["customers", "orders", "products"]

# List schemas
schemas = await db.schemas()
# Returns: ["PUBLIC", "STAGING", "ANALYTICS"]

# Get table structure
columns = await db.describe("customers")
# Returns: [{"name": "id", "type": "NUMBER", "nullable": False}, ...]

Query History

from daita.plugins import snowflake

async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
# Get recent query history
history = await db.query_history(limit=10)
# Returns query details including execution time, rows, bytes scanned

# Filter by time range
from datetime import datetime, timedelta
start_time = datetime.now() - timedelta(hours=24)
history = await db.query_history(
start_time=start_time,
limit=100
)

Using with Agents

Direct Database Operations (Scripts)

For scripts that don't need agent capabilities:

from daita.plugins import snowflake

async with snowflake(
account="xy12345",
warehouse="ANALYTICS_WH",
database="SALES",
user="analyst",
password="password"
) as db:
# Analyze sales data
monthly_sales = await db.query("""
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(total_amount) as revenue
FROM orders
WHERE order_date >= DATEADD('year', -1, CURRENT_DATE())
GROUP BY month
ORDER BY month DESC
""")

print(f"Found {len(monthly_sales)} months of data")
for row in monthly_sales:
print(f"{row['month']}: ${row['revenue']:,.2f}")

Snowflake plugin exposes data warehouse operations as tools that agents can use autonomously:

from daita import SubstrateAgent
from daita.plugins import snowflake
import os

# Create Snowflake plugin (auto-loads from environment variables)
db = snowflake(
account=os.getenv("SNOWFLAKE_ACCOUNT"),
warehouse="ANALYTICS_WH",
database="SALES",
user=os.getenv("SNOWFLAKE_USER"),
private_key_path=os.getenv("SNOWFLAKE_PRIVATE_KEY_PATH")
)

# Create agent
agent = SubstrateAgent(
name="Data Warehouse Analyst",
model="gpt-4o-mini",
prompt="You are a data analyst specializing in Snowflake data warehouse analysis."
)

# Give agent access to Snowflake tools
agent.add_plugin(db)

await agent.start()

# Agent autonomously uses Snowflake tools to answer questions
result = await agent.run("Analyze our sales trends over the last 6 months")

# The agent will autonomously:
# 1. Use list_tables to explore available data
# 2. Use describe_table to understand schema
# 3. Use query_database to fetch sales data
# 4. Analyze trends and present insights

await db.disconnect()

Available Tools

The Snowflake plugin exposes these tools to LLM agents:

ToolDescriptionParameters
query_databaseExecute SELECT queriessql (required), params (optional array)
execute_sqlRun INSERT/UPDATE/DELETEsql (required), params (optional array)
list_tablesList all tables in databaseNone
list_schemasList all schemasNone
describe_tableGet table column infotable_name (required)
list_warehousesList available warehousesNone
switch_warehouseChange active warehousewarehouse_name (required)
get_query_historyRetrieve query historylimit (int), start_time (datetime)
list_stagesList data stagesNone
upload_to_stageUpload file to stagelocal_path, stage_path, overwrite (bool)
download_from_stageDownload file from stagestage_path, local_path
load_from_stageLoad stage data into tabletable_name, stage_path, file_format, pattern

Tool Categories: database, data_warehouse Tool Source: plugin

Tool Usage Example

from daita import SubstrateAgent
from daita.plugins import snowflake

# Create Snowflake plugin
db = snowflake(
account="xy12345",
warehouse="ANALYTICS_WH",
database="ECOMMERCE"
)

# Create agent
agent = SubstrateAgent(
name="E-commerce Analyst",
model="gpt-4o-mini",
prompt="You are an e-commerce analyst. Help users analyze sales, inventory, and customer data."
)

# Give agent access to Snowflake tools
agent.add_plugin(db)

await agent.start()

# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze our e-commerce performance:
1. What are the available data tables?
2. Calculate total revenue by product category for Q1 2024
3. Identify top 10 customers by purchase amount
4. Show sales trend over the last 90 days
""")

# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - describe_table("orders") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns comprehensive analysis in natural language

print(result)
await db.disconnect()

Error Handling

from daita.plugins import snowflake

try:
async with snowflake(
account="xy12345",
warehouse="COMPUTE_WH",
database="SALES"
) as db:
results = await db.query("SELECT * FROM customers WHERE id = %s", [customer_id])
except RuntimeError as e:
if "snowflake-connector-python not installed" in str(e):
print("Install Snowflake connector: pip install snowflake-connector-python")
elif "authentication" in str(e).lower():
print(f"Authentication failed: {e}")
elif "warehouse" in str(e).lower():
print(f"Warehouse issue: {e}")
else:
print(f"Error: {e}")

Best Practices

Authentication:

  • Local Development: Use password or externalbrowser (SSO) for convenience
  • Production/Lambda: Always use key-pair authentication - password and externalbrowser won't work in headless environments
  • Service Accounts: Create dedicated Snowflake users with key-pair auth for automated systems
  • Credential Storage: Use environment variables, AWS Secrets Manager, or similar - never hardcode credentials
  • Key Management: Store private keys securely, use encrypted keys with passphrases in production

Connection Management:

  • Always use context managers (async with) for automatic cleanup when not using agents
  • When using agents, call await db.disconnect() at the end of execution
  • Set appropriate warehouse sizes for your workload
  • Use connection pooling for high-frequency operations

Warehouse Optimization:

  • Use smaller warehouses for light queries to reduce costs
  • Switch to larger warehouses only for heavy analytics
  • Suspend warehouses when not in use (Snowflake auto-suspend helps)
  • Monitor query performance and adjust warehouse size accordingly

Query Performance:

  • Use parameterized queries to prevent SQL injection
  • Leverage Snowflake's clustering keys for large tables
  • Use result caching by running identical queries
  • Limit result sets to avoid memory issues

Data Loading:

  • Use stages for bulk data loading (more efficient than individual inserts)
  • Prefer external stages (S3, Azure) for large datasets
  • Use file patterns to load multiple files at once
  • Leverage Snowflake's COPY INTO command via load_from_stage()

Security:

  • Use key-pair authentication for service accounts
  • Grant minimal required privileges (read-only when possible)
  • Use separate warehouses for different workloads
  • Enable multi-factor authentication for user accounts
  • Regularly rotate credentials

Cost Management:

  • Monitor warehouse usage and credit consumption
  • Use auto-suspend and auto-resume features
  • Right-size warehouses for your workload
  • Use query history to identify expensive queries

Troubleshooting

IssueSolution
snowflake-connector-python not installedpip install snowflake-connector-python
Authentication failed (password)Verify account, username, password are correct
Authentication failed (key-pair)Check private key format (PEM/DER), verify public key assigned to user in Snowflake
External browser authentication SAML errorYour Snowflake account may not have SSO configured, use password or key-pair auth instead
External browser not openingEnsure you're running locally (not in Lambda/Docker), check browser is accessible
Authentication required errorProvide at least one auth method: password, private_key_path, or authenticator
Warehouse not foundCheck warehouse name, verify it exists with list_warehouses()
Database not foundVerify database name, check access permissions
Insufficient privilegesGrant required privileges to user/role
Query timeoutUse larger warehouse, optimize query, check for locks
Network connection issuesCheck firewall, verify account region, test network connectivity
Private key format errorEnsure key is in PEM or DER format (PKCS#8), check passphrase
Lambda authentication failsUse key-pair auth - password and externalbrowser don't work in headless environments

Next Steps