Redis Plugin
Redis data store plugin with key-value, hash, list, and set operations for agents. Plus Redis Messaging for distributed workflows.
Daita provides two Redis plugins:
- Redis Data Store (
redis) — general-purpose key-value, hash, list, and set operations exposed as LLM-callable tools. - Redis Messaging (
redis_messaging) — distributed agent communication via Redis Streams and Pub/Sub (infrastructure layer, no LLM tools).
#Redis Data Store
#Installation
pip install 'daita-agents[redis]'#Quick Start
from daita import Agent
from daita.plugins import redis
# Create plugin with namespace isolation
r = redis(url="redis://localhost:6379", key_prefix="myapp:")
# Agent can read/write Redis autonomously
agent = Agent(
name="Cache Manager",
model="gpt-4o-mini",
prompt="You manage application state in Redis. Help users inspect and update cached data.",
tools=[r]
)
await agent.start()
result = await agent.run("What keys exist under the 'user:*' pattern?")#Direct Usage
from daita.plugins import redis
async with redis(url="redis://localhost:6379", key_prefix="myapp:") as r:
await r.set("user:1", '{"name": "Alice"}', ttl=3600)
value = await r.get("user:1")
await r.hset("config", {"theme": "dark", "lang": "en"})
fields = await r.hgetall("config")#Connection Parameters
redis(
url: Optional[str] = None,
db: Optional[int] = None,
password: Optional[str] = None,
max_connections: int = 10,
key_prefix: Optional[str] = None,
read_only: bool = False
)#Parameters
url(str): Redis connection URL (default:"redis://localhost:6379"). Falls back toREDIS_URLdb(int): Redis database number (default: 0). Falls back toREDIS_DBpassword(str): Redis password. Falls back toREDIS_PASSWORDmax_connections(int): Connection pool size (default: 10)key_prefix(str): Prefix applied to all key operations for namespace isolation. Falls back toREDIS_KEY_PREFIXread_only(bool): WhenTrue, write tools are not exposed to agents (default:False)
#Environment Variables
| Variable | Maps to |
|---|---|
REDIS_URL | url |
REDIS_DB | db |
REDIS_PASSWORD | password |
REDIS_KEY_PREFIX | key_prefix |
#Core Methods
#Key-Value
await r.set("key", "value", ttl=3600) # set with optional TTL (seconds)
value = await r.get("key") # get value (None if missing)
count = await r.delete("key1", "key2") # delete keys, returns count deleted
count = await r.exists("key1", "key2") # count how many exist
keys = await r.keys("user:*", count=100) # scan keys matching glob pattern#TTL
seconds = await r.ttl("key") # remaining TTL (-1 = no TTL, -2 = missing)
ok = await r.expire("key", 600) # set TTL on existing key#Hash
await r.hset("config", {"theme": "dark", "lang": "en"})
fields = await r.hgetall("config") # {"theme": "dark", "lang": "en"}#List
await r.lpush("queue", "a", "b") # push to head
await r.rpush("queue", "c") # push to tail
items = await r.lrange("queue", 0, -1) # get all elements#Set
await r.sadd("tags", "python", "async")
members = await r.smembers("tags") # ["python", "async"]#Info
size = await r.dbsize() # total keys in the database#Available Tools
When used with an agent, the Redis plugin exposes these LLM-callable tools:
Read-only tools (always available):
| Tool | Description | Key Parameters |
|---|---|---|
| redis_get | Get the value of a key | key |
| redis_keys | Scan keys matching a glob pattern | pattern, count |
| redis_exists | Check if keys exist | keys |
| redis_ttl | Get remaining TTL of a key | key |
| redis_hgetall | Get all fields of a hash | key |
| redis_lrange | Get elements from a list | key, start, stop |
| redis_smembers | Get all members of a set | key |
| redis_info | Get database size and connection info | — |
Write tools (disabled in read-only mode):
| Tool | Description | Key Parameters |
|---|---|---|
| redis_set | Set a key-value pair with optional TTL | key, value, ttl |
| redis_delete | Delete keys | keys |
| redis_hset | Set fields on a hash | key, mapping |
| redis_lpush | Push to head of list | key, values |
| redis_rpush | Push to tail of list | key, values |
| redis_sadd | Add members to a set | key, members |
| redis_expire | Set TTL on a key | key, seconds |
#Read-Only Mode
r = redis(url="redis://localhost:6379", read_only=True)In read-only mode, write tools (redis_set, redis_delete, etc.) are not exposed to the agent.
#Key Prefixing
All operations are automatically scoped to the configured key_prefix:
r = redis(key_prefix="myapp:")
await r.set("user:1", "Alice") # actually stores "myapp:user:1"
keys = await r.keys("user:*") # scans "myapp:user:*", returns ["user:1"]This enables safe multi-tenant isolation on a shared Redis instance.
#Context Manager Usage
from daita.plugins import redis
async with redis(url="redis://localhost:6379") as r:
await r.set("key", "value")
value = await r.get("key")
# Automatically disconnected#Redis Messaging
The Redis Messaging plugin is designed for distributed agent communication in workflows, not for direct LLM tool use.
#Quick Start
from daita.core.workflow import Workflow
from daita.agents import Agent
from daita.plugins import redis_messaging
redis_msg = redis_messaging(url="redis://localhost:6379")
workflow = Workflow("Distributed Pipeline", messaging_plugin=redis_msg)
workflow.connect("Producer", "processed_data", "Consumer")#Connection Parameters
redis_messaging(
url: str = "redis://localhost:6379",
max_connections: int = 10,
message_ttl: int = 86400,
max_stream_length: int = 10000,
connection_timeout: int = 30
)url(str): Redis connection URL (default:"redis://localhost:6379")max_connections(int): Maximum connections in pool (default: 10)message_ttl(int): Message TTL in seconds (default: 86400 / 24 hours)max_stream_length(int): Maximum messages per stream (default: 10000)connection_timeout(int): Connection timeout in seconds (default: 30)
#Publishing & Subscribing
from daita.plugins import redis_messaging
async with redis_messaging() as redis_msg:
# Publish
message_id = await redis_msg.publish(
channel="data_pipeline",
message={"user_id": 123, "action": "processed"},
publisher="agent_a"
)
# Subscribe
await redis_msg.subscribe("data_pipeline", my_handler)
# Retrieve recent messages
recent = await redis_msg.get_latest("data_pipeline", count=10)#Distributed Workflow Example
from daita.core.workflow import Workflow
from daita.agents import Agent
from daita.plugins import redis_messaging
agent_a = Agent(name="Producer")
agent_b = Agent(name="Consumer")
redis_msg = redis_messaging(url="redis://localhost:6379")
workflow = Workflow("Distributed Pipeline", messaging_plugin=redis_msg)
workflow.connect("Producer", "processed_data", "Consumer")
workflow.add_agent(agent_a)
workflow.add_agent(agent_b)
await workflow.run()Note: The Redis Messaging plugin does not expose LLM-usable tools. It operates at the infrastructure layer, enabling agents to communicate across processes and machines.
#Error Handling
from daita.plugins import redis
try:
async with redis(url="redis://localhost:6379") as r:
await r.set("key", "value")
except ImportError:
print("Install redis: pip install 'daita-agents[redis]'")
except Exception as e:
print(f"Redis error: {e}")#Best Practices
Connection Management:
- Use context managers (
async with) for automatic cleanup - Configure pool size based on expected concurrent operations
- Use
key_prefixfor namespace isolation in shared Redis instances
Performance:
- Use
keys()with specific patterns rather than* - Keep values small — Redis is optimized for fast reads, not large blobs
- Use hashes for structured data instead of serialized JSON strings
Security:
- Use Redis authentication in production (password in URL or parameter)
- Enable TLS for remote connections
- Use
read_only=Truewhen agents should only inspect data
#Troubleshooting
| Issue | Solution |
|---|---|
redis not installed | pip install 'daita-agents[redis]' |
| Connection refused | Check Redis is running, verify host/port |
| Authentication failed | Verify password in URL or REDIS_PASSWORD env var |
| Keys not found | Check key_prefix — keys are prefixed automatically |
| Write tools missing | Plugin may be in read_only=True mode |
#Next Steps
- Plugin Overview — All available plugins
- PostgreSQL Plugin — Relational database operations
- MongoDB Plugin — Document database operations