Skip to content

4.1 Transport Protocol

本节你会学到

  • 构成 Transport 接口的全部 4 个方法
  • 每个方法的契约:哪个阻塞、哪个 fire-and-forget
  • NullTransport 的行为以及什么时候它就是合适的默认

Transport 是 Agent 运行时与宿主 UI/业务逻辑之间的唯一接口。理解它的四个方法,你就能在任何 UI 框架下集成 Agentao。

协议定义

python
@runtime_checkable
class Transport(Protocol):
    # 单向事件(fire-and-forget)
    def emit(self, event: AgentEvent) -> None: ...

    # 阻塞式请求-响应
    def confirm_tool(self, tool_name: str, description: str, args: dict) -> bool: ...
    def ask_user(self, question: str) -> str: ...
    def on_max_iterations(self, count: int, messages: list) -> dict: ...

    # 可选 fan-out(基类声明;并非所有实现都暴露)
    def subscribe(self, listener: Callable[[AgentEvent], None]) -> Callable[[], None]: ...

关键设计

  • Transport 是一个 Protocol(PEP 544)——你不必继承任何基类,实现这 4 个必填方法就算 Transport
  • @runtime_checkableisinstance(x, Transport) 可用(但不保证方法类型正确;类型检查应依赖静态工具)
  • 四个必填方法一分为二:1 个单向推送事件 + 3 个同步问答
  • subscribe(listener)可选的——Protocol 里有声明;NullTransportSdkTransport 通过组合 EventBroadcaster helper 提供;自定义实现可以不实现。访问前用 getattr(transport, "subscribe", None) 探测。

方法一:emit(event) — 推事件

python
def emit(self, event: AgentEvent) -> None:
    """接收运行时事件。不得抛异常;错误必须吞掉。"""

契约

  • Agent 在关键节点调用 emit(一轮开始、工具开始/输出/结束、LLM 流式文本、思考、错误…)
  • 实现禁止抛异常——抛了会被上游 try/except 吞掉,但可能破坏状态一致性
  • 实现应快速返回——这是同步调用,慢了会拖慢整个 Agent

典型实现

python
def emit(self, event: AgentEvent) -> None:
    try:
        self._queue.put_nowait(event)   # 扔到队列让别的线程处理
    except Exception:
        pass  # 永不抛

事件类型全表见 4.2 AgentEvent 事件清单

方法二:confirm_tool(name, desc, args) — 工具确认

python
def confirm_tool(self, tool_name: str, description: str, args: dict) -> bool:
    """询问是否允许这个工具执行。
    True  → 允许
    False → 取消(Agent 收到 "Tool execution cancelled by user" 字符串,继续推理)
    """

何时被调用

  • Agent 准备调用某个 requires_confirmation=True 的工具时
  • 默认触发者:write_filerun_shell_commandweb_fetchweb_search

阻塞语义:这是同步调用——在你返回 True/False 前,Agent 的执行线程会停在这里。如果你的宿主是异步 UI,需要在 Transport 实现内部做阻塞等待(见 4.5)。

返回 False 的后果

  • 工具不执行
  • Agent 收到一个"用户取消"的假结果
  • LLM 会基于这个结果继续推理(通常会换个思路或停下来汇报)

方法三:ask_user(question) — 向用户反问

python
def ask_user(self, question: str) -> str:
    """让 Agent 向用户反问一个开放问题,返回用户的回答。"""

何时被调用

  • Agent 主动调用内置工具 ask_user
  • 典型场景:信息不足、多选一决策、要求澄清歧义需求

默认兜底NullTransport 返回固定字符串 "[ask_user: not available in non-interactive mode]",Agent 收到后会据此决定继续/放弃。

方法四:on_max_iterations(count, messages) — 迭代上限兜底

python
def on_max_iterations(self, count: int, messages: list) -> dict:
    """Agent 达到 max_iterations(默认 100)时调用。
    返回 dict,key "action" 必填:
        "continue"        — 再给 N 轮继续跑
        "stop"            — 终止,返回当前结果
        "new_instruction" — 注入一条新 user 消息,需带 "message" 键
    """

经典用法

python
def on_max_iterations(self, count, messages):
    # 自动续一次
    if not hasattr(self, "_continued"):
        self._continued = True
        return {"action": "continue"}
    # 续过了,还卡着 → 让 LLM 总结并停下
    return {
        "action": "new_instruction",
        "message": "请基于目前的信息给出最终答复,不要再调用工具。",
    }

详细策略见 4.6 最大迭代数兜底

可选方法:subscribe(listener) —— 不重发地 fan-out

python
def subscribe(
    self, listener: Callable[[AgentEvent], None]
) -> Callable[[], None]:
    """注册一个额外 listener,每个被 emit 的事件都会回调它。
    返回一个幂等的 unsubscribe 函数。"""

