Skip to content

4.3 SdkTransport 快速桥接

本节你会学到

  • SdkTransport 的 4 个可选回调及缺省 fallback
  • 惯用模式:dispatcher、fan-out、用类来组织共享状态
  • 常见坑:挂死、异常、与 legacy 回调混用

SdkTransport 是 Agentao 官方提供的通用 Transport 实现——四个回调,覆盖 90% 嵌入场景。

构造器

python
class SdkTransport:
    def __init__(
        self,
        on_event:          Optional[Callable[[AgentEvent], None]]       = None,
        confirm_tool:      Optional[Callable[[str, str, dict], bool]]    = None,
        ask_user:          Optional[Callable[[str], str]]                = None,
        on_max_iterations: Optional[Callable[[int, list], dict]]         = None,
    ) -> None: ...

所有参数都可选。未提供的方法回退到 NullTransport 默认行为:

未设回退行为
on_event丢弃事件(静默)
confirm_tool自动批准所有工具(返回 True
ask_user返回固定字符串 "[ask_user: not available in non-interactive mode]"
on_max_iterations{"action": "stop"}

最简用法

python
from agentao.transport import SdkTransport

transport = SdkTransport(
    on_event=lambda ev: print(ev.type.value, ev.data),
)

这已经够用来打印整个事件流——你的回调每收到一个事件就打印一行。

四个回调的典型实现

1) on_event — 事件分发器

python
from agentao.transport import EventType

def on_event(event):
    match event.type:
        case EventType.LLM_TEXT:
            render_chunk(event.data["chunk"])
        case EventType.TOOL_START:
            open_tool_card(event.data)
        case EventType.TOOL_OUTPUT:
            append_tool_output(event.data)
        case EventType.TOOL_COMPLETE:
            close_tool_card(event.data)
        case EventType.ERROR:
            show_error(event.data)
        # 其他事件忽略

Python 3.10+ 可用 match;老版本用 if/elif

2) confirm_tool — 确认弹窗

python
def confirm_tool(tool_name: str, description: str, args: dict) -> bool:
    # 自动批准只读/安全工具
    if tool_name in {"read_file", "glob", "grep"}:
        return True

    # 其他全部弹窗
    return user_confirm_dialog(
        title=f"Allow {tool_name}?",
        details=f"{description}\n\n{json.dumps(args, indent=2)}",
    )

⚠️ 这是同步阻塞调用。如果你的 UI 是异步(如 Flask 异步/Electron),需要把弹窗调用转到 UI 线程再同步等待结果——详见 4.5

3) ask_user — 文本反问

python
def ask_user(question: str) -> str:
    return user_text_input_dialog(question) or ""

同样是阻塞调用。用户如果关窗不答,返回空字符串让 Agent 优雅处理。

4) on_max_iterations — 兜底

python
def on_max_iterations(count: int, messages: list) -> dict:
    # 默认做法:问用户要不要继续
    answer = user_confirm_dialog(
        f"Agent reached {count} iterations. Continue?"
    )
    if answer:
        return {"action": "continue"}
    return {"action": "stop"}

把所有回调装在一个类里

当回调之间共享状态(比如同一个 UI 对象、同一个会话 ID)时,用类封装更干净:

python
class ChatSession:
    def __init__(self, ui, session_id: str):
        self.ui = ui
        self.session_id = session_id
        self._events = []

    def on_event(self, event):
        self._events.append(event)
        self.ui.push_event(self.session_id, event)

    def confirm_tool(self, name, desc, args):
        return self.ui.ask_approval(self.session_id, name, desc, args)

    def ask_user(self, q):
        return self.ui.ask_text(self.session_id, q)

    def on_max_iterations(self, count, msgs):
        return self.ui.ask_continue(self.session_id, count)

# 用法
session = ChatSession(ui, "sess-123")
transport = SdkTransport(
    on_event=session.on_event,
    confirm_tool=session.confirm_tool,
    ask_user=session.ask_user,
    on_max_iterations=session.on_max_iterations,
)
agent = Agentao(transport=transport, working_directory=Path("/tmp/sess-123"))

多个订阅者:扇出事件流

一个 on_event 可以扇出到多个消费者:

python
class EventFanout:
    def __init__(self):
        self.subscribers = []

    def subscribe(self, callback):
        self.subscribers.append(callback)

    def __call__(self, event):
        for cb in self.subscribers:
            try:
                cb(event)
            except Exception as e:
                logger.warning(f"Subscriber failed: {e}")

fanout = EventFanout()
fanout.subscribe(write_to_database)
fanout.subscribe(push_to_websocket)
fanout.subscribe(update_ui_state)

transport = SdkTransport(on_event=fanout)

向后兼容的 8 回调 API

