LangChain 消息处理全解析:缓存、过滤、合并与流式输出实战

LangChain 消息处理全解析:缓存、过滤、合并与流式输出实战

文章目录

在这里插入图片描述


一、消息内存缓存

核心概念

通过 InMemoryChatMessageHistory 将对话历史存储在内存中,使模型能"记住"之前的对话内容。

关键组件

组件作用
InMemoryChatMessageHistory内存中的聊天记录存储器
RunnableWithMessageHistory将模型包装为支持历史记录的可运行对象
memory_store(字典)session_id 为 key 管理多个会话的历史

代码流程

# 1. 创建内存存储字典 memory_store ={}# 2. 定义获取会话历史的函数(按 session_id 区分会话)defget_session_history(session_id:str):if session_id notin memory_store: memory_store[session_id]= InMemoryChatMessageHistory()return memory_store[session_id]# 3. 用 RunnableWithMessageHistory 包装模型 message_model = RunnableWithMessageHistory(model, get_session_history)# 4. 通过 config 指定会话 ID config ={"configurable":{"session_id":"123"}}# 5. 多轮对话,模型自动记住上下文 response1 = message_model.invoke({"input":"你好,我是小明"}, config=config) response2 = message_model.invoke({"input":"我叫什么名字?"}, config=config)# → 模型能回答出"小明",因为历史被缓存了

运行效果

  • 第一轮:用户说"我是小明",AI 正常打招呼
  • 第二轮:用户问"我叫什么名字",AI 能从历史中回忆出"小明"

从LangChain的v0.3版本开始,官⽅建议LangChain⽤⼾不要使⽤
RunnableWithMessageHistory ,⽽是利⽤ LangGraph 持久性 来完成


二、消息过滤

核心概念

使用 filter_messages 函数对消息列表进行筛选,按类型ID过滤消息。

关键函数

from langchain_core.messages import filter_messages 

过滤参数

参数作用示例
include_types只保留指定类型的消息["ai"] → 只保留 AI 消息
exclude_ids排除指定 ID 的消息["4"] → 排除 id 为 “4” 的消息

代码示例

messages =[ HumanMessage(content="你好,我是小明",id="1"), AIMessage(content="你好,小明!很高兴认识你!",id="2"), HumanMessage(content="我想知道我之前的名字",id="3"), AIMessage(content="你之前的名字是小绿!",id="4"),]# 过滤:只保留 AI 消息,且排除 的消息 filtered_messages = filter_messages( messages, include_types=["ai"], exclude_ids=["4"],)# → 结果只剩 的 AIMessage: "你好,小明!很高兴认识你!"

过滤逻辑

原始消息 → include_types=["ai"] 筛掉 Human 消息 → exclude_ids=["4"] 再排除 id=4 → 最终结果