为什么需要它 —— replay 录制器、审计流水线、指标采集等都需要观察所有事件,但不应取代主 Transport。subscribe() 让多个消费者挂到同一个 transport 实例上,省去手动 fan-out。

何时调用

  • 宿主在构造期挂一个 listener,直到调用返回的 unsubscribe 才解除注册
  • 通知时以快照方式遍历 listeners,所以 emit 过程中订阅/取消订阅是安全的
  • listener 抛出的异常会被吞掉——绝不能污染运行时的 emit 路径

框架内置使用方

  • replay 录制器(订阅 TURN_BEGIN / TURN_END / 工具 / 子 agent 事件,替代过去从 agent 状态直接调 replay adapter 的路径)
  • agent.events() 背后的宿主事件流(见 4.7

调用前先探测 —— 路径 C 的自定义实现可能完全不实现该方法:

python
sub = getattr(transport, "subscribe", None)
if sub is not None:
    unsubscribe = sub(my_listener)
    try:
        ...
    finally:
        unsubscribe()

NullTransportSdkTransport 通过组合 agentao.transport.EventBroadcaster 拿到 subscribe()。从零实现的 transport(如 ACP)想要 fan-out,可以照搬同样的写法:

python
from agentao.transport import EventBroadcaster

class MyTransport:
    def __init__(self):
        self._broadcast = EventBroadcaster()

    def emit(self, event):
        # ... 你真正的发送路径 ...
        self._broadcast.notify(event)

    def subscribe(self, listener):
        return self._broadcast.subscribe(listener)

三种实现路径

路径适合场景复杂度
SdkTransport + 回调90% 嵌入场景最低
继承 NullTransport 覆盖部分方法只关心某几个事件
从零实现 Transport Protocol完全自定义(如 ACP、消息队列桥接)

路径 A · SdkTransport

参见 4.3

python
from agentao.transport import SdkTransport

transport = SdkTransport(
    on_event=handle,
    confirm_tool=approve,
    ask_user=prompt,
    on_max_iterations=bail_out,
)

路径 B · 继承 NullTransport

当你只关心部分事件且想显式控制每个方法时:

python
from agentao.transport import NullTransport, EventType

class MyTransport(NullTransport):
    def __init__(self, on_token):
        self.on_token = on_token

    def emit(self, event):
        if event.type == EventType.LLM_TEXT:
            self.on_token(event.data["chunk"])
        # 其他事件继续走 NullTransport 默认(即 pass)

    def confirm_tool(self, name, desc, args):
        # 只允许读类工具
        return name.startswith("read_") or name == "glob"

路径 C · 从零实现

最典型的真实例子:ACP 服务端。它不继承任何基类,而是把每次 emit 转成 session/update 通知、把 confirm_tool 转成 session/request_permission 请求发给 ACP Client。

python
class MyCustomTransport:
    """把 Agent 事件桥到你自己的消息协议。"""
    def __init__(self, send_to_client):
        self.send = send_to_client

    def emit(self, event):
        self.send({"type": "agent_event",
                   "event": event.type.value,
                   "data": event.data})

    def confirm_tool(self, name, desc, args):
        # 发请求给客户端,同步等响应
        return self.send({"type": "confirm", ...}, wait=True)

    def ask_user(self, q):
        return self.send({"type": "ask", "question": q}, wait=True)

    def on_max_iterations(self, count, msgs):
        return {"action": "stop"}

线程与异步注意事项

  • 同步 Agent 线程里调用 Transport:所有 4 个方法在 Agent 的 chat() 循环里被同步调用
  • 如果你的宿主是 asyncio:
    • emit 可以 asyncio.run_coroutine_threadsafe(...) 回主循环
    • confirm_tool / ask_user 需要跨线程同步等待(见 4.5 实现模式)

测试你的 Transport

python
from agentao.transport import AgentEvent, EventType

def test_my_transport():
    t = MyTransport()
    # 1. emit 不得抛
    t.emit(AgentEvent(EventType.LLM_TEXT, {"chunk": "hi"}))
    # 2. confirm_tool 必须返回 bool
    assert isinstance(t.confirm_tool("x", "", {}), bool)
    # 3. ask_user 必须返回 str
    assert isinstance(t.ask_user("q?"), str)
    # 4. on_max_iterations 必须返回带 action 的 dict
    r = t.on_max_iterations(100, [])
    assert r["action"] in {"continue", "stop", "new_instruction"}

TL;DR

  • Transport = 4 个方法emit(fire-and-forget)·confirm_tool(阻塞 bool)·ask_user(阻塞 str)·on_max_iterations(阻塞 dict)。
  • emit 抛出的异常会被吞;其他三个的异常会向上传播。
  • NullTransport = 静默 + 自动批准——适合测试和无人值守批处理。
  • 自己实现 Transport 时 4 个方法都要给(哪怕是 no-op stub),让 Agent 循环在所有路径上都安全。

→ 下一节:4.2 AgentEvent 事件清单