Daita Logo

Watch

The @agent.watch() decorator monitors data sources on a schedule and fires async handlers when conditions activate — enabling reactive, event-driven agent behavior.

#Overview

@agent.watch() registers an async handler that runs whenever a condition activates on a data source. Polling-based watches check a SQL condition or async callable on a fixed interval and call the handler when the result crosses a threshold.

python
from daita import Agent
from daita.plugins import postgresql
from daita.core.watch import WatchEvent
 
agent = Agent(name="Order Monitor", prompt="You monitor order activity.")
db = postgresql(host="localhost", database="orders")
 
@agent.watch(
    source=db,
    condition="SELECT COUNT(*) FROM orders WHERE status = 'pending'",
    threshold=lambda v: v > 50,
    interval="1m"
)
async def on_pending_spike(event: WatchEvent):
    print(f"Pending orders spiked to {event.value}")
    await agent.run(f"We have {event.value} pending orders — triage them")
 
await agent.start()
# Watch starts automatically; handler fires whenever pending orders exceed 50

#Core Types

#WatchEvent

Every handler receives a WatchEvent when the condition fires:

python
from daita.core.watch import WatchEvent
 
@dataclass
class WatchEvent:
    value: Any              # current value returned by the condition
    triggered_at: datetime  # UTC timestamp of the trigger
    source_type: Literal["polling", "streaming"]  # how the event was detected
    resolved: bool = False  # True when the condition has cleared (on_resolve=True)
    previous_value: Any = None  # value from the previous poll cycle

Fields:

  • value: The raw value returned by the condition. For SELECT COUNT(*) queries, this is unwrapped to a scalar integer. For multi-column or multi-row queries, this is a list of dicts.
  • triggered_at: UTC datetime when the condition was evaluated and found to be active.
  • source_type: Always "polling" in the current release.
  • resolved: True when the condition has returned to inactive (only fires when on_resolve=True).
  • previous_value: The condition value from the previous poll cycle, useful for computing deltas.

#Decorator Parameters

python
@agent.watch(
    source,                  # plugin or WatchSource to poll
    *,
    condition=None,          # SQL string or async callable
    threshold=None,          # callable(value) -> bool; watch fires when True
    interval=None,           # poll period: timedelta or string like "30s"
    on_resolve=False,        # also fire when the condition clears
    on_error=None,           # called when the handler raises
    handler_timeout=None,    # max seconds for the handler to run
    relay_channel=None,      # relay channel to publish events to
    name=None,               # watch name (defaults to handler.__name__)
)

#Parameters

  • source: A plugin instance (PostgreSQL, SQLite, etc.) or any object implementing the WatchSource protocol. Required for polling watches.
  • condition: What to evaluate each poll cycle. Either a SQL string (executed via plugin.query()) or an async callable returning a value.
  • threshold: A callable (value) -> bool. The watch fires when this returns True. When omitted, the watch fires whenever the condition returns a truthy value.
  • interval: How often to poll. Accepts a timedelta or a string (see Interval Formats).
  • on_resolve: When True, the handler fires a second time (with event.resolved=True) once the condition stops being active. Useful for "spike cleared" notifications.
  • on_error: Async callable (exception) -> None. Called when the handler raises. If omitted, errors are logged and the watch loop continues.
  • handler_timeout: Maximum seconds the handler is allowed to run. If exceeded, the handler is cancelled and on_error is called.
  • relay_channel: Relay channel name to publish WatchEvent data after each trigger.
  • name: Override the watch name (used in logs and agent.watch_status()). Defaults to handler.__name__.

#Interval Formats

StringDuration
"30s"30 seconds
"5m"5 minutes
"1h"1 hour
"2d"2 days
"0.5m"30 seconds (float supported)

Floats are accepted: "1.5h" = 90 minutes.


#Polling

#How Polling Works

  1. On agent.start() (or first run() call), all registered watches start as background tasks.
  2. Each watch calls plugin.query(condition) (or the async callable) once per interval.
  3. If a threshold is set, the watch fires when threshold(value) is True. If no threshold is set, the watch fires when the value is truthy.
  4. The handler is called with a WatchEvent.
  5. After the handler completes (or times out), the watch sleeps for interval and repeats.

#SQL Conditions

SQL conditions are executed via the plugin's query() method. Single-column, single-row results are automatically unwrapped to a scalar:

python
# Single scalar — returns an int
condition="SELECT COUNT(*) FROM alerts WHERE resolved = false"
 
