AI对话页的流式处理架构:基于Web Streams+Fetch API的实践

AI对话页的流式处理架构:基于Web Streams+Fetch API的实践

引言

        当前AI浪潮下,基于各大agent平台,我们可以在几分钟内就搭建出一个具备页面交互的智能体,从问答输出到页面交互,这个过程中的数据流转、UI实现被统一封装以降低模型搭建复杂度。为了探索这个过程的底层实现,我们采用“生产者-消费者模式”的流式处理架构,将网络IO、数据解码、文本解析与UI渲染解耦,实现实时流式响应、UI增量渲染。

使用框架如下:

  • 前端框架:Vue 3 + TypeScript + Vite
  • UI组件库:Ant Design Vue、Ant Design X Vue
  • 流处理:Web Streams API + Fetch API

        从请求发送到UI渲染,流程如下:

流式响应处理

请求管理

  1. 采用 AbortControllerReadableStreamDefaultReader 实现“上游网络请求中止”和“下游字节流读取控制”,共同实现一次会话的可取消、可停止的流式处理。
  2. AbortController:管理当前请求的“上游网络中止”句柄,用于在开始新的提问前中止上一轮未完成的请求,或用户点击取消时终止本次请求。
  3. ReadableStreamDefaultReader:管理“下游传输层字节流”的读取器句柄,用来驱动上游生产者向管道入队字节块,以及在用户点击取消时终止字节读取。
// 流控制相关句柄 let abortController: AbortController | null = null; let currentReader: ReadableStreamDefaultReader<Uint8Array> | null = null;

流处理管道

        建立 Web Streams 流式解析管线:生产者 → 解码 → 按行拆分 → 消费者。整体处理流程如下所示:

生产者流

        负责把上游 reader 的chunk字节块统一按流的背压节奏入队,供下游统一消费,实现“读-推送”连续循环。并将下游或外部触发的取消信号正确传播到上游,终止读取链路。当外部状态指示停止或上游耗尽时,关闭控制器并复位响应状态,保证资源释放与状态一致性。

