1. 引言
你有没有遇到过这样的场景——用户跟 AI 助手聊了三天,第四天打开对话,AI 完全不记得之前说过什么。用户说”我上次提到的那个项目”,AI 只能回一句”请问您指的是哪个项目?”。
这就是无状态对话的核心痛点:每次对话都是一张白纸。
市面上有不少记忆框架(比如 mem0)试图解决这个问题,但在实际落地时我们发现几个绕不开的限制:
- 黑盒提取逻辑:无法精确控制”记住什么、忘记什么”
- 存储耦合:通常绑定特定向量数据库,难以与已有基础设施整合
- 生命周期缺失:记忆只增不减,随着对话积累会变成噪声源
所以我们选择自己实现一套完整的记忆管理系统,核心闭环只有五步:
提取 → 向量化存储 → 召回 → 注入 → 清理
本文将以一个实际的 FastAPI + PostgreSQL 项目为背景,完整拆解这套方案的架构与实现,记忆提取/更新/删除,向量化存储,召回,记忆应用,过期记忆清理全流程都可以在本文中进行参考。
提示一下,本文中虽然是使用 DIFY 工作流提取记忆,使用 RAGFLOW 作为对话服务,实现方案实际上是通用的,对这些工具做了弱化处理。
2. 整体架构
全景流程图
用户发送消息
│
▼
┌──────────────────────────────────────────────────────────┐
│ 对话引擎 │
│ │
│ ① 向量检索记忆 ──→ ② 格式化记忆文本 ──→ ③ 注入提示词 │
│ │
└──────────────────────────┬───────────────────────────────┘
│
▼
LLM 生成回复
│
▼
用户收到回答
│
┌──────────────────┴──────────────────┐
│ 后台定时任务(每5分钟) │
│ │
│ ④ 扫描新对话 │
│ │ │
│ ▼ │
│ ⑤ LLM 提取记忆(add/update/delete) │
│ │ │
│ ▼ │
│ ⑥ 写入 PostgreSQL + pgvector │
└─────────────────────────────────────┘
│
┌──────────────────┴──────────────────┐
│ 后台定时任务(每天凌晨) │
│ │
│ ⑦ 清理过期的知识类记忆 │
└─────────────────────────────────────┘
核心组件
| 组件 | 职责 |
|---|---|
| PostgreSQL + pgvector | 存储记忆文本和向量,支持余弦距离检索 |
| Embedding 模型 | 将文本转为向量(bge-m3,1024 维) |
| LLM 工作流 | 分析对话历史,决定 add/update/delete 操作 |
| 定时任务 | 增量提取记忆、清理过期记忆 |
两类记忆
我们将用户记忆分为两类,它们在召回策略和生命周期上完全不同:
| 类别 | 说明 | 召回策略 | 生命周期 |
|---|---|---|---|
| persona(用户画像) | 用户的身份、偏好、习惯等 | 全量召回 | 永久保留 |
| knowledge(专业知识) | 对话中涉及的领域知识、事实 | 向量相似度检索 | 定时过期清理 |
为什么这样分?用户画像数量少且始终相关(”用户是化工行业的技术总监”),全量加载不会产生噪声。知识类记忆数量多且时效性强,必须按相关性检索,并定期淘汰过时信息。
3. 数据模型
Memory 表结构
CREATE TABLE memory (
id VARCHAR(36) PRIMARY KEY, -- UUID
user_id INTEGER NOT NULL, -- 用户ID
fact VARCHAR(2000) NOT NULL, -- 记忆内容
embedding VECTOR(1024), -- pgvector 向量字段
category VARCHAR(20) NOT NULL, -- persona | knowledge
created_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE INDEX ix_memory_user_id ON memory (user_id);
SQLModel + pgvector 的字段定义技巧
在 SQLModel(SQLAlchemy 2.0+ 的上层封装)中定义 pgvector 向量字段时,不能直接使用 Python 类型注解,需要通过 sa_column 绕过类型推断:
from pgvector.sqlalchemy import Vector
from sqlalchemy import Column
from sqlmodel import Field
class Memory(BaseModel, TimestampMixin, table=True):
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
max_length=36,
primary_key=True,
)
user_id: int = Field(index=True, nullable=False)
fact: str = Field(max_length=2000, nullable=False)
# 关键:用 sa_column 绕过 SQLModel 的类型推断
embedding: Optional[Any] = Field(
default=None,
sa_column=Column(Vector(1024), nullable=True),
)
category: str = Field(max_length=20, nullable=False)
这里有两个细节:
- 类型声明为
Optional[Any]:SQLModel 会尝试将 Python 类型映射到 SQL 类型,但它不认识Vector。声明为Any可以跳过这个推断,实际的列类型由sa_column决定。 -
sa_column=Column(Vector(1024)):直接使用 SQLAlchemy 的Column来定义,pgvector 提供的Vector(1024)指定了向量维度。
会话表扩展
在会话表中添加两个字段来支持增量记忆提取:
class Conversation(table=True):
# ... 其他字段 ...
# 上次记忆提取时最后一个用户消息的 ID
memory_extracted_message_id: str | None = Field(
None, max_length=64, nullable=True,
)
# 上次记忆提取的时间戳
memory_extracted_at: datetime | None = Field(
None, nullable=True,
)
memory_extracted_message_id:标记上次提取到哪条消息,下次只处理它之后的新消息memory_extracted_at:与last_conversation_at对比,判断是否有新对话需要提取
4. 记忆提取
记忆提取是整个系统最复杂的环节。我们通过定时任务异步完成,不影响对话的实时响应。
4.1 定时扫描策略
每 5 分钟扫描一次会话表,筛选条件:
cutoff = get_now_utc() - timedelta(minutes=5)
conversations = await db.exec(
select(Conversation).where(
Conversation.is_deleted.is_(False),
Conversation.last_conversation_at.isnot(None),
# 最后对话超过 5 分钟(冷却期,避免提取正在进行的对话)
Conversation.last_conversation_at < cutoff,
# 有未提取的新对话
(Conversation.memory_extracted_at.is_(None))
| (Conversation.last_conversation_at > Conversation.memory_extracted_at),
)
)
5 分钟冷却期的设计意图:用户连续提问时不触发提取,等对话暂停后再处理,避免对未完成的上下文做出片面判断。
4.2 消息过滤与增量定位
从对话引擎拉取完整消息列表后,只提取上次提取点之后的新消息:
def _filter_new_messages(all_messages, last_extracted_id):
valid_messages = [m for m in all_messages if m.get("id")]
if last_extracted_id:
# 找到上次提取的 message_id 最后一次出现的位置
last_match_index = None
for i, msg in enumerate(valid_messages):
if msg.get("id") == last_extracted_id:
last_match_index = i # 不 break,遍历到底取最后一个匹配
if last_match_index is not None:
new_raw = valid_messages[last_match_index + 1:]
else:
new_raw = valid_messages # 找不到则从头提取
else:
new_raw = valid_messages # 从未提取,处理全部
# 清理 AI 回复中的思考过程
filtered = []
for msg in new_raw:
content = msg.get("content", "")
if msg.get("role") == "assistant":
content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
filtered.append({"content": content, "role": msg.get("role", "")})
return filtered, last_user_message_id
这里有一个重要细节——为什么要 “遍历到底取最后一个匹配” 而不是找到第一个就 break?见后文踩坑记录。
4.3 LLM 工作流设计
我们使用独立的 LLM 工作流(实际上这个项目用的是dify)来决策记忆操作。输入是两部分信息:
| 输入 | 内容 |
|---|---|
history |
本次新增的对话消息(已过滤、已清洗) |
memory |
用户当前所有已有记忆(id + fact + type) |
工作流的输出是一个操作列表,每个操作包含:
[
{"action": "add", "fact": "用户是化工行业的安全工程师", "type": "persona"},
{"action": "update", "update_id": "xxx", "fact": "用户关注的项目已从A改为B", "type": "knowledge"},
{"action": "delete", "update_id": "yyy"}
]
三种操作:
– add:发现新的记忆点
– update:已有记忆需要修正或补充(通过 update_id 定位)
– delete:已有记忆不再准确(通过 update_id 定位)
将现有记忆作为输入的关键作用:让 LLM 做去重和冲突解决。如果用户之前说 “我在上海”,现在说 “我最近搬到北京了”,LLM 会输出一个 update 操作而非 add,避免两条矛盾的记忆共存。
LLM 记忆提取提示词参考: