❝在 Agent 应用中,不是所有决策都应该由 AI 自主完成。本文以实际业务场景为主线,深入讲解如何在 Agent 执行流程中实现 Human-in-the-Loop(HIL)——让 AI 在关键节点暂停执行、等待人工介入、再携带人工决策恢复运行。❞
Agent 的核心能力是自主规划和执行任务。但在实际业务中,很多场景需要人类参与决策:
这些场景的共同特点是:「执行流程需要在某个点暂停,等待人类输入,然后带着人类的决策继续执行」。
这就是 Human-in-the-Loop(HIL)要解决的问题。
在 Agent Graph 执行框架中,HIL 有两种本质不同的实现模式:
外部中断(External Interrupt) | 编程式中断(Programmatic Interrupt) | |
|---|---|---|
「触发方」 | 节点外部(用户/运营人员) | 节点内部(代码主动触发) |
「类比」 | 遥控器上的"暂停"按钮 | 程序弹出的"确认对话框" |
「中断时机」 | 两个节点之间(当前节点跑完再暂停) | 节点执行过程中 |
「是否需要人工输入」 | 不需要(暂停后直接恢复即可) | 需要(必须携带人工决策才能恢复) |
「恢复后行为」 | 从下一个节点继续 | 重新执行被中断的节点 |
「典型场景」 | 运营紧急叫停、用户取消 | 审批流程、工具调用确认 |
下面分别用具体业务场景来详解这两种模式。
一个 AI 客服对话系统,后端有一个三步流水线:
[prepare] → [call_model] → [finalize]
组装提示词 调用大模型 整理回复
运营人员在后台管理界面上有一个"暂停"按钮。当发现用户问的是敏感话题,运营需要暂停 AI 回复,由人工介入审核后再恢复。
外部中断的设计思路很简单——「用一个 channel 传递中断信号」:
// 创建带中断能力的 context
ctx, interrupt := graph.WithGraphInterrupt(context.Background())
WithGraphInterrupt 内部做了什么?
func WithGraphInterrupt(parent context.Context) (context.Context, func(...GraphInterruptOption)) {
st := &graphInterruptState{
done: make(chan struct{}), // 中断信号 channel
}
ctx = context.WithValue(parent, graphInterruptKey{}, st)
interrupt = func(opts ...GraphInterruptOption) {
st.once.Do(func() {
close(st.done) // 关闭 channel = 发送中断信号
})
}
return ctx, interrupt
}
就两件事:
done channel,塞进 contextinterrupt 函数,调用它就关闭 channelsaver := inmemory.NewSaver()
exec, _ := graph.NewExecutor(g, graph.WithCheckpointSaver(saver))
st := graph.State{graph.CfgKeyLineageID: "session-12345"}
ch, _ := exec.Execute(ctx, st, inv)
Executor 启动时,从 context 中取出中断信号,创建一个 「watcher goroutine」 在后台监听:
func (e *Executor) executeGraph(ctx context.Context, ...) error {
interruptState := graphInterruptFromContext(ctx)
ctx, extInterrupt := newExternalInterruptWatcher(ctx, interruptState)
defer extInterrupt.stop()
// ...
}
watcher 的逻辑非常直接——阻塞等待 done channel:
func (w *externalInterruptWatcher) listen() {
select {
case <-w.stopCh: // 正常退出
return
case <-w.state.doneCh(): // 收到中断信号!
}
// 如果没设超时,直接返回(Planned 模式)
// 如果设了超时,等超时后 cancel context(Forced 模式)
}
BSP 循环进入 Step 0,「在执行每一步之前都会检查中断信号」:
func (e *Executor) runBspStep(..., extInterrupt *externalInterruptWatcher) {
tasks, _ := e.planTasksForBspStep(...) // 规划本步要执行的节点
// 关键检查点:执行前检查中断信号
if handled, err := e.maybeHandleExternalInterruptBeforeStep(
..., tasks, step, ..., extInterrupt,
); handled || err != nil {
return // 被中断了,不执行
}
// 没被中断,正常执行
e.executeStepWithInterruptHandling(tasks)
}
此时还没人按暂停,prepare 正常执行完毕。
prepare 执行过程中,运营人员发现问题,按下暂停按钮:
interrupt() // 内部执行 close(done)
但这里有个关键设计:「Planned 模式不会打断正在运行的节点」。watcher 检测到信号后:
timeout := w.state.timeoutOrNil() // Planned 模式没设超时 → nil
if timeout == nil {
return // 直接返回,不 cancel context
}
所以 prepare 不受任何影响,正常跑完。
prepare 完成后,BSP 循环进入 Step 1,准备执行 call_model。此时再次检查中断信号:
func (e *Executor) maybeHandleExternalInterruptBeforeStep(...) (bool, error) {
if extInterrupt == nil || !extInterrupt.requested() {
return false, nil // 没有中断请求,继续
}
// 检测到中断请求!
interrupt := newExternalInterruptError(extInterrupt.forced(ctx))
interrupt.NextNodes = nextNodesFromTasks(tasks) // ["call_model"]
return true, e.handleInterrupt(...)
}
requested() 方法检查 done channel 是否已关闭:
func (s *graphInterruptState) requested() bool {
select {
case <-s.done:
return true // channel 已关闭 = 有人按了暂停
default:
return false
}
}
handleInterrupt 将当前执行状态持久化为一个 「interrupt checkpoint」:
func (e *Executor) handleInterrupt(..., interrupt *InterruptError, step int, ...) error {
checkpoint := e.createCheckpointFromState(execCtx.State, step, execCtx)
checkpoint.SetInterruptState(interrupt.NodeID, interrupt.TaskID, interrupt.Value, step, ...)
checkpoint.NextNodes = nextNodes // ["call_model"] — 恢复后要执行的节点
e.checkpointSaver.PutFull(saveCtx, req) // 持久化
// 发送中断事件给前端
agent.EmitEvent(eventCtx, invocation, execCtx.EventChan, interruptEvent)
return interrupt // 中断错误沿调用链返回
}
外部中断的 SkipRerun = true,因为被中断的节点(prepare)已经跑完了,恢复时应该从「下一个」节点(call_model)开始。
保存的 Checkpoint 数据结构长这样:
┌──────────────────────────────────────────────┐
│ Checkpoint (interrupt) │
│ │
│ state: {messages: [UserMessage("xxx")]} │
│ nextNodes: ["call_model"] ← 恢复后从这继续 │
│ interruptValue: {key: "external_interrupt"} │
│ SkipRerun: true ← 不重跑当前节点 │
└──────────────────────────────────────────────┘
运营审核完毕后,前端调用恢复接口:
resumeState := graph.State{
graph.CfgKeyLineageID: "session-12345",
graph.CfgKeyCheckpointID: meta.CheckpointID, // 指定从哪个 checkpoint 恢复
}
ch2, _ := exec.Execute(context.Background(), resumeState, inv2)
Executor 加载 checkpoint,恢复 state 和 nextNodes,从 call_model 继续执行。
Planned(温和暂停) | Forced(强制暂停) | |
|---|---|---|
「触发方式」 | interrupt() | interrupt(WithGraphInterruptTimeout(50ms)) |
「对正在运行的节点」 | 不干预,让它跑完 | 超时后 cancel context |
「适用场景」 | 常规暂停 | 紧急叫停(节点可能耗时很长) |
Forced 模式下,watcher 会在超时后 cancel context:
func (w *externalInterruptWatcher) listen() {
// ...收到中断信号
timeout := w.state.timeoutOrNil() // Forced 模式有超时
timer := time.NewTimer(*timeout)
select {
case <-timer.C:
w.cancel(errGraphInterruptTimeout) // 强制取消!
}
}
节点代码需要配合检查 ctx.Done():
func slowNode(ctx context.Context, st graph.State) (any, error) {
select {
case <-ctx.Done():
return nil, ctx.Err() // 被强制中断
case <-time.After(longDuration):
return result, nil
}
}
一个电商订单系统,用户下单后需要经过审批:
[request_approval] → [process_order]
生成审批请求 处理订单
(等待审批人确认)
request_approval 节点需要「主动暂停执行」,把审批请求发给前端,等审批人点击"批准"或"拒绝"后,再带着审批结果恢复执行。
编程式中断通过 graph.Interrupt() 函数实现。这个函数的巧妙之处在于——「同一个函数,首次调用触发中断,恢复后再次调用返回恢复值」:
func Interrupt(ctx context.Context, state State, key string, prompt any) (any, error)
InterruptError(中断)func requestApprovalNode(ctx context.Context, st graph.State) (any, error) {
// 构造要展示给审批人的信息
prompt := map[string]any{
"message": "订单 #67890 金额 ¥5000,请审批",
"options": []string{"approve", "reject"},
}
// 调用 Interrupt —— 首次执行会在这里中断
resume, err := graph.Interrupt(ctx, st, "request_approval", prompt)
if err != nil {
returnnil, err // ← 首次执行走这里
}
// 恢复后走这里
decision := resume.(string) // "approve" 或 "reject"
return graph.State{"approval": decision}, nil
}
Interrupt 函数内部的判断逻辑:
func Interrupt(ctx context.Context, state State, key string, prompt any) (any, error) {
// 1. 检查是否已经用过这个 key 的恢复值(幂等保护)
usedMap, _ := state[StateKeyUsedInterrupts].(map[string]any)
if usedValue, exists := usedMap[key]; exists {
return usedValue, nil
}
// 2. 检查是否有直接的恢复值(Resume 通道)
if resumeValue, exists := state[ResumeChannel]; exists {
usedMap[key] = resumeValue
delete(state, ResumeChannel)
return resumeValue, nil
}
// 3. 检查是否有按 key 分发的恢复值(ResumeMap)
if resumeMap, exists := state[StateKeyResumeMap]; exists {
if resumeValue, exists := resumeMapTyped[key]; exists {
usedMap[key] = resumeValue
delete(resumeMapTyped, key)
return resumeValue, nil
}
}
// 4. 都没有 → 触发中断
interrupt := NewInterruptError(prompt)
interrupt.Key = key
returnnil, interrupt
}
首次执行时,state 中没有任何恢复值,走到第 4 步,返回 InterruptError。
错误沿调用链向上传播:
requestApprovalNode() → InterruptError
→ executeSingleAttempt → InterruptError
→ executeTaskWithRetry → evaluateRetryDecision
在 evaluateRetryDecision 中,InterruptError 被特殊处理——「不重试,直接返回」:
if IsInterruptError(retryCtx.err) {
if interrupt, ok := GetInterruptError(retryCtx.err); ok {
interrupt.NodeID = t.NodeID // 填充 "request_approval"
interrupt.Step = step // 填充 0
}
return false, retryCtx.err // 不重试
}
继续向上到 handleExecuteStepError,最终进入 handleInterrupt。
handleInterrupt 中有一段关键逻辑——决定恢复时是否重跑被中断的节点:
// SkipRerun 默认 false(编程式中断的核心特点)
if !interrupt.SkipRerun {
// 确保被中断的节点在 NextNodes 中
hasNode := false
for _, nodeID := range nextNodes {
if nodeID == interrupt.NodeID {
hasNode = true
}
}
if !hasNode && interrupt.NodeID != "" {
nextNodes = append([]string{interrupt.NodeID}, nextNodes...)
}
}
checkpoint.NextNodes = nextNodes // ["request_approval"]
「为什么编程式中断要重跑节点?」
因为节点函数执行到一半就返回了 error,「它的返回值还没有被写入 state」。如果跳过重跑,直接执行下一个节点,state 中就缺少了审批结果。
保存的 Checkpoint:
┌──────────────────────────────────────────────────────────┐
│ Checkpoint (interrupt) │
│ │
│ state: {} ← 节点没跑完,state 没有更新 │
│ nextNodes: ["request_approval"] ← 恢复后重跑这个节点 │
│ interruptValue: { │
│ "message": "订单 #67890 金额 ¥5000,请审批", │
│ "options": ["approve", "reject"] │
│ } │
│ SkipRerun: false ← 需要重跑 │
└──────────────────────────────────────────────────────────┘
meta, done, err := drainEvents(ch)
// meta.InterruptKey = "request_approval"
// meta.InterruptValue = {"message": "订单 #67890...", "options": [...]}
// meta.CheckpointID = "ckpt-001"
前端展示审批弹窗:"订单 #67890 金额 ¥5000,请审批 [批准] [拒绝]"
审批人点击"批准"。
resumeCmd := graph.NewResumeCommand().
AddResumeValue("request_approval", "approve")
resumeState := graph.State{
graph.CfgKeyLineageID: "order-67890",
graph.CfgKeyCheckpointID: "ckpt-001",
graph.StateKeyCommand: resumeCmd, // ← 携带恢复值
}
ch2, _ := exec.Execute(ctx, resumeState, inv2)
恢复流程内部,processResumeCommand 将恢复值注入 state:
func (e *Executor) processResumeCommand(execState, initialState State) State {
cmd := initialState[StateKeyCommand].(*ResumeCommand)
// cmd.ResumeMap = {"request_approval": "approve"}
execState[StateKeyResumeMap] = cmd.ResumeMap // 注入恢复值
delete(execState, StateKeyCommand)
return execState
}
BSP 循环从 checkpoint 恢复,nextNodes = ["request_approval"],于是再次执行 requestApprovalNode。
「同一个节点函数再次执行」,再次调用 graph.Interrupt():
resume, err := graph.Interrupt(ctx, st, "request_approval", prompt)
这次 state 中有 __resume_map__:
// Interrupt 内部
resumeMap := state["__resume_map__"] // {"request_approval": "approve"}
resumeValue := resumeMapTyped["request_approval"] // "approve"
usedMap["request_approval"] = "approve"
delete(resumeMapTyped, "request_approval")
return "approve", nil // ← 直接返回,不中断!
节点函数继续执行,正常返回审批结果:
decision := resume.(string) // "approve"
return graph.State{"approval": "approve"}, nil
之后 process_order 节点正常执行,整个流程完成。
首次执行:
节点函数 → Interrupt("request_approval", prompt)
→ state 中无恢复值
→ 返回 InterruptError ← 中断!
→ 保存 checkpoint (nextNodes: ["request_approval"])
恢复执行:
节点函数 → Interrupt("request_approval", prompt) ← 同一个函数重跑
→ state 中有恢复值 "approve"
→ 返回 ("approve", nil) ← 不中断!
→ 节点正常完成
理解两种模式 Checkpoint 的差异,是理解整个 HIL 机制的关键:
外部中断 Checkpoint: 编程式中断 Checkpoint:
┌─────────────────────────┐ ┌─────────────────────────┐
│ state: {已更新} │ │ state: {未更新} │
│ nextNodes: [下一个节点] │ │ nextNodes: [当前节点] │
│ SkipRerun: true │ │ SkipRerun: false │
│ interruptValue: │ │ interruptValue: │
│ {key: "external_..."} │ │ {审批请求内容} │
│ │ │ │
│ 恢复: 直接跑下一个节点 │ │ 恢复: 需要 ResumeCommand │
│ 不需要额外输入 │ │ 携带人工决策值 │
└─────────────────────────┘ └─────────────────────────┘
维度 | 外部中断 | 编程式中断 |
|---|---|---|
state 是否包含当前节点输出 | 是(节点跑完了) | 否(节点中途中断) |
nextNodes | 下一个待执行节点 | 被中断的当前节点 |
SkipRerun | true(跳过重跑) | false(必须重跑) |
interruptValue | 固定标识 | 业务数据(审批请求) |
恢复时是否需要额外数据 | 不需要 | 需要(ResumeCommand) |
在真实业务中,一个流程可能有多个需要人工介入的节点。Interrupt 的 key 参数和 ResumeCommand 的 ResumeMap 就是为此设计的:
// 节点 A:金额审批
func amountApproval(ctx context.Context, st graph.State) (any, error) {
resume, err := graph.Interrupt(ctx, st, "amount_check", map[string]any{
"message": "大额订单 ¥50000,是否批准?",
})
if err != nil {
returnnil, err
}
return graph.State{"amount_approved": resume.(bool)}, nil
}
// 节点 B:合规审批
func complianceApproval(ctx context.Context, st graph.State) (any, error) {
resume, err := graph.Interrupt(ctx, st, "compliance_check", map[string]any{
"message": "该订单涉及跨境交易,是否合规?",
})
if err != nil {
returnnil, err
}
return graph.State{"compliance_approved": resume.(bool)}, nil
}
恢复时按 key 分发恢复值:
resumeCmd := graph.NewResumeCommand().
AddResumeValue("amount_check", true).
AddResumeValue("compliance_check", true)
每个 Interrupt 调用根据自己的 key 从 ResumeMap 中取对应的值,互不干扰。
整个 HIL 机制建立在 Checkpoint 之上。没有 Checkpoint,中断后就无法恢复。在生产环境中,应使用持久化存储(数据库、Redis 等)而非内存存储。
中断发生时,框架通过事件通道将中断信息(包括 checkpointID、interruptKey、interruptValue)推送给前端。前端据此展示相应的 UI(审批弹窗、暂停提示等),用户操作后携带 checkpointID 和恢复值调用后端恢复接口。
后端 Executor 前端 UI
│ │
├── InterruptEvent ──────────────────→ │
│ {checkpointID, key, value} │
│ ├── 展示审批弹窗
│ │
│ ├── 用户点击"批准"
│ │
│ ←────── Resume Request ──────────────┤
│ {checkpointID, resumeValue} │
│ │
├── 恢复执行 ──────────────────────────→│
│ │
Interrupt 函数内部通过 usedMap 实现幂等保护——如果同一个 key 的恢复值已经被使用过,再次调用会直接返回之前的值,避免重复中断:
if usedValue, exists := usedMap[key]; exists {
return usedValue, nil // 幂等:返回之前用过的值
}
外部中断支持超时机制,适用于紧急场景。但编程式中断是"无限等待"的——Checkpoint 保存在持久化存储中,理论上可以等待任意长时间。在业务层面,你可能需要自己实现超时逻辑(比如审批超过 24 小时自动拒绝)。
Human-in-the-Loop 的核心是「可恢复的执行中断」。在 Agent Graph 执行框架中,它通过两种互补的机制实现:
两种模式共同依赖 「Checkpoint 持久化」机制,将执行状态保存下来,实现跨时间、跨进程的执行恢复。无论是紧急叫停还是审批流程,HIL 让 Agent 在自主和可控之间找到了平衡点。
在 Agent 应用越来越复杂的今天,HIL 不是可选项,而是生产级 Agent 系统的必备能力。让 AI 学会在关键时刻"等一下",是构建可信赖 Agent 系统的重要一步。