Agents are classic “long-tail bug” workloads — fine 90% of the time, then 10% of incomprehensible behavior. No observability = no diagnosis = no improvement.
┌─────────────────────────────────────────────┐
│ 1. Structured logs: what happened? │
│ agentao.log + your app logs │
├─────────────────────────────────────────────┤
│ 2. Metrics: how much, how fast, how expensive? │
│ calls / latency / tokens / failure rate │
├─────────────────────────────────────────────┤
│ 3. Event stream: per-session replay │
│ AgentEvent archive │
├─────────────────────────────────────────────┤
│ 4. Distributed tracing: end-to-end per request │
│ OpenTelemetry │
└─────────────────────────────────────────────┘
Defaults to <working_directory>/agentao.log. It’s very thorough:
This is your most important debugging tool. In production:
working_directory)import logging
agentao_logger = logging.getLogger("agentao")
# JSON handler to Loki / CloudWatch / ELK
import pythonjsonlogger.jsonlogger as jl
handler = logging.StreamHandler()
handler.setFormatter(jl.JsonFormatter())
agentao_logger.addHandler(handler)
agentao_logger.setLevel(logging.INFO)
Inject business context via on_event:
def on_event(ev):
logger.info("agent_event", extra={
"event_type": ev.type.value,
"session_id": current_session_id(),
"tenant_id": current_tenant_id(),
"user_id": current_user_id(),
**ev.data,
})
session_id / tenant_id / user_id are the most-used filter fields when debugging.
| Metric | Type | Meaning |
|---|---|---|
agent.turn.count |
counter | Turns per chat() |
agent.turn.duration_ms |
histogram | Per-turn duration |
agent.tool.calls |
counter, by tool | Tool invocations |
agent.tool.failures |
counter, by tool | Tool failures |
agent.tool.duration_ms |
histogram, by tool | Tool latency |
agent.llm.tokens.prompt |
counter | Prompt tokens |
agent.llm.tokens.completion |
counter | Completion tokens |
agent.llm.tokens.cached |
counter | Prompt-cache hits |
agent.llm.errors |
counter, by error_type | LLM errors |
agent.confirm.requests |
counter, by outcome | Confirm request / allow / reject / timeout |
agent.max_iterations.hits |
counter | Bailouts triggered |
from prometheus_client import Counter, Histogram
turn_dur = Histogram("agent_turn_duration_ms", "Turn duration",
buckets=[100, 500, 1000, 3000, 10_000, 30_000])
tool_calls = Counter("agent_tool_calls", "Tool invocations", ["tool", "status"])
def on_event(ev):
if ev.type == EventType.TOOL_COMPLETE:
tool_calls.labels(tool=ev.data["tool"], status=ev.data["status"]).inc()
start = time.time()
reply = agent.chat(msg)
turn_dur.observe((time.time() - start) * 1000)
| Metric | Typical threshold |
|---|---|
| Tool failure rate > 10% | Tools broken or misconfigured |
| LLM 5xx rate > 2% | Vendor issue |
| max_iterations hits > 5% | Agent stuck pattern |
| Cache hit rate < 30% | System prompt is churning |
| Confirm timeout rate > 10% | UI issue or user drop-off |
Saving each session’s AgentEvent stream enables:
One event per line JSONL:
{"ts": 1704067200.1, "session": "sess-123", "type": "turn_start", "data": {}}
{"ts": 1704067200.3, "session": "sess-123", "type": "tool_start", "data": {"tool": "get_customer_orders", "args": {"customer_id": "c-42"}, "call_id": "..."}}
{"ts": 1704067200.8, "session": "sess-123", "type": "tool_complete", "data": {"tool": "get_customer_orders", "status": "ok", "duration_ms": 500, "call_id": "..."}}
import json, time
from pathlib import Path
class EventArchiver:
def __init__(self, path: Path, session_id: str, tenant_id: str):
self.f = path.open("a", encoding="utf-8")
self.session_id = session_id
self.tenant_id = tenant_id
def __call__(self, ev):
self.f.write(json.dumps({
"ts": time.time(),
"session": self.session_id,
"tenant": self.tenant_id,
"type": ev.type.value,
"data": ev.data,
}) + "\n")
self.f.flush()
def close(self):
self.f.close()
archiver = EventArchiver(
path=Path(f"/data/tenant-{tenant.id}/events.jsonl"),
session_id=session_id, tenant_id=tenant.id,
)
transport = SdkTransport(on_event=archiver)
When the agent is embedded in your web service, one user request can span:
Browser → your API → Agent.chat() → LLM API → Agent → custom tool → database
OpenTelemetry stitches these into one trace:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
@app.post("/chat")
async def chat(req: ChatRequest):
with tracer.start_as_current_span("user_chat") as span:
span.set_attribute("user.id", req.user_id)
span.set_attribute("session.id", req.session_id)
with tracer.start_as_current_span("agent_chat"):
reply = await asyncio.to_thread(agent.chat, req.message)
return {"reply": reply}
Deeper: wrap LLMClient / Tool.execute and instrument each call.
gen_ai.system = “openai”gen_ai.request.model = model namegen_ai.usage.prompt_tokens / completion_tokensgen_ai.response.finish_reasonFollow OpenTelemetry GenAI semantic conventions.
| Scenario | Trigger | Retention |
|---|---|---|
| User starts a session | Agent construction | 90–365 days |
| User approves dangerous tool | confirm_tool = True | 180–365 days |
| Permission rule denied | decide = DENY | 90 days |
| Agent modified user data | Business tool execution | Business-defined (usually 1–7 years) |
| User requests “forget me” | memory.clear_all | Indefinite (compliance evidence) |
Audit logs should not be scrubbed (you lose evidence), but must be encrypted-at-rest with strict access control.
Under compliance regimes: append-only storage (WORM) required.
On a budget:
agentao.log → per-tenant files, daily rotate, 14d retentionprometheus_client → the 5 key metrics, Grafana dashboardEnough for 99% of small/mid SaaS. Add APM when you scale.