Skip to main content

Redis Messaging Plugin

Distributed messaging and caching using Redis Streams and Pub/Sub. Built on redis.asyncio.

Installation

pip install redis

Quick Start

from daita.plugins import redis_messaging
from daita.core.workflow import Workflow

# Create Redis messaging plugin
redis_msg = redis_messaging(url="redis://localhost:6379")

# Use with workflow for distributed agent communication
workflow = Workflow("Distributed Pipeline", messaging_plugin=redis_msg)
workflow.connect("agent_a", "data_channel", "agent_b")

Connection Parameters

redis_messaging(
url: str = "redis://localhost:6379",
max_connections: int = 10,
message_ttl: int = 86400,
max_stream_length: int = 10000,
connection_timeout: int = 30,
**kwargs
)

Parameters

  • url (str): Redis connection URL (default: "redis://localhost:6379")
  • max_connections (int): Maximum connections in pool (default: 10)
  • message_ttl (int): Message TTL in seconds (default: 86400 / 24 hours)
  • max_stream_length (int): Maximum messages per stream (default: 10000)
  • connection_timeout (int): Connection timeout in seconds (default: 30)
  • **kwargs: Additional Redis parameters

Connection Methods

# Local Redis
redis_msg = redis_messaging(url="redis://localhost:6379")

# Remote Redis with authentication
redis_msg = redis_messaging(url="redis://:password@redis-server:6379")

# Redis Cluster
redis_msg = redis_messaging(
url="redis://redis-cluster:6379",
max_connections=20
)

# Production configuration
redis_msg = redis_messaging(
url="redis://redis-prod:6379",
max_connections=50,
message_ttl=7*24*3600, # 7 days
max_stream_length=50000
)

Publishing Messages

from daita.plugins import redis_messaging

async def main():
redis_msg = redis_messaging()
await redis_msg.start()

# Publish message to channel
message_id = await redis_msg.publish(
channel="data_pipeline",
message={"user_id": 123, "action": "processed"},
publisher="agent_a"
)

print(f"Published message: {message_id}")

await redis_msg.stop()

Subscribing to Messages

from daita.plugins import redis_messaging

async def message_handler(message_data):
print(f"Received: {message_data}")
# Process message...

async def main():
redis_msg = redis_messaging()
await redis_msg.start()

# Subscribe to channel
await redis_msg.subscribe("data_pipeline", message_handler)

# Keep running to receive messages
await asyncio.sleep(60)

await redis_msg.stop()

Retrieving Messages

from daita.plugins import redis_messaging

async def main():
redis_msg = redis_messaging()
await redis_msg.start()

# Get latest message
latest = await redis_msg.get_latest("data_pipeline", count=1)

# Get last 10 messages (newest first)
recent = await redis_msg.get_latest("data_pipeline", count=10)

for msg in recent:
print(msg)

await redis_msg.stop()

Channel Management

from daita.plugins import redis_messaging

async def main():
redis_msg = redis_messaging()
await redis_msg.start()

# Clear all messages from channel
cleared = await redis_msg.clear_channel("data_pipeline")
print(f"Channel cleared: {cleared}")

await redis_msg.stop()

Health Monitoring

from daita.plugins import redis_messaging

async def main():
redis_msg = redis_messaging()
await redis_msg.start()

# Check Redis connection health
health = await redis_msg.health_check()
print(f"Status: {health['status']}")
print(f"Ping time: {health.get('ping_time_ms')}ms")
print(f"Redis version: {health.get('redis_version')}")

# Get statistics
stats = await redis_msg.get_stats()
print(f"Active channels: {stats['pubsub_channels']}")
print(f"Memory used: {stats['used_memory_human']}")

await redis_msg.stop()

Using with Workflows

The Redis messaging plugin is designed for distributed agent communication in workflows, not for direct LLM tool use. It enables agents running in different processes or machines to communicate via Redis Streams and Pub/Sub.

Distributed Workflow Example

from daita.core.workflow import Workflow
from daita.agents import SubstrateAgent
from daita.plugins import redis_messaging

# Create agents
agent_a = SubstrateAgent(name="Producer")
agent_b = SubstrateAgent(name="Consumer")

