首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Agent 应用中的 Human-in-the-Loop:让 AI 学会"等一下"

Agent 应用中的 Human-in-the-Loop:让 AI 学会"等一下"

作者头像
tunsuy
发布2026-04-09 11:27:49
发布2026-04-09 11:27:49
1700
举报

❝在 Agent 应用中,不是所有决策都应该由 AI 自主完成。本文以实际业务场景为主线,深入讲解如何在 Agent 执行流程中实现 Human-in-the-Loop(HIL)——让 AI 在关键节点暂停执行、等待人工介入、再携带人工决策恢复运行。❞

一、为什么 Agent 需要 Human-in-the-Loop

Agent 的核心能力是自主规划和执行任务。但在实际业务中,很多场景需要人类参与决策:

  • 「风控审批」:AI 客服准备给用户退款 5000 元,需要主管审批
  • 「敏感内容审核」:AI 生成的回复涉及法律/医疗建议,需要专业人员确认
  • 「运营干预」:运营人员发现 AI 正在处理一个异常请求,需要紧急叫停
  • 「工具调用确认」:Agent 准备调用一个有副作用的外部 API(如发邮件、转账),需要用户确认

这些场景的共同特点是:「执行流程需要在某个点暂停,等待人类输入,然后带着人类的决策继续执行」

这就是 Human-in-the-Loop(HIL)要解决的问题。

二、HIL 的两种模式

在 Agent Graph 执行框架中,HIL 有两种本质不同的实现模式:

外部中断(External Interrupt)

编程式中断(Programmatic Interrupt)

「触发方」

节点外部(用户/运营人员)

节点内部(代码主动触发)

「类比」

遥控器上的"暂停"按钮

程序弹出的"确认对话框"

「中断时机」

两个节点之间(当前节点跑完再暂停)

节点执行过程中

「是否需要人工输入」

不需要(暂停后直接恢复即可)

需要(必须携带人工决策才能恢复)

「恢复后行为」

从下一个节点继续

重新执行被中断的节点

「典型场景」

运营紧急叫停、用户取消

审批流程、工具调用确认

下面分别用具体业务场景来详解这两种模式。

三、场景一:运营人员的"暂停"按钮(外部中断)

3.1 业务场景

一个 AI 客服对话系统,后端有一个三步流水线:

代码语言:javascript
复制
[prepare] → [call_model] → [finalize]
 组装提示词      调用大模型       整理回复

运营人员在后台管理界面上有一个"暂停"按钮。当发现用户问的是敏感话题,运营需要暂停 AI 回复,由人工介入审核后再恢复。

3.2 核心设计

外部中断的设计思路很简单——「用一个 channel 传递中断信号」

代码语言:javascript
复制
// 创建带中断能力的 context
ctx, interrupt := graph.WithGraphInterrupt(context.Background())

WithGraphInterrupt 内部做了什么?

代码语言:javascript
复制
func WithGraphInterrupt(parent context.Context) (context.Context, func(...GraphInterruptOption)) {
    st := &graphInterruptState{
        done: make(chan struct{}),  // 中断信号 channel
    }
    ctx = context.WithValue(parent, graphInterruptKey{}, st)

    interrupt = func(opts ...GraphInterruptOption) {
        st.once.Do(func() {
            close(st.done)  // 关闭 channel = 发送中断信号
        })
    }
    return ctx, interrupt
}

就两件事:

  1. 创建一个 done channel,塞进 context
  2. 返回一个 interrupt 函数,调用它就关闭 channel

3.3 执行流程详解

第一步:启动执行
代码语言:javascript
复制
saver := inmemory.NewSaver()
exec, _ := graph.NewExecutor(g, graph.WithCheckpointSaver(saver))

st := graph.State{graph.CfgKeyLineageID: "session-12345"}
ch, _ := exec.Execute(ctx, st, inv)

Executor 启动时,从 context 中取出中断信号,创建一个 「watcher goroutine」 在后台监听:

代码语言:javascript
复制
func (e *Executor) executeGraph(ctx context.Context, ...) error {
    interruptState := graphInterruptFromContext(ctx)
    ctx, extInterrupt := newExternalInterruptWatcher(ctx, interruptState)
    defer extInterrupt.stop()
    // ...
}

watcher 的逻辑非常直接——阻塞等待 done channel:

代码语言:javascript
复制
func (w *externalInterruptWatcher) listen() {
    select {
    case <-w.stopCh:   // 正常退出
        return
    case <-w.state.doneCh():  // 收到中断信号!
    }
    // 如果没设超时,直接返回(Planned 模式)
    // 如果设了超时,等超时后 cancel context(Forced 模式)
}
第二步:prepare 节点正常执行

BSP 循环进入 Step 0,「在执行每一步之前都会检查中断信号」

代码语言:javascript
复制
func (e *Executor) runBspStep(..., extInterrupt *externalInterruptWatcher) {
    tasks, _ := e.planTasksForBspStep(...)  // 规划本步要执行的节点

    // 关键检查点:执行前检查中断信号
    if handled, err := e.maybeHandleExternalInterruptBeforeStep(
        ..., tasks, step, ..., extInterrupt,
    ); handled || err != nil {
        return  // 被中断了,不执行
    }

    // 没被中断,正常执行
    e.executeStepWithInterruptHandling(tasks)
}

此时还没人按暂停,prepare 正常执行完毕。

第三步:运营人员按下暂停

prepare 执行过程中,运营人员发现问题,按下暂停按钮:

代码语言:javascript
复制
interrupt()  // 内部执行 close(done)

但这里有个关键设计:「Planned 模式不会打断正在运行的节点」。watcher 检测到信号后:

代码语言:javascript
复制
timeout := w.state.timeoutOrNil()  // Planned 模式没设超时 → nil
if timeout == nil {
    return  // 直接返回,不 cancel context
}

所以 prepare 不受任何影响,正常跑完。

第四步:在下一步开始前拦截

prepare 完成后,BSP 循环进入 Step 1,准备执行 call_model。此时再次检查中断信号:

代码语言:javascript
复制
func (e *Executor) maybeHandleExternalInterruptBeforeStep(...) (bool, error) {
    if extInterrupt == nil || !extInterrupt.requested() {
        return false, nil  // 没有中断请求,继续
    }
    // 检测到中断请求!
    interrupt := newExternalInterruptError(extInterrupt.forced(ctx))
    interrupt.NextNodes = nextNodesFromTasks(tasks)  // ["call_model"]
    return true, e.handleInterrupt(...)
}

requested() 方法检查 done channel 是否已关闭:

代码语言:javascript
复制
func (s *graphInterruptState) requested() bool {
    select {
    case <-s.done:
        return true   // channel 已关闭 = 有人按了暂停
    default:
        return false
    }
}
第五步:保存中断 Checkpoint

handleInterrupt 将当前执行状态持久化为一个 「interrupt checkpoint」

代码语言:javascript
复制
func (e *Executor) handleInterrupt(..., interrupt *InterruptError, step int, ...) error {
    checkpoint := e.createCheckpointFromState(execCtx.State, step, execCtx)
    checkpoint.SetInterruptState(interrupt.NodeID, interrupt.TaskID, interrupt.Value, step, ...)
    checkpoint.NextNodes = nextNodes  // ["call_model"] — 恢复后要执行的节点

    e.checkpointSaver.PutFull(saveCtx, req)  // 持久化

    // 发送中断事件给前端
    agent.EmitEvent(eventCtx, invocation, execCtx.EventChan, interruptEvent)
    return interrupt  // 中断错误沿调用链返回
}

外部中断的 SkipRerun = true,因为被中断的节点(prepare)已经跑完了,恢复时应该从「下一个」节点(call_model)开始。

保存的 Checkpoint 数据结构长这样:

代码语言:javascript
复制
┌──────────────────────────────────────────────┐
│ Checkpoint (interrupt)                       │
│                                              │
│ state: {messages: [UserMessage("xxx")]}      │
│ nextNodes: ["call_model"]    ← 恢复后从这继续  │
│ interruptValue: {key: "external_interrupt"}  │
│ SkipRerun: true              ← 不重跑当前节点  │
└──────────────────────────────────────────────┘
第六步:恢复执行

