Watch
The @agent.watch() decorator monitors data sources on a schedule and fires async handlers when conditions activate — enabling reactive, event-driven agent behavior.
#Overview
@agent.watch() registers an async handler that runs whenever a condition activates on a data source. Polling-based watches check a SQL condition or async callable on a fixed interval and call the handler when the result crosses a threshold.
from daita import Agent
from daita.plugins import postgresql
from daita.core.watch import WatchEvent
agent = Agent(name="Order Monitor", prompt="You monitor order activity.")
db = postgresql(host="localhost", database="orders")
@agent.watch(
source=db,
condition="SELECT COUNT(*) FROM orders WHERE status = 'pending'",
threshold=lambda v: v > 50,
interval="1m"
)
async def on_pending_spike(event: WatchEvent):
print(f"Pending orders spiked to {event.value}")
await agent.run(f"We have {event.value} pending orders — triage them")
await agent.start()
# Watch starts automatically; handler fires whenever pending orders exceed 50#Core Types
#WatchEvent
Every handler receives a WatchEvent when the condition fires:
from daita.core.watch import WatchEvent
@dataclass
class WatchEvent:
value: Any # current value returned by the condition
triggered_at: datetime # UTC timestamp of the trigger
source_type: Literal["polling", "streaming"] # how the event was detected
resolved: bool = False # True when the condition has cleared (on_resolve=True)
previous_value: Any = None # value from the previous poll cycleFields:
value: The raw value returned by the condition. ForSELECT COUNT(*)queries, this is unwrapped to a scalar integer. For multi-column or multi-row queries, this is a list of dicts.triggered_at: UTC datetime when the condition was evaluated and found to be active.source_type: Always"polling"in the current release.resolved:Truewhen the condition has returned to inactive (only fires whenon_resolve=True).previous_value: The condition value from the previous poll cycle, useful for computing deltas.
#Decorator Parameters
@agent.watch(
source, # plugin or WatchSource to poll
*,
condition=None, # SQL string or async callable
threshold=None, # callable(value) -> bool; watch fires when True
interval=None, # poll period: timedelta or string like "30s"
on_resolve=False, # also fire when the condition clears
on_error=None, # called when the handler raises
handler_timeout=None, # max seconds for the handler to run
relay_channel=None, # relay channel to publish events to
name=None, # watch name (defaults to handler.__name__)
)#Parameters
source: A plugin instance (PostgreSQL, SQLite, etc.) or any object implementing theWatchSourceprotocol. Required for polling watches.condition: What to evaluate each poll cycle. Either a SQL string (executed viaplugin.query()) or an async callable returning a value.threshold: A callable(value) -> bool. The watch fires when this returnsTrue. When omitted, the watch fires whenever the condition returns a truthy value.interval: How often to poll. Accepts atimedeltaor a string (see Interval Formats).on_resolve: WhenTrue, the handler fires a second time (withevent.resolved=True) once the condition stops being active. Useful for "spike cleared" notifications.on_error: Async callable(exception) -> None. Called when the handler raises. If omitted, errors are logged and the watch loop continues.handler_timeout: Maximum seconds the handler is allowed to run. If exceeded, the handler is cancelled andon_erroris called.relay_channel: Relay channel name to publishWatchEventdata after each trigger.name: Override the watch name (used in logs andagent.watch_status()). Defaults tohandler.__name__.
#Interval Formats
| String | Duration |
|---|---|
"30s" | 30 seconds |
"5m" | 5 minutes |
"1h" | 1 hour |
"2d" | 2 days |
"0.5m" | 30 seconds (float supported) |
Floats are accepted: "1.5h" = 90 minutes.
#Polling
#How Polling Works
- On
agent.start()(or firstrun()call), all registered watches start as background tasks. - Each watch calls
plugin.query(condition)(or the async callable) once per interval. - If a
thresholdis set, the watch fires whenthreshold(value)isTrue. If no threshold is set, the watch fires when the value is truthy. - The handler is called with a
WatchEvent. - After the handler completes (or times out), the watch sleeps for
intervaland repeats.
#SQL Conditions
SQL conditions are executed via the plugin's query() method. Single-column, single-row results are automatically unwrapped to a scalar:
# Single scalar — returns an int
condition="SELECT COUNT(*) FROM alerts WHERE resolved = false"
# Multi-row — returns a list of dicts
condition="SELECT id, severity FROM alerts WHERE resolved = false"Use scalar queries with threshold for simple numeric comparisons. Use multi-row queries when you need to inspect the data in the handler.
#Async Callable Conditions
async def check_queue_depth():
return await redis_client.llen("job_queue")
@agent.watch(
source=None,
condition=check_queue_depth,
threshold=lambda v: v > 1000,
interval="10s"
)
async def on_queue_overload(event: WatchEvent):
print(f"Queue depth: {event.value}")#Threshold and Resolve
#Threshold
# Fire when count exceeds 100
@agent.watch(
source=db,
condition="SELECT COUNT(*) FROM errors WHERE created_at > datetime('now', '-1 minute')",
threshold=lambda v: v > 100,
interval="30s"
)
async def on_error_surge(event: WatchEvent):
...#on_resolve
When on_resolve=True, the handler also fires when the condition returns to inactive:
@agent.watch(
source=db,
condition="SELECT COUNT(*) FROM pending_orders",
threshold=lambda v: v > 50,
interval="1m",
on_resolve=True
)
async def on_pending_change(event: WatchEvent):
if event.resolved:
print(f"Backlog cleared — now at {event.value}")
else:
print(f"Backlog spike: {event.value} pending orders")The handler uses event.resolved to distinguish the two fire conditions.
#Error Handling
#on_error
async def handle_watch_error(exc: Exception):
print(f"Watch error: {exc}")
await notify_oncall(str(exc))
@agent.watch(
source=db,
condition="SELECT COUNT(*) FROM jobs WHERE status = 'failed'",
threshold=lambda v: v > 0,
interval="5m",
on_error=handle_watch_error
)
async def on_job_failure(event: WatchEvent):
...If on_error is not set, handler exceptions are logged at WARNING level and the watch continues.
#handler_timeout
@agent.watch(
source=db,
condition="SELECT COUNT(*) FROM queue",
threshold=lambda v: v > 500,
interval="30s",
handler_timeout=60.0 # handler must complete within 60 seconds
)
async def on_queue_spike(event: WatchEvent):
# Long-running handler — will be cancelled after 60s
await agent.run("Process the backlog")#Reconnection
The polling loop uses exponential backoff for transient failures. After 5 consecutive poll failures, the watch enters status="error" and stops:
- Failure 1: retry after 2s
- Failure 2: retry after 4s
- Failure 3: retry after 8s
- Failure 4: retry after 16s
- Failure 5: raise and stop
#Lifecycle
#When Watches Start
Watches start in one of two ways:
- Explicit start: When
agent.start()is called. - Lazy start: On the first
agent.run()oragent.stream()call.
Important: Watches must be registered before start() or the first run(). Registering a watch after the agent has started raises RuntimeError.
agent = Agent(name="Monitor")
# Register watch BEFORE start()
@agent.watch(source=db, condition="...", interval="1m")
async def handler(event):
...
await agent.start() # Watch starts here
# This would raise RuntimeError — too late:
# @agent.watch(source=db, condition="...", interval="1m")
# async def late_handler(event): ...#Watch Status
Inspect the status of all registered watches:
status = agent.watch_status()
# Returns:
# {
# "on_pending_spike": {"status": "running", "triggered": False, "last_error": None},
# "on_job_failure": {"status": "error", "triggered": False, "last_error": "..."},
# }#Stopping
Watches are stopped when agent.stop() is called. Background tasks are cancelled cleanly.
#Best Practices
- Choose intervals carefully: Short intervals (< 10s) create high database load. Use longer intervals for non-urgent monitoring.
- Keep handlers fast: Long-running handlers block the next poll cycle. Use
handler_timeoutto protect against runaway handlers. - Use
on_resolvefor alerting workflows — know when the spike clears, not just when it starts. - Register all watches before
start(): The framework validates this and raisesRuntimeErrorif you try to register late. - Use
previous_valuein the handler to compute rate-of-change:
async def on_metric_change(event: WatchEvent):
if event.previous_value is not None:
delta = event.value - event.previous_value
print(f"Delta: {delta:+d}")#Next Steps
- Agent.from_db() — Build a database analyst agent in one call
- Scheduling — Run agent tasks on a fixed schedule
- Agent — Full agent configuration reference
- PostgreSQL Plugin — Database source for watch conditions