Agentao 0.2.10 之前用 8 个独立回调(confirmation_callback, step_callback, thinking_callback, …)。这种写法仍被接受,但内部会通过 build_compat_transport() 自动转成一个 SdkTransport

python
# 老代码(仍能跑)
agent = Agentao(
    confirmation_callback=lambda n, d, a: True,
    llm_text_callback=lambda chunk: print(chunk, end=""),
    step_callback=lambda name, args: print(f"[{name}]"),
)

# 新代码(推荐)
def on_event(ev):
    if ev.type == EventType.LLM_TEXT:
        print(ev.data["chunk"], end="")
    elif ev.type == EventType.TOOL_START:
        print(f"[{ev.data['tool']}]")

agent = Agentao(transport=SdkTransport(
    on_event=on_event,
    confirm_tool=lambda n, d, a: True,
))

参见 2.2 构造器参数表 · 已废弃的 8 个回调

⚠️ 常见陷阱

上线前先确认这几条

  • on_event 里抛异常 —— emit 会替你吞掉,但下游副作用可能只做了一半
  • confirm_tool 里长时间卡死 —— Agent 循环跟着你一起挂
  • 同时传 transport= 和 legacy 回调 —— legacy 那些会被静默忽略

下面每一条都附完整修法。

❌ 在 on_event 里抛异常

python
def on_event(ev):
    if ev.type == EventType.LLM_TEXT:
        ui.append(ev.data["chunk"])   # 如果 ui 崩了?

SdkTransport.emit 会吞掉所有异常保护 Agent,但下游副作用可能只做了一半。写 on_event 时把每个分支的失败当作独立风险管理:

python
def on_event(ev):
    try:
        dispatch(ev)
    except Exception as e:
        logger.warning("event dispatch failed", exc_info=e)

❌ 在 confirm_tool 里长时间卡死

如果你的确认弹窗 bug 导致永不返回,Agent 会永远挂在那。务必给同步等待加超时(见 4.5)。

❌ 混用 transport 和 legacy callbacks

python
# 都传了——legacy 会被忽略
agent = Agentao(
    transport=my_transport,
    confirmation_callback=my_callback,  # 不会被调用!
)

二选一。transport 优先级最高。

最小 "什么都处理" 模板

python
from agentao import Agentao
from agentao.transport import SdkTransport, EventType
from pathlib import Path

class AgentBridge:
    def on_event(self, ev):
        handlers = {
            EventType.TURN_START: self._turn,
            EventType.LLM_TEXT: self._text,
            EventType.THINKING: self._thinking,
            EventType.TOOL_START: self._tool_start,
            EventType.TOOL_OUTPUT: self._tool_out,
            EventType.TOOL_COMPLETE: self._tool_done,
            EventType.ERROR: self._error,
            EventType.AGENT_START: self._sub_start,
            EventType.AGENT_END: self._sub_end,
        }
        h = handlers.get(ev.type)
        if h: h(ev.data)

    def _turn(self, d): pass
    def _text(self, d): print(d["chunk"], end="", flush=True)
    def _thinking(self, d): print(f"\n[💭 {d['text']}]", flush=True)
    def _tool_start(self, d): print(f"\n[🔧 {d['tool']}]")
    def _tool_out(self, d): pass
    def _tool_done(self, d): print(f" ✓ ({d['duration_ms']}ms)")
    def _error(self, d): print(f"\n[❌ {d['message']}]")
    def _sub_start(self, d): print(f"\n[🧭 sub: {d['agent']}]")
    def _sub_end(self, d): print(f" ✓ {d['turns']} turns")

    def confirm_tool(self, name, desc, args):
        return input(f"Allow {name}? [y/N] ").lower() == "y"

    def ask_user(self, q):
        return input(f"Agent asks: {q}\n> ")

    def on_max_iterations(self, count, msgs):
        return {"action": "stop"}


bridge = AgentBridge()
transport = SdkTransport(
    on_event=bridge.on_event,
    confirm_tool=bridge.confirm_tool,
    ask_user=bridge.ask_user,
    on_max_iterations=bridge.on_max_iterations,
)
agent = Agentao(transport=transport, working_directory=Path.cwd())
print(agent.chat("hello"))
agent.close()

TL;DR

  • 4 个回调全部可选;缺哪个就 fallback 到 NullTransport 的对应行为(静默 / 自动批准 / 非交互字符串 / {"action": "stop"})。
  • 当多个回调共享 UI 状态或会话 id 时,用类把它们组织起来 —— 闭包 + per-session self 是最干净的写法。
  • 多个消费者同时关心事件(DB log + WebSocket + UI)时,加一个简单的 dispatcher 做 fan-out。
  • 永远不要在 on_event 里 raise —— 每个分支都加 try/except,否则 SdkTransport.emit 会替你吞下异常但下游副作用可能只做了一半。

→ 下一节:4.4 构建流式 UI