实现原理可参考文档:https://developer.mozilla.org/zh-CN/docs/Web/API/ReadableStream

 const producerStream = new ReadableStream<Uint8Array>({ start(controller) { function pump() { if (!isResponding.value) { controller.close(); return; } currentReader?.read().then(({ done, value }) => { if (done) { isResponding.value = false; controller.close(); return; } controller.enqueue(value); // 推送字节块 pump(); }); } pump(); }, });

转换流

        当上游将生产的字节块入队后,我们构建一条流式处理管道,兼容粘包/半包:先将上游的二进制字节流解码为字符串流,再定义转换流(TransformStream类型)按行拆分并过滤空行,确保下游以“完整且非空的文本行”为单位消费数据。并在流结束的 flush() 钩子中,再次冲刷缓冲区,以处理可能残留的最后一行,避免丢失收尾数据,最后得到的是由一对可读流和可写流组成的TransformStream

const textStream = producerStream.pipeThrough(new TextDecoderStream() as unknown as TransformStream<Uint8Array, string>); let; const lineSplitter = new TransformStream<string, string>({ transform(chunk, controller) { buffer += chunk; const lines = buffer.split('\n'); buffer = lines.pop() || ''; for (const line of lines) { const trimmed = line.trim(); if (trimmed) controller.enqueue(trimmed); } }, flush(controller) { const trimmed = buffer.trim(); if (trimmed) controller.enqueue(trimmed); }, });

SSE解析流

        构建SSE 消费者读取器并驱动 UI 增量渲染,读取时若 done = true,表示“传输层的流”已经真正结束(数据源已关闭,后续不会再有字节到来),这时调用 releaseLock 释放读取器的独占锁;若读取到 message === '[DONE]',表示应用层的结束,业务上不再需要继续读取,但此时连接/流不一定已被对端关闭,后续仍可能存在空闲或遗留的字节,主动调用 cancel 终止读取,cancel() 方法返回一个 Promise,这个Promise 在流被取消时兑现,消费者在流中调用该方法发出取消流的信号。

const sseStream = textStream.pipeThrough(lineSplitter); // ReadableStream const downstreamReader = sseStream.getReader(); // ReadableStreamDefaultReader function consume() { downstreamReader.read().then(({ done, value }) => { if (done) { isResponding.value = false; downstreamReader.releaseLock(); return; } const message = value.replace(/^data:\s*/, ''); if (message === '[DONE]') { …… } try { const parsed = JSON.parse(message); const content = parsed?.choices?.[0]?.delta?.content; if (content) { streamReply += content; // 消息增量渲染 if (chatList.value.length < index + 1) { const newReply: ChatItem = { key: index, role: 'assistant', content: streamReply, }; chatList.value.push(newReply); } else { chatList.value[index]!.content = streamReply; } } } catch (e) { console.warn('SSE parse error:', e); } consume(); }); } consume();

        整体数据流转如下所示:

API通信层

发送请求

        以qwen-plus模型为例,其接入方式如下:

async function fetchReply(list: ChatItem[], signal?: AbortSignal) { const messages = list.map(({ role, content }) => ({ role, content })); return fetch('https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions', { method: 'POST', headers: { Authorization: `Bearer ${import.meta.env.VITE_ALIYUN_API_KEY}`, 'Content-Type': 'application/json', }, body: JSON.stringify({ model: 'qwen-plus', messages, stream: true, }), signal, }); }

中止请求

  1. 重置 UI 响应状态:将会话状态标记为非响应中,避免界面继续显示“正在响应”等状态提示
  2. 终止流式读取:如果存在当前的流读取器,则主动取消读取,防止继续从流中消费数据,并清理引用
  3. 终止网络请求:如果存在未完成的流,则触发终止,并清理引用
const handleCancel = () => { isResponding.value = false; // 终止读取 if (currentReader) { try { currentReader.cancel('user canceled'); } catch (e) { console.error(e); } currentReader = null; } // 终止请求 if (abortController) { try { abortController.abort('user cancellation'); } catch (e) { console.error(e); } abortController = null; } message.error('已取消发送'); };

基础UI交互组件封装

        UI设计主要基于Ant Design VueAnt Design X Vue组件库,其中 Ant Design X Vue 专注于Vue生态的先进AI组件库,旨在简化对话式AI应用的开发,同时支持tstsx

消息展示

  1. 功能:展示对话历史,支持多角色渲染,支持Markdown渲染。
  2. 处理 markdown 输出渲染:
import { h } from 'vue'; import { type BubbleProps } from 'ant-design-x-vue'; import { Typography } from 'ant-design-vue'; import markdownit from 'markdown-it'; const md = new markdownit({ html: false, breaks: true, linkify: true, typographer: true }); const renderMarkdown: BubbleProps['messageRender'] = (content) => h(Typography, null, { default: () => h('div', { innerHTML: md.render(content) }), });

3. 角色映射配置:

import { UserOutlined } from '@ant-design/icons-vue'; import { h } from 'vue'; const rolesAsObject = { assistant: { placement: 'start', avatar: { icon: h(UserOutlined), style: { background: '#fde3cf' } }, typing: { step: 5, interval: 20 }, styles: { maxWidth: '600px', }, messageRender: renderMarkdown, }, user: { placement: 'end', avatar: { icon: h(UserOutlined), style: { background: '#87d068' } }, }, system: { placement: 'start', avatar: { icon: h(UserOutlined), style: { background: '#d9d9d9' } }, styles: { maxWidth: '600px', }, messageRender: renderMarkdown, }, } as const;

4.  适配 BubbleList 组件的 roles 类型处理

const bubbleListRoles = rolesAsObject as NonNullable<BubbleListProps['roles']>; const bubbleItems = computed(() => props.chatList.map((m, idx) => { type RoleKey = keyof typeof rolesAsObject; const roleKey = (m.role in rolesAsObject ? m.role : 'assistant') as RoleKey; return { key: m.key ?? idx, role: m.role, // 绑定role,对应rolesAsObject中的配置项 placement: rolesAsObject[roleKey].placement, avatar: rolesAsObject[roleKey].avatar, content: m.content, }; }) );

5. 最后将 bubbleItems 和 bubbleListRoles 传入 BubbleList 组件
 

// ChatBubble组件 <template> <BubbleList :items="bubbleItems" :roles="bubbleListRoles" /> </template> // 使用方法 <ChatBubble :chat-list="chatList" :md-render="false" />

消息发送

  1. 功能:支持消息输入、发送控制、状态展示
<template> <Sender :value="props.inputText" @update:value="onUpdateValue" :loading="props.isResponding" :auto-size="{ minRows: 2, maxRows: 6 }" :onSubmit="handleSubmit" :onCancel="handleCancel" /> </template> <script setup lang="ts"> import { Sender } from 'ant-design-x-vue'; const props = defineProps({ inputText: String, isResponding: Boolean, }); const emit = defineEmits(['submit', 'cancel', 'update:inputText']); const onUpdateValue = (val: string) => { emit('update:inputText', val); }; const handleSubmit = () => { emit('submit', props.inputText); }; const handleCancel = () => { emit('cancel'); }; </script>

        以上是AI对话页中最基础也是必不可少的部分,基于业务背景和用户体验提升,我们还可以添加更多的交互配置,比如还可以使用 vue-clipboard3 库中的 toClipboard 方法实现一键复制功能,等等。

Read more

基于FPGA的千兆以太网源代码实现与设计实战

本文还有配套的精品资源,点击获取 简介:本设计基于FPGA平台,实现千兆以太网的数据传输功能,适用于高速网络通信场景,如视频信号的高效传输。通过Verilog等硬件描述语言,构建包括以太网物理层(PHY)、MAC控制器、Wishbone总线接口等核心模块,并提供完整的测试平台与行为模型用于仿真验证。配套的使用说明指导开发者在特定FPGA平台上配置和部署该系统,具有较强的工程实用性。该方案广泛应用于嵌入式系统、工业控制和高性能数据传输领域,是掌握FPGA网络接口开发的重要实践项目。 1. FPGA千兆以太网设计概述 随着高速通信需求的不断增长,基于FPGA实现千兆以太网接口已成为嵌入式系统、工业控制和视频传输等领域的重要技术手段。本章从系统架构出发,阐述FPGA在千兆以太网设计中的核心优势——强大的并行处理能力、灵活的可重构性以及极低的数据处理延迟。重点介绍关键功能模块的划分与协作机制,包括PHY层接口、MAC控制器、Wishbone总线桥接及数据包处理引擎,并结合IEEE 802.3标准解析千兆以太网帧结构与物理层规范。同时,明确顶层模块( eth_top )的数据流向与控制

OpenClaw实战系列01:OpenClaw接入飞书机器人全接入指南 + Ollama本地大模型

文章目录 * 引言 * 第一步:环境准备与核心思想 * 第二步:部署Ollama——把大模型“养”在本地 * 1. 安装 Ollama * 2. 拉取并运行模型 * 3. 确认API可用性 * 第三步:安装OpenClaw——AI大脑的“躯干” * 1. 安装Node.js * 2. 一键安装 OpenClaw * 3. 验证安装 * 第四步:打通飞书——创建并配置机器人 * 1. 创建飞书应用 * 2. 配置机器人能力 * 3. 发布应用 * 第五步:OpenClaw与飞书“握手” * 方法一:使用 onboard 向导重新配置(推荐最新版) * 方法二:手动添加渠道 * 批准配对 * 第六步:实战测试与玩法拓展

3步实现Stable Diffusion本地部署与性能优化指南

3步实现Stable Diffusion本地部署与性能优化指南 【免费下载链接】stable-diffusion-webui-reForge 项目地址: https://gitcode.com/gh_mirrors/st/stable-diffusion-webui-reForge Stable Diffusion WebUI Forge/reForge是一款基于Gradio(开源Web界面框架)构建的AI绘画工具,通过模块化架构设计和推理加速技术,帮助用户在本地高效部署专业级图像生成系统。本文将从核心价值解析、环境准备、多场景启动方案到进阶优化技巧,全面指导您完成从部署到调优的全流程。 核心价值解析:为何选择reForge架构? ⚡️ 推理引擎深度优化 采用自研的K-Diffusion采样算法优化实现,相比传统扩散模型推理速度提升40%,在保持图像质量的同时将生成时间从平均60秒压缩至35秒以内。通过动态阈值调整和混合精度计算,在消费级GPU上也能流畅运行512x512分辨率图像生成。 🔧 模块化插件生态 创新的插件架构支持ControlNet、LoRA等扩展功能即

Llama-3.2V-11B-cot一文详解:bf16显存优化与流式输出实现原理

Llama-3.2V-11B-cot一文详解:bf16显存优化与流式输出实现原理 1. 项目概述 Llama-3.2V-11B-cot是基于Meta Llama-3.2V-11B-cot多模态大模型开发的高性能视觉推理工具。该工具针对双卡RTX 4090环境进行了深度优化,解决了视觉权重加载等关键问题,支持Chain of Thought(CoT)逻辑推演和流式输出功能。 1.1 核心特性 * 新手友好设计:提供开箱即用的优化配置,无需复杂设置 * 双卡自动分配:智能拆分模型到两张显卡,充分利用硬件资源 * bf16显存优化:采用半精度计算大幅降低显存占用 * 流式推理展示:实时显示模型思考过程,提升交互体验 * 现代化界面:基于Streamlit构建直观易用的聊天式界面 2. bf16显存优化原理 2.1 半精度计算的优势 传统深度学习模型通常使用fp32(单精度浮点数)进行计算,但这会带来较大的显存开销。bf16(Brain Floating Point)是一种16位浮点数格式,相比fp32可以: * 减少50%的显存占用