Remote Execution
Execute your deployed agents and workflows programmatically from Python code or the CLI. The remote execution system allows you to trigger agent executions from your applications, scripts, or automation workflows.
Overview
Daita provides two ways to execute deployed agents remotely:
- Python SDK -
DaitaClientfor programmatic execution from Python code - CLI Commands - Execute from the command line with
daita run
Both methods use the same underlying API and support:
- Synchronous and asynchronous execution
- Real-time status monitoring
- Execution history and logs
- Error handling and retries
Python SDK
Installation
The DaitaClient is included in the daita-agents package:
pip install daita-agents
Basic Usage
from daita import DaitaClient
# Initialize client with your API key
client = DaitaClient(api_key="your_daita_api_key")
# Execute an agent
result = client.execute_agent(
"data_processor",
data={"input": "process this data"}
)
# Check result
if result.is_success:
print(f"Agent output: {result.result}")
else:
print(f"Error: {result.error}")
Execute and Wait
# Execute and wait for completion (synchronous)
result = client.execute_agent(
"sentiment_analyzer",
data={"text": "This product is amazing!"},
wait=True # Wait for completion
)
print(f"Status: {result.status}")
print(f"Result: {result.result}")
print(f"Duration: {result.duration_seconds}s")
Execute Workflows
# Execute a workflow
result = client.execute_workflow(
"data_pipeline",
data={"source": "s3://bucket/data.csv"},
wait=True
)
if result.is_success:
print(f"Pipeline completed: {result.result}")
Check Execution Status
# Start execution without waiting
result = client.execute_agent("long_running_agent", data={...})
# Check status later
status = client.get_execution(result.execution_id)
print(f"Current status: {status.status}")
# Wait for completion when ready
final_result = client.wait_for_execution(
result.execution_id,
timeout=600 # 10 minutes
)
List Recent Executions
# Get recent executions
executions = client.list_executions(limit=10)
for execution in executions:
print(f"{execution.target_name}: {execution.status}")
# Filter executions
completed = client.list_executions(
status="completed",
target_type="agent",
limit=20
)
Get Latest Execution
# Get most recent execution for an agent
latest = client.get_latest_execution(agent_name="my_agent")
if latest:
print(f"Latest execution: {latest.status}")
print(f"Result: {latest.result}")
Cancel Execution
# Cancel a running execution
success = client.cancel_execution(execution_id)
if success:
print("Execution cancelled")
Advanced Python Usage
Async/Await Pattern
import asyncio
from daita import DaitaClient
async def main():
# Use async context manager
async with DaitaClient(api_key="your_key") as client:
# Execute asynchronously
result = await client.execute_agent_async(
"my_agent",
data={"input": "data"},
wait=True
)
print(f"Result: {result.result}")
# Run async function
asyncio.run(main())
Concurrent Executions
import asyncio
from daita import DaitaClient
async def process_batch():
async with DaitaClient(api_key="your_key") as client:
# Execute multiple agents concurrently
tasks = [
client.execute_agent_async("agent1", data={"id": 1}),
client.execute_agent_async("agent2", data={"id": 2}),
client.execute_agent_async("agent3", data={"id": 3})
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result.target_name}: {result.status}")
asyncio.run(process_batch())
Error Handling
from daita import DaitaClient, ExecutionError, AuthenticationError
client = DaitaClient(api_key="your_key")
try:
result = client.execute_agent("my_agent", data={...}, wait=True)
if result.is_success:
print(f"Success: {result.result}")
else:
print(f"Agent returned error: {result.error}")
except AuthenticationError:
print("Invalid API key")
except ExecutionError as e:
print(f"Execution failed: {e}")
Custom Configuration
from daita import DaitaClient
# Configure client with custom settings
client = DaitaClient(
api_key="your_key",
timeout=600, # Request timeout in seconds
max_retries=5, # Number of retries
retry_delay=2.0 # Base delay between retries
)
# Execute with custom environment
result = client.execute_agent(
"my_agent",
data={...},
environment="staging", # or "production"
wait=True
)
CLI Commands
Execute Agent
# Basic execution
daita run my_agent
# Execute with data from file
daita run my_agent --data input.json
# Execute with inline JSON data
daita run my_agent --data-json '{"input": "test data"}'
# Execute and follow progress
daita run my_agent --data input.json --follow
# Verbose output
daita run my_agent --data input.json --verbose
Execute Workflow
# Execute workflow
daita run data_pipeline --type workflow --data input.json
# With environment specification
daita run data_pipeline --type workflow --env production
Execute with Options
# Specify task for agent
daita run my_agent --task analyze --data input.json
# Set timeout
daita run long_agent --timeout 600 --data input.json
# Execute in staging
daita run my_agent --env staging --data input.json
View Execution History
# List recent executions
daita executions
# Limit number of results
daita executions --limit 20
# Filter by status
daita executions --status completed
# Filter by type
daita executions --type agent
# Filter by environment
daita executions --env production
View Execution Logs
# Get logs for specific execution
daita execution-logs exec_abc123
# Follow execution progress
daita execution-logs exec_abc123 --follow
ExecutionResult Object
The ExecutionResult object contains all information about an execution:
Properties
result = client.execute_agent("my_agent", data={...})
# Execution identifiers
result.execution_id # Unique execution ID
result.target_name # Agent/workflow name
result.target_type # "agent" or "workflow"
# Status and results
result.status # "queued", "running", "completed", "failed", "cancelled"
result.result # Execution output (dict)
result.error # Error message if failed
# Timing information
result.created_at # When execution was created
result.started_at # When execution started
result.completed_at # When execution completed
result.duration_ms # Duration in milliseconds
result.duration_seconds # Duration in seconds (property)
# Resource usage
result.memory_used_mb # Memory used in MB
result.cost_estimate # Estimated cost
# Monitoring
result.trace_id # Trace ID for debugging
result.dashboard_url # Link to dashboard (if available)
# Helper properties
result.is_complete # True if completed/failed/cancelled
result.is_success # True if completed successfully
result.is_running # True if queued/running
Example Usage
result = client.execute_agent("my_agent", data={...}, wait=True)
# Check status
if result.is_success:
print(f"✅ Success!")
print(f"Result: {result.result}")
print(f"Duration: {result.duration_seconds:.2f}s")
elif result.is_running:
print(f"⏳ Still running...")
else:
print(f"❌ Failed: {result.error}")
# Access specific result fields
if result.result:
output = result.result.get('output')
metadata = result.result.get('metadata')
Use Cases
Automated Data Processing
from daita import DaitaClient
import schedule
import time
client = DaitaClient(api_key="your_key")
def process_daily_data():
"""Run daily data processing."""
result = client.execute_workflow(
"daily_pipeline",
data={"date": time.strftime("%Y-%m-%d")},
wait=True
)
if result.is_success:
print(f"Daily processing completed: {result.result}")
else:
print(f"Processing failed: {result.error}")
# Send alert
# Schedule daily at 2 AM
schedule.every().day.at("02:00").do(process_daily_data)
while True:
schedule.run_pending()
time.sleep(60)
API Integration
from fastapi import FastAPI
from daita import DaitaClient
app = FastAPI()
client = DaitaClient(api_key="your_key")
@app.post("/analyze")
async def analyze_text(text: str):
"""API endpoint that uses Daita agent."""
result = client.execute_agent(
"sentiment_analyzer",
data={"text": text},
wait=True
)
if result.is_success:
return result.result
else:
return {"error": result.error}
Event-Driven Processing
from daita import DaitaClient
import boto3
client = DaitaClient(api_key="your_key")
s3 = boto3.client('s3')
def process_s3_upload(bucket, key):
"""Process file when uploaded to S3."""
# Download file
s3.download_file(bucket, key, '/tmp/file')
# Execute agent to process
with open('/tmp/file', 'r') as f:
data = f.read()
result = client.execute_agent(
"file_processor",
data={"content": data, "filename": key},
wait=True
)
if result.is_success:
print(f"Processed {key}: {result.result}")
else:
print(f"Failed to process {key}: {result.error}")
Batch Processing
from daita import DaitaClient
import pandas as pd
client = DaitaClient(api_key="your_key")
def batch_process(csv_file):
"""Process batch of items."""
df = pd.read_csv(csv_file)
results = []
for _, row in df.iterrows():
result = client.execute_agent(
"item_processor",
data=row.to_dict(),
wait=True
)
results.append({
"id": row['id'],
"status": result.status,
"output": result.result
})
# Save results
results_df = pd.DataFrame(results)
results_df.to_csv('results.csv', index=False)
Best Practices
1. Reuse Client Instances
# Good - reuse client
client = DaitaClient(api_key="your_key")
for item in items:
result = client.execute_agent("my_agent", data=item)
# Avoid - creating new client each time
for item in items:
client = DaitaClient(api_key="your_key") # Inefficient
result = client.execute_agent("my_agent", data=item)
2. Handle Errors Gracefully
from daita import DaitaClient, ExecutionError
import logging
client = DaitaClient(api_key="your_key")
try:
result = client.execute_agent("my_agent", data={...}, wait=True)
if not result.is_success:
logging.error(f"Agent failed: {result.error}")
# Handle failure
except ExecutionError as e:
logging.error(f"Execution error: {e}")
# Handle error
3. Use Appropriate Timeouts
# Short timeout for simple agents
result = client.execute_agent(
"quick_agent",
data={...},
wait=True
)
# Longer timeout for complex workflows
result = client.execute_workflow(
"long_pipeline",
data={...},
wait=True
)
client.wait_for_execution(result.execution_id, timeout=1800) # 30 min
4. Monitor Execution Status
# Start long-running execution
result = client.execute_agent("long_agent", data={...})
# Check periodically
import time
while not result.is_complete:
time.sleep(10)
result = client.get_execution(result.execution_id)
print(f"Status: {result.status}")
if result.is_complete:
break
5. Clean Up Resources
# Use context manager for auto cleanup
async with DaitaClient(api_key="your_key") as client:
result = await client.execute_agent_async("my_agent", data={...})
# Client automatically closed
# Or manually close
client = DaitaClient(api_key="your_key")
try:
result = client.execute_agent("my_agent", data={...})
finally:
client.close()
Environment Variables
The execution system uses these environment variables:
# Required for all operations
export DAITA_API_KEY="your_api_key"
# Optional - customize API endpoint
export DAITA_API_ENDPOINT="https://api.daita-tech.io"
Troubleshooting
Authentication Errors
# Error: Invalid API key
# Solution: Check DAITA_API_KEY environment variable
import os
print(os.getenv('DAITA_API_KEY')) # Should print your key
Agent Not Found
# Error: No deployment found for agent
# Solution: Ensure agent is deployed
daita status # Check deployed agents
daita push production # Deploy if needed
Execution Timeout
# Increase timeout for long-running executions
result = client.wait_for_execution(
execution_id,
timeout=1800 # 30 minutes
)
Connection Errors
# Configure retries for unreliable networks
client = DaitaClient(
api_key="your_key",
max_retries=5,
retry_delay=2.0
)
Related Documentation
- daita push - Deploy agents before executing them
- daita status - Check deployed agents
- daita logs - View execution logs
- Agents - Learn about creating agents
- Workflows - Learn about creating workflows