LangChain 消息处理全解析:缓存、过滤、合并与流式输出实战
文章目录

一、消息内存缓存
核心概念
通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能"记住"之前的对话内容。
关键组件
| 组件 | 作用 |
|---|---|
InMemoryChatMessageHistory | 内存中的聊天记录存储器 |
RunnableWithMessageHistory | 将模型包装为支持历史记录的可运行对象 |
memory_store(字典) | 以 session_id 为 key 管理多个会话的历史 |
代码流程
# 1. 创建内存存储字典 memory_store ={}# 2. 定义获取会话历史的函数(按 session_id 区分会话)defget_session_history(session_id:str):if session_id notin memory_store: memory_store[session_id]= InMemoryChatMessageHistory()return memory_store[session_id]# 3. 用 RunnableWithMessageHistory 包装模型 message_model = RunnableWithMessageHistory(model, get_session_history)# 4. 通过 config 指定会话 ID config ={"configurable":{"session_id":"123"}}# 5. 多轮对话,模型自动记住上下文 response1 = message_model.invoke({"input":"你好,我是小明"}, config=config) response2 = message_model.invoke({"input":"我叫什么名字?"}, config=config)# → 模型能回答出"小明",因为历史被缓存了运行效果
- 第一轮:用户说"我是小明",AI 正常打招呼
- 第二轮:用户问"我叫什么名字",AI 能从历史中回忆出"小明"
从LangChain的v0.3版本开始,官⽅建议LangChain⽤⼾不要使⽤
RunnableWithMessageHistory ,⽽是利⽤ LangGraph 持久性 来完成
二、消息过滤
核心概念
使用 filter_messages 函数对消息列表进行筛选,按类型或ID过滤消息。
关键函数
from langchain_core.messages import filter_messages 过滤参数
| 参数 | 作用 | 示例 |
|---|---|---|
include_types | 只保留指定类型的消息 | ["ai"] → 只保留 AI 消息 |
exclude_ids | 排除指定 ID 的消息 | ["4"] → 排除 id 为 “4” 的消息 |
代码示例
messages =[ HumanMessage(content="你好,我是小明",id="1"), AIMessage(content="你好,小明!很高兴认识你!",id="2"), HumanMessage(content="我想知道我之前的名字",id="3"), AIMessage(content="你之前的名字是小绿!",id="4"),]# 过滤:只保留 AI 消息,且排除 的消息 filtered_messages = filter_messages( messages, include_types=["ai"], exclude_ids=["4"],)# → 结果只剩 的 AIMessage: "你好,小明!很高兴认识你!"过滤逻辑
原始消息 → include_types=["ai"] 筛掉 Human 消息 → exclude_ids=["4"] 再排除 id=4 → 最终结果
原始: [Human#1, AI#2, Human#3, AI#4] ↓ include_types=["ai"] 中间: [AI#2, AI#4] ↓ exclude_ids=["4"] 结果: [AI#2] 三、消息合并
核心概念
使用 merge_message_runs 将连续的同类型消息合并为一条,避免多条连续 Human 或 AI 消息导致模型报错或行为异常。
关键函数
from langchain_core.messages import merge_message_runs 代码示例
messages =[ HumanMessage(content="你好",id="1"), HumanMessage(content="我是小明",id="2"),# 连续两条 Human AIMessage(content="你好,小明!",id="3"), AIMessage(content="很高兴认识你!",id="4"),# 连续两条 AI] merged_messages = merge_message_runs(messages)合并效果
合并前(4条): human: 你好 human: 我是小明 ai: 你好,小明! ai: 很高兴认识你! 合并后(2条): human: 你好\n我是小明 ai: 你好,小明!\n很高兴认识你! 两种使用方式
# 方式一:直接调用函数合并后传给模型 merged_messages = merge_message_runs(messages) model.invoke(merged_messages)# 方式二:通过管道(pipe)操作,合并与模型调用串联 chain = merge_message_runs | model response = chain.invoke(messages)管道方式更简洁,适合在 LangChain 链式调用中使用。
四、流式输出
什么是流式输出
流式输出(Streaming) 是指 AI 模型逐字返回内容,而不是等待全部生成完毕后一次性返回。就像 ChatGPT 那样,文字一个个"打"出来,而不是突然全部出现。
为什么需要?
AI 生成长文本可能需要几秒甚至更长时间。传统方式用户需要等待整个响应完成才能看到内容,体验很差。流式输出实时展示生成过程,让用户感觉响应更快,交互更自然。
| 特性 | 非流式 | 流式 |
|---|---|---|
| 用户体验 | 需要等待 | 实时看到 |
| 适用场景 | 短文本 | 聊天对话、长文本 |
| 内存占用 | 一次性加载 | 逐块处理 |
| 可控性 | 无法中断 | 可随时停止 |
典型应用
- 聊天机器人:像 ChatGPT 一样逐字显示
- 文章生成:实时展示生成过程
- 代码生成:逐行显示代码
- 实时翻译:边翻译边显示
五、同步 vs 异步流式
LangChain 提供两种流式方式:同步(stream)和异步(astream)。
核心区别
| 特性 | 同步 stream | 异步 astream |
|---|---|---|
| 调用 | chain.stream() | chain.astream() |
| 循环 | for chunk in | async for chunk in |
| 阻塞 | 阻塞线程 | 不阻塞,可并发 |
| 场景 | 单个请求 | 多个并发请求 |
| 性能 | 一般 | 更高 |
工作原理
同步流式: 阻塞当前线程,处理一个请求时无法处理其他请求。就像排队买咖啡,必须等前一个人买完。
异步流式: 使用协程机制,等待 AI 响应时可以切换到其他任务。就像服务员可以同时为多桌客人点单。
何时使用异步?
推荐:
- 多用户 Web 应用
- 高并发聊天机器人
- 与其他异步操作结合
不需要:
- 简单的单次调用
- 学习测试阶段
六、流式输出基础用法
同步流式
from langchain_deepseek import ChatDeepSeek from langchain_core.output_parsers import StrOutputParser model = ChatDeepSeek(model="deepseek-chat", streaming=True) parser = StrOutputParser() chain = model | parser for chunk in chain.stream("写一个关于程序员的笑话"):print(chunk, end="|", flush=True)关键点:
streaming=True:必须设置flush=True:立即刷新输出
异步流式
import asyncio asyncdefmain(): chain = model | parser asyncfor chunk in chain.astream("写一个关于程序员的笑话"):print(chunk, end="|", flush=True)if __name__ =="__main__": asyncio.run(main())关键点:
async def:定义异步函数async for:异步迭代asyncio.run():运行入口
七、输出解析器
StrOutputParser 是最常用的解析器,将模型输出转换为纯文本。
作用:
- 提取文本内容
- 去除多余格式
- 统一输出格式
自定义解析器:
defcustom_parser(output:str)->str:return output.strip().replace("。","!") chain = model | parser | custom_parser 应用场景:
- 格式转换(Markdown → HTML)
- 内容过滤审核
- 特殊字符处理
八、流式输出实际应用
1. 聊天机器人
用户发送消息后,AI 回复逐字显示,像真人打字。使用异步流式提高响应速度。
2. 多用户并发
Web 应用中多个用户同时请求,异步流式可以并发处理。
性能对比:
- 同步:3个请求需要 15 秒(串行)
- 异步:3个请求只需 5 秒(并发)
3. FastAPI 集成
from fastapi import FastAPI from fastapi.responses import StreamingResponse @app.get("/chat")asyncdefchat_stream(question:str):asyncdefgenerate():asyncfor chunk in chain.astream(question):yield chunk return StreamingResponse(generate(), media_type="text/plain")九、常见问题
1. 没有流式效果?
原因: 忘记 streaming=True 或 flush=True
2. async for 报错?
原因: 使用了 ainvoke() 而不是 astream()
ainvoke() 返回完整结果,astream() 返回流式迭代器。
3. 性能对比
- 单个请求:同步和异步相近
- 多个并发:异步快 3 倍
十、总结对比
| 功能 | 函数/类 | 用途 |
|---|---|---|
| 内存缓存 | InMemoryChatMessageHistory + RunnableWithMessageHistory | 让模型记住多轮对话上下文 |
| 消息过滤 | filter_messages | 按类型/ID 筛选消息 |
| 消息合并 | merge_message_runs | 合并连续同类型消息 |
| 流式输出 | stream / astream | 实时逐字返回,提升体验 |
| 输出解析 | StrOutputParser | 将模型输出转为纯文本 |
典型应用场景
- 内存缓存:多轮对话场景,用户问"我之前说了什么"时模型能回答
- 消息过滤:只提取 AI 回复做摘要、排除某些敏感消息
- 消息合并:用户连续发了多条消息时,合并后再发给模型,避免格式错误
- 流式输出:聊天机器人逐字显示、长文本生成、FastAPI 接口集成
流式输出要点
- 流式输出 = 实时返回,提升体验
- 同步 = 简单,适合学习
- 异步 = 高性能,适合生产
- 必须设置
streaming=True和flush=True