Snowflake Plugin

Async data warehouse operations with automatic connection management built on `snowflake-connector-python`.

#Installation

bash
pip install snowflake-connector-python

#Quick Start

python
from daita import Agent
from daita.plugins import snowflake
 
# Create plugin
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS_DB",
    user="analyst",
    password="secure_password"
)
 
# Create agent
agent = Agent(
    name="Data Warehouse Analyst",
    model="gpt-4o-mini",
    prompt="You are a data analyst. Help users query and analyze data warehouse data."
)
 
# Give agent access to Snowflake tools
agent.add_plugin(db)
 
await agent.start()
result = await agent.run("What were our top 10 products by revenue last quarter?")

#Direct Usage

The plugin can be used directly without agents for programmatic access. For comprehensive Snowflake documentation, see the official Snowflake docs. The main value of this plugin is agent integration - enabling LLMs to autonomously query and analyze data warehouse data.

#Connection Parameters

python
snowflake(
    account: Optional[str] = None,
    warehouse: Optional[str] = None,
    database: Optional[str] = None,
    schema: str = "PUBLIC",
    user: Optional[str] = None,
    password: Optional[str] = None,
    role: Optional[str] = None,
    private_key_path: Optional[str] = None,
    private_key_passphrase: Optional[str] = None,
    authenticator: Optional[str] = None,
    session_parameters: Optional[Dict] = None
)

#Parameters

  • account (str): Snowflake account identifier (e.g., "xy12345" or "xy12345.us-east-1")
  • warehouse (str): Virtual warehouse name for compute resources
  • database (str): Database name to connect to
  • schema (str): Schema name (default: "PUBLIC")
  • user (str): Username for authentication
  • password (str): Password for password authentication
  • role (str): Role to use after connecting (optional)
  • private_key_path (str): Path to RSA private key file for key-pair authentication
  • private_key_passphrase (str): Passphrase for encrypted private key (optional)
  • authenticator (str): External authenticator (e.g., "externalbrowser", "oauth")
  • session_parameters (dict): Additional session parameters

#Environment Variables

All parameters support environment variable fallback:

  • SNOWFLAKE_ACCOUNT
  • SNOWFLAKE_WAREHOUSE
  • SNOWFLAKE_DATABASE
  • SNOWFLAKE_SCHEMA
  • SNOWFLAKE_USER
  • SNOWFLAKE_PASSWORD
  • SNOWFLAKE_ROLE
  • SNOWFLAKE_PRIVATE_KEY_PATH
  • SNOWFLAKE_PRIVATE_KEY_PASSPHRASE
  • SNOWFLAKE_AUTHENTICATOR

#Authentication Methods

The Snowflake plugin supports three authentication methods. Choose based on your deployment environment and security requirements.

#Authentication Method Comparison

MethodUse CaseEnvironmentSecuritySetup Complexity
PasswordQuick testing, developmentLocal, stagingBasicLow
Key-PairProduction, Lambda, CI/CDProduction, cloudHighMedium
External BrowserEnterprise SSO accountsLocal onlyManaged by SSOLow

#1. Password Authentication

Best for: Local development, testing, personal Snowflake accounts

python
from daita.plugins import snowflake
 
# Explicit credentials
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="analyst",
    password="secure_password"
)
 
# Or use environment variables (recommended)
import os
os.environ['SNOWFLAKE_ACCOUNT'] = 'xy12345'
os.environ['SNOWFLAKE_USER'] = 'analyst'
os.environ['SNOWFLAKE_PASSWORD'] = 'secure_password'
 
db = snowflake(
    warehouse="COMPUTE_WH",
    database="ANALYTICS"
)

Pros:

  • Simple setup
  • Works everywhere
  • Familiar authentication flow

Cons:

  • Less secure than key-pair
  • Passwords may need rotation
  • Not recommended for production

Best for: Production deployments, Lambda functions, CI/CD pipelines, service accounts

Why use this:

  • No password to manage or rotate
  • Works in headless environments (Lambda, Docker, Kubernetes)
  • More secure than password authentication
  • Supports encrypted private keys for additional security

Setup:

  1. Generate RSA key pair:
bash
# Generate private key (unencrypted)
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
 
# Generate public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  1. Assign public key to Snowflake user:
sql
ALTER USER service_account SET RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...';
  1. Use in code:
python
from daita.plugins import snowflake
 
# Unencrypted private key
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="service_account",
    private_key_path="/path/to/rsa_key.p8"
)
 
# Or via environment variable
os.environ['SNOWFLAKE_PRIVATE_KEY_PATH'] = '/path/to/rsa_key.p8'
 
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="service_account"
)

