❝当你把一个 Agent 拆成多个子智能体协作时,"多轮对话"变成了一个全新的问题——每个子智能体都有自己的记忆,用户的一句追问可能依赖三个智能体之前的回答。本文将系统讲解如何在分布式多智能体架构下,优雅地管理多轮对话上下文。❞
单体 Agent 的多轮对话很简单——所有消息都在一个 messages[] 列表里,LLM 天然能理解上下文。但当架构变成这样时:
用户 ←→ 主智能体(路由/编排)←→ 子智能体 A(天气查询)
←→ 子智能体 B(活动推荐)
←→ 子智能体 C(旅行规划)
问题就出现了:
「核心挑战」:用户的对话是连续的,但子智能体的视角是割裂的。跨智能体的信息如何流动?
在我们讨论的架构中,有一个关键前提:「每个子智能体都是独立部署的服务,拥有自己完整的 session 管理和对话历史记录。」
这意味着:
user/assistant 消息列表,LLM 理解起来最自然这个前提决定了我们的方案核心:「增量注入,而非全量重写」。
主智能体的职责:
1. 记录所有轮次的完整对话(全局视角)
2. 每次调用子智能体时,只传"当前查询 + 需要注入的跨智能体上下文"
3. 让 LLM 自主判断哪些历史信息需要注入
子智能体的职责:
1. 维护自己的 session(只包含跟自己相关的对话)
2. 接收并理解注入的跨智能体上下文
3. 正常多轮对话
package context
import (
"github.com/cloudwego/eino/schema"
)
// ================== 主智能体侧 ==================
// ConversationEntry 主智能体的对话历史条目
type ConversationEntry struct {
IsUserInput bool `json:"is_user_input"`// 是否为用户输入
AgentName string `json:"agent_name"` // 来源智能体名称
Round int `json:"round"` // 对话轮次
Message *schema.Message `json:"message"` // 消息内容
}
// ConversationStore 主智能体的全局会话存储
type ConversationStore struct {
SessionID string `json:"session_id"`
Entries []*ConversationEntry `json:"entries"`
CurrentRound int `json:"current_round"`
}
// ================== A2A 协议层 ==================
// A2ARequest 发送给子智能体的请求(增量模式)
type A2ARequest struct {
// SessionID 主会话 ID
// 子智能体用 "主SessionID_自身名称" 构建自己的 session key
SessionID string`json:"session_id"`
// CurrentQuery 当前轮次的用户查询
CurrentQuery string`json:"current_query"`
// ContextInjections 需要注入的跨智能体上下文
// 只有当前轮需要其他智能体的信息时才会有值
ContextInjections []*ContextInjection `json:"context_injections,omitempty"`
// Round 当前对话轮次
Round int`json:"round"`
}
// ContextInjection 跨智能体的上下文注入项
type ContextInjection struct {
SourceAgent string`json:"source_agent"`// 信息来源智能体
Round int `json:"round"` // 信息所属的原始轮次
Content string`json:"content"` // 信息内容
}
// A2AResponse 子智能体的响应
type A2AResponse struct {
SessionID string `json:"session_id"`
Message *schema.Message `json:"message"`
}
// ================== 子智能体侧 ==================
// AgentSession 子智能体自己的 session
type AgentSession struct {
SessionID string `json:"session_id"`// 格式: "{主SessionID}_{agentName}"
AgentName string `json:"agent_name"`
Messages []*schema.Message `json:"messages"`
}
为什么选择增量注入而不是全量重写?对比如下:
维度 | 全量重写 | 增量注入 |
|---|---|---|
「传输方式」 | 每轮传完整重写后的消息列表 | 只传当前查询 + 需要注入的摘要 |
「网络开销」 | 随轮次线性增长 | 基本恒定 |
「子智能体对话连贯性」 | 每次是"新对话",靠重写模拟 | 「原生连贯」,user/assistant 交替自然 |
「无关轮次的处理」 | 也会收到无关轮次的消息 | 「不会收到无关信息」 |
「主智能体复杂度」 | 需要完整的历史重写逻辑 | 只需判断注入哪些信息 |
「容错性」 | 子智能体重启无影响 | 需要 session 持久化 |
让我们用一个具体例子,逐轮展示每个智能体的 session 状态变化。
「用户」:"帮我查询北京今天的天气"
「主智能体」判断:天气问题 → 路由到 Agent A。第 1 轮无历史,无需注入上下文。
// 发给 Agent A 的请求
a2aReq := &A2ARequest{
SessionID: "session-001",
CurrentQuery: "帮我查询北京今天的天气",
ContextInjections: nil, // 无需注入
Round: 1,
}
「Agent A」 收到后:
"session-001_agent_a"「第 1 轮结束后的状态快照:」
┌─── 主智能体 ConversationStore ───┐
│ R1: user → "查北京天气" │
│ R1: agent_a → "晴,25-32℃,良" │
└──────────────────────────────────┘
┌─── Agent A Session ──────────────┐
│ user: "查北京天气" │
│ assistant: "晴,25-32℃,良" │
└──────────────────────────────────┘
┌─── Agent B Session ──────────────┐
│ (不存在,从未被调用) │
└──────────────────────────────────┘
「用户」:"推荐一个适合今天天气的户外活动"
「主智能体」判断:活动推荐 → 路由到 Agent B。关键决策——"推荐活动"依赖"天气信息",需要注入 Agent A 的回复。
a2aReq := &A2ARequest{
SessionID: "session-001",
CurrentQuery: "推荐一个适合今天天气的户外活动",
Round: 2,
ContextInjections: []*ContextInjection{
{
SourceAgent: "agent_a",
Round: 1,
Content: "北京今天晴,25-32℃,空气质量良",
},
},
}
「Agent B」 收到后:
"session-001_agent_b"「第 2 轮结束后的状态快照:」
┌─── 主智能体 ConversationStore ──────────────────────┐
│ R1: user → "查北京天气" │
│ R1: agent_a → "晴,25-32℃,良" │
│ R2: user → "推荐适合今天天气的户外活动" │
│ R2: agent_b → "推荐:1.奥森跑步 2.颐和园划船 3.香山徒步" │
└─────────────────────────────────────────────────────┘
┌─── Agent A Session(未变化)──────┐
│ user: "查北京天气" │
│ assistant: "晴,25-32℃,良" │
└──────────────────────────────────┘
┌─── Agent B Session ──────────────────────────────────┐
│ user: "[ctx] agent_a: 晴,25-32℃,良" │
│ user: "推荐适合今天天气的户外活动" │
│ assistant: "推荐:1.奥森跑步 2.颐和园划船 3.香山徒步" │
└──────────────────────────────────────────────────────┘
「用户」:"如果去颐和园划船,明天天气怎么样,适合吗"
「主智能体」判断:天气查询 → 路由回 Agent A。用户提到"颐和园划船"来自 Agent B 的推荐,需要注入。
a2aReq := &A2ARequest{
SessionID: "session-001",
CurrentQuery: "如果去颐和园划船,明天天气怎么样,适合吗",
Round: 3,
ContextInjections: []*ContextInjection{
{
SourceAgent: "agent_b",
Round: 2,
Content: "推荐:1.奥森跑步 2.颐和园划船 3.香山徒步",
},
},
}
「Agent A」 收到后:在已有 session 基础上追加(不是新建 session!),LLM 既能看到第 1 轮自己查过今天天气,也能看到 Agent B 推荐了颐和园划船。
「第 3 轮结束后的状态快照:」
┌─── Agent A Session(关键!完整连贯) ─────────────────────────────────┐
│ user: "查北京天气" ← 第1轮,自己的历史 │
│ assistant: "晴,25-32℃,良" ← 第1轮,自己的回复 │
│ user: "[ctx] agent_b: 奥森跑步/颐和园划船/香山徒步" ← 跨智能体注入│
│ user: "颐和园划船,明天天气怎么样" ← 第3轮,当前问题 │
│ assistant: "明天多云转阴,22-28℃,建议上午去" ← 第3轮,自己的回复 │
└──────────────────────────────────────────────────────────────────────┘
注意:Agent A 的 session 中没有第 2 轮用户问的"推荐活动"
因为那个问题跟 Agent A 无关,只注入了 Agent B 的结果作为参考
上面的推演中,"判断注入哪些跨智能体信息"这个决策,有两种实现方式。推荐「让 LLM 自主决策」——把子智能体封装为 Function Calling 工具,让主智能体的 LLM 自己判断。
用户 Query
↓
主智能体 LLM(System Prompt 告知规则)
↓ Function Calling
├── 选择调用哪个智能体工具
├── 决定传入哪些历史上下文
└── 生成 tool call arguments
↓
子智能体工具 → 解析参数 → 组装 A2A 请求 → 调用远程智能体
↓
返回结果 → 主智能体 LLM 整合输出
const mainAgentInstruction = `你是一个智能路由助手,负责将用户的问题分发给合适的子智能体处理。
## 可用工具
- call_weather_agent: 天气查询智能体,处理天气相关问题
- call_activity_agent: 活动推荐智能体,处理活动和出行推荐
- call_travel_agent: 旅行规划智能体,处理旅行行程规划
## 工作规则
1. 根据用户意图,判断应该调用哪个智能体工具
2. 判断之前的对话历史中,哪些子智能体的回复对当前问题有参考价值
3. 如果有相关的历史信息,将这些信息按时间顺序放入工具的 context_messages 参数
4. 每条 context_message 需要标注来源智能体名称和原始内容
## 注意事项
- 只传递与当前问题相关的历史信息,不要传无关内容
- context_messages 按时间顺序排列
- 用户的原始问题通过 query 参数传递,不要混入 context_messages`
// 子智能体工具的参数结构
type CallAgentArgs struct {
// Query 当前用户的问题
Query string`json:"query"`
// ContextMessages 相关的历史上下文
// 由主智能体 LLM 判断后选择性传入
ContextMessages []ContextMessage `json:"context_messages,omitempty"`
}
type ContextMessage struct {
SourceAgent string`json:"source_agent"`// 来源智能体
Content string`json:"content"` // 信息内容
}
// 基于 A2A 协议的工具定义
var callWeatherAgentTool = &schema.ToolInfo{
Name: "call_weather_agent",
Desc: "调用天气查询智能体,查询天气相关信息",
ParamsOneOf: schema.NewParamsOneOfByParams(
map[string]*schema.ParameterInfo{
"query": {
Type: "string",
Desc: "用户关于天气的具体问题",
Required: true,
},
"context_messages": {
Type: "array",
Desc: "与当前问题相关的其他智能体历史回复," +
"按时间顺序排列。每条消息包含 source_agent 和 content",
Required: false,
},
},
),
}
「第 2 轮」,主智能体 LLM 看到的完整消息历史:
[system] 你是一个智能路由助手...
[user] 帮我查询北京今天的天气
[assistant] (tool_call: call_weather_agent, {query: "帮我查询北京今天的天气"})
[tool] 北京今天晴,25-32℃,空气质量良
[assistant] 北京今天晴,气温25-32℃,空气质量良好。
[user] 推荐一个适合今天天气的户外活动
LLM 自然会产出:
{
"name": "call_activity_agent",
"arguments": {
"query": "推荐一个适合今天天气的户外活动",
"context_messages": [
{
"source_agent": "weather_agent",
"content": "北京今天晴,25-32℃,空气质量良"
}
]
}
}
❝LLM 判断:"推荐活动"需要参考天气信息,自动注入了 weather_agent 的回复 ✅ ❞
这个方案的优势在于:主智能体的 session 是标准的 ReAct 对话格式(user → assistant/tool_call → tool → assistant),LLM 天然看得到所有轮次的 tool 调用记录,有足够信息做出判断。
// RemoteAgentTool 远程子智能体工具
type RemoteAgentTool struct {
agentName string
client A2AClient
}
// InvokableRun 工具执行入口
func (t *RemoteAgentTool) InvokableRun(
ctx context.Context,
argumentsInJSON string,
opts ...tool.Option,
) (string, error) {
// 1. 解析 LLM 传入的参数
var args CallAgentArgs
if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil {
return"", fmt.Errorf("parse args: %w", err)
}
// 2. 转换为 A2A 上下文注入
var injections []*ContextInjection
for _, cm := range args.ContextMessages {
injections = append(injections, &ContextInjection{
SourceAgent: cm.SourceAgent,
Content: cm.Content,
})
}
// 3. 构建 A2A 请求
sessionID := getSessionID(ctx)
a2aReq := &A2ARequest{
SessionID: sessionID,
CurrentQuery: args.Query,
ContextInjections: injections,
}
// 4. 调用远程子智能体
resp, err := t.client.Send(ctx, a2aReq)
if err != nil {
return"", fmt.Errorf("call %s: %w", t.agentName, err)
}
return resp.Message.Content, nil
}
A2A 协议原生支持 SSE(Server-Sent Events)流式输出。协议定义了两种交互模式:
tasks/send):请求-响应,一次性返回tasks/sendSubscribe):通过 SSE 逐步推送流式模式下,主要有两类 SSE 事件:
event: TaskStatusUpdateEvent // 任务状态变更(working → completed)
event: TaskArtifactUpdateEvent // 产出物的增量推送
一个典型的 SSE 流:
event: TaskStatusUpdateEvent
data: {"id":"task-001","status":{"state":"working"},"final":false}
event: TaskArtifactUpdateEvent
data: {"id":"task-001","artifact":{"parts":[{"type":"text","text":"北京今天"}],"index":0,"append":true}}
event: TaskArtifactUpdateEvent
data: {"id":"task-001","artifact":{"parts":[{"type":"text","text":"晴,25-32℃"}],"index":0,"append":true}}
event: TaskArtifactUpdateEvent
data: {"id":"task-001","artifact":{"parts":[{"type":"text","text":",空气质量良"}],"index":0,"append":true,"lastChunk":true}}
event: TaskStatusUpdateEvent
data: {"id":"task-001","status":{"state":"completed"},"final":true}
子智能体的回复是逐 chunk 到达的,主智能体需要同时做两件事:
用户 ←──SSE──── 主智能体 ←──SSE──── 子智能体
chunk1 ←─ 转发+累积 ←── chunk1
chunk2 ←─ 转发+累积 ←── chunk2
chunk3 ←─ 转发+累积 ←── chunk3
│
流结束后:
buffer.FullContent()
→ 保存到 ConversationStore
// StreamBuffer 流式输出的累积缓冲器
type StreamBuffer struct {
mu sync.Mutex
chunks []string
complete bool
}
// Append 追加一个 chunk
func (b *StreamBuffer) Append(text string) {
b.mu.Lock()
defer b.mu.Unlock()
b.chunks = append(b.chunks, text)
}
// Complete 标记流结束
func (b *StreamBuffer) Complete() {
b.mu.Lock()
defer b.mu.Unlock()
b.complete = true
}
// FullContent 获取完整内容
func (b *StreamBuffer) FullContent() string {
b.mu.Lock()
defer b.mu.Unlock()
return strings.Join(b.chunks, "")
}
// HandleUserQueryStream 流式处理用户查询
func (o *Orchestrator) HandleUserQueryStream(
ctx context.Context,
sessionID string,
userQuery string,
userStream chan<- string, // 给用户的 SSE 输出通道
) error {
store := o.getOrCreateSession(sessionID)
round := store.CurrentRound + 1
store.CurrentRound = round
// 1. 记录用户输入
store.AddEntry(&ConversationEntry{
IsUserInput: true,
AgentName: "user",
Round: round,
Message: schema.UserMessage(userQuery),
})
// 2. 意图路由
targetAgent := o.routeIntent(ctx, store, userQuery)
// 3. 构建 A2A 请求
injections := store.BuildContextInjections(ctx, targetAgent, userQuery)
a2aReq := &A2ARequest{
SessionID: sessionID,
CurrentQuery: userQuery,
ContextInjections: injections,
Round: round,
}
// 4. 流式调用子智能体:转发 + 累积
buffer := &StreamBuffer{}
err := o.streamCallAgent(ctx, targetAgent, a2aReq, func(event SSEEvent) {
switch e := event.(type) {
case *TaskArtifactUpdateEvent:
for _, part := range e.Artifact.Parts {
if part.Type == "text" {
userStream <- part.Text // 实时转发给用户
buffer.Append(part.Text) // 同时累积
}
}
case *TaskStatusUpdateEvent:
if e.Status.State == "completed" {
buffer.Complete()
}
}
})
if err != nil {
return fmt.Errorf("stream call agent %s: %w", targetAgent, err)
}
// 5. 流结束后保存完整内容
fullContent := buffer.FullContent()
store.AddEntry(&ConversationEntry{
IsUserInput: false,
AgentName: targetAgent,
Round: round,
Message: schema.AssistantMessage(fullContent),
})
close(userStream)
returnnil
}
// A2AStreamClient SSE 流式 A2A 客户端
type A2AStreamClient struct {
endpoint string
httpClient *http.Client
}
// SendSubscribe 发起流式 A2A 调用
func (c *A2AStreamClient) SendSubscribe(
ctx context.Context,
req *A2ARequest,
callback func(SSEEvent),
) error {
// 1. 构建 JSON-RPC 请求
jsonRPCReq := map[string]interface{}{
"jsonrpc": "2.0",
"method": "tasks/sendSubscribe",
"params": req,
"id": uuid.New().String(),
}
body, err := json.Marshal(jsonRPCReq)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
// 2. 发起 HTTP 请求,Accept: text/event-stream
httpReq, err := http.NewRequestWithContext(
ctx, http.MethodPost, c.endpoint, bytes.NewReader(body),
)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "text/event-stream")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer resp.Body.Close()
// 3. 逐行解析 SSE 流
return c.parseSSEStream(ctx, resp.Body, callback)
}
// parseSSEStream 解析 SSE 事件流
func (c *A2AStreamClient) parseSSEStream(
ctx context.Context,
reader io.Reader,
callback func(SSEEvent),
) error {
scanner := bufio.NewScanner(reader)
var eventType string
var dataLines []string
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
line := scanner.Text()
switch {
case strings.HasPrefix(line, "event:"):
eventType = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
case strings.HasPrefix(line, "data:"):
dataLines = append(dataLines, strings.TrimPrefix(line, "data:"))
case line == "": // 空行 = 事件边界
if eventType != "" && len(dataLines) > 0 {
data := strings.Join(dataLines, "\n")
if event, err := parseEvent(eventType, data); err == nil {
callback(event)
}
}
eventType = ""
dataLines = nil
}
}
return scanner.Err()
}
无论同步还是流式模式,子智能体处理请求的核心逻辑是一致的:
// SubAgentHandler 子智能体的 A2A 请求处理器
type SubAgentHandler struct {
agentName string
sessions map[string]*AgentSession // 实际应持久化到 Redis/DB
mu sync.RWMutex
}
// HandleA2ARequest 处理来自主智能体的请求
func (h *SubAgentHandler) HandleA2ARequest(
ctx context.Context,
req *A2ARequest,
) (*A2AResponse, error) {
subSessionID := fmt.Sprintf("%s_%s", req.SessionID, h.agentName)
// 1. 获取或创建 session
h.mu.Lock()
session, exists := h.sessions[subSessionID]
if !exists {
session = &AgentSession{
SessionID: subSessionID,
AgentName: h.agentName,
Messages: make([]*schema.Message, 0),
}
h.sessions[subSessionID] = session
}
h.mu.Unlock()
// 2. 注入跨智能体上下文(增量)
for _, injection := range req.ContextInjections {
contextMsg := schema.UserMessage(
fmt.Sprintf("For context: [%s] said: %s",
injection.SourceAgent, injection.Content),
)
session.Messages = append(session.Messages, contextMsg)
}
// 3. 追加当前用户查询
session.Messages = append(session.Messages,
schema.UserMessage(req.CurrentQuery))
// 4. 调用 LLM(session.Messages 包含完整历史)
resp, err := h.llm.Generate(ctx, session.Messages)
if err != nil {
returnnil, fmt.Errorf("agent %s generate: %w", h.agentName, err)
}
// 5. 追加回复到自己的 session
session.Messages = append(session.Messages, resp)
return &A2AResponse{
SessionID: subSessionID,
Message: resp,
}, nil
}
┌──────────────────────────────────────────────────────────────────┐
│ 主智能体 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ ConversationStore(全局视角,记录所有轮次 + 来源) │ │
│ │ R1: user→"查天气" agent_a→"晴,25-32℃" │ │
│ │ R2: user→"推荐活动" agent_b→"跑步/划船/徒步" │ │
│ │ R3: user→"颐和园明天天气" agent_a→"多云转阴,建议上午" │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 主智能体 LLM 的 session 是标准 ReAct 格式: │
│ user → tool_call(call_xxx_agent) → tool_result → assistant │
│ │
│ 职责: │
│ • 意图路由:判断调用哪个子智能体 │
│ • 上下文决策:LLM 自主判断需要注入哪些跨智能体信息 │
│ • 传输:只传 CurrentQuery + ContextInjections(增量) │
│ • SSE:转发 chunk 给用户 + 累积 buffer + 流结束后保存 │
└────────────┬───────────────────────────────┬─────────────────────┘
│ A2A (SSE/同步) │ A2A (SSE/同步)
┌────────▼────────┐ ┌────────▼────────┐
│ Agent A │ │ Agent B │
│ (天气查询) │ │ (活动推荐) │
│ │ │ │
│ 独立 Session: │ │ 独立 Session: │
│ u: 查天气 │ │ u: [ctx]agent_a │
│ a: 晴,25-32℃ │ │ u: 推荐活动 │
│ u: [ctx]agent_b │ │ a: 跑步/划船/徒步│
│ u: 颐和园明天? │ │ │
│ a: 多云,建议上午 │ │ │
│ │ │ │
│ 特点: │ │ 特点: │
│ ✓ 只有相关的对话 │ │ ✓ 天气信息被注入 │
│ ✓ 没有第2轮无关QA │ │ ✓ 对话原生连贯 │
│ ✓ 跨智能体信息以 │ │ │
│ 参考形式存在 │ │ │
└─────────────────┘ └─────────────────┘
主智能体的 session 包含所有轮次的 tool_call 记录,随着对话进行会持续增长。解决方案:
子智能体的 session 需要持久化(Redis/数据库),否则服务重启后历史对话会丢失。建议:
注入的跨智能体信息以 user 角色消息追加,并带有明确的前缀标记:
For context: [weather_agent] said: 北京今天晴,25-32℃
这样做的好处:
user/assistant 交替的对话结构❝「流式是"传输层"的事,上下文保存是"流结束后"的事。用 buffer 把两者解耦。」 ❞
StreamBuffer 实时转发 + 累积,流结束后一次性保存维度 | 规则引擎 | LLM 自主决策(推荐) |
|---|---|---|
「决策方式」 | 代码硬编码 | LLM 通过 Prompt 自主判断 |
「灵活性」 | 需预设规则 | 自动适应新场景 |
「准确性」 | 简单场景准确 | 语义理解更智能 |
「实现成本」 | 需写规则逻辑 | 只需好 Prompt + Tool 定义 |
「推荐场景」 | 子智能体少、规则明确 | 子智能体多、场景复杂 |
多智能体架构下的上下文管理,核心就是三件事:
每个子智能体的 session 里只有跟自己相关的对话,user/assistant 交替是原生连贯的,LLM 理解起来最自然。跨智能体信息通过标记前缀的 user 消息增量注入,既不污染对话结构,又提供了必要的上下文。
流式场景只是在传输层增加了 SSE 的处理——用缓冲器解耦"实时转发"和"完整保存",不改变上下文管理的本质逻辑。