运营审核完毕后,前端调用恢复接口:

代码语言:javascript
复制
resumeState := graph.State{
    graph.CfgKeyLineageID:    "session-12345",
    graph.CfgKeyCheckpointID: meta.CheckpointID,  // 指定从哪个 checkpoint 恢复
}
ch2, _ := exec.Execute(context.Background(), resumeState, inv2)

Executor 加载 checkpoint,恢复 state 和 nextNodes,从 call_model 继续执行。

3.4 Planned vs Forced:两种外部中断策略

Planned(温和暂停)

Forced(强制暂停)

「触发方式」

interrupt()

interrupt(WithGraphInterruptTimeout(50ms))

「对正在运行的节点」

不干预,让它跑完

超时后 cancel context

「适用场景」

常规暂停

紧急叫停(节点可能耗时很长)

Forced 模式下,watcher 会在超时后 cancel context:

代码语言:javascript
复制
func (w *externalInterruptWatcher) listen() {
    // ...收到中断信号
    timeout := w.state.timeoutOrNil()  // Forced 模式有超时
    timer := time.NewTimer(*timeout)
    select {
    case <-timer.C:
        w.cancel(errGraphInterruptTimeout)  // 强制取消!
    }
}

节点代码需要配合检查 ctx.Done()

代码语言:javascript
复制
func slowNode(ctx context.Context, st graph.State) (any, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()  // 被强制中断
    case <-time.After(longDuration):
        return result, nil
    }
}

四、场景二:订单审批的"确认对话框"(编程式中断)

4.1 业务场景

一个电商订单系统,用户下单后需要经过审批:

代码语言:javascript
复制
[request_approval] → [process_order]
  生成审批请求            处理订单
  (等待审批人确认)

request_approval 节点需要「主动暂停执行」,把审批请求发给前端,等审批人点击"批准"或"拒绝"后,再带着审批结果恢复执行。

4.2 核心设计

编程式中断通过 graph.Interrupt() 函数实现。这个函数的巧妙之处在于——「同一个函数,首次调用触发中断,恢复后再次调用返回恢复值」

代码语言:javascript
复制
func Interrupt(ctx context.Context, state State, key string, prompt any) (any, error)
  • 「首次执行」:没有恢复值 → 返回 InterruptError(中断)
  • 「恢复执行」:检测到恢复值 → 直接返回该值(不中断)

4.3 执行流程详解

第一步:节点内主动中断
代码语言:javascript
复制
func requestApprovalNode(ctx context.Context, st graph.State) (any, error) {
    // 构造要展示给审批人的信息
    prompt := map[string]any{
        "message": "订单 #67890 金额 ¥5000,请审批",
        "options": []string{"approve", "reject"},
    }

    // 调用 Interrupt —— 首次执行会在这里中断
    resume, err := graph.Interrupt(ctx, st, "request_approval", prompt)
    if err != nil {
        returnnil, err  // ← 首次执行走这里
    }

    // 恢复后走这里
    decision := resume.(string)  // "approve" 或 "reject"
    return graph.State{"approval": decision}, nil
}

Interrupt 函数内部的判断逻辑:

代码语言:javascript
复制
func Interrupt(ctx context.Context, state State, key string, prompt any) (any, error) {
    // 1. 检查是否已经用过这个 key 的恢复值(幂等保护)
    usedMap, _ := state[StateKeyUsedInterrupts].(map[string]any)
    if usedValue, exists := usedMap[key]; exists {
        return usedValue, nil
    }

    // 2. 检查是否有直接的恢复值(Resume 通道)
    if resumeValue, exists := state[ResumeChannel]; exists {
        usedMap[key] = resumeValue
        delete(state, ResumeChannel)
        return resumeValue, nil
    }

    // 3. 检查是否有按 key 分发的恢复值(ResumeMap)
    if resumeMap, exists := state[StateKeyResumeMap]; exists {
        if resumeValue, exists := resumeMapTyped[key]; exists {
            usedMap[key] = resumeValue
            delete(resumeMapTyped, key)
            return resumeValue, nil
        }
    }

    // 4. 都没有 → 触发中断
    interrupt := NewInterruptError(prompt)
    interrupt.Key = key
    returnnil, interrupt
}