For encrypted keys:

python
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="service_account",
    private_key_path="/path/to/encrypted_rsa_key.p8",
    private_key_passphrase="key_passphrase"
)

Pros:

  • Most secure authentication method
  • No password management needed
  • Works in all environments (including headless)
  • Industry standard for service accounts

Cons:

  • Slightly more complex setup
  • Need to manage private key securely

#3. External Browser Authentication (SSO)

Best for: Enterprise developers with SSO-enabled Snowflake accounts, local development

Important: This method opens a browser window for authentication and only works locally. Cannot be used in Lambda, Docker, or any headless environment.

python
from daita.plugins import snowflake
 
# Browser-based SSO (e.g., Okta, Azure AD)
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="analyst@company.com",
    authenticator="externalbrowser"
)
 
# Or via environment variable
os.environ['SNOWFLAKE_AUTHENTICATOR'] = 'externalbrowser'
 
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="analyst@company.com"
)

When you run this:

  1. A browser window opens automatically
  2. You authenticate via your company's SSO (Okta, Azure AD, etc.)
  3. After successful login, the connection is established

Pros:

  • Works with SSO-only Snowflake accounts
  • Leverages existing enterprise authentication
  • No password to manage locally
  • Supports MFA automatically

Cons:

  • Only works locally - cannot be used in production/Lambda
  • Requires browser access
  • Not suitable for automated scripts

Migration path for production: When deploying to production, switch to key-pair authentication:

  1. Local development: Use externalbrowser for easy SSO login
  2. Production/Lambda: Create service account with key-pair auth

#OAuth Authentication

Best for: Advanced integrations with OAuth 2.0 providers

python
db = snowflake(
    account="xy12345",
    warehouse="COMPUTE_WH",
    database="ANALYTICS",
    user="analyst",
    authenticator="oauth",
    session_parameters={"token": "oauth_access_token"}
)

#Query Operations

SELECT queries - Use query() method, returns list of dictionaries:

python
from daita.plugins import snowflake
 
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="SALES") as db:
    # Simple query
    customers = await db.query("SELECT id, name, total_spent FROM customers")
    # Returns: [{"id": 1, "name": "Acme Corp", "total_spent": 50000.00}, ...]
 
    # Parameterized query (use %s, %(name)s placeholders)
    high_value = await db.query(
        "SELECT * FROM customers WHERE total_spent > %s AND status = %s",
        [10000, "active"]
    )
 
    # Named parameters
    recent = await db.query(
        "SELECT * FROM orders WHERE order_date >= %(start_date)s",
        {"start_date": "2024-01-01"}
    )

Parameter syntax: Snowflake uses %s for positional parameters and %(name)s for named parameters

#Data Modification

INSERT, UPDATE, DELETE - Use execute() method, returns row count:

python
from daita.plugins import snowflake
 
async with snowflake(account="xy12345", warehouse="LOAD_WH", database="SALES") as db:
    # Insert
    await db.execute(
        "INSERT INTO customers (name, email, status) VALUES (%s, %s, %s)",
        ["New Customer", "customer@example.com", "active"]
    )
 
    # Update
    await db.execute(
        "UPDATE customers SET last_purchase = CURRENT_TIMESTAMP() WHERE id = %s",
        [customer_id]
    )
 
    # Delete
    await db.execute(
        "DELETE FROM customers WHERE status = %s AND days_inactive > %s",
        ["inactive", 365]
    )

#Warehouse Management

Snowflake virtual warehouses control compute resources and costs:

python
from daita.plugins import snowflake
 
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
    # List all warehouses
    warehouses = await db.list_warehouses()
    # Returns: [{"name": "COMPUTE_WH", "state": "STARTED", "size": "MEDIUM"}, ...]
 
    # Switch to different warehouse
    await db.switch_warehouse("ANALYTICS_WH")
 
    # Use larger warehouse for heavy queries
    await db.switch_warehouse("LARGE_WH")
    results = await db.query("SELECT * FROM huge_fact_table")
 
    # Switch back to save costs
    await db.switch_warehouse("SMALL_WH")

#Stage Operations

Snowflake stages are used for loading and unloading data:

python
from daita.plugins import snowflake
 
async with snowflake(account="xy12345", warehouse="LOAD_WH", database="RAW_DATA") as db:
    # List available stages
    stages = await db.list_stages()
    # Returns: [{"name": "my_s3_stage", "url": "s3://bucket/path/", "type": "EXTERNAL"}, ...]
 
    # Upload file to stage
    await db.put_file(
        local_path="/data/sales.csv",
        stage_path="@my_stage/sales/",
        overwrite=True
    )
 
    # Download file from stage
    await db.get_file(
        stage_path="@my_stage/sales/sales.csv",
        local_path="/data/downloaded_sales.csv"
    )
 
    # Load data from stage into table
    result = await db.load_from_stage(
        table_name="sales",
        stage_path="@my_stage/sales/",
        file_format="CSV",
        pattern=r".*\.csv"
    )
 
    # Create new stage
    await db.create_stage(
        stage_name="my_new_stage",
        stage_type="internal"
    )
 
    # Create external stage (S3)
    await db.create_stage(
        stage_name="s3_stage",
        stage_type="external",
        url="s3://my-bucket/data/",
        credentials={
            "AWS_KEY_ID": "your_key",
            "AWS_SECRET_KEY": "your_secret"
        }
    )

#Schema Information

python
from daita.plugins import snowflake
 
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
    # List all tables
    tables = await db.tables()
    # Returns: ["customers", "orders", "products"]
 
    # List schemas
    schemas = await db.schemas()
    # Returns: ["PUBLIC", "STAGING", "ANALYTICS"]
 
    # Get table structure
    columns = await db.describe("customers")
    # Returns: [{"name": "id", "type": "NUMBER", "nullable": False}, ...]

#Query History

python
from daita.plugins import snowflake
 
async with snowflake(account="xy12345", warehouse="COMPUTE_WH", database="ANALYTICS") as db:
    # Get recent query history
    history = await db.query_history(limit=10)
    # Returns query details including execution time, rows, bytes scanned
 
    # Filter by time range
    from datetime import datetime, timedelta
    start_time = datetime.now() - timedelta(hours=24)
    history = await db.query_history(
        start_time=start_time,
        limit=100
    )

#Using with Agents

#Direct Database Operations (Scripts)

For scripts that don't need agent capabilities:

python
from daita.plugins import snowflake
 
async with snowflake(
    account="xy12345",
    warehouse="ANALYTICS_WH",
    database="SALES",
    user="analyst",
    password="password"
) as db:
    # Analyze sales data
    monthly_sales = await db.query("""
        SELECT
            DATE_TRUNC('month', order_date) as month,
            SUM(total_amount) as revenue
        FROM orders
        WHERE order_date >= DATEADD('year', -1, CURRENT_DATE())
        GROUP BY month
        ORDER BY month DESC
    """)
 
    print(f"Found {len(monthly_sales)} months of data")
    for row in monthly_sales:
        print(f"{row['month']}: ${row['revenue']:,.2f}")

Snowflake plugin exposes data warehouse operations as tools that agents can use autonomously:

python
from daita import Agent
from daita.plugins import snowflake
import os
 
# Create Snowflake plugin (auto-loads from environment variables)
db = snowflake(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse="ANALYTICS_WH",
    database="SALES",
    user=os.getenv("SNOWFLAKE_USER"),
    private_key_path=os.getenv("SNOWFLAKE_PRIVATE_KEY_PATH")
)
 
# Create agent
agent = Agent(
    name="Data Warehouse Analyst",
    model="gpt-4o-mini",
    prompt="You are a data analyst specializing in Snowflake data warehouse analysis."
)
 
# Give agent access to Snowflake tools
agent.add_plugin(db)
 
await agent.start()
 
# Agent autonomously uses Snowflake tools to answer questions
result = await agent.run("Analyze our sales trends over the last 6 months")
 
# The agent will autonomously:
# 1. Use list_tables to explore available data
# 2. Use describe_table to understand schema
# 3. Use query_database to fetch sales data
# 4. Analyze trends and present insights
 
await db.disconnect()

#Available Tools

The Snowflake plugin exposes these tools to LLM agents:

ToolDescriptionParameters
query_databaseExecute SELECT queriessql (required), params (optional array)
execute_sqlRun INSERT/UPDATE/DELETEsql (required), params (optional array)
list_tablesList all tables in databaseNone
list_schemasList all schemasNone
describe_tableGet table column infotable_name (required)
list_warehousesList available warehousesNone
switch_warehouseChange active warehousewarehouse_name (required)
get_query_historyRetrieve query historylimit (int), start_time (datetime)
list_stagesList data stagesNone
upload_to_stageUpload file to stagelocal_path, stage_path, overwrite (bool)
download_from_stageDownload file from stagestage_path, local_path
load_from_stageLoad stage data into tabletable_name, stage_path, file_format, pattern

Tool Categories: database, data_warehouse Tool Source: plugin

