在前五篇文章中,我们的 Agent 已经能接入多渠道、自主思考行动、动态加载技能,甚至能让多个子 Agent 并行协作。但有一个致命的弱点:每次新会话,它都像失忆一样从零开始。用户需要反复说明偏好、重复上下文,这显然不是企业级产品该有的体验。OpenClaw 通过创新的双源记忆系统解决了这个问题,而我们将在本章实现一个融合短期会话窗口与长期向量记忆的记忆管理架构,并集成到已有的 Agent 运行时中。
系列文章:
手把手构建企业级 Agent 框架:从 OpenClaw 架构到自主实现
手把手构建企业级 Agent 框架(二):Gateway 网关与多渠道接入
手把手构建企业级 Agent 框架(三):Pi Agent 运行时与 ReAct 循环
手把手构建企业级 Agent 框架(四):Skill 系统——知识注入与能力扩展
手把手构建企业级 Agent 框架(五):多智能体并行与任务委派,突破单 Agent 的性能与上下文瓶颈
一、记忆 ≠ 上下文窗口:双源模型的真谛
许多框架将“记忆”简单等同于把历史消息全部塞进提示词。当对话超过几千 token,不仅成本飙升,LLM 也容易丢失重点。OpenClaw 的做法截然不同:
源一:短期记忆(Working Memory)
—— 当前会话最近 N 轮对话,以原始形式保留在上下文窗口内,保证连贯性。
源二:长期记忆(Long-term Memory)
—— 历史会话的摘要、关键事实和用户偏好,通过向量检索召回,按需注入少量最相关的信息。
这种双源设计优雅地平衡了连贯性与成本:近期对话保证流畅,长期回顾提供个性化。下面这张架构图展示了记忆系统如何融入整体框架:
流程解析:
用户消息到达,Agent 从短期存储读取最近对话(如最近 5 轮),并调用检索器从长期记忆中搜索与当前问题相关的历史事实。
将这两部分记忆与技能、工具描述一起注入系统提示词。
经过本轮交互后,将用户和助手消息追加到短期存储,并检查是否超过窗口大小。若超过,触发自动摘要器:调用 LLM 将最旧的若干轮对话压缩为一段摘要,存入长期记忆,并从短期窗口中移除。
二、接口与数据结构设计
清晰的接口是扩展性的保证。我们定义MemoryManager抽象,统一对外暴露。
2.1 核心数据结构
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
import time
classMemoryType(Enum):
SHORT_TERM ="short_term"
LONG_TERM ="long_term"
@dataclass
classMemoryItem:
role:str# user, assistant, system
content:str
timestamp:float= field(default_factory=time.time)
metadata: Dict = field(default_factory=dict)
@dataclass
classLongTermFact:
"""存入向量库的长期记忆单元"""
id:str
text:str# 可检索的文本
embedding: Optional[List[float]]=None
metadata: Dict = field(default_factory=dict)# session_id, timestamp, importance 等
2.2 MemoryManager 接口
classMemoryManager:
asyncdefadd_short_term(self, session_id:str, role:str, content:str)->None:...
asyncdefget_short_term(self, session_id:str, last_n:int=10)-> List[MemoryItem]:...
asyncdefadd_long_term(self, fact: LongTermFact)->None:...
asyncdefretrieve_long_term(self, query:str, top_k:int=3)-> List[LongTermFact]:...
asyncdefcompress_and_store(self, session_id:str)->None:...# 触发摘要
三、代码实战:实现基于 Redis + ChromaDB 的双源记忆
我们将用Redis管理短期列表,ChromaDB作为长期向量库,并实现自动摘要逻辑。
3.1 环境准备
pip install redis chromadb sentence-transformers
# 确保本地 Redis 运行在 6379 端口,或修改连接字符串
3.2 项目结构
eclaw-memory/
├── memory_manager.py # 核心 MemoryManager
├── agent.py # 集成记忆的 AgentRuntime(基于之前版本修改)
├── tools.py
├── skill.py
└── test_memory.py # 测试脚本
3.3 MemoryManager 完整实现
import json
import asyncio
import time
from typing import List, Optional, Dict
import redis.asyncio as aioredis
import chromadb
from chromadb.utils import embedding_functions
# 使用轻量级嵌入模型(也可换用 OpenAI embeddings)
EMBEDDING_MODEL ="all-MiniLM-L6-v2"
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=EMBEDDING_MODEL
)
classMemoryManager:
def__init__(self, redis_url="redis://localhost:6379", short_term_limit=10):
self.redis = aioredis.from_url(redis_url, decode_responses=True)
self.short_term_limit = short_term_limit
# 初始化 ChromaDB 客户端
self.chroma_client = chromadb.Client()
# 获取或创建集合,并指定嵌入函数
self.long_term_collection = self.chroma_client.get_or_create_collection(
name="agent_long_term_memory",
embedding_function=sentence_transformer_ef
)
# 为摘要调用准备一个 LLM 函数(外部注入,这里简单定义一个调用接口)
self._summarize_fn =None# 将在集成 Agent 时设置
# ---- 短期记忆 ----
asyncdefadd_short_term(self, session_id:str, role:str, content:str):
key =f"memory:short:{session_id}"
item = json.dumps({"role": role,"content": content,"timestamp": time.time()})
await self.redis.rpush(key, item)
# 检查是否超出限制,若超出则触发压缩
current_len =await self.redis.llen(key)
if current_len > self.short_term_limit *2:# 每个轮次包含 user+assistant,所以乘以2
await self.compress_and_store(session_id)
asyncdefget_short_term(self, session_id:str, last_n:int=None)-> List[dict]:
key =f"memory:short:{session_id}"
if last_n isNone:
last_n = self.short_term_limit *2
items =await self.redis.lrange(key,-last_n,-1)
return[json.loads(i)for i in items]
# ---- 长期记忆 ----
asyncdefadd_long_term(self, fact_text:str, metadata: Dict =None):
"""向向量库添加一条长期记忆"""
fact_id =f"fact_{int(time.time()*1000)}_{hash(fact_text)%10000}"
self.long_term_collection.add(
documents=[fact_text],
metadatas=[metadata or{}],
ids=[fact_id]
)
asyncdefretrieve_long_term(self, query:str, top_k:int=3)-> List[dict]:
"""根据查询文本检索最相关的长期记忆"""
results = self.long_term_collection.query(
query_texts=[query],
n_results=top_k
)
retrieved =[]
if results['documents']and results['documents'][0]:
for doc, meta, dist inzip(results['documents'][0],
results['metadatas'][0],
results['distances'][0]):
retrieved.append({"text": doc,"metadata": meta,"distance": dist})
return retrieved
# ---- 自动摘要与压缩 ----
asyncdefcompress_and_store(self, session_id:str):
"""
将最旧的若干轮对话压缩成摘要,存入长期记忆,并从短期列表中移除。
"""
key =f"memory:short:{session_id}"
# 取最早的 4 条消息(约2轮对话)进行压缩
old_items =await self.redis.lrange(key,0,3)
ifnot old_items:
return
# 删除这4条
await self.redis.ltrim(key,4,-1)
# 调用摘要函数(需要注入 LLM 生成摘要)
if self._summarize_fn:
conversation_text ="\n".join([f"{json.loads(i)['role']}: {json.loads(i)['content']}"for i in old_items])
summary =await self._summarize_fn(conversation_text)
if summary:
await self.add_long_term(summary,{"session_id": session_id,"type":"summary"})
else:
# 如果没有摘要函数,简单拼接存档
raw_concat =" ".join([json.loads(i)['content']for i in old_items])
await self.add_long_term(raw_concat[:500],{"session_id": session_id,"type":"raw_truncated"})
3.4 集成到 Agent 运行时
我们修改agent.py,在run()方法中注入记忆检索步骤,并在结束后保存。
# agent.py 中的关键改动
classAgentRuntime:
def__init__(self,...):
# ... 其他初始化
self.memory = MemoryManager()
# 注入摘要函数(使用 Agent 自己的 LLM 调用能力)
self.memory._summarize_fn = self._generate_summary
asyncdef_generate_summary(self, conversation_text:str)->str:
"""调用 LLM 生成对话摘要"""
# 简化示例,直接调用 mock(实际调真实模型)
# 这里可直接复用 _call_llm_with_retry 但需要构造特殊提示词
prompt =f"请将以下对话压缩为一段简短摘要,包含关键信息和用户偏好:\n{conversation_text}"
# 调用模拟 LLM(实际替换为真实调用)
returnf"摘要:{conversation_text[:50]}..."
asyncdefrun(self, session_id:str, user_input:str, ctx=None):
# 1. 获取短期记忆
short_history =await self.memory.get_short_term(session_id, last_n=10)
# 2. 检索长期记忆
long_facts =await self.memory.retrieve_long_term(user_input, top_k=3)
# 3. 构建提示词(融入记忆)
memory_context =""
if long_facts:
memory_context ="【你可能回忆起以下历史信息】\n"+"\n".join([f"- {f['text']}"for f in long_facts])
# ... 后续构造 messages 时,将 memory_context 和 short_history 拼接进去
# 同时把用户消息保存到短期
await self.memory.add_short_term(session_id,"user", user_input)
# 执行原有 ReAct 循环(略)
# ...
# 最终回复后保存助手消息
await self.memory.add_short_term(session_id,"assistant", final_answer)
四、运行与测试
编写测试脚本验证短期滑动窗口、长期检索和自动压缩能否正常工作。
# test_memory.py
import asyncio
from memory_manager import MemoryManager
asyncdefmain():
mm = MemoryManager()
session ="test-session-1"
# 模拟几轮对话
await mm.add_short_term(session,"user","你好,我叫张三")
await mm.add_short_term(session,"assistant","你好张三,有什么可以帮你?")
await mm.add_short_term(session,"user","我喜欢深色模式的界面")
await mm.add_short_term(session,"assistant","好的,已记住你的偏好")
# 查看短期记忆
short =await mm.get_short_term(session, last_n=4)
print("短期记忆:")
for i in short:
print(f" {i['role']}: {i['content']}")
# 模拟压缩(添加超过限制的消息)
for i inrange(12):# 添加大量消息触发压缩
await mm.add_short_term(session,"user",f"消息{i}")
# 检查短期记忆长度是否被裁剪
new_short =await mm.get_short_term(session)
print(f"\n触发压缩后短期记忆条数:{len(new_short)}")
# 手动添加一条长期事实
await mm.add_long_term("张三偏好深色模式界面",{"user_id":"user_001","category":"preference"})
# 检索
results =await mm.retrieve_long_term("界面颜色主题")
print("\n长期记忆检索结果:")
for r in results:
print(f" - {r['text']} (距离: {r['distance']})")
if __name__ =="__main__":
asyncio.run(main())
预期输出显示短期记忆滑动、压缩触发后长度控制在合理范围内,且长期检索能返回用户偏好。
生产环境提示:
使用pgvector或Qdrant替代 ChromaDB 以获得更好的持久化和高可用。
摘要时务必使用异步 LLM 调用,并设置超时和重试。
对于多租户,长期记忆的元数据必须包含tenant_id,并在检索时过滤。
自动压缩的触发策略可以更智能,例如根据 token 计数而非消息条数。
五、与 OpenClaw 的对标思考
“我们做了什么” vs “OpenClaw 为什么这样做”?
存储后端:OpenClaw 采用文件系统 +MEMORY.md纯文本存储,简单可靠,适合个人单机。我们的方案引入 Redis + 向量数据库,更适合企业多实例、高并发场景,但也增加了运维组件。
摘要机制:OpenClaw 通过内置的 LLM 调用定期将对话压缩为摘要写入文件,与我们的compress_and_store异曲同工。我们将其封装为可配置的回调,便于替换不同的摘要模型。
检索粒度:OpenClaw 一般将摘要全量加载(因为 MEMORY.md 通常不大),我们则利用向量检索按需获取相关事实,有效控制 Token 消耗。这种按需注入对于知识密集型的企业 Agent 至关重要。
数据隔离:企业版必须考虑不同租户、不同用户的记忆严格隔离。我们的设计从接口层面就支持按session_id和元数据过滤,确保不会发生数据混淆。
通过继承 OpenClaw 双源记忆的思想,并注入企业级持久化与检索能力,我们让 Agent 实现了真正跨会话的知识积累。
六、总结与下一步
本文我们:
彻底理解了双源记忆模型的设计哲学——短期保持连贯,长期提供深度。
基于 Redis 和 ChromaDB 实现了完整的MemoryManager,包含滑动窗口、向量检索与自动摘要。
将记忆系统无缝集成到 Agent 运行时,并通过测试验证了效果。
下一篇文章预告:《第7篇:工具系统与执行层》——我们将构建一个安全、可扩展的企业 Tool 生态,支持沙箱执行、超时控制和动态注册,让 Agent 的双手更加有力。