4.1 Transport Protocol
本节你会学到
- 构成 Transport 接口的全部 4 个方法
- 每个方法的契约:哪个阻塞、哪个 fire-and-forget
NullTransport的行为以及什么时候它就是合适的默认
Transport 是 Agent 运行时与宿主 UI/业务逻辑之间的唯一接口。理解它的四个方法,你就能在任何 UI 框架下集成 Agentao。
协议定义
@runtime_checkable
class Transport(Protocol):
# 单向事件(fire-and-forget)
def emit(self, event: AgentEvent) -> None: ...
# 阻塞式请求-响应
def confirm_tool(self, tool_name: str, description: str, args: dict) -> bool: ...
def ask_user(self, question: str) -> str: ...
def on_max_iterations(self, count: int, messages: list) -> dict: ...
# 可选 fan-out(基类声明;并非所有实现都暴露)
def subscribe(self, listener: Callable[[AgentEvent], None]) -> Callable[[], None]: ...关键设计:
Transport是一个Protocol(PEP 544)——你不必继承任何基类,实现这 4 个必填方法就算 Transport@runtime_checkable让isinstance(x, Transport)可用(但不保证方法类型正确;类型检查应依赖静态工具)- 四个必填方法一分为二:1 个单向推送事件 + 3 个同步问答
subscribe(listener)是可选的——Protocol 里有声明;NullTransport和SdkTransport通过组合EventBroadcasterhelper 提供;自定义实现可以不实现。访问前用getattr(transport, "subscribe", None)探测。
方法一:emit(event) — 推事件
def emit(self, event: AgentEvent) -> None:
"""接收运行时事件。不得抛异常;错误必须吞掉。"""契约:
- Agent 在关键节点调用
emit(一轮开始、工具开始/输出/结束、LLM 流式文本、思考、错误…) - 实现禁止抛异常——抛了会被上游 try/except 吞掉,但可能破坏状态一致性
- 实现应快速返回——这是同步调用,慢了会拖慢整个 Agent
典型实现:
def emit(self, event: AgentEvent) -> None:
try:
self._queue.put_nowait(event) # 扔到队列让别的线程处理
except Exception:
pass # 永不抛事件类型全表见 4.2 AgentEvent 事件清单。
方法二:confirm_tool(name, desc, args) — 工具确认
def confirm_tool(self, tool_name: str, description: str, args: dict) -> bool:
"""询问是否允许这个工具执行。
True → 允许
False → 取消(Agent 收到 "Tool execution cancelled by user" 字符串,继续推理)
"""何时被调用:
- Agent 准备调用某个
requires_confirmation=True的工具时 - 默认触发者:
write_file、run_shell_command、web_fetch、web_search
阻塞语义:这是同步调用——在你返回 True/False 前,Agent 的执行线程会停在这里。如果你的宿主是异步 UI,需要在 Transport 实现内部做阻塞等待(见 4.5)。
返回 False 的后果:
- 工具不执行
- Agent 收到一个"用户取消"的假结果
- LLM 会基于这个结果继续推理(通常会换个思路或停下来汇报)
方法三:ask_user(question) — 向用户反问
def ask_user(self, question: str) -> str:
"""让 Agent 向用户反问一个开放问题,返回用户的回答。"""何时被调用:
- Agent 主动调用内置工具
ask_user时 - 典型场景:信息不足、多选一决策、要求澄清歧义需求
默认兜底:NullTransport 返回固定字符串 "[ask_user: not available in non-interactive mode]",Agent 收到后会据此决定继续/放弃。
方法四:on_max_iterations(count, messages) — 迭代上限兜底
def on_max_iterations(self, count: int, messages: list) -> dict:
"""Agent 达到 max_iterations(默认 100)时调用。
返回 dict,key "action" 必填:
"continue" — 再给 N 轮继续跑
"stop" — 终止,返回当前结果
"new_instruction" — 注入一条新 user 消息,需带 "message" 键
"""经典用法:
def on_max_iterations(self, count, messages):
# 自动续一次
if not hasattr(self, "_continued"):
self._continued = True
return {"action": "continue"}
# 续过了,还卡着 → 让 LLM 总结并停下
return {
"action": "new_instruction",
"message": "请基于目前的信息给出最终答复,不要再调用工具。",
}详细策略见 4.6 最大迭代数兜底。
可选方法:subscribe(listener) —— 不重发地 fan-out
def subscribe(
self, listener: Callable[[AgentEvent], None]
) -> Callable[[], None]:
"""注册一个额外 listener,每个被 emit 的事件都会回调它。
返回一个幂等的 unsubscribe 函数。"""为什么需要它 —— replay 录制器、审计流水线、指标采集等都需要观察所有事件,但不应取代主 Transport。subscribe() 让多个消费者挂到同一个 transport 实例上,省去手动 fan-out。
何时调用:
- 宿主在构造期挂一个 listener,直到调用返回的 unsubscribe 才解除注册
- 通知时以快照方式遍历 listeners,所以 emit 过程中订阅/取消订阅是安全的
- listener 抛出的异常会被吞掉——绝不能污染运行时的 emit 路径
框架内置使用方:
- replay 录制器(订阅
TURN_BEGIN/TURN_END/ 工具 / 子 agent 事件,替代过去从 agent 状态直接调 replay adapter 的路径) agent.events()背后的宿主事件流(见 4.7)
调用前先探测 —— 路径 C 的自定义实现可能完全不实现该方法:
sub = getattr(transport, "subscribe", None)
if sub is not None:
unsubscribe = sub(my_listener)
try:
...
finally:
unsubscribe()NullTransport 和 SdkTransport 通过组合 agentao.transport.EventBroadcaster 拿到 subscribe()。从零实现的 transport(如 ACP)想要 fan-out,可以照搬同样的写法:
from agentao.transport import EventBroadcaster
class MyTransport:
def __init__(self):
self._broadcast = EventBroadcaster()
def emit(self, event):
# ... 你真正的发送路径 ...
self._broadcast.notify(event)
def subscribe(self, listener):
return self._broadcast.subscribe(listener)三种实现路径
| 路径 | 适合场景 | 复杂度 |
|---|---|---|
用 SdkTransport + 回调 | 90% 嵌入场景 | 最低 |
继承 NullTransport 覆盖部分方法 | 只关心某几个事件 | 低 |
从零实现 Transport Protocol | 完全自定义(如 ACP、消息队列桥接) | 中 |
路径 A · SdkTransport
参见 4.3:
from agentao.transport import SdkTransport
transport = SdkTransport(
on_event=handle,
confirm_tool=approve,
ask_user=prompt,
on_max_iterations=bail_out,
)路径 B · 继承 NullTransport
当你只关心部分事件且想显式控制每个方法时:
from agentao.transport import NullTransport, EventType
class MyTransport(NullTransport):
def __init__(self, on_token):
self.on_token = on_token
def emit(self, event):
if event.type == EventType.LLM_TEXT:
self.on_token(event.data["chunk"])
# 其他事件继续走 NullTransport 默认(即 pass)
def confirm_tool(self, name, desc, args):
# 只允许读类工具
return name.startswith("read_") or name == "glob"路径 C · 从零实现
最典型的真实例子:ACP 服务端。它不继承任何基类,而是把每次 emit 转成 session/update 通知、把 confirm_tool 转成 session/request_permission 请求发给 ACP Client。
class MyCustomTransport:
"""把 Agent 事件桥到你自己的消息协议。"""
def __init__(self, send_to_client):
self.send = send_to_client
def emit(self, event):
self.send({"type": "agent_event",
"event": event.type.value,
"data": event.data})
def confirm_tool(self, name, desc, args):
# 发请求给客户端,同步等响应
return self.send({"type": "confirm", ...}, wait=True)
def ask_user(self, q):
return self.send({"type": "ask", "question": q}, wait=True)
def on_max_iterations(self, count, msgs):
return {"action": "stop"}线程与异步注意事项
- 同步 Agent 线程里调用 Transport:所有 4 个方法在 Agent 的 chat() 循环里被同步调用
- 如果你的宿主是 asyncio:
emit可以asyncio.run_coroutine_threadsafe(...)回主循环confirm_tool/ask_user需要跨线程同步等待(见 4.5 实现模式)
测试你的 Transport
from agentao.transport import AgentEvent, EventType
def test_my_transport():
t = MyTransport()
# 1. emit 不得抛
t.emit(AgentEvent(EventType.LLM_TEXT, {"chunk": "hi"}))
# 2. confirm_tool 必须返回 bool
assert isinstance(t.confirm_tool("x", "", {}), bool)
# 3. ask_user 必须返回 str
assert isinstance(t.ask_user("q?"), str)
# 4. on_max_iterations 必须返回带 action 的 dict
r = t.on_max_iterations(100, [])
assert r["action"] in {"continue", "stop", "new_instruction"}TL;DR
- Transport = 4 个方法:
emit(fire-and-forget)·confirm_tool(阻塞 bool)·ask_user(阻塞 str)·on_max_iterations(阻塞 dict)。 emit抛出的异常会被吞;其他三个的异常会向上传播。NullTransport= 静默 + 自动批准——适合测试和无人值守批处理。- 自己实现 Transport 时 4 个方法都要给(哪怕是 no-op stub),让 Agent 循环在所有路径上都安全。
→ 下一节:4.2 AgentEvent 事件清单