4.1 Transport Protocol
What you'll learn
- The four methods that make up the entire Transport interface
- The contract for each: what's blocking, what's fire-and-forget
- How
NullTransportbehaves and when it's the right default
Transport is the only interface between the agent runtime and your host UI / business logic. Master its four methods and you can integrate Agentao into any UI framework.
Definition
@runtime_checkable
class Transport(Protocol):
# One-way events (fire-and-forget)
def emit(self, event: AgentEvent) -> None: ...
# Blocking request-response
def confirm_tool(self, tool_name: str, description: str, args: dict) -> bool: ...
def ask_user(self, question: str) -> str: ...
def on_max_iterations(self, count: int, messages: list) -> dict: ...
# Optional fan-out (declared on the base; not all implementations expose it)
def subscribe(self, listener: Callable[[AgentEvent], None]) -> Callable[[], None]: ...Key design:
Transportis aProtocol(PEP 544) — you do not inherit any base class; implementing the four required methods is enough@runtime_checkablemakesisinstance(x, Transport)available (but it doesn't verify method signatures — use a static type checker for that)- The four required methods split 1 + 3: one-way event push + three synchronous Q&A
subscribe(listener)is optional — the Protocol declares it;NullTransportandSdkTransportprovide it by composing theEventBroadcasterhelper; bespoke implementations may omit it. Probe withgetattr(transport, "subscribe", None).
Method 1: emit(event) — push events
def emit(self, event: AgentEvent) -> None:
"""Receive runtime events. Must not raise; errors must be swallowed."""Contract:
- The agent calls
emitat key points (turn start, tool start/output/complete, LLM streamed text, thinking, errors…) - Implementations must not raise — exceptions will be caught upstream, but may leave state inconsistent
- Implementations should return fast — this is synchronous; slow handlers block the agent loop
Typical implementation:
def emit(self, event: AgentEvent) -> None:
try:
self._queue.put_nowait(event) # hand off to another thread
except Exception:
pass # never raiseFull event catalog: 4.2 AgentEvent Reference.
Method 2: confirm_tool(name, desc, args) — tool approval
def confirm_tool(self, tool_name: str, description: str, args: dict) -> bool:
"""Ask whether the tool may execute.
True → allow
False → cancel (agent receives "Tool execution cancelled by user" and keeps reasoning)
"""When called:
- Before invoking any tool with
requires_confirmation=True - Default triggers:
write_file,run_shell_command,web_fetch,web_search
Blocking semantics: this is a synchronous call — until you return True/False, the agent's execution thread is stuck here. For async hosts, block internally (see 4.5).
When you return False:
- The tool does not execute
- The agent sees a "cancelled by user" synthetic result
- The LLM keeps reasoning on that (usually pivots or stops and reports)
Method 3: ask_user(question) — ask the user
def ask_user(self, question: str) -> str:
"""Agent asks the user an open question and gets a text answer."""When called:
- The agent invokes the built-in
ask_usertool - Typical use cases: missing info, decision point, ambiguity clarification
Fallback: NullTransport returns the fixed string "[ask_user: not available in non-interactive mode]"; the agent handles it gracefully.
Method 4: on_max_iterations(count, messages) — iteration-cap fallback
def on_max_iterations(self, count: int, messages: list) -> dict:
"""Called when the agent reaches max_iterations (default 100).
Return a dict with key "action":
"continue" — give it another N iterations
"stop" — stop, return current result
"new_instruction" — inject a new user message; requires "message" key
"""Canonical use:
def on_max_iterations(self, count, messages):
# Auto-extend once
if not hasattr(self, "_continued"):
self._continued = True
return {"action": "continue"}
# Already extended, still stuck → force summarization
return {
"action": "new_instruction",
"message": "Based on what you have, give the final answer now; do not call any more tools.",
}Deep-dive: 4.6 Max-iterations strategies.
Optional method: subscribe(listener) — fan-out without re-emit
def subscribe(
self, listener: Callable[[AgentEvent], None]
) -> Callable[[], None]:
"""Register an extra listener that receives every emitted event.
Returns an idempotent unsubscribe function."""Why it exists — replay recorders, audit pipelines, and metrics collectors need to observe every event without becoming the primary transport. subscribe() lets multiple consumers attach to a single transport instance without manual fan-out.
When called:
- The host attaches a listener once (e.g. at agent construction); the listener stays registered until the returned callable is invoked
- Listeners receive a snapshot of subscribers at notify time, so subscribing or unsubscribing mid-emit is safe
- Errors raised by a listener are swallowed — they must not poison the runtime emit path
Used internally by:
- The replay recorder (subscribes to
TURN_BEGIN/TURN_END/ tool / sub-agent events instead of being reached through agent state) - The host event stream backing
agent.events()(see 4.7)
Probe before calling — bespoke implementations from path C below may omit this method entirely:
sub = getattr(transport, "subscribe", None)
if sub is not None:
unsubscribe = sub(my_listener)
try:
...
finally:
unsubscribe()NullTransport and SdkTransport get subscribe() by composing agentao.transport.EventBroadcaster. From-scratch transports (e.g. ACP) can opt in the same way:
from agentao.transport import EventBroadcaster
class MyTransport:
def __init__(self):
self._broadcast = EventBroadcaster()
def emit(self, event):
# ... your real send path ...
self._broadcast.notify(event)
def subscribe(self, listener):
return self._broadcast.subscribe(listener)Three implementation paths
| Path | When | Complexity |
|---|---|---|
SdkTransport + callbacks | 90% of embeddings | Lowest |
Subclass NullTransport, override some | You care about only a few events | Low |
Implement Transport from scratch | Fully custom (e.g. ACP, message queue bridge) | Medium |
Path A · SdkTransport
See 4.3:
from agentao.transport import SdkTransport
transport = SdkTransport(
on_event=handle,
confirm_tool=approve,
ask_user=prompt,
on_max_iterations=bail_out,
)Path B · Subclass NullTransport
When you only care about some events and want explicit control:
from agentao.transport import NullTransport, EventType
class MyTransport(NullTransport):
def __init__(self, on_token):
self.on_token = on_token
def emit(self, event):
if event.type == EventType.LLM_TEXT:
self.on_token(event.data["chunk"])
# other events fall through to NullTransport (pass)
def confirm_tool(self, name, desc, args):
# Allow only read-like tools
return name.startswith("read_") or name == "glob"Path C · From scratch
The canonical real-world case: ACP server. It does not inherit from anything — each emit becomes a session/update notification, each confirm_tool becomes a session/request_permission request sent to the ACP client.
class MyCustomTransport:
"""Bridge agent events into your own message protocol."""
def __init__(self, send_to_client):
self.send = send_to_client
def emit(self, event):
self.send({"type": "agent_event",
"event": event.type.value,
"data": event.data})
def confirm_tool(self, name, desc, args):
return self.send({"type": "confirm", ...}, wait=True)
def ask_user(self, q):
return self.send({"type": "ask", "question": q}, wait=True)
def on_max_iterations(self, count, msgs):
return {"action": "stop"}Threading / async notes
- All 4 methods are called synchronously from the agent's
chat()thread - If your host is asyncio:
emitcanasyncio.run_coroutine_threadsafe(...)back to the main loopconfirm_tool/ask_userneed cross-thread blocking-wait (see 4.5)
Testing your Transport
from agentao.transport import AgentEvent, EventType
def test_my_transport():
t = MyTransport()
# 1. emit must not raise
t.emit(AgentEvent(EventType.LLM_TEXT, {"chunk": "hi"}))
# 2. confirm_tool must return bool
assert isinstance(t.confirm_tool("x", "", {}), bool)
# 3. ask_user must return str
assert isinstance(t.ask_user("q?"), str)
# 4. on_max_iterations must return dict with "action"
r = t.on_max_iterations(100, [])
assert r["action"] in {"continue", "stop", "new_instruction"}TL;DR
- Transport = 4 methods:
emit(fire-and-forget),confirm_tool(blocking bool),ask_user(blocking str),on_max_iterations(blocking dict). emitexceptions are swallowed; the other three's exceptions propagate.NullTransport= silent + auto-approve — fine for tests and headless batch jobs.- Implement all 4 if you build a custom transport — even a no-op stub keeps the agent loop honest.
→ Next: 4.2 AgentEvent Reference