首次执行时,state 中没有任何恢复值,走到第 4 步,返回 InterruptError

第二步:InterruptError 向上传播

错误沿调用链向上传播:

代码语言:javascript
复制
requestApprovalNode() → InterruptError
  → executeSingleAttempt → InterruptError
    → executeTaskWithRetry → evaluateRetryDecision

evaluateRetryDecision 中,InterruptError 被特殊处理——「不重试,直接返回」

代码语言:javascript
复制
if IsInterruptError(retryCtx.err) {
    if interrupt, ok := GetInterruptError(retryCtx.err); ok {
        interrupt.NodeID = t.NodeID  // 填充 "request_approval"
        interrupt.Step = step        // 填充 0
    }
    return false, retryCtx.err      // 不重试
}

继续向上到 handleExecuteStepError,最终进入 handleInterrupt

第三步:保存中断 Checkpoint

handleInterrupt 中有一段关键逻辑——决定恢复时是否重跑被中断的节点:

代码语言:javascript
复制
// SkipRerun 默认 false(编程式中断的核心特点)
if !interrupt.SkipRerun {
    // 确保被中断的节点在 NextNodes 中
    hasNode := false
    for _, nodeID := range nextNodes {
        if nodeID == interrupt.NodeID {
            hasNode = true
        }
    }
    if !hasNode && interrupt.NodeID != "" {
        nextNodes = append([]string{interrupt.NodeID}, nextNodes...)
    }
}
checkpoint.NextNodes = nextNodes  // ["request_approval"]

「为什么编程式中断要重跑节点?」

因为节点函数执行到一半就返回了 error,「它的返回值还没有被写入 state」。如果跳过重跑,直接执行下一个节点,state 中就缺少了审批结果。

保存的 Checkpoint:

代码语言:javascript
复制
┌──────────────────────────────────────────────────────────┐
│ Checkpoint (interrupt)                                   │
│                                                          │
│ state: {}                  ← 节点没跑完,state 没有更新    │
│ nextNodes: ["request_approval"]  ← 恢复后重跑这个节点     │
│ interruptValue: {                                        │
│   "message": "订单 #67890 金额 ¥5000,请审批",            │
│   "options": ["approve", "reject"]                       │
│ }                                                        │
│ SkipRerun: false           ← 需要重跑                    │
└──────────────────────────────────────────────────────────┘
第四步:前端收到审批请求并展示
代码语言:javascript
复制
meta, done, err := drainEvents(ch)
// meta.InterruptKey = "request_approval"
// meta.InterruptValue = {"message": "订单 #67890...", "options": [...]}
// meta.CheckpointID = "ckpt-001"

前端展示审批弹窗:"订单 #67890 金额 ¥5000,请审批 [批准] [拒绝]"

审批人点击"批准"。

第五步:携带审批结果恢复执行
代码语言:javascript
复制
resumeCmd := graph.NewResumeCommand().
    AddResumeValue("request_approval", "approve")

resumeState := graph.State{
    graph.CfgKeyLineageID:    "order-67890",
    graph.CfgKeyCheckpointID: "ckpt-001",
    graph.StateKeyCommand:    resumeCmd,  // ← 携带恢复值
}
ch2, _ := exec.Execute(ctx, resumeState, inv2)

恢复流程内部,processResumeCommand 将恢复值注入 state:

代码语言:javascript
复制
func (e *Executor) processResumeCommand(execState, initialState State) State {
    cmd := initialState[StateKeyCommand].(*ResumeCommand)
    // cmd.ResumeMap = {"request_approval": "approve"}

    execState[StateKeyResumeMap] = cmd.ResumeMap  // 注入恢复值
    delete(execState, StateKeyCommand)
    return execState
}
第六步:节点重跑,Interrupt 返回恢复值