# Create workflow with Redis messaging
redis_msg = redis_messaging(url="redis://localhost:6379")
workflow = Workflow("Distributed Pipeline", messaging_plugin=redis_msg)

# Connect agents via Redis channels
workflow.connect("Producer", "processed_data", "Consumer")

# Add agents to workflow
workflow.add_agent(agent_a)
workflow.add_agent(agent_b)

# Run workflow - messages now distributed via Redis
await workflow.run()

LLM Tool Integration

Note: The Redis messaging plugin does not expose LLM-usable tools. This plugin is specifically designed for:

  • Inter-agent communication in distributed workflows
  • Message persistence and delivery between agents
  • Pub/sub messaging for real-time agent coordination

For LLM-accessible tools, use other plugins like:

  • Database plugins (PostgreSQL, MySQL, MongoDB) for data operations
  • REST plugin for API calls
  • S3 plugin for storage operations
  • Slack plugin for notifications

The Redis plugin operates at the infrastructure layer, enabling agents to communicate across processes and machines, rather than providing tools for LLM agents to use directly.

Context Manager Usage

from daita.plugins import redis_messaging

async def main():
async with redis_messaging(url="redis://localhost:6379") as redis_msg:
# Publish messages
await redis_msg.publish("events", {"type": "user_login"})

# Get latest
messages = await redis_msg.get_latest("events", count=5)

# Automatically closed after context

Error Handling

from daita.plugins import redis_messaging

try:
async with redis_messaging(url="redis://localhost:6379") as redis_msg:
await redis_msg.publish("channel", {"data": "test"})
except ImportError as e:
if "redis" in str(e):
print("Install redis: pip install redis")
except Exception as e:
print(f"Redis error: {e}")

Best Practices

Connection Management:

  • Use context managers (async with) for automatic cleanup
  • Configure pool size based on expected load
  • Set appropriate message TTL to prevent storage bloat
  • Set max stream length to limit memory usage

Performance:

  • Use Pub/Sub for real-time messaging between running agents
  • Use Streams for message persistence and recovery
  • Batch publish operations when possible
  • Monitor Redis memory usage regularly

Reliability:

  • Set message TTL to automatically clean up old messages
  • Configure max stream length to prevent unbounded growth
  • Use health checks to monitor Redis connectivity
  • Handle subscriber callback errors gracefully

Security:

  • Use Redis authentication in production (password in URL)
  • Enable TLS for remote connections
  • Restrict Redis port access with firewall rules
  • Use separate Redis instances for different environments

Features

Redis Streams:

  • Persistent message storage with ordering guarantees
  • Retrieve historical messages with get_latest()
  • Automatic TTL-based cleanup
  • Max stream length to prevent unbounded growth

Redis Pub/Sub:

  • Real-time message delivery to active subscribers
  • Low latency notification system
  • Supports multiple subscribers per channel
  • Automatic reconnection handling

Workflow Integration:

  • Drop-in replacement for in-memory RelayManager
  • Enables distributed agent communication
  • Same API as local messaging
  • Transparent to agent implementations

Troubleshooting

IssueSolution
redis not installedpip install redis
Connection refusedCheck Redis is running, verify host/port
Authentication failedVerify password in connection URL
High memory usageReduce message_ttl or max_stream_length
Messages not deliveredCheck subscriber callbacks, verify channel names
Slow performanceIncrease max_connections, check Redis server load

Common Patterns

Distributed workflow:

# Enable distributed processing across multiple machines
redis_msg = redis_messaging(url="redis://redis-cluster:6379")
workflow = Workflow("Distributed", messaging_plugin=redis_msg)
workflow.connect("agent_a", "channel", "agent_b")

Message replay:

# Retrieve and reprocess historical messages
async with redis_messaging() as redis_msg:
old_messages = await redis_msg.get_latest("events", count=100)
for msg in old_messages:
await process_message(msg)

Health monitoring:

# Monitor Redis health in production
async with redis_messaging() as redis_msg:
health = await redis_msg.health_check()
if health['status'] != 'healthy':
alert_operations(health)

Next Steps