第3节:LlamaIndex工作流:从入门到实战

目录
一、前言
在构建大语言模型(LLM)应用时,我们经常会遇到这样的场景:需要调用多次模型、查询多个数据源、根据不同条件执行不同分支逻辑,甚至需要人工介入审核。传统的顺序代码逻辑在这种复杂场景下往往显得力不从心——代码耦合度高、难以调试、无法灵活扩展。
LlamaIndex Workflows 正是为解决这些问题而生的轻量级事件驱动框架。它于 2025 年 6 月发布了 1.0 正式版,成为一个独立的 Python/TypeScript 包,其核心理念是:将复杂的 AI 应用拆解为多个独立的“步骤”(Step),步骤之间通过“事件”(Event)通信,由框架负责调度执行。
这种设计带来的好处是显而易见的:代码模块化、易于测试、支持并行执行、内置可视化调试工具。无论你是想构建一个 RAG 机器人、一个多智能体协作系统,还是一个需要人工审批的内容生成流水线,LlamaIndex Workflows 都能成为你的得力助手。
本文将带你从零开始,通过大量的代码示例和实战案例,全面掌握这一强大的编排框架。
二、LlamaIndex 工作流
2.1 核心组件
在开始编写工作流之前,我们需要理解几个核心概念。如果把工作流比作一个智能工厂,那么事件就是工厂里流转的“物料”,步骤就是处理物料的“工位”,而上下文则是工位之间共享的“信息白板”。
2.1.1 定义工作流事件
事件是工作流中最基本的通信单元。在 LlamaIndex 中,所有事件都继承自 Event 类,它本质上是一个 Pydantic 模型,可以携带结构化数据。
from llama_index.core.workflow import Event from typing import List, Optional # 定义一个简单的事件,携带一条消息 class MessageEvent(Event): content: str # 定义一个更复杂的事件,携带多个字段 class AnalysisEvent(Event): topic: str keywords: List[str] confidence: float除了自定义事件,框架还提供了两个特殊的内置事件:
StartEvent:工作流的入口事件,当你调用workflow.run()时,传入的参数会自动封装成它StopEvent:工作流的出口事件,当某个步骤返回它时,工作流会立即终止,并返回其中的result字段
from llama_index.core.workflow import StartEvent, StopEvent # StartEvent 可以携带任意字段 # StopEvent 需要传入 result 参数2.1.2 设置工作流类
定义好事件后,我们需要创建一个继承自 Workflow 的类,并在其中定义各个步骤。
from llama_index.core.workflow import Workflow, step from llama_index.llms.openai import OpenAI class MyWorkflow(Workflow): # 可以初始化一些共享资源,如 LLM 实例 llm = OpenAI(model="gpt-4o-mini") # 步骤将在后续定义...2.1.3 工作流入口点
使用 @step 装饰器标记的方法是工作流的“步骤”。框架会根据方法的参数类型注解和返回值类型注解,自动判断该步骤接收什么事件、产出什么事件。
from llama_index.core.workflow import step, StartEvent, StopEvent class JokeWorkflow(Workflow): llm = OpenAI(model="gpt-4o-mini") # 入口点:接收 StartEvent,产出一个我们自定义的 JokeEvent @step async def generate_joke(self, ev: StartEvent) -> JokeEvent: topic = ev.topic # 从 StartEvent 中获取参数 prompt = f"讲一个关于{topic}的笑话" response = await self.llm.acomplete(prompt) return JokeEvent(joke=str(response)) # 出口点:接收 JokeEvent,产出 StopEvent @step async def critique_joke(self, ev: JokeEvent) -> StopEvent: prompt = f"分析这个笑话的笑点:{ev.joke}" response = await self.llm.acomplete(prompt) return StopEvent(result=str(response))2.1.4 工作流程退出
当任何一个步骤返回 StopEvent 时,整个工作流会立即停止,并返回 StopEvent.result 的值。这意味着工作流可以有多个潜在的出口点——你可以根据不同的条件,在不同的步骤中决定终止流程。
class ConditionalWorkflow(Workflow): @step async def step_one(self, ev: StartEvent) -> StopEvent | NextEvent: if ev.input == "exit": # 条件满足,直接退出 return StopEvent(result="流程提前结束") # 否则继续 return NextEvent(data=ev.input)2.1.5 绘制工作流程
LlamaIndex 提供了强大的可视化工具,帮助你理解工作流的结构和执行路径。你需要先安装额外的包
pip install llama-index-utils-workflow然后就可以绘制两种图:
from llama_index.utils.workflow import ( draw_all_possible_flows, draw_most_recent_execution, ) # 1. 绘制静态流程图(基于代码分析) draw_all_possible_flows(JokeWorkflow, filename="workflow_structure.html") # 2. 绘制最近一次执行的动态轨迹(用于调试) workflow = JokeWorkflow() await workflow.run(topic="程序员") draw_most_recent_execution(workflow, filename="recent_execution.html")动态执行图特别有用——你可以清楚地看到哪些分支被实际执行了,哪一步耗时最长。
2.1.6 全局上下文/状态协同
当工作流变得复杂时,单纯靠事件传递数据会非常繁琐。例如,步骤 A 和步骤 D 都需要访问用户原始问题,如果只能通过事件传递,那么 A → B → C → D 链条上的每个中间步骤都得负责转发这个字段。
Context 对象就是来解决这个问题的。它就像一个全局的“数据黑板”,任何步骤都可以随时读取或写入数据。
from llama_index.core.workflow import Context class StatefulWorkflow(Workflow): @step async def first_step(self, ctx: Context, ev: StartEvent) -> NextEvent: # 写入数据到上下文 await ctx.set("user_query", ev.query) await ctx.set("start_time", datetime.now()) return NextEvent() @step async def second_step(self, ctx: Context, ev: NextEvent) -> StopEvent: # 从上下文读取数据,无需经过事件传递 query = await ctx.get("user_query") start_time = await ctx.get("start_time") print(f"处理查询: {query}, 耗时: {datetime.now() - start_time}") return StopEvent(result="完成")注意:ctx.get() 和 ctx.set() 是异步方法,需要使用 await。
2.1.7 多事件等待
有时候,一个步骤需要等待多个事件全部到达后才能执行。例如,你需要同时获取“用户画像”和“商品推荐”两个异步计算的结果,然后才能生成个性化推送。Context 提供了 collect_events() 方法来实现这一点。
class UserProfileEvent(Event): profile: str class RecommendationEvent(Event): items: List[str] class AggregationWorkflow(Workflow): @step async def fetch_profile(self, ctx: Context, ev: StartEvent) -> UserProfileEvent: # 模拟异步获取用户画像 await asyncio.sleep(1) return UserProfileEvent(profile="科技爱好者") @step async def fetch_recommendations(self, ctx: Context, ev: StartEvent) -> RecommendationEvent: await asyncio.sleep(1) return RecommendationEvent(items=["GPU", "机械键盘"]) @step async def aggregate(self, ctx: Context, ev: StartEvent) -> StopEvent: # 等待两个事件都到达 events = await ctx.collect_events( ev, [UserProfileEvent, RecommendationEvent] ) if events is None: # 还未收集齐,返回 None 表示等待 return None profile_event, rec_event = events result = f"为用户 {profile_event.profile} 推荐 {rec_event.items}" return StopEvent(result=result)2.1.8 手动事件触发
在默认情况下,工作流会自动将步骤的返回值作为事件发送。但如果你想在同一个步骤中发送多个事件,或者动态决定发送时机,可以使用 ctx.send_event()。
class FanOutWorkflow(Workflow): @step async def broadcast(self, ctx: Context, ev: StartEvent) -> None: # 不返回值,而是手动发送多个事件 for i in range(5): ctx.send_event(ProcessEvent(index=i)) # 发送完成信号 ctx.send_event(CompletionEvent()) @step async def process(self, ctx: Context, ev: ProcessEvent) -> None: print(f"处理任务 {ev.index}") @step async def on_complete(self, ctx: Context, ev: CompletionEvent) -> StopEvent: return StopEvent(result="所有任务完成")2.1.9 人机协同
人机协同(Human-in-the-loop)是 AI 应用中的常见需求——当模型遇到不确定情况时,可以暂停执行,等待人工介入。LlamaIndex Workflows 通过 stepwise=True 模式或配合服务端框架来实现这一点。
# 分步执行模式 handler = workflow.run(stepwise=True) async for event in handler.stream_events(): if isinstance(event, HumanInputRequiredEvent): # 暂停,等待人工输入 user_input = await get_user_input(event.question) # 将输入作为事件发送回去 handler.ctx.send_event(HumanResponseEvent(response=user_input)) await handler.run_step() result = await handler如果你使用的是 TypeScript 版本,还可以将工作流集成到 Hono/Express 等框架中,通过 API 端点实现人机交互。
2.1.10 逐步执行
逐步执行是调试复杂工作流的利器。开启 stepwise 模式后,工作流不会自动运行到底,而是每执行一个步骤就暂停,等待你手动推进。
async def debug_workflow(): workflow = MyComplexWorkflow() handler = workflow.run(stepwise=True) # 执行第一步 events = await handler.run_step() print(f"第一步产出的事件: {events}") # 执行第二步 events = await handler.run_step() print(f"第二步产出的事件: {events}") # 继续直到完成 final_result = await handler print(f"最终结果: {final_result}")2.1.11 检查点工作流程
对于长时间运行的工作流(如文档处理流水线),你肯定不希望因为一次意外崩溃而从头开始。检查点(Checkpoint)机制允许你保存工作流的完整状态,并在以后恢复执行。
class CheckpointWorkflow(Workflow): @step async def critical_step(self, ctx: Context, ev: StartEvent) -> NextEvent: # 执行一些重要操作... result = await self.do_something() # 手动保存检查点 await self.save_checkpoint(ctx, "after_critical_step") return NextEvent(data=result) # 使用检查点 workflow = CheckpointWorkflow() handler = workflow.run() try: result = await handler except Exception as e: # 发生错误,从最近的检查点恢复 last_checkpoint = workflow.get_last_checkpoint() new_handler = workflow.run_from(checkpoint=last_checkpoint) result = await new_handler2.1.12 部署工作流
工作流可以轻松地部署为 Web 服务。TypeScript 版本提供了与 Hono 框架的集成:
// TypeScript 示例 import { Hono } from "hono"; import { createHonoHandler } from "@llamaindex/workflow-core/interrupter/hono"; const app = new Hono(); app.post("/api/run", createHonoHandler( myWorkflow, async (ctx) => startEvent(await ctx.req.json()), stopEvent )); serve(app);Python 版本也可以使用 FastAPI 等框架进行类似封装。
2.2 Workflow 管道
掌握了核心组件后,让我们来看看如何使用它们构建各种实际的工作流模式。
2.2.1 基本工作流
最简单的线性工作流:StartEvent → Step1 → Step2 → StopEvent。
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event class Step1Event(Event): data: str class LinearWorkflow(Workflow): @step async def step_one(self, ev: StartEvent) -> Step1Event: print(f"步骤1处理: {ev.input}") return Step1Event(data=ev.input.upper()) @step async def step_two(self, ev: Step1Event) -> StopEvent: print(f"步骤2处理: {ev.data}") return StopEvent(result=f"最终结果: {ev.data}") async def main(): w = LinearWorkflow() result = await w.run(input="hello world") print(result) # 输出: 最终结果: HELLO WORLD2.2.2 工作流分支与循环
工作流支持条件分支和循环,只需要让步骤返回不同类型的事件即可。
class RouteEvent(Event): value: int class BranchAEvent(Event): result: str class BranchBEvent(Event): result: str class LoopEvent(Event): counter: int class BranchLoopWorkflow(Workflow): @step async def router(self, ev: StartEvent) -> RouteEvent: return RouteEvent(value=ev.number) @step async def handle_positive(self, ev: RouteEvent) -> BranchAEvent | LoopEvent: if ev.value > 0: return BranchAEvent(result=f"正数: {ev.value}") elif ev.value < 0: return BranchBEvent(result=f"负数: {ev.value}") else: # 为零时,触发循环重试 return LoopEvent(counter=1) @step async def retry_zero(self, ev: LoopEvent) -> StartEvent: if ev.counter < 3: print(f"遇到0,第{ev.counter}次重试...") return StartEvent(number=1) # 修改输入后重新开始 return BranchAEvent(result="重试次数用尽,强制视为正数") @step async def finalize(self, ev: BranchAEvent | BranchBEvent) -> StopEvent: return StopEvent(result=ev.result)2.2.3 状态维护
前面提到的 Context 是维护状态的首选方式。需要注意的是,Context 是可序列化的——你可以将整个工作流状态保存到磁盘或 Redis,之后再恢复。
class StateMaintenanceWorkflow(Workflow): @step async def accumulate(self, ctx: Context, ev: StartEvent) -> StopEvent: # 初始化或获取历史记录 history = await ctx.get("history", default=[]) history.append(ev.message) await ctx.set("history", history) if len(history) >= 5: return StopEvent(result=f"收集完毕: {history}") return None # 返回 None 表示等待更多输入2.2.4 流媒体事件
对于 LLM 生成这种长耗时操作,实时反馈进度能极大提升用户体验。ctx.write_event_to_stream() 允许你向事件流中写入中间进度,而主流程可以异步监听。
class ProgressEvent(Event): msg: str class StreamingWorkflow(Workflow): llm = OpenAI(model="gpt-4o-mini") @step async def generate(self, ctx: Context, ev: StartEvent) -> StopEvent: ctx.write_event_to_stream(ProgressEvent(msg="开始生成...")) async for chunk in self.llm.astream_complete(ev.prompt): full_response += chunk.delta # 每个 chunk 都推送进度 ctx.write_event_to_stream(ProgressEvent(msg=chunk.delta)) ctx.write_event_to_stream(ProgressEvent(msg="生成完成!")) return StopEvent(result=full_response) async def main(): w = StreamingWorkflow() handler = w.run(prompt="写一首关于 AI 的诗") # 实时监听进度 async for ev in handler.stream_events(): if isinstance(ev, ProgressEvent): print(f"进度: {ev.msg}",, flush=True) final = await handler print(f"\n\n最终结果:\n{final}")2.2.5 并发执行
工作流可以轻松实现并发。当同一个事件有多个步骤都“感兴趣”时,它们会被并发执行。此外,你也可以在单个步骤中手动发送多个事件来触发并行处理。
class ParallelWorkflow(Workflow): @step async def kickoff(self, ev: StartEvent) -> ProcessEvent: # 触发 3 个并行任务 for i in range(3): self.send_event(ProcessEvent(task_id=i, data=ev.data)) @step async def worker(self, ev: ProcessEvent) -> ResultEvent: # 这些 worker 会并发执行 await asyncio.sleep(1) # 模拟耗时操作 return ResultEvent(task_id=ev.task_id, result=f"任务{ev.task_id}完成") @step async def collector(self, ctx: Context, ev: ResultEvent) -> StopEvent | None: # 收集所有结果 results = await ctx.get("results", default=[]) results.append(ev.result) await ctx.set("results", results) if len(results) == 3: return StopEvent(result=f"全部完成: {results}") return None # 继续等待2.2.6 子类化工作流
你可以通过子类化一个已有的工作流来扩展其行为。这是实现代码复用和定制化的好方法。
class BaseWorkflow(Workflow): @step async def common_step(self, ev: StartEvent) -> IntermediateEvent: # 通用处理逻辑 return IntermediateEvent(data=ev.input.strip()) @step async def final_step(self, ev: IntermediateEvent) -> StopEvent: # 默认实现 return StopEvent(result=ev.data) class CustomWorkflow(BaseWorkflow): # 重写 final_step,提供不同的行为 @step async def final_step(self, ev: IntermediateEvent) -> StopEvent: return StopEvent(result=f"自定义结果: {ev.data.upper()}")2.2.7 嵌套工作流
工作流可以嵌套调用——一个工作流的步骤中可以运行另一个工作流。这使得你可以构建模块化的、可组合的系统。
# 定义一个子工作流 class ReflectionWorkflow(Workflow): @step async def reflect(self, ev: StartEvent) -> StopEvent: improved = f"改进后的查询: {ev.query} (经过反思优化)" return StopEvent(result=improved) # 主工作流 class MainWorkflow(Workflow): @step async def start(self, ctx: Context, ev: StartEvent, reflection_wf: Workflow) -> ProcessEvent: print("执行反思步骤...") # 运行嵌套工作流 improved = await reflection_wf.run(query=ev.query) return ProcessEvent(query=improved) @step async def process(self, ev: ProcessEvent) -> StopEvent: return StopEvent(result=f"最终处理: {ev.query}") # 使用 main = MainWorkflow() # 注入子工作流实例 main.add_workflows(reflection_wf=ReflectionWorkflow()) result = await main.run(query="初始查询") print(result) # 输出: 最终处理: 改进后的查询: 初始查询 (经过反思优化)2.2.8 可视化工作流
我们已经在 2.1.5 中介绍了可视化工具。除了生成静态图片,它还可以输出 HTML 交互式图表,非常适合在 Jupyter Notebook 中展示和调试。
# 在 Jupyter 中直接显示 from IPython.display import HTML draw_all_possible_flows(MyWorkflow, filename="temp.html") with open("temp.html", "r") as f: display(HTML(f.read()))三、本章练习题及其答案
3.1 选择题
1. 在 LlamaIndex Workflow 中,哪个事件是工作流的默认出口点?
A. StartEvent
B. StopEvent
C. Context
D. ProgressEvent
答案:B
2. 以下哪个方法用于在工作流步骤之间共享全局数据?
A. send_event()
B. collect_events()
C. ctx.set() / ctx.get()
D. write_event_to_stream()
答案:C
3. 想要实时获取 LLM 生成过程中的中间结果,应该使用什么机制?
A. 检查点 (Checkpoint)
B. 流式事件 (Streaming Events)
C. 嵌套工作流
D. 手动事件触发
答案:B
3.2 填空题
4. 使用 @step 装饰器标记的方法,其返回值的类型注解决定了该步骤产出什么 ________。
答案:事件 (Event)
5. 要从检查点恢复工作流执行,应使用 ________ 方法。
答案:run_from()
3.3 简答题
6. 简述 Context 对象和事件传递在数据共享上的区别,以及各自的适用场景。
参考答案:
- 事件传递适用于步骤间的“一次性的、需要触发下游逻辑”的数据传递,是工作流的驱动机制。
Context适用于“多个步骤需要访问的、全局性的”数据,如配置参数、数据库连接、用户会话信息等。Context避免了数据在长链条中逐层传递的繁琐,使代码更清晰。
7. 什么情况下需要使用嵌套工作流而不是简单的步骤组合?
参考答案:
嵌套工作流适用于以下场景:
- 某个子功能逻辑复杂,本身就是一个完整的工作流,希望独立开发和测试
- 需要在运行时动态替换子流程的实现(多态)
- 子工作流可能被多个不同的父工作流复用
- 希望保持主工作流的简洁性,将细节封装在子工作流中
3.4 实操题
8. 实现一个“智能客服”工作流,要求:
- 接收用户问题
- 第一步:判断问题类型(售后/售前/投诉)
- 第二步:根据类型调用不同的处理流程
- 第三步:生成最终回复
- 要求使用
Context在整个流程中记录日志
参考答案:
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent, Event, Context from enum import Enum from datetime import datetime class QuestionType(Enum): AFTER_SALES = "售后" PRE_SALES = "售前" COMPLAINT = "投诉" class ClassifyEvent(Event): qtype: QuestionType question: str class AfterSalesEvent(Event): question: str class PreSalesEvent(Event): question: str class ComplaintEvent(Event): question: str class CustomerServiceWorkflow(Workflow): @step async def classify(self, ctx: Context, ev: StartEvent) -> ClassifyEvent: # 记录日志 logs = await ctx.get("logs", default=[]) logs.append(f"[{datetime.now()}] 收到问题: {ev.question}") await ctx.set("logs", logs) # 模拟分类逻辑 question = ev.question.lower() if "退货" in question or "维修" in question: qtype = QuestionType.AFTER_SALES elif "多少钱" in question or "价格" in question: qtype = QuestionType.PRE_SALES else: qtype = QuestionType.COMPLAINT return ClassifyEvent(qtype=qtype, question=ev.question) @step async def handle_after_sales(self, ctx: Context, ev: ClassifyEvent) -> AfterSalesEvent: if ev.qtype != QuestionType.AFTER_SALES: return None # 不处理 logs = await ctx.get("logs") logs.append(f"[{datetime.now()}] 进入售后流程") await ctx.set("logs", logs) return AfterSalesEvent(question=ev.question) @step async def handle_pre_sales(self, ctx: Context, ev: ClassifyEvent) -> PreSalesEvent: if ev.qtype != QuestionType.PRE_SALES: return None logs = await ctx.get("logs") logs.append(f"[{datetime.now()}] 进入售前流程") await ctx.set("logs", logs) return PreSalesEvent(question=ev.question) @step async def handle_complaint(self, ctx: Context, ev: ClassifyEvent) -> ComplaintEvent: if ev.qtype != QuestionType.COMPLAINT: return None logs = await ctx.get("logs") logs.append(f"[{datetime.now()}] 进入投诉流程(升级处理)") await ctx.set("logs", logs) return ComplaintEvent(question=ev.question) @step async def generate_response(self, ctx: Context, ev: AfterSalesEvent | PreSalesEvent | ComplaintEvent) -> StopEvent: if isinstance(ev, AfterSalesEvent): response = f"【售后】关于「{ev.question}」,请提供订单号,我们将为您安排退货/维修。" elif isinstance(ev, PreSalesEvent): response = f"【售前】关于「{ev.question}」,我们的产品售价为 299 元,当前有优惠活动。" else: response = f"【投诉】非常抱歉给您带来不便,关于「{ev.question}」,已转接人工客服,请稍候。" logs = await ctx.get("logs") logs.append(f"[{datetime.now()}] 生成回复: {response}") await ctx.set("logs", logs) # 打印完整日志 print("\n=== 处理日志 ===") for log in logs: print(log) print("===============\n") return StopEvent(result=response) async def test(): w = CustomerServiceWorkflow() result = await w.run(question="我的产品坏了,想退货") print(f"最终回复: {result}")四、总结
LlamaIndex Workflows 为我们提供了一个优雅的框架来编排复杂的 AI 应用。通过事件驱动的架构,我们可以将业务逻辑拆解为独立、可测试的步骤,并通过 Context 实现跨步骤的状态共享。可视化工具和检查点机制则为调试和生产环境下的稳定性提供了有力保障。
从简单的线性 RAG 流水线,到复杂的多智能体协作系统,再到需要人工介入的审批流程,Workflows 都能胜任。掌握这一工具,意味着你在构建 LLM 应用时,不再需要在“灵活性”和“可控性”之间做取舍——两者可以兼得。
希望本文能帮助你快速上手并深入理解 LlamaIndex Workflows。接下来,不妨动手尝试实现你自己的第一个工作流吧!
🌟 感谢您耐心阅读到这里!
🚀 技术成长没有捷径,但每一次的阅读、思考和实践,都在默默缩短您与成功的距离。
💡 如果本文对您有所启发,欢迎点赞👍、收藏📌、分享📤给更多需要的伙伴!
🗣️ 期待在评论区看到您的想法、疑问或建议,我会认真回复,让我们共同探讨、一起进步~
🔔 关注我,持续获取更多干货内容!
🤗 我们下篇文章见!