BSP 循环从 checkpoint 恢复,nextNodes = ["request_approval"],于是再次执行 requestApprovalNode

「同一个节点函数再次执行」,再次调用 graph.Interrupt()

代码语言:javascript
复制
resume, err := graph.Interrupt(ctx, st, "request_approval", prompt)

这次 state 中有 __resume_map__

代码语言:javascript
复制
// Interrupt 内部
resumeMap := state["__resume_map__"]  // {"request_approval": "approve"}
resumeValue := resumeMapTyped["request_approval"]  // "approve"
usedMap["request_approval"] = "approve"
delete(resumeMapTyped, "request_approval")
return "approve", nil  // ← 直接返回,不中断!

节点函数继续执行,正常返回审批结果:

代码语言:javascript
复制
decision := resume.(string)  // "approve"
return graph.State{"approval": "approve"}, nil

之后 process_order 节点正常执行,整个流程完成。

4.4 编程式中断的核心机制图

代码语言:javascript
复制
首次执行:
  节点函数 → Interrupt("request_approval", prompt)
           → state 中无恢复值
           → 返回 InterruptError  ← 中断!
           → 保存 checkpoint (nextNodes: ["request_approval"])

恢复执行:
  节点函数 → Interrupt("request_approval", prompt)  ← 同一个函数重跑
           → state 中有恢复值 "approve"
           → 返回 ("approve", nil)  ← 不中断!
           → 节点正常完成

五、两种模式的 Checkpoint 对比

理解两种模式 Checkpoint 的差异,是理解整个 HIL 机制的关键:

代码语言:javascript
复制
外部中断 Checkpoint:                   编程式中断 Checkpoint:
┌─────────────────────────┐           ┌─────────────────────────┐
│ state: {已更新}          │           │ state: {未更新}          │
│ nextNodes: [下一个节点]   │           │ nextNodes: [当前节点]    │
│ SkipRerun: true          │           │ SkipRerun: false         │
│ interruptValue:          │           │ interruptValue:          │
│   {key: "external_..."}  │           │   {审批请求内容}          │
│                          │           │                          │
│ 恢复: 直接跑下一个节点    │           │ 恢复: 需要 ResumeCommand │
│       不需要额外输入      │           │       携带人工决策值      │
└─────────────────────────┘           └─────────────────────────┘

维度

外部中断

编程式中断

state 是否包含当前节点输出

是(节点跑完了)

否(节点中途中断)

nextNodes

下一个待执行节点

被中断的当前节点

SkipRerun

true(跳过重跑)

false(必须重跑)

interruptValue

固定标识

业务数据(审批请求)

恢复时是否需要额外数据

不需要

需要(ResumeCommand)

六、实战:多节点审批链

在真实业务中,一个流程可能有多个需要人工介入的节点。Interruptkey 参数和 ResumeCommandResumeMap 就是为此设计的:

代码语言:javascript
复制
// 节点 A:金额审批
func amountApproval(ctx context.Context, st graph.State) (any, error) {
    resume, err := graph.Interrupt(ctx, st, "amount_check", map[string]any{
        "message": "大额订单 ¥50000,是否批准?",
    })
    if err != nil {
        returnnil, err
    }
    return graph.State{"amount_approved": resume.(bool)}, nil
}

// 节点 B:合规审批
func complianceApproval(ctx context.Context, st graph.State) (any, error) {
    resume, err := graph.Interrupt(ctx, st, "compliance_check", map[string]any{
        "message": "该订单涉及跨境交易,是否合规?",
    })
    if err != nil {
        returnnil, err
    }
    return graph.State{"compliance_approved": resume.(bool)}, nil
}

恢复时按 key 分发恢复值:

代码语言:javascript
复制
resumeCmd := graph.NewResumeCommand().
    AddResumeValue("amount_check", true).
    AddResumeValue("compliance_check", true)

每个 Interrupt 调用根据自己的 key 从 ResumeMap 中取对应的值,互不干扰。

七、HIL 的架构设计要点

