Redis Messaging Plugin
Distributed messaging and caching using Redis Streams and Pub/Sub. Built on redis.asyncio.
Installation
pip install redis
Quick Start
from daita.plugins import redis_messaging
from daita.core.workflow import Workflow
# Create Redis messaging plugin
redis_msg = redis_messaging(url="redis://localhost:6379")
# Use with workflow for distributed agent communication
workflow = Workflow("Distributed Pipeline", messaging_plugin=redis_msg)
workflow.connect("agent_a", "data_channel", "agent_b")
Connection Parameters
redis_messaging(
url: str = "redis://localhost:6379",
max_connections: int = 10,
message_ttl: int = 86400,
max_stream_length: int = 10000,
connection_timeout: int = 30,
**kwargs
)
Parameters
url(str): Redis connection URL (default: "redis://localhost:6379")max_connections(int): Maximum connections in pool (default: 10)message_ttl(int): Message TTL in seconds (default: 86400 / 24 hours)max_stream_length(int): Maximum messages per stream (default: 10000)connection_timeout(int): Connection timeout in seconds (default: 30)**kwargs: Additional Redis parameters
Connection Methods
# Local Redis
redis_msg = redis_messaging(url="redis://localhost:6379")
# Remote Redis with authentication
redis_msg = redis_messaging(url="redis://:password@redis-server:6379")
# Redis Cluster
redis_msg = redis_messaging(
url="redis://redis-cluster:6379",
max_connections=20
)
# Production configuration
redis_msg = redis_messaging(
url="redis://redis-prod:6379",
max_connections=50,
message_ttl=7*24*3600, # 7 days
max_stream_length=50000
)
Publishing Messages
from daita.plugins import redis_messaging
async def main():
redis_msg = redis_messaging()
await redis_msg.start()
# Publish message to channel
message_id = await redis_msg.publish(
channel="data_pipeline",
message={"user_id": 123, "action": "processed"},
publisher="agent_a"
)
print(f"Published message: {message_id}")
await redis_msg.stop()
Subscribing to Messages
from daita.plugins import redis_messaging
async def message_handler(message_data):
print(f"Received: {message_data}")
# Process message...
async def main():
redis_msg = redis_messaging()
await redis_msg.start()
# Subscribe to channel
await redis_msg.subscribe("data_pipeline", message_handler)
# Keep running to receive messages
await asyncio.sleep(60)
await redis_msg.stop()
Retrieving Messages
from daita.plugins import redis_messaging
async def main():
redis_msg = redis_messaging()
await redis_msg.start()
# Get latest message
latest = await redis_msg.get_latest("data_pipeline", count=1)
# Get last 10 messages (newest first)
recent = await redis_msg.get_latest("data_pipeline", count=10)
for msg in recent:
print(msg)
await redis_msg.stop()
Channel Management
from daita.plugins import redis_messaging
async def main():
redis_msg = redis_messaging()
await redis_msg.start()
# Clear all messages from channel
cleared = await redis_msg.clear_channel("data_pipeline")
print(f"Channel cleared: {cleared}")
await redis_msg.stop()
Health Monitoring
from daita.plugins import redis_messaging
async def main():
redis_msg = redis_messaging()
await redis_msg.start()
# Check Redis connection health
health = await redis_msg.health_check()
print(f"Status: {health['status']}")
print(f"Ping time: {health.get('ping_time_ms')}ms")
print(f"Redis version: {health.get('redis_version')}")
# Get statistics
stats = await redis_msg.get_stats()
print(f"Active channels: {stats['pubsub_channels']}")
print(f"Memory used: {stats['used_memory_human']}")
await redis_msg.stop()
Using with Workflows
The Redis messaging plugin is designed for distributed agent communication in workflows, not for direct LLM tool use. It enables agents running in different processes or machines to communicate via Redis Streams and Pub/Sub.
Distributed Workflow Example
from daita.core.workflow import Workflow
from daita.agents import SubstrateAgent
from daita.plugins import redis_messaging
# Create agents
agent_a = SubstrateAgent(name="Producer")
agent_b = SubstrateAgent(name="Consumer")
# Create workflow with Redis messaging
redis_msg = redis_messaging(url="redis://localhost:6379")
workflow = Workflow("Distributed Pipeline", messaging_plugin=redis_msg)
# Connect agents via Redis channels
workflow.connect("Producer", "processed_data", "Consumer")
# Add agents to workflow
workflow.add_agent(agent_a)
workflow.add_agent(agent_b)
# Run workflow - messages now distributed via Redis
await workflow.run()
LLM Tool Integration
Note: The Redis messaging plugin does not expose LLM-usable tools. This plugin is specifically designed for:
- Inter-agent communication in distributed workflows
- Message persistence and delivery between agents
- Pub/sub messaging for real-time agent coordination
For LLM-accessible tools, use other plugins like:
- Database plugins (PostgreSQL, MySQL, MongoDB) for data operations
- REST plugin for API calls
- S3 plugin for storage operations
- Slack plugin for notifications
The Redis plugin operates at the infrastructure layer, enabling agents to communicate across processes and machines, rather than providing tools for LLM agents to use directly.
Context Manager Usage
from daita.plugins import redis_messaging
async def main():
async with redis_messaging(url="redis://localhost:6379") as redis_msg:
# Publish messages
await redis_msg.publish("events", {"type": "user_login"})
# Get latest
messages = await redis_msg.get_latest("events", count=5)
# Automatically closed after context
Error Handling
from daita.plugins import redis_messaging
try:
async with redis_messaging(url="redis://localhost:6379") as redis_msg:
await redis_msg.publish("channel", {"data": "test"})
except ImportError as e:
if "redis" in str(e):
print("Install redis: pip install redis")
except Exception as e:
print(f"Redis error: {e}")
Best Practices
Connection Management:
- Use context managers (
async with) for automatic cleanup - Configure pool size based on expected load
- Set appropriate message TTL to prevent storage bloat
- Set max stream length to limit memory usage
Performance:
- Use Pub/Sub for real-time messaging between running agents
- Use Streams for message persistence and recovery
- Batch publish operations when possible
- Monitor Redis memory usage regularly
Reliability:
- Set message TTL to automatically clean up old messages
- Configure max stream length to prevent unbounded growth
- Use health checks to monitor Redis connectivity
- Handle subscriber callback errors gracefully
Security:
- Use Redis authentication in production (password in URL)
- Enable TLS for remote connections
- Restrict Redis port access with firewall rules
- Use separate Redis instances for different environments
Features
Redis Streams:
- Persistent message storage with ordering guarantees
- Retrieve historical messages with
get_latest() - Automatic TTL-based cleanup
- Max stream length to prevent unbounded growth
Redis Pub/Sub:
- Real-time message delivery to active subscribers
- Low latency notification system
- Supports multiple subscribers per channel
- Automatic reconnection handling
Workflow Integration:
- Drop-in replacement for in-memory RelayManager
- Enables distributed agent communication
- Same API as local messaging
- Transparent to agent implementations
Troubleshooting
| Issue | Solution |
|---|---|
redis not installed | pip install redis |
| Connection refused | Check Redis is running, verify host/port |
| Authentication failed | Verify password in connection URL |
| High memory usage | Reduce message_ttl or max_stream_length |
| Messages not delivered | Check subscriber callbacks, verify channel names |
| Slow performance | Increase max_connections, check Redis server load |
Common Patterns
Distributed workflow:
# Enable distributed processing across multiple machines
redis_msg = redis_messaging(url="redis://redis-cluster:6379")
workflow = Workflow("Distributed", messaging_plugin=redis_msg)
workflow.connect("agent_a", "channel", "agent_b")
Message replay:
# Retrieve and reprocess historical messages
async with redis_messaging() as redis_msg:
old_messages = await redis_msg.get_latest("events", count=100)
for msg in old_messages:
await process_message(msg)
Health monitoring:
# Monitor Redis health in production
async with redis_messaging() as redis_msg:
health = await redis_msg.health_check()
if health['status'] != 'healthy':
alert_operations(health)
Next Steps
- Plugin Overview - All available plugins
- PostgreSQL Plugin - Relational database operations
- MongoDB Plugin - Document database operations