原始: [Human#1, AI#2, Human#3, AI#4] ↓ include_types=["ai"] 中间: [AI#2, AI#4] ↓ exclude_ids=["4"] 结果: [AI#2] 

三、消息合并

核心概念

使用 merge_message_runs连续的同类型消息合并为一条,避免多条连续 Human 或 AI 消息导致模型报错或行为异常。

关键函数

from langchain_core.messages import merge_message_runs 

代码示例

messages =[ HumanMessage(content="你好",id="1"), HumanMessage(content="我是小明",id="2"),# 连续两条 Human AIMessage(content="你好,小明!",id="3"), AIMessage(content="很高兴认识你!",id="4"),# 连续两条 AI] merged_messages = merge_message_runs(messages)

合并效果

合并前(4条): human: 你好 human: 我是小明 ai: 你好,小明! ai: 很高兴认识你! 合并后(2条): human: 你好\n我是小明 ai: 你好,小明!\n很高兴认识你! 

两种使用方式

# 方式一:直接调用函数合并后传给模型 merged_messages = merge_message_runs(messages) model.invoke(merged_messages)# 方式二:通过管道(pipe)操作,合并与模型调用串联 chain = merge_message_runs | model response = chain.invoke(messages)

管道方式更简洁,适合在 LangChain 链式调用中使用。


四、流式输出

什么是流式输出

流式输出(Streaming) 是指 AI 模型逐字返回内容,而不是等待全部生成完毕后一次性返回。就像 ChatGPT 那样,文字一个个"打"出来,而不是突然全部出现。

为什么需要?

AI 生成长文本可能需要几秒甚至更长时间。传统方式用户需要等待整个响应完成才能看到内容,体验很差。流式输出实时展示生成过程,让用户感觉响应更快,交互更自然。

特性非流式流式
用户体验需要等待实时看到
适用场景短文本聊天对话、长文本
内存占用一次性加载逐块处理
可控性无法中断可随时停止

典型应用

  1. 聊天机器人:像 ChatGPT 一样逐字显示
  2. 文章生成:实时展示生成过程
  3. 代码生成:逐行显示代码
  4. 实时翻译:边翻译边显示

五、同步 vs 异步流式

LangChain 提供两种流式方式:同步(stream)和异步(astream)。

核心区别

特性同步 stream异步 astream
调用chain.stream()chain.astream()
循环for chunk inasync for chunk in
阻塞阻塞线程不阻塞,可并发
场景单个请求多个并发请求
性能一般更高

工作原理

同步流式: 阻塞当前线程,处理一个请求时无法处理其他请求。就像排队买咖啡,必须等前一个人买完。

异步流式: 使用协程机制,等待 AI 响应时可以切换到其他任务。就像服务员可以同时为多桌客人点单。

何时使用异步?

推荐:

  • 多用户 Web 应用
  • 高并发聊天机器人
  • 与其他异步操作结合

不需要:

  • 简单的单次调用
  • 学习测试阶段

六、流式输出基础用法

同步流式

from langchain_deepseek import ChatDeepSeek from langchain_core.output_parsers import StrOutputParser model = ChatDeepSeek(model="deepseek-chat", streaming=True) parser = StrOutputParser() chain = model | parser for chunk in chain.stream("写一个关于程序员的笑话"):print(chunk, end="|", flush=True)

关键点:

  • streaming=True:必须设置
  • flush=True:立即刷新输出

异步流式

import asyncio asyncdefmain(): chain = model | parser asyncfor chunk in chain.astream("写一个关于程序员的笑话"):print(chunk, end="|", flush=True)if __name__ =="__main__": asyncio.run(main())

关键点:

  • async def:定义异步函数
  • async for:异步迭代
  • asyncio.run():运行入口

七、输出解析器

StrOutputParser 是最常用的解析器,将模型输出转换为纯文本。

作用:

  • 提取文本内容
  • 去除多余格式
  • 统一输出格式

自定义解析器:

defcustom_parser(output:str)->str:return output.strip().replace("。","!") chain = model | parser | custom_parser 

应用场景:

  • 格式转换(Markdown → HTML)
  • 内容过滤审核
  • 特殊字符处理

八、流式输出实际应用

1. 聊天机器人

用户发送消息后,AI 回复逐字显示,像真人打字。使用异步流式提高响应速度。

2. 多用户并发

Web 应用中多个用户同时请求,异步流式可以并发处理。

性能对比:

  • 同步:3个请求需要 15 秒(串行)
  • 异步:3个请求只需 5 秒(并发)

3. FastAPI 集成

from fastapi import FastAPI from fastapi.responses import StreamingResponse @app.get("/chat")asyncdefchat_stream(question:str):asyncdefgenerate():asyncfor chunk in chain.astream(question):yield chunk return StreamingResponse(generate(), media_type="text/plain")

九、常见问题

1. 没有流式效果?

原因: 忘记 streaming=Trueflush=True

2. async for 报错?

原因: 使用了 ainvoke() 而不是 astream()

ainvoke() 返回完整结果,astream() 返回流式迭代器。

3. 性能对比

  • 单个请求:同步和异步相近
  • 多个并发:异步快 3 倍

十、总结对比

功能函数/类用途
内存缓存InMemoryChatMessageHistory + RunnableWithMessageHistory让模型记住多轮对话上下文
消息过滤filter_messages按类型/ID 筛选消息
消息合并merge_message_runs合并连续同类型消息
流式输出stream / astream实时逐字返回,提升体验
输出解析StrOutputParser将模型输出转为纯文本

典型应用场景

  • 内存缓存:多轮对话场景,用户问"我之前说了什么"时模型能回答
  • 消息过滤:只提取 AI 回复做摘要、排除某些敏感消息
  • 消息合并:用户连续发了多条消息时,合并后再发给模型,避免格式错误
  • 流式输出:聊天机器人逐字显示、长文本生成、FastAPI 接口集成

流式输出要点

  1. 流式输出 = 实时返回,提升体验
  2. 同步 = 简单,适合学习
  3. 异步 = 高性能,适合生产
  4. 必须设置 streaming=Trueflush=True

Read more

污泥清淤机器人实践复盘分享

污泥清淤机器人实践复盘分享

污泥清淤机器人实践复盘:从行业痛点看智能化解决方案 在化工、市政、河道治理等众多领域,清淤作业长期面临着安全风险高、效率低下、环境影响大等严峻挑战。传统人工作业方式在有毒有害、密闭缺氧的环境中难以为继,行业对安全、高效、智能的清淤解决方案需求迫切。近年来,以清淤机器人为代表的特种作业装备快速发展,为行业带来了革命性的变化。本文旨在通过实践复盘,深入探讨清淤机器人的应用价值、技术要点与发展趋势。 一、行业痛点催生技术变革 清淤作业,尤其是工业场景下的清淤,绝非简单的体力劳动。行业报告显示,在化工厂、钢铁冶金、污水处理厂等场所,作业环境往往伴随着高浓度有毒有害化学物质、污泥厌氧分解产生的易燃易爆气体,以及密闭空间氧气不足导致的窒息风险。人工清淤事故频发,使得安全规范日益严格,传统作业模式已触及瓶颈。 与此同时,市政管网、水库涵洞、港口航道等受限空间的清淤需求巨大,但空间狭小、环境复杂,人员与大型设备均难以进入。这些痛点共同构成了对“人不能近、人不能及、人不能为”作业场景的精准描述,也成为了推动水下清淤机器人等智能装备从研发走向广泛应用的核心驱动力。 二、清淤机器人的核心技术与

破局新农业数智化困境:低代码不是“捷径”,而是重构生产逻辑的技术密钥

破局新农业数智化困境:低代码不是“捷径”,而是重构生产逻辑的技术密钥

作为IT互联网产品技术从业者,笔者长期关注各行业数智化转型落地场景,发现一个极具反差感的现象:一边是国家政策持续加码智慧农业,2025年中央一号文件明确支持拓展人工智能、数据等技术在农业领域的应用场景,资本也在不断涌入农业数智化赛道;另一边是绝大多数农业主体(尤其是中小农户、乡镇合作社)仍被困在“不会用、用不起、用不好”的死循环里,所谓的“数智化解决方案”,要么是脱离实际场景的“空中楼阁”,要么是成本高企、运维复杂的“沉重包袱”。        很多人将新农业数智化的瓶颈归结为“农业从业者数字化素养低”“农村基础设施落后”,但笔者始终认为,核心矛盾不在于“人”和“硬件”,而在于“技术供给与农业场景需求的严重错配”。传统IT开发模式下,农业数智化系统开发周期长、定制化难度大、运维成本高,根本无法适配农业场景碎片化、需求多样化、预算有限化的核心特征——你不可能让一个种植合作社花几十万、等半年,去开发一套仅用于蔬菜大棚环境监测的系统;也不可能让一个农户,去操作一套需要专业IT技能才能运维的复杂平台。        就在行业陷入“两难”之际,低代码平台的崛起,似乎为新农业数智

多模态学习(五):基于可变形注意力的无人机可见光-红外图像配准算法解析

1. 引言:当无人机“双眼”看到的世界不一样 大家好,我是老张,一个在AI和无人机视觉领域摸爬滚打了十来年的工程师。今天想和大家聊聊一个听起来有点专业,但实际上非常“接地气”的问题:怎么让无人机上的“两只眼睛”看到同一个东西? 想象一下,你操控的无人机上装了两台相机:一台是我们日常用的可见光相机,能拍出色彩斑斓的画面;另一台是红外热成像相机,能在黑夜或雾霾中“看见”物体散发的热量。这本来是件好事,相当于给无人机开了“天眼”。但现实很骨感,由于这两台相机安装位置、镜头视角不可能完全一致,它们拍下的同一场景,在图像上往往是错位的。这就好比你的左眼和右眼看到的画面对不上,不仅看着头晕,更严重的是,当你用这些错位的图像去做目标检测、跟踪或者融合时,结果会一塌糊涂。 这就是“可见光-红外图像配准”要解决的核心问题。简单说,就是通过算法计算,把红外图像“掰正”,让它和可见光图像在空间上严丝合缝地对齐。过去,学术界很多研究都默认这两幅图是已经对齐好的,直接拿来做后续分析。但实际飞过无人机的朋友都知道,这纯属理想情况。

2026 无人机 AI 算法全景图:7 大场景 50+ 算法详解

2026 无人机 AI 算法全景图:7 大场景 50+ 算法详解 一张图看懂无人机 AI 算法全貌 前言 很多人问我:共达地到底有哪些算法? 今天把我们的算法家底全部公开,7 大场景、50+ 算法,建议收藏备用。 一、飞行辅助类算法 让无人机飞得更稳、更安全。 1. 自动避障算法 功能: 实时检测前方障碍物,自动规划绕行路径 技术: * 深度相机 + 激光雷达融合 * 3D 点云分割 * 动态路径规划 性能: * 检测距离:0.5-50 米 * 响应时间:<100ms * 支持静态 + 动态障碍物 2. 精准定位算法 功能: 无 GPS