7.1 Checkpoint 持久化是基础

整个 HIL 机制建立在 Checkpoint 之上。没有 Checkpoint,中断后就无法恢复。在生产环境中,应使用持久化存储(数据库、Redis 等)而非内存存储。

7.2 事件驱动的前后端协作

中断发生时,框架通过事件通道将中断信息(包括 checkpointIDinterruptKeyinterruptValue)推送给前端。前端据此展示相应的 UI(审批弹窗、暂停提示等),用户操作后携带 checkpointID 和恢复值调用后端恢复接口。

代码语言:javascript
复制
后端 Executor                          前端 UI
    │                                      │
    ├── InterruptEvent ──────────────────→ │
    │   {checkpointID, key, value}         │
    │                                      ├── 展示审批弹窗
    │                                      │
    │                                      ├── 用户点击"批准"
    │                                      │
    │ ←────── Resume Request ──────────────┤
    │   {checkpointID, resumeValue}        │
    │                                      │
    ├── 恢复执行 ──────────────────────────→│
    │                                      │

7.3 幂等性保护

Interrupt 函数内部通过 usedMap 实现幂等保护——如果同一个 key 的恢复值已经被使用过,再次调用会直接返回之前的值,避免重复中断:

代码语言:javascript
复制
if usedValue, exists := usedMap[key]; exists {
    return usedValue, nil  // 幂等:返回之前用过的值
}

7.4 超时与降级

外部中断支持超时机制,适用于紧急场景。但编程式中断是"无限等待"的——Checkpoint 保存在持久化存储中,理论上可以等待任意长时间。在业务层面,你可能需要自己实现超时逻辑(比如审批超过 24 小时自动拒绝)。

八、总结

Human-in-the-Loop 的核心是「可恢复的执行中断」。在 Agent Graph 执行框架中,它通过两种互补的机制实现:

  1. 「外部中断」:从外部发送停止信号,让正在运行的流水线在安全点暂停。就像给 Agent 装了一个遥控器上的暂停键。
  2. 「编程式中断」:在节点代码中主动触发暂停,等待人工输入后重跑节点。就像程序弹出了一个确认对话框,必须用户点击后才能继续。

两种模式共同依赖 「Checkpoint 持久化」机制,将执行状态保存下来,实现跨时间、跨进程的执行恢复。无论是紧急叫停还是审批流程,HIL 让 Agent 在自主和可控之间找到了平衡点。

在 Agent 应用越来越复杂的今天,HIL 不是可选项,而是生产级 Agent 系统的必备能力。让 AI 学会在关键时刻"等一下",是构建可信赖 Agent 系统的重要一步。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有文化的技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、为什么 Agent 需要 Human-in-the-Loop
  • 二、HIL 的两种模式
  • 三、场景一:运营人员的"暂停"按钮(外部中断)
    • 3.1 业务场景
    • 3.2 核心设计
    • 3.3 执行流程详解
      • 第一步:启动执行
      • 第二步:prepare 节点正常执行
      • 第三步:运营人员按下暂停
      • 第四步:在下一步开始前拦截
      • 第五步:保存中断 Checkpoint
      • 第六步:恢复执行
    • 3.4 Planned vs Forced:两种外部中断策略
  • 四、场景二:订单审批的"确认对话框"(编程式中断)
    • 4.1 业务场景
    • 4.2 核心设计
    • 4.3 执行流程详解
      • 第一步:节点内主动中断
      • 第二步:InterruptError 向上传播
      • 第三步:保存中断 Checkpoint
      • 第四步:前端收到审批请求并展示
      • 第五步:携带审批结果恢复执行
      • 第六步:节点重跑,Interrupt 返回恢复值
    • 4.4 编程式中断的核心机制图
  • 五、两种模式的 Checkpoint 对比
  • 六、实战:多节点审批链
  • 七、HIL 的架构设计要点
    • 7.1 Checkpoint 持久化是基础
    • 7.2 事件驱动的前后端协作
    • 7.3 幂等性保护
    • 7.4 超时与降级
  • 八、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档