#Tool Usage Example

python
from daita import Agent
from daita.plugins import snowflake
 
# Create Snowflake plugin
db = snowflake(
    account="xy12345",
    warehouse="ANALYTICS_WH",
    database="ECOMMERCE"
)
 
# Create agent
agent = Agent(
    name="E-commerce Analyst",
    model="gpt-4o-mini",
    prompt="You are an e-commerce analyst. Help users analyze sales, inventory, and customer data."
)
 
# Give agent access to Snowflake tools
agent.add_plugin(db)
 
await agent.start()
 
# Natural language query - agent uses tools autonomously
result = await agent.run("""
Analyze our e-commerce performance:
1. What are the available data tables?
2. Calculate total revenue by product category for Q1 2024
3. Identify top 10 customers by purchase amount
4. Show sales trend over the last 90 days
""")
 
# Agent orchestrates tool calls autonomously:
# - list_tables() to discover schema
# - describe_table("orders") to understand structure
# - query_database(...) to fetch aggregated data
# - Returns comprehensive analysis in natural language
 
print(result)
await db.disconnect()

#Error Handling

python
from daita.plugins import snowflake
 
try:
    async with snowflake(
        account="xy12345",
        warehouse="COMPUTE_WH",
        database="SALES"
    ) as db:
        results = await db.query("SELECT * FROM customers WHERE id = %s", [customer_id])
except RuntimeError as e:
    if "snowflake-connector-python not installed" in str(e):
        print("Install Snowflake connector: pip install snowflake-connector-python")
    elif "authentication" in str(e).lower():
        print(f"Authentication failed: {e}")
    elif "warehouse" in str(e).lower():
        print(f"Warehouse issue: {e}")
    else:
        print(f"Error: {e}")

#Best Practices

Authentication:

  • Local Development: Use password or externalbrowser (SSO) for convenience
  • Production/Lambda: Always use key-pair authentication - password and externalbrowser won't work in headless environments
  • Service Accounts: Create dedicated Snowflake users with key-pair auth for automated systems
  • Credential Storage: Use environment variables, AWS Secrets Manager, or similar - never hardcode credentials
  • Key Management: Store private keys securely, use encrypted keys with passphrases in production

Connection Management:

  • Always use context managers (async with) for automatic cleanup when not using agents
  • When using agents, call await db.disconnect() at the end of execution
  • Set appropriate warehouse sizes for your workload
  • Use connection pooling for high-frequency operations

Warehouse Optimization:

  • Use smaller warehouses for light queries to reduce costs
  • Switch to larger warehouses only for heavy analytics
  • Suspend warehouses when not in use (Snowflake auto-suspend helps)
  • Monitor query performance and adjust warehouse size accordingly

Query Performance:

  • Use parameterized queries to prevent SQL injection
  • Leverage Snowflake's clustering keys for large tables
  • Use result caching by running identical queries
  • Limit result sets to avoid memory issues

Data Loading:

  • Use stages for bulk data loading (more efficient than individual inserts)
  • Prefer external stages (S3, Azure) for large datasets
  • Use file patterns to load multiple files at once
  • Leverage Snowflake's COPY INTO command via load_from_stage()

Security:

  • Use key-pair authentication for service accounts
  • Grant minimal required privileges (read-only when possible)
  • Use separate warehouses for different workloads
  • Enable multi-factor authentication for user accounts
  • Regularly rotate credentials

Cost Management:

  • Monitor warehouse usage and credit consumption
  • Use auto-suspend and auto-resume features
  • Right-size warehouses for your workload
  • Use query history to identify expensive queries

#Troubleshooting

IssueSolution
snowflake-connector-python not installedpip install snowflake-connector-python
Authentication failed (password)Verify account, username, password are correct
Authentication failed (key-pair)Check private key format (PEM/DER), verify public key assigned to user in Snowflake
External browser authentication SAML errorYour Snowflake account may not have SSO configured, use password or key-pair auth instead
External browser not openingEnsure you're running locally (not in Lambda/Docker), check browser is accessible
Authentication required errorProvide at least one auth method: password, private_key_path, or authenticator
Warehouse not foundCheck warehouse name, verify it exists with list_warehouses()
Database not foundVerify database name, check access permissions
Insufficient privilegesGrant required privileges to user/role
Query timeoutUse larger warehouse, optimize query, check for locks
Network connection issuesCheck firewall, verify account region, test network connectivity
Private key format errorEnsure key is in PEM or DER format (PKCS#8), check passphrase
Lambda authentication failsUse key-pair auth - password and externalbrowser don't work in headless environments

#Next Steps