# Multi-row — returns a list of dicts
condition="SELECT id, severity FROM alerts WHERE resolved = false"

Use scalar queries with threshold for simple numeric comparisons. Use multi-row queries when you need to inspect the data in the handler.

#Async Callable Conditions

python
async def check_queue_depth():
    return await redis_client.llen("job_queue")
 
@agent.watch(
    source=None,
    condition=check_queue_depth,
    threshold=lambda v: v > 1000,
    interval="10s"
)
async def on_queue_overload(event: WatchEvent):
    print(f"Queue depth: {event.value}")

#Threshold and Resolve

#Threshold

python
# Fire when count exceeds 100
@agent.watch(
    source=db,
    condition="SELECT COUNT(*) FROM errors WHERE created_at > datetime('now', '-1 minute')",
    threshold=lambda v: v > 100,
    interval="30s"
)
async def on_error_surge(event: WatchEvent):
    ...

#on_resolve

When on_resolve=True, the handler also fires when the condition returns to inactive:

python
@agent.watch(
    source=db,
    condition="SELECT COUNT(*) FROM pending_orders",
    threshold=lambda v: v > 50,
    interval="1m",
    on_resolve=True
)
async def on_pending_change(event: WatchEvent):
    if event.resolved:
        print(f"Backlog cleared — now at {event.value}")
    else:
        print(f"Backlog spike: {event.value} pending orders")

The handler uses event.resolved to distinguish the two fire conditions.


#Error Handling

#on_error

python
async def handle_watch_error(exc: Exception):
    print(f"Watch error: {exc}")
    await notify_oncall(str(exc))
 
@agent.watch(
    source=db,
    condition="SELECT COUNT(*) FROM jobs WHERE status = 'failed'",
    threshold=lambda v: v > 0,
    interval="5m",
    on_error=handle_watch_error
)
async def on_job_failure(event: WatchEvent):
    ...

If on_error is not set, handler exceptions are logged at WARNING level and the watch continues.

#handler_timeout

python
@agent.watch(
    source=db,
    condition="SELECT COUNT(*) FROM queue",
    threshold=lambda v: v > 500,
    interval="30s",
    handler_timeout=60.0   # handler must complete within 60 seconds
)
async def on_queue_spike(event: WatchEvent):
    # Long-running handler — will be cancelled after 60s
    await agent.run("Process the backlog")

#Reconnection

The polling loop uses exponential backoff for transient failures. After 5 consecutive poll failures, the watch enters status="error" and stops:

  • Failure 1: retry after 2s
  • Failure 2: retry after 4s
  • Failure 3: retry after 8s
  • Failure 4: retry after 16s
  • Failure 5: raise and stop

#Lifecycle

#When Watches Start

Watches start in one of two ways:

  1. Explicit start: When agent.start() is called.
  2. Lazy start: On the first agent.run() or agent.stream() call.

Important: Watches must be registered before start() or the first run(). Registering a watch after the agent has started raises RuntimeError.

python
agent = Agent(name="Monitor")
 
# Register watch BEFORE start()
@agent.watch(source=db, condition="...", interval="1m")
async def handler(event):
    ...
 
await agent.start()  # Watch starts here
 
# This would raise RuntimeError — too late:
# @agent.watch(source=db, condition="...", interval="1m")
# async def late_handler(event): ...

#Watch Status

Inspect the status of all registered watches:

python
status = agent.watch_status()
# Returns:
# {
#   "on_pending_spike": {"status": "running", "triggered": False, "last_error": None},
#   "on_job_failure":   {"status": "error",   "triggered": False, "last_error": "..."},
# }

#Stopping

Watches are stopped when agent.stop() is called. Background tasks are cancelled cleanly.


#Best Practices

  • Choose intervals carefully: Short intervals (< 10s) create high database load. Use longer intervals for non-urgent monitoring.
  • Keep handlers fast: Long-running handlers block the next poll cycle. Use handler_timeout to protect against runaway handlers.
  • Use on_resolve for alerting workflows — know when the spike clears, not just when it starts.
  • Register all watches before start(): The framework validates this and raises RuntimeError if you try to register late.
  • Use previous_value in the handler to compute rate-of-change:
python
async def on_metric_change(event: WatchEvent):
    if event.previous_value is not None:
        delta = event.value - event.previous_value
        print(f"Delta: {delta:+d}")

#Next Steps