AI Agent 实战:核心架构与生产级落地指南
AI Agent 的核心架构,涵盖 LLM、记忆、规划与工具四大模块。通过手写生产级框架,实现基于 Redis 和向量数据库的混合记忆管理,集成 ReAct 推理范式与动态工具调用系统。提供智能客服实战案例,对比传统客服性能,并给出成本优化策略如缓存与模型混合使用。适合希望深入理解 Agent 开发及工程化落地的开发者。

AI Agent 的核心架构,涵盖 LLM、记忆、规划与工具四大模块。通过手写生产级框架,实现基于 Redis 和向量数据库的混合记忆管理,集成 ReAct 推理范式与动态工具调用系统。提供智能客服实战案例,对比传统客服性能,并给出成本优化策略如缓存与模型混合使用。适合希望深入理解 Agent 开发及工程化落地的开发者。

简单来说,AI Agent = LLM + 记忆 + 规划 + 工具。
| 技术层级 | 主流框架/工具 | 推荐指数 | 适用场景 |
|---|---|---|---|
| 编排框架 | LangChain / LangGraph | ⭐⭐⭐⭐⭐ | 复杂工作流编排 |
| 运行时 | AutoGen / AgentScope | ⭐⭐⭐⭐ | 多 Agent 协作 |
| 向量数据库 | Milvus / Chroma | ⭐⭐⭐⭐⭐ | RAG 知识库 |
| 工具生态 | OpenAI Function Calling | ⭐⭐⭐⭐⭐ | 结构化工具调用 |
| 记忆管理 | MemGPT | ⭐⭐⭐⭐ | 长对话场景 |
| 评估框架 | Ragas / TruLens | ⭐⭐⭐⭐ | 生产环境监控 |
agent-framework/
├── core/
│ ├── agent.py # Agent 核心类
│ ├── memory.py # 记忆管理模块
│ ├── planner.py # 任务规划器
│ └── tools.py # 工具注册器
├── memory/
│ ├── short_term.py # 短期记忆(Redis)
│ ├── long_term.py # 长期记忆(向量 DB)
│ └── semantic.py # 语义记忆检索
├── tools/
│ ├── base.py # 工具基类
│ ├── registry.py # 工具注册中心
│ └── builtin/ # 内置工具
├── evaluators/
│ ├── cost.py # 成本评估
│ └── performance.py# 性能评估
└── utils/
├── logger.py # 日志系统
└── retry.py # 重试机制
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import json
class AgentState(Enum):
"""Agent 状态枚举"""
IDLE = "idle" # 空闲
THINKING = "thinking" # 思考中
ACTING = "acting" # 执行中
WAITING = "waiting" # 等待外部输入
ERROR = "error" # 错误状态
@dataclass
class Message:
"""消息数据结构"""
role: str # user / assistant / system / tool
content: str # 消息内容
tool_calls: Optional[List[Dict]] = None
timestamp: float = None
metadata: Dict[str, Any] = None
class BaseAgent:
"""生产级 Agent 基类"""
def __init__(self, llm_client: Any,
memory_manager: Any = None,
tool_registry: = ,
max_iterations: = ,
verbose: = ):
.llm = llm_client
.memory = memory_manager
.tools = tool_registry
.max_iterations = max_iterations
.verbose = verbose
.state = AgentState.IDLE
.conversation_history: [Message] = []
() -> :
.conversation_history.append(Message(role=, content=user_input))
.state = AgentState.THINKING
iteration (.max_iterations):
._log()
context = ._retrieve_context(user_input)
prompt = ._build_prompt(context)
response = ._llm_inference(prompt)
response.tool_calls:
.state = AgentState.ACTING
tool_results = ._execute_tools(response.tool_calls)
result tool_results:
.conversation_history.append(
Message(role=, content=result[], tool_name=result[])
)
:
.state = AgentState.IDLE
.conversation_history.append(Message(role=, content=response.content))
response.content
() -> :
.memory:
.memory.search(query, top_k=)
() -> :
system_prompt =
system_prompt
() -> :
messages = [
{: , : prompt},
*[{: m.role, : m.content} m .conversation_history]
]
response = .llm.chat.completions.create(
model=,
messages=messages,
tools=.tools.get_tool_schemas() .tools ,
temperature=
)
response.choices[].message
() -> []:
results = []
call tool_calls:
tool_name = call[][]
arguments = json.loads(call[][])
._log()
:
tool = .tools.get_tool(tool_name)
result = tool.execute(**arguments)
results.append({: tool_name, : json.dumps(result, ensure_ascii=)})
Exception e:
results.append({: tool_name, : json.dumps({: (e)})})
results
():
.verbose:
()
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import redis
import numpy as np
from datetime import datetime, timedelta
class MemoryBackend(ABC):
@abstractmethod
async def add(self, content: str, metadata: Dict = None) -> str:
pass
@abstractmethod
async def search(self, query: str, top_k: int = 5) -> List[Dict]:
pass
class ShortTermMemory(MemoryBackend):
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
self.client = redis.from_url(redis_url)
self.ttl = ttl
async def add(self, content: str, metadata: Dict = None) -> :
memory_id =
memory_data = {: content, : metadata {}, : datetime.now().isoformat()}
.client.setex(memory_id, .ttl, json.dumps(memory_data, ensure_ascii=))
memory_id
() -> []:
keys = .client.keys()
memories = []
key keys[-top_k:]:
data = json.loads(.client.get(key))
memories.append(data)
memories
():
():
.embedding_model = embedding_model
.vector_db = vector_db
() -> :
embedding = .embedding_model.embed(content)
memory_id = .vector_db.insert(vector=embedding, payload={: content, : metadata {}})
memory_id
() -> []:
query_embedding = .embedding_model.embed(query)
results = .vector_db.search(vector=query_embedding, top_k=top_k, score_threshold=)
results
:
():
.short_term = short_term
.long_term = long_term
():
.short_term.add(content, metadata)
importance > :
.long_term.add(content, metadata)
() -> []:
short_results = .short_term.search(query, top_k // )
long_results = .long_term.search(query, top_k // )
all_results = short_results + long_results
all_results[:top_k]
ReAct(Reasoning + Acting)是目前 Agent 最主流的推理范式。
class ReActAgent(BaseAgent):
def _build_react_prompt(self, question: str) -> str:
return f"""使用以下格式回答问题:
Question: {question}
Thought: 你应该思考做什么
Action: 要采取的操作,应该是 [{self.tools.get_tool_names()}] 中的一个
Observation: 操作的结果
...
Thought: 我现在知道最终答案了
Answer: 对原始问题的最终答案
开始!
Question: {question}
Thought:"""
async def run(self, user_input: str) -> str:
prompt = self._build_react_prompt(user_input)
for _ in range(self.max_iterations):
response = await self.llm.generate(prompt)
thought, action, action_input = self._parse_react_response(response)
if not action:
return thought
observation = await self._execute_action(action, action_input)
prompt += f"\n{response}\nObservation: {observation}\nThought:"
return "无法在指定迭代次数内完成"
def _parse_react_response(self, response: str) -> tuple:
lines = response.strip().split('\n')
thought = ""
action = None
action_input =
line lines:
line.startswith(): thought = line.replace(, ).strip()
line.startswith(): action = line.replace(, ).strip()
line.startswith(): action_input = line.replace(, ).strip()
thought, action, action_input
from typing import Callable, Dict, Any, List
import inspect
from pydantic import BaseModel, Field
class Tool(BaseModel):
name: str = Field(description="工具名称")
description: str = Field(description="工具功能描述")
parameters: Dict[str, Any] = Field(default_factory=dict, description="参数 schema")
function: Callable = Field(description="工具执行函数")
class Config:
arbitrary_types_allowed = True
async def execute(self, **kwargs) -> Any:
return await self.function(**kwargs)
def to_openai_schema(self) -> Dict:
return {"type": "function", "function": {"name": self.name, "description": self.description, "parameters": self.parameters}}
def tool(name: str = None, description: = ):
() -> Tool:
sig = inspect.signature(func)
parameters = {}
param_name, param sig.parameters.items():
param_type = param.annotation param.annotation != inspect.Parameter.empty
parameters[param_name] = {: param_type.__name__ (param_type, ) , : }
Tool(
name=name func.__name__,
description=description func.__doc__ ,
parameters={: , : parameters, : [p p sig.parameters p.default == inspect.Parameter.empty]},
function=func
)
decorator
():
:
():
._tools: [, Tool] = {}
():
._tools[tool.name] = tool
() -> Tool:
._tools.get(name)
() -> []:
(._tools.keys())
() -> :
descriptions = []
tool ._tools.values():
descriptions.append()
.join(descriptions)
() -> []:
[tool.to_openai_schema() tool ._tools.values()]
class TaskPlanner:
def __init__(self, llm_client: Any):
self.llm = llm_client
async def plan(self, goal: str) -> List[Dict]:
prompt = f"""将以下目标分解为具体的、可执行的子任务列表。
目标:{goal}
请按以下格式输出:
1. [任务描述]
2. [任务描述]
... 要求:每个任务应该独立且可执行,任务之间应该有逻辑顺序。"""
response = await self.llm.generate(prompt)
tasks = []
for line in response.strip().split('\n'):
if line.strip():
task_desc = line.split('.', 1)[1].strip() if '.' in line else line.strip()
tasks.append({"task": task_desc, "order": len(tasks) + 1, "status": "pending"})
return tasks
async def execute_plan(self, agent: BaseAgent, tasks: List[Dict]) -> Dict:
completed = []
failed = []
for task in tasks:
print(f"\n执行任务 {task['order']}: {task[]}")
:
result = agent.run(task[])
task[] =
task[] = result
completed.append(task)
Exception e:
task[] =
task[] = (e)
failed.append(task)
{
: (failed) == ,
: completed,
: failed,
: completed[-][] completed
}
智能客服是 AI Agent 最典型的应用场景。我们来实现一个政务大厅智能客服,具备政策问答、办事流程引导、工单生成及人工转接能力。
import asyncio
from typing import Optional
class CustomerServiceAgent(ReActAgent):
def __init__(self, knowledge_base, ticket_system, *args, **kwargs):
super().__init__(*args, **kwargs)
self.knowledge_base = knowledge_base
self.ticket_system = ticket_system
self._register_customer_service_tools()
def _register_customer_service_tools(self):
@self.tools.register
@tool(name="search_policy", description="搜索政策信息")
async def search_policy(query: str):
results = await self.knowledge_base.search(query, top_k=3)
return "\n".join([r['content'] for r in results])
@self.tools.register
@tool(name="get_process_guide", description="获取办事流程")
async def get_process_guide(service_type: str):
guide = await self.knowledge_base.get_guide(service_type)
return guide
@self.tools.register
():
ticket_id = .ticket_system.create(category=category, description=description, priority=priority)
():
queue_number = .ticket_system.human_transfer(reason)
() -> :
intent = ._detect_intent(user_input)
system_prompt = ._get_system_prompt(intent)
.run(user_input)
() -> :
intent_prompt =
response = .llm.generate(intent_prompt)
response.strip()
() -> :
prompts = {
: ,
: ,
: ,
:
}
prompts.get(intent, )
| 指标 | 传统规则客服 | 基础 Chatbot | 智能 Agent |
|---|---|---|---|
| 问题解决率 | 35% | 60% | 85% |
| 平均响应时间 | 5 分钟 | 2 秒 | 3 秒 |
| 多轮对话能力 | ❌ | ⚠️ | ✅ |
| 工具调用能力 | ❌ | ❌ | ✅ |
| 学习进化能力 | ❌ | ⚠️ | ✅ |
| 运营成本 | 高 | 低 | 中 |
AI Agent 的主要成本来源包括 LLM Token 消耗、向量数据库存储、缓存服务及 API 调用费用。
| 优化项 | 策略 | 预期节省 |
|---|---|---|
| Prompt 优化 | 精简系统提示词 | 20-30% |
| 模型选择 | 混合使用 GPT-4/GPT-3.5 | 40-50% |
| 缓存策略 | 重复问题命中缓存 | 30-40% |
| Token 限制 | 动态裁剪上下文 | 15-20% |
| 批量处理 | 合并多个请求 | 10-15% |
import hashlib
from functools import wraps
import time
def smart_cache(ttl: int = 3600):
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = hashlib.md5(f"{func.__name__}{args}{kwargs}".encode()).hexdigest()
if key in cache:
cache_data = cache[key]
if time.time() - cache_data['timestamp'] < ttl:
return cache_data['result']
result = await func(*args, **kwargs)
cache[key] = {'result': result, 'timestamp': time.time()}
return result
return wrapper
return decorator
class OptimizedAgent(BaseAgent):
@smart_cache(ttl=1800)
async def _llm_inference(self, prompt: str):
return await super()._llm_inference(prompt)
AI Agent 正在从实验性应用向生产力工具转变。掌握 Agent 开发已成为 AI 工程师的核心竞争力。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online