Python 开源 AI 模型引入与测试全流程
Python 开源 AI 模型引入与测试全流程。涵盖环境配置、BERT 模型加载、IMDB 数据集预处理、模型微调训练、性能评估指标计算、Pytest 单元测试与集成测试、FastAPI 服务实现、Docker 容器化部署及监控日志配置。提供完整可执行代码示例与工程化最佳实践,助力开发者构建生产级 AI 应用系统。

Python 开源 AI 模型引入与测试全流程。涵盖环境配置、BERT 模型加载、IMDB 数据集预处理、模型微调训练、性能评估指标计算、Pytest 单元测试与集成测试、FastAPI 服务实现、Docker 容器化部署及监控日志配置。提供完整可执行代码示例与工程化最佳实践,助力开发者构建生产级 AI 应用系统。


本文详细介绍了在 Python 环境中引入开源 AI 模型并进行全面测试的完整技术流程。我们将以 Hugging Face Transformers 库中的 BERT 模型为例,从环境配置、模型加载、数据处理、模型训练与微调、性能评估到部署测试,提供一套完整的可执行方案。文章包含详细的原理解析、代码实现和命令操作,帮助开发者掌握开源 AI 模型集成的最佳实践。
开源 AI 模型已成为现代人工智能应用的核心组成部分。从 Google 的 BERT 到 OpenAI 的 GPT 系列,再到 Meta 的 Llama,开源模型推动了 AI 技术的民主化进程。Hugging Face 作为目前最流行的开源 AI 模型社区,提供了超过 10 万个预训练模型和 1 万个数据集。
本文选择以下技术栈:
实现一个完整的 BERT 文本分类模型引入流程,包括:
# 检查系统环境
python --version # Python 3.8+
nvidia-smi # GPU 支持(可选但推荐)
# 创建项目目录
mkdir openai-introduction && cd openai-introduction
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# Linux/Mac
source venv/bin/activate
# Windows
venv\Scripts\activate
创建 requirements.txt:
# 核心 AI 库
torch>=2.0.0
transformers>=4.30.0
datasets>=2.12.0
accelerate>=0.20.0
# 数据处理
numpy>=1.24.0
pandas>=2.0.0
scikit-learn>=1.3.0
# 实验跟踪
wandb>=0.15.0
tensorboard>=2.13.0
# API 服务
fastapi>=0.100.0
uvicorn[standard]>=0.23.0
pydantic>=2.0.0
# 测试工具
pytest>=7.4.0
hypothesis>=6.82.0
pytest-benchmark>=4.0.0
# 开发工具
black>=23.0.0
flake8>=6.0.0
mypy>=1.5.0
pre-commit>=3.3.0
# 模型优化
optimum>=1.12.0
onnxruntime>=1.15.0
# 其他工具
jupyter>=1.0.0
ipython>=8.14.0
matplotlib>=3.7.0
seaborn>=0.12.0
# 安装基础依赖
pip install -r requirements.txt
# 安装带 CUDA 支持的 PyTorch(如使用 GPU)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
创建项目目录结构:
openai-introduction/
├── src/
│ ├── __init__.py
│ ├── data/
│ │ ├── __init__.py
│ │ ├── processor.py
│ │ └── dataset.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── bert_classifier.py
│ │ └── model_utils.py
│ ├── training/
│ │ ├── __init__.py
│ │ ├── trainer.py
│ │ └── optimizer.py
│ ├── evaluation/
│ │ ├── __init__.py
│ │ ├── metrics.py
│ │ └── visualization.py
│ └── api/
│ ├── __init__.py
│ ├── app.py
│ └── schemas.py
├── tests/
│ ├── __init__.py
│ ├── test_data.py
│ ├── test_model.py
│ ├── test_training.py
│ └── test_api.py
├── notebooks/
│ ├── 01_exploratory_analysis.ipynb
│ └── 02_model_experiments.ipynb
├── configs/
│ ├── base_config.yaml
│ └── train_config.yaml
├── scripts/
│ ├── train.py
│ ├── evaluate.py
│ └── deploy.py
├── .pre-commit-config.yaml
├── Dockerfile
├── docker-compose.yml
├── pyproject.toml
├── README.md
└── requirements.txt
BERT(Bidirectional Encoder Representations from Transformers)是基于 Transformer 编码器的预训练语言模型。其核心创新在于双向上下文理解,通过 Masked Language Model(MLM)和 Next Sentence Prediction(NSP)任务进行预训练。
import math
from typing import Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiHeadAttention(nn.Module):
"""多头注意力机制实现"""
def __init__(self, embed_dim: int, num_heads: int, dropout: float = 0.1):
super().__init__()
assert embed_dim % num_heads == 0
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.q_proj = nn.Linear(embed_dim, embed_dim)
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
self.scaling = self.head_dim ** -0.5
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, torch.Tensor]:
batch_size = query.size(0)
# 线性变换并 reshape 为多头
q = self.q_proj(query).view(batch_size, -1, .num_heads, .head_dim).transpose(, )
k = .k_proj(key).view(batch_size, -, .num_heads, .head_dim).transpose(, )
v = .v_proj(value).view(batch_size, -, .num_heads, .head_dim).transpose(, )
attn_scores = torch.matmul(q, k.transpose(-, -)) * .scaling
attention_mask :
attn_scores = attn_scores.masked_fill(attention_mask == , -)
attn_probs = F.softmax(attn_scores, dim=-)
attn_probs = .dropout(attn_probs)
attn_output = torch.matmul(attn_probs, v)
attn_output = attn_output.transpose(, ).contiguous().view(
batch_size, -, .embed_dim
)
attn_output = .out_proj(attn_output)
attn_output, attn_probs
(nn.Module):
():
().__init__()
.self_attn = MultiHeadAttention(embed_dim, num_heads, dropout)
.attn_norm = nn.LayerNorm(embed_dim)
.ffn = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(ff_dim, embed_dim),
nn.Dropout(dropout)
)
.ffn_norm = nn.LayerNorm(embed_dim)
.dropout = nn.Dropout(dropout)
():
residual = x
attn_output, attn_weights = .self_attn(x, x, x, attention_mask)
x = .attn_norm(residual + .dropout(attn_output))
residual = x
x = .ffn_norm(residual + .ffn(x))
x, attn_weights
Hugging Face Transformers 库提供了统一的 API 接口,支持多种预训练模型。其核心设计模式基于 PreTrainedModel 和 PreTrainedTokenizer 基类。
from transformers import (
BertConfig,
BertModel,
BertTokenizer,
PreTrainedModel,
PretrainedConfig
)
from transformers.modeling_outputs import SequenceClassifierOutput
import torch.nn as nn
class BertForSequenceClassification(PreTrainedModel):
"""基于 BERT 的序列分类模型"""
def __init__(self, config: BertConfig):
super().__init__(config)
self.num_labels = config.num_labels
self.config = config
# 加载预训练 BERT 模型
self.bert = BertModel(config)
# 分类头
self.classifier = nn.Sequential(
nn.Dropout(config.hidden_dropout_prob),
nn.Linear(config.hidden_size, config.hidden_size),
nn.GELU(),
nn.Dropout(config.hidden_dropout_prob),
nn.Linear(config.hidden_size, config.num_labels)
)
# 初始化权重
self.post_init()
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
**kwargs
) -> SequenceClassifierOutput:
# BERT 前向传播
outputs = self.bert(
input_ids=input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
**kwargs
)
# 使用 [CLS] token 的表示
pooled_output = outputs.pooler_output
# 分类
logits = .classifier(pooled_output)
loss =
labels :
loss_fct = nn.CrossEntropyLoss()
loss = loss_fct(logits.view(-, .num_labels), labels.view(-))
SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions
)
我们使用 IMDB 电影评论数据集进行情感分类任务。
from datasets import load_dataset, DatasetDict
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
import torch
class DataProcessor:
"""数据处理器"""
def __init__(self, model_name: str = "bert-base-uncased", max_length: int = 512):
self.tokenizer = BertTokenizer.from_pretrained(model_name)
self.max_length = max_length
def load_imdb_dataset(self, cache_dir: str = "./data"):
"""加载 IMDB 数据集"""
# 从 Hugging Face 加载数据集
dataset = load_dataset("imdb", cache_dir=cache_dir)
# 划分验证集
train_test_split = dataset["train"].train_test_split(test_size=0.1, seed=42)
dataset_dict = DatasetDict({
"train": train_test_split["train"],
"validation": train_test_split["test"],
"test": dataset["test"]
})
return dataset_dict
def preprocess_function(self, examples):
"""预处理函数"""
# 对文本进行分词
tokenized_inputs = self.tokenizer(
examples[],
truncation=,
padding=,
max_length=.max_length,
return_tensors=
)
{
: tokenized_inputs[].tolist(),
: tokenized_inputs[].tolist(),
: examples[]
}
():
tokenized_datasets = dataset_dict.(
.preprocess_function,
batched=,
remove_columns=[, ]
)
tokenized_datasets.set_format(=, columns=[, , ])
train_dataloader = DataLoader(
tokenized_datasets[],
shuffle=,
batch_size=batch_size,
collate_fn=.collate_fn
)
val_dataloader = DataLoader(
tokenized_datasets[],
batch_size=batch_size,
collate_fn=.collate_fn
)
test_dataloader = DataLoader(
tokenized_datasets[],
batch_size=batch_size,
collate_fn=.collate_fn
)
train_dataloader, val_dataloader, test_dataloader
():
input_ids = torch.stack([item[] item batch])
attention_mask = torch.stack([item[] item batch])
labels = torch.tensor([item[] item batch])
{
: input_ids,
: attention_mask,
: labels
}
():
stats = {}
split [, , ]:
dataset = dataset_dict[split]
stats[split] = {
: (dataset),
: (dataset[]),
: (dataset) - (dataset[]),
: ((text.split()) text dataset[]) / (dataset)
}
pd.DataFrame(stats).T
import nlpaug.augmenter.word as naw
from typing import List
class DataAugmenter:
"""数据增强器"""
def __init__(self, aug_method: str = "synonym"):
self.aug_method = aug_method
if aug_method == "synonym":
self.augmenter = naw.SynonymAug(aug_src="wordnet")
elif aug_method == "contextual":
self.augmenter = naw.ContextualWordEmbsAug(
model_path='bert-base-uncased',
action="substitute"
)
elif aug_method == "back_translation":
self.augmenter = naw.BackTranslationAug(
from_model_name='facebook/wmt19-en-de',
to_model_name='facebook/wmt19-de-en'
)
else:
raise ValueError(f"Unsupported augmentation method: {aug_method}")
def augment_text(self, text: str, num_aug: int = 3) -> List[str]:
"""生成增强文本"""
augmented_texts = []
for _ in range(num_aug):
augmented_text = self.augmenter.augment(text)
augmented_texts.append(augmented_text)
augmented_texts
():
augmented_texts = []
augmented_labels = []
text, label (dataset[], dataset[]):
augmented_texts.append(text)
augmented_labels.append(label)
_ (num_aug_per_sample):
augmented_text = .augmenter.augment(text)
augmented_texts.append(augmented_text)
augmented_labels.append(label)
{: augmented_texts, : augmented_labels}
from dataclasses import dataclass
from typing import Optional, Dict, Any
import yaml
from transformers import TrainingArguments
@dataclass
class TrainingConfig:
"""训练配置"""
# 模型配置
model_name: str = "bert-base-uncased"
num_labels: int = 2
dropout_rate: float = 0.1
# 训练配置
batch_size: int = 32
gradient_accumulation_steps: int = 1
num_epochs: int = 3
learning_rate: float = 2e-5
weight_decay: float = 0.01
warmup_steps: int = 500
# 优化器配置
optimizer: str = "adamw"
scheduler: str = "linear"
# 实验跟踪
logging_steps: int = 100
eval_steps: int = 500
save_steps: int = 1000
# 硬件配置
fp16: bool = True
device: str = "cuda" if torch.cuda.is_available()
():
(yaml_path, ) f:
config_dict = yaml.safe_load(f)
cls(**config_dict)
() -> TrainingArguments:
TrainingArguments(
output_dir=output_dir,
overwrite_output_dir=,
num_train_epochs=.num_epochs,
per_device_train_batch_size=.batch_size,
per_device_eval_batch_size=.batch_size,
gradient_accumulation_steps=.gradient_accumulation_steps,
learning_rate=.learning_rate,
weight_decay=.weight_decay,
warmup_steps=.warmup_steps,
logging_dir=,
logging_steps=.logging_steps,
eval_steps=.eval_steps,
save_steps=.save_steps,
evaluation_strategy=,
save_strategy=,
load_best_model_at_end=,
metric_for_best_model=,
greater_is_better=,
fp16=.fp16,
report_to=[],
run_name=
)
import torch
from torch.utils.data import DataLoader
from transformers import Trainer, AdamW, get_linear_schedule_with_warmup
from typing import Dict, List, Optional, Tuple
import numpy as np
from tqdm.auto import tqdm
class CustomTrainer:
"""自定义训练器"""
def __init__(
self,
model,
train_config: TrainingConfig,
train_dataloader: DataLoader,
val_dataloader: DataLoader,
test_dataloader: Optional[DataLoader] = None
):
self.model = model
self.config = train_config
self.train_dataloader = train_dataloader
self.val_dataloader = val_dataloader
self.test_dataloader = test_dataloader
# 设备设置
self.device = torch.device(train_config.device)
self.model.to(self.device)
# 优化器和调度器
self.optimizer = self._create_optimizer()
self.scheduler = self._create_scheduler()
# 训练状态
self.global_step = 0
self.best_metric = 0.0
self.history = {"train_loss": [], "val_loss": [], : [], : []}
() -> torch.optim.Optimizer:
no_decay = [, ]
optimizer_grouped_parameters = [
{
: [
p n, p .model.named_parameters()
(nd n nd no_decay)
],
: .config.weight_decay
},
{
: [
p n, p .model.named_parameters()
(nd n nd no_decay)
],
:
}
]
.config.optimizer == :
AdamW(
optimizer_grouped_parameters,
lr=.config.learning_rate,
eps=
)
:
ValueError()
():
total_steps = (.train_dataloader) * .config.num_epochs
.config.scheduler == :
get_linear_schedule_with_warmup(
.optimizer,
num_warmup_steps=.config.warmup_steps,
num_training_steps=total_steps
)
:
ValueError()
() -> [, ]:
.model.train()
total_loss =
progress_bar = tqdm(
.train_dataloader,
desc=,
leave=
)
batch progress_bar:
batch = {k: v.to(.device) k, v batch.items()}
outputs = .model(**batch)
loss = outputs.loss
loss.backward()
torch.nn.utils.clip_grad_norm_(
.model.parameters(),
max_norm=
)
.optimizer.step()
.scheduler.step()
.optimizer.zero_grad()
total_loss += loss.item()
.global_step +=
progress_bar.set_postfix({
: loss.item(),
: .scheduler.get_last_lr()[]
})
.history[].append(
.scheduler.get_last_lr()[]
)
.global_step % .config.eval_steps == :
val_metrics = .evaluate()
.history[].append(val_metrics[])
.history[].append(val_metrics[])
val_metrics[] > .best_metric:
.best_metric = val_metrics[]
.save_model()
.model.train()
avg_loss = total_loss / (.train_dataloader)
.history[].append(avg_loss)
{: avg_loss}
() -> [, ]:
dataloader :
dataloader = .val_dataloader
.model.()
total_loss =
all_preds = []
all_labels = []
torch.no_grad():
batch tqdm(dataloader, desc=, leave=):
batch = {k: v.to(.device) k, v batch.items()}
outputs = .model(**batch)
loss = outputs.loss
logits = outputs.logits
total_loss += loss.item()
preds = torch.argmax(logits, dim=-)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch[].cpu().numpy())
sklearn.metrics accuracy_score, precision_recall_fscore_support
accuracy = accuracy_score(all_labels, all_preds)
precision, recall, f1, _ = precision_recall_fscore_support(
all_labels, all_preds,
average=
)
avg_loss = total_loss / (dataloader)
{
: avg_loss,
: accuracy,
: precision,
: recall,
: f1
}
():
()
()
()
epoch (.config.num_epochs):
()
()
()
train_metrics = .train_epoch(epoch)
val_metrics = .evaluate()
()
()
()
()
()
.test_dataloader :
test_metrics = .evaluate(.test_dataloader)
()
()
()
()
.history
():
torch.save({
: .model.state_dict(),
: .optimizer.state_dict(),
: .scheduler.state_dict(),
: .config,
: .history,
: .global_step,
: .best_metric
}, )
.model.save_pretrained()
():
checkpoint = torch.load(load_path, map_location=.device)
.model.load_state_dict(checkpoint[])
.optimizer.load_state_dict(checkpoint[])
.scheduler.load_state_dict(checkpoint[])
.history = checkpoint[]
.global_step = checkpoint[]
.best_metric = checkpoint[]
#!/usr/bin/env python3
""" 训练脚本 """
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import torch
import wandb
from transformers import BertForSequenceClassification
from src.data.processor import DataProcessor
from src.training.trainer import CustomTrainer, TrainingConfig
from src.models.model_utils import set_seed
def main():
# 设置随机种子
set_seed(42)
# 初始化 WandB
wandb.init(
project="bert-imdb-classification",
config={
"model": "bert-base-uncased",
"dataset": "imdb",
"epochs": 3,
"batch_size": 32,
"learning_rate": 2e-5
}
)
# 加载配置
config = TrainingConfig()
# 数据准备
print("Loading and preprocessing data...")
processor = DataProcessor(config.model_name)
dataset_dict = processor.load_imdb_dataset()
# 数据分析
stats_df = processor.analyze_dataset(dataset_dict)
print("\nDataset Statistics:")
print(stats_df)
# 准备数据加载器
train_dataloader, val_dataloader, test_dataloader = processor.prepare_dataset(
dataset_dict,
batch_size=config.batch_size
)
# 加载模型
print(f"\nLoading model: ")
model = BertForSequenceClassification.from_pretrained(
config.model_name,
num_labels=config.num_labels,
hidden_dropout_prob=config.dropout_rate,
attention_probs_dropout_prob=config.dropout_rate
)
trainer = CustomTrainer(
model=model,
train_config=config,
train_dataloader=train_dataloader,
val_dataloader=val_dataloader,
test_dataloader=test_dataloader
)
()
history = trainer.train()
trainer.save_model()
wandb.log({: trainer.best_metric})
wandb.finish()
()
history
__name__ == :
history = main()
import numpy as np
from sklearn.metrics import (
accuracy_score,
precision_score,
recall_score,
f1_score,
roc_auc_score,
confusion_matrix,
classification_report
)
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Any
class ModelEvaluator:
"""模型评估器"""
def __init__(self, model, tokenizer, device: str = "cuda"):
self.model = model
self.tokenizer = tokenizer
self.device = device
self.model.to(device)
self.model.eval()
def predict(self, texts: List[str], batch_size: int = 32) -> Tuple[np.ndarray, np.ndarray]:
"""批量预测"""
all_logits = []
all_probs = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
# 编码
inputs = self.tokenizer(
batch_texts,
truncation=True,
padding=True,
max_length=512,
return_tensors=
)
inputs = {k: v.to(.device) k, v inputs.items()}
torch.no_grad():
outputs = .model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-)
all_logits.append(logits.cpu().numpy())
all_probs.append(probs.cpu().numpy())
logits_array = np.vstack(all_logits)
probs_array = np.vstack(all_probs)
logits_array, probs_array
() -> [, ]:
logits, probs = .predict(texts)
preds = np.argmax(probs, axis=)
metrics = {
: accuracy_score(labels, preds),
: precision_score(labels, preds, average=),
: recall_score(labels, preds, average=),
: f1_score(labels, preds, average=),
: roc_auc_score(labels, probs[:, ])
}
cm = confusion_matrix(labels, preds)
report = classification_report(
labels, preds,
target_names=[, ],
output_dict=
)
confidence_scores = np.(probs, axis=)
{
: metrics,
: cm,
: report,
: preds,
: probs,
: confidence_scores
}
():
errors = []
i, (text, label, pred) ((texts, labels, preds)):
label != pred:
errors.append({
: text[:] + (text) > text,
: label == ,
: pred == ,
: (text.split())
})
errors
():
plt.figure(figsize=(, ))
sns.heatmap(
cm,
annot=,
fmt=,
cmap=,
xticklabels=[, ],
yticklabels=[, ]
)
plt.xlabel()
plt.ylabel()
plt.title()
save_path:
plt.savefig(save_path, dpi=, bbox_inches=)
plt.show()
():
sklearn.metrics roc_curve
fpr, tpr, thresholds = roc_curve(labels, probs[:, ])
plt.figure(figsize=(, ))
plt.plot(
fpr, tpr,
label=
)
plt.plot([, ], [, ], , label=)
plt.xlabel()
plt.ylabel()
plt.title()
plt.legend()
plt.grid()
save_path:
plt.savefig(save_path, dpi=, bbox_inches=)
plt.show()
():
sklearn.calibration calibration_curve
prob_true, prob_pred = calibration_curve(
labels, probs[:, ], n_bins=n_bins
)
plt.figure(figsize=(, ))
plt.plot(prob_pred, prob_true, , label=)
plt.plot([, ], [, ], , label=)
plt.xlabel()
plt.ylabel()
plt.title()
plt.legend()
plt.grid()
plt.show()
bin_edges = np.linspace(, , n_bins + )
bin_indices = np.digitize(probs[:, ], bin_edges) -
ece =
i (n_bins):
mask = bin_indices == i
np.(mask) > :
bin_prob_mean = np.mean(probs[mask, ])
bin_accuracy = np.mean(labels[mask] == )
ece += np.(bin_prob_mean - bin_accuracy) * np.(mask)
ece /= (labels)
{: ece, : (prob_true, prob_pred)}
import time
from typing import Dict, List
import psutil
import GPUtil
from memory_profiler import memory_usage
import numpy as np
class PerformanceBenchmark:
"""性能基准测试"""
def __init__(self, model, tokenizer, device: str = "cuda"):
self.model = model
self.tokenizer = tokenizer
self.device = device
def measure_inference_time(
self,
texts: List[str],
batch_sizes: List[int] = [1, 4, 8, 16, 32, 64]
) -> Dict[int, Dict[str, float]]:
"""测量推理时间"""
results = {}
for batch_size in batch_sizes:
print(f"\nTesting batch size: {batch_size}")
# 预热
warmup_texts = ["This is a warmup sentence."] * batch_size
self.predict_batch(warmup_texts)
# 实际测试
times = []
i (, (texts), batch_size):
batch_texts = texts[i:i+batch_size]
start_time = time.perf_counter()
.predict_batch(batch_texts)
end_time = time.perf_counter()
times.append(end_time - start_time)
avg_time = np.mean(times)
throughput = (texts) / np.(times)
results[batch_size] = {
: avg_time,
: throughput,
: throughput,
: np.(times)
}
()
()
results
():
inputs = .tokenizer(
texts,
truncation=,
padding=,
max_length=,
return_tensors=
)
inputs = {k: v.to(.device) k, v inputs.items()}
torch.no_grad():
outputs = .model(**inputs)
outputs
():
memory_results = {}
length text_lengths:
test_text = .join([] * length)
texts = [test_text] *
mem_usage = memory_usage((.predict_batch, (texts,)), interval=)
memory_results[length] = {
: (mem_usage),
: np.mean(mem_usage),
: length
}
memory_results
():
threading
gpu_stats = []
stop_monitor =
():
stop_monitor:
gpus = GPUtil.getGPUs()
gpu gpus:
gpu_stats.append({
: time.time(),
: gpu.memoryUsed,
: gpu.memoryTotal,
: gpu.load * ,
: gpu.temperature
})
time.sleep()
monitor_thread = threading.Thread(target=monitor_gpu)
monitor_thread.start()
start_time = time.time()
batch_size =
time.time() - start_time < duration:
i (, ((texts), ), batch_size):
batch_texts = texts[i:i+batch_size]
.predict_batch(batch_texts)
stop_monitor =
monitor_thread.join()
gpu_stats
() -> :
report = []
report.append( * )
report.append()
report.append( * )
report.append()
report.append( * )
batch_size, metrics benchmark_results[].items():
report.append(
)
report.append()
report.append( * )
length, metrics benchmark_results[].items():
report.append(
)
report.append()
report.append( * )
cpu_percent = psutil.cpu_percent(interval=)
memory = psutil.virtual_memory()
report.append()
report.append()
report.append()
.join(report)
import pytest
import tempfile
from hypothesis import given, strategies as st
from hypothesis.extra.numpy import arrays
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader
class TestDataProcessor:
"""数据处理器测试"""
def setup_method(self):
self.processor = DataProcessor("bert-base-uncased")
def test_tokenization(self):
"""测试分词功能"""
text = "This is a test sentence."
tokenized = self.processor.tokenizer(
text,
truncation=True,
padding="max_length",
max_length=128
)
assert "input_ids" in tokenized
assert "attention_mask" in tokenized
assert len(tokenized["input_ids"]) == 128
@given(
st.text(min_size=1, max_size=1000),
st.integers(min_value=0, max_value=1)
)
def test_preprocess_function():
examples = {: [text], : [label]}
result = .processor.preprocess_function(examples)
result
result
result
(result[]) ==
result[][] == label
():
tempfile.TemporaryDirectory() tmpdir:
:
():
.model = BertForSequenceClassification.from_pretrained(
,
num_labels=
)
.model.()
():
batch_size =
seq_length =
input_ids = torch.randint(, , (batch_size, seq_length))
attention_mask = torch.ones((batch_size, seq_length))
torch.no_grad():
outputs = .model(
input_ids=input_ids,
attention_mask=attention_mask
)
outputs.logits.shape == (batch_size, )
outputs.logits.requires_grad ==
():
save_path = tmp_path /
.model.save_pretrained(save_path)
loaded_model = BertForSequenceClassification.from_pretrained(save_path)
(name1, param1), (name2, param2) (
.model.named_parameters(),
loaded_model.named_parameters()
):
name1 == name2
torch.allclose(param1, param2)
():
input_ids = torch.from_numpy(input_ids)
attention_mask = torch.from_numpy(attention_mask)
torch.no_grad():
outputs = .model(
input_ids=input_ids,
attention_mask=attention_mask
)
outputs.logits.shape[] == input_ids.shape[]
:
():
config = TrainingConfig(
batch_size=,
num_epochs=,
learning_rate=
)
train_dataset = TensorDataset(
torch.randint(, , (, )),
torch.ones((, )),
torch.randint(, , (,))
)
train_dataloader = DataLoader(
train_dataset,
batch_size=config.batch_size
)
val_dataloader = DataLoader(
train_dataset,
batch_size=config.batch_size
)
model = BertForSequenceClassification.from_pretrained(
,
num_labels=
)
trainer = CustomTrainer(
model=model,
train_config=config,
train_dataloader=train_dataloader,
val_dataloader=val_dataloader
)
initial_loss = trainer.evaluate()[]
trainer.train_epoch()
final_loss = trainer.evaluate()[]
final_loss < initial_loss torch.isclose(
torch.tensor(final_loss),
torch.tensor(initial_loss),
rtol=
)
():
config = TrainingConfig(
batch_size=,
gradient_accumulation_steps=,
learning_rate=
)
config.gradient_accumulation_steps ==
__name__ == :
pytest.main([__file__, , ])
import asyncio
from fastapi.testclient import TestClient
import json
class TestAPI:
"""API 测试"""
def setup_method(self):
from src.api.app import app
self.client = TestClient(app)
def test_health_endpoint(self):
"""测试健康检查端点"""
response = self.client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "healthy"}
def test_predict_endpoint(self):
"""测试预测端点"""
test_data = {
"text": "This movie was absolutely fantastic!",
"model_version": "latest"
}
response = self.client.post("/predict", json=test_data)
assert response.status_code == 200
result = response.json()
assert "prediction" in result
assert "confidence" in result
assert "label" in result
assert result["confidence"] >= 0 result[] <=
():
test_data = {
: [
,
,
]
}
response = .client.post(, json=test_data)
response.status_code ==
results = response.json()
(results) ==
result results:
result
result
():
test_data = {
: ,
}
response = .client.post(, json=test_data)
response.status_code ==
():
():
test_data = {
: ,
:
}
response = .client.post(, json=test_data)
response.status_code
tasks = [make_request() _ ()]
results = asyncio.gather(*tasks)
(status == status results)
():
response = .client.get()
response.status_code ==
models = response.json()
models
models
():
response = .client.get()
response.status_code ==
metrics = response.json()
metrics
metrics
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, validator
from typing import List, Optional, Dict, Any
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
from datetime import datetime
import pickle
import hashlib
import torch
from transformers import BertForSequenceClassification, BertTokenizer
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 定义数据模型
class PredictionRequest(BaseModel):
"""预测请求模型"""
text: str = Field(..., min_length=1, max_length=5000)
model_version: Optional[str] = "latest"
@validator('text')
def text_not_empty(cls, v):
if not v.strip():
raise ValueError('Text cannot be empty')
return v.strip()
class ():
texts: [] = Field(..., min_items=, max_items=)
model_version: [] =
():
(text.strip() text v):
ValueError()
[text.strip() text v]
():
prediction:
label:
confidence:
model_version:
request_id:
processing_time:
():
predictions: [[, ]]
batch_id:
total_processed:
processing_time:
:
():
.model_path = model_path
.models = {}
.active_model =
.model_versions = []
.executor = ThreadPoolExecutor(max_workers=)
.load_models()
():
os
glob
model_dirs = glob.glob(os.path.join(.model_path, ))
model_dir model_dirs:
os.path.isdir(model_dir):
:
model_name = os.path.basename(model_dir)
model = BertForSequenceClassification.from_pretrained(model_dir)
tokenizer = BertTokenizer.from_pretrained(model_dir)
.models[model_name] = {
: model,
: tokenizer,
: datetime.now(),
: {
: ,
:
}
}
.model_versions.append(model_name)
.active_model :
.active_model = model_name
logger.info()
Exception e:
logger.error()
.models:
logger.warning()
.load_default_model()
():
:
model_name =
model = BertForSequenceClassification.from_pretrained(
model_name,
num_labels=
)
tokenizer = BertTokenizer.from_pretrained(model_name)
.models[model_name] = {
: model,
: tokenizer,
: datetime.now(),
: {
: ,
:
}
}
.active_model = model_name
.model_versions.append(model_name)
logger.info()
Exception e:
logger.error()
() -> [, ]:
loop = asyncio.get_event_loop()
result = loop.run_in_executor(
.executor,
.predict_sync,
text,
model_version
)
result
() -> [, ]:
start_time = datetime.now()
model_key = model_version model_version != .active_model
model_key .models:
ValueError()
model_info = .models[model_key]
model = model_info[]
tokenizer = model_info[]
inputs = tokenizer(
text,
truncation=,
padding=,
max_length=,
return_tensors=
)
torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-)
prediction = torch.argmax(probs, dim=-).item()
confidence = probs[][prediction].item()
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds()
model_info[][] +=
current_avg = model_info[][]
total_preds = model_info[][]
model_info[][] = (
(current_avg * (total_preds - ) + processing_time) / total_preds
)
{
: prediction,
: prediction == ,
: confidence,
: model_key,
: processing_time
}
() -> [[, ]]:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
.executor,
.predict_sync,
text,
model_version
)
text texts
]
results = asyncio.gather(*tasks)
results
() -> [, ]:
stats = {}
model_name, model_info .models.items():
stats[model_name] = {
: model_info[].isoformat(),
: model_info[][],
: model_info[][],
: model_name == .active_model
}
stats
() -> :
model_version .models:
.active_model = model_version
logger.info()
app = FastAPI(
title=,
description=,
version=,
docs_url=,
redoc_url=
)
app.add_middleware(
CORSMiddleware,
allow_origins=[],
allow_credentials=,
allow_methods=[],
allow_headers=[],
)
model_manager =
request_counter =
cache = {}
startup_time = datetime.now()
() -> :
request_counter
request_counter +=
text_hash = hashlib.md5(text.encode()).hexdigest()[:]
timestamp = datetime.now().strftime()
():
():
cache_key =
cache_key cache:
cached_time, result = cache[cache_key]
(datetime.now() - cached_time).total_seconds() < :
logger.info()
result
result = func(text, model_version, *args, **kwargs)
cache[cache_key] = (datetime.now(), result)
(cache) > :
oldest_key = (cache.keys(), key= k: cache[k][])
cache[oldest_key]
result
wrapper
():
model_manager
logger.info()
model_manager = ModelManager()
logger.info()
logger.info()
():
logger.info()
model_manager:
model_manager.executor.shutdown()
():
{
: ,
: ,
: ,
: ,
:
}
():
{
: ,
: datetime.now().isoformat(),
: (model_manager.models) model_manager ,
: model_manager.active_model model_manager
}
():
:
start_time = datetime.now()
request_id = generate_request_id(request.text)
result = model_manager.predict_async(
request.text,
request.model_version
)
processing_time = (datetime.now() - start_time).total_seconds()
background_tasks.add_task(
log_prediction,
request_id=request_id,
text_length=(request.text),
prediction=result[],
confidence=result[],
processing_time=processing_time
)
PredictionResponse(
prediction=result[],
label=result[],
confidence=result[],
model_version=result[],
request_id=request_id,
processing_time=processing_time
)
Exception e:
logger.error()
HTTPException(
status_code=,
detail=
)
():
:
start_time = datetime.now()
batch_id = hashlib.md5(.join(request.texts).encode()).hexdigest()[:]
results = model_manager.predict_batch_async(
request.texts,
request.model_version
)
processing_time = (datetime.now() - start_time).total_seconds()
predictions = []
i, (text, result) ((request.texts, results)):
request_id = generate_request_id(text)
predictions.append({
: text[:] + (text) > text,
: result[],
: result[],
: result[],
: request_id
})
BatchPredictionResponse(
predictions=predictions,
batch_id=batch_id,
total_processed=(results),
processing_time=processing_time
)
Exception e:
logger.error()
HTTPException(
status_code=,
detail=
)
():
model_manager:
HTTPException(status_code=, detail=)
stats = model_manager.get_model_stats()
{
: (model_manager.models.keys()),
: model_manager.active_model,
: stats
}
():
model_manager:
HTTPException(status_code=, detail=)
success = model_manager.switch_active_model(model_version)
success:
{
: ,
: model_manager.active_model
}
:
HTTPException(
status_code=,
detail=
)
():
cache, request_counter
total_predictions = (
model_info[][]
model_info model_manager.models.values()
)
avg_response_times = [
model_info[][]
model_info model_manager.models.values()
]
avg_response_time = (
(avg_response_times) / (avg_response_times)
avg_response_times
)
{
: total_predictions,
: request_counter,
: (cache),
: ,
: avg_response_time,
: (model_manager.models),
: (datetime.now() - startup_time).total_seconds()
}
():
{
: ,
: limit,
: model_version
}
():
log_entry = {
: datetime.now().isoformat(),
: request_id,
: text_length,
: prediction,
: confidence,
: processing_time
}
logger.info()
__name__ == :
uvicorn
uvicorn.run(
app,
host=,
port=,
log_level=,
reload=
)
# Dockerfile
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建模型目录
RUN mkdir -p models
# 下载默认模型(可选)
# RUN python -c "from transformers import BertForSequenceClassification, BertTokenizer; \
# model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=2); \
# model.save_pretrained('./models/bert-base-uncased'); \
# tokenizer = BertTokenizer.from_pretrained('bert-base-uncased'); \
# tokenizer.save_pretrained('./models/bert-base-uncased')"
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动命令
CMD ["uvicorn", "src.api.app:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
- ./logs:/app/logs
environment:
- CUDA_VISIBLE_DEVICES=0 # 如果使用 GPU
- MODEL_PATH=/app/models
- LOG_LEVEL=INFO
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# 可选:添加监控服务
prometheus:
image: prom/prometheus:latest
# logging_config.py
import logging
import logging.config
import json
import sys
from datetime import datetime
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""自定义 JSON 日志格式化器"""
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
if not log_record.get('timestamp'):
log_record['timestamp'] = datetime.utcnow().isoformat()
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
# 添加额外字段
log_record['service'] = 'bert-classification-api'
log_record['module'] = record.module
log_record['function'] = record.funcName
log_record['line'] = record.lineno
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'json': {
'()': CustomJsonFormatter,
'format': '%(timestamp)s %(level)s %(name)s %(message)s'
},
'simple': {
'format':
}
},
: {
: {
: ,
: ,
:
},
: {
: ,
: ,
: ,
: ,
: ,
:
},
: {
: ,
: ,
: ,
: ,
: ,
:
}
},
: {
: {
: [, , ],
: ,
:
},
: {
: [, ],
: ,
:
},
: {
: [],
: ,
:
}
}
}
():
logging.config.dictConfig(LOGGING_CONFIG)
():
(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
logger = logging.getLogger(__name__)
logger.critical(, exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_exception
# monitoring.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client.core import CollectorRegistry
import time
from functools import wraps
from fastapi.responses import Response
# 创建指标注册表
registry = CollectorRegistry()
# 定义指标
PREDICTION_REQUESTS = Counter(
'prediction_requests_total',
'Total number of prediction requests',
['model_version', 'endpoint'],
registry=registry
)
PREDICTION_LATENCY = Histogram(
'prediction_latency_seconds',
'Prediction latency in seconds',
['model_version', 'endpoint'],
buckets=(0.01, 0.05, 0.1, 0.5, 1.0, 5.0),
registry=registry
)
ACTIVE_MODELS = Gauge(
'active_models_total',
'Number of active models',
registry=registry
)
MODEL_LOAD_TIME = Histogram(
'model_load_time_seconds',
'Model loading time in seconds',
['model_name'],
registry=registry
)
CACHE_HITS = Counter(
'cache_hits_total',
'Total number of cache hits',
registry=registry
)
CACHE_MISSES = Counter(
'cache_misses_total',
'Total number of cache misses',
registry=registry
)
ERROR_COUNT = Counter(
'prediction_errors_total',
'Total number of prediction errors',
['error_type', 'model_version'],
registry=registry
)
def ():
():
start_time = time.time()
model_version = kwargs.get(, )
endpoint = func.__name__
PREDICTION_REQUESTS.labels(
model_version=model_version,
endpoint=endpoint
).inc()
:
result = func(*args, **kwargs)
latency = time.time() - start_time
PREDICTION_LATENCY.labels(
model_version=model_version,
endpoint=endpoint
).observe(latency)
result
Exception e:
ERROR_COUNT.labels(
error_type=(e).__name__,
model_version=model_version
).inc()
wrapper
():
ACTIVE_MODELS.((model_manager.models))
model_name, model_info model_manager.models.items():
():
model_manager:
update_model_metrics(model_manager)
Response(
generate_latest(registry),
media_type=
)
#!/usr/bin/env python3
""" 端到端测试脚本 """
import sys
import os
import time
import requests
import json
from typing import Dict, List, Any
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.data.processor import DataProcessor
from src.training.trainer import CustomTrainer
from src.evaluation.metrics import ModelEvaluator
from src.api.app import app
from fastapi.testclient import TestClient
class EndToEndTest:
"""端到端测试"""
def __init__(self, api_url: str = "http://localhost:8000"):
self.api_url = api_url
self.client = TestClient(app)
# 测试客户端
self.results = {}
def run_all_tests(self):
"""运行所有测试"""
print("=" * 60)
print("端到端测试开始")
( * )
tests = [
.test_environment,
.test_data_pipeline,
.test_model_training,
.test_model_evaluation,
.test_api_endpoints,
.test_performance,
.test_error_handling,
.test_concurrent_requests
]
test tests:
:
test_name = test.__name__
()
( * )
result = test()
.results[test_name] = {: , : result}
()
Exception e:
.results[test_name] = {: , : (e)}
()
.generate_report()
() -> [, ]:
python_version = sys.version_info
python_version.major == python_version.minor >=
torch
transformers
{
: ,
: torch.__version__,
: transformers.__version__,
: torch.cuda.is_available()
}
() -> [, ]:
processor = DataProcessor()
test_text =
tokenized = processor.tokenizer(
test_text,
truncation=,
padding=,
max_length=
)
tokenized
(tokenized[]) ==
batch_texts = [, , ]
batch_labels = [, , ]
batch = processor.collate_fn([
{: torch.tensor([] * ),
: torch.tensor([] * ),
: label}
label batch_labels
])
batch[].shape[] ==
batch[].shape[] ==
{
: ,
:
}
() -> [, ]:
transformers BertForSequenceClassification
model = BertForSequenceClassification.from_pretrained(
,
num_labels=
)
train_data = torch.utils.data.TensorDataset(
torch.randint(, , (, )),
torch.ones((, )),
torch.randint(, , (,))
)
train_dataloader = torch.utils.data.DataLoader(
train_data,
batch_size=
)
val_dataloader = torch.utils.data.DataLoader(
train_data,
batch_size=
)
src.training.train_config TrainingConfig
config = TrainingConfig(
batch_size=,
num_epochs=,
learning_rate=
)
trainer = CustomTrainer(
model=model,
train_config=config,
train_dataloader=train_dataloader,
val_dataloader=val_dataloader
)
initial_metrics = trainer.evaluate()
trainer.train_epoch()
final_metrics = trainer.evaluate()
final_metrics[] <= initial_metrics[] *
{
: initial_metrics[],
: final_metrics[],
:
}
() -> [, ]:
transformers BertForSequenceClassification, BertTokenizer
model = BertForSequenceClassification.from_pretrained(
,
num_labels=
)
tokenizer = BertTokenizer.from_pretrained()
evaluator = ModelEvaluator(model, tokenizer, device=)
test_texts = [
,
,
]
test_labels = [, , ]
results = evaluator.evaluate_classification(test_texts, test_labels)
results[]
results
{
: results[],
: (results[])
}
() -> [, ]:
response = .client.get()
response.status_code ==
health_data = response.json()
health_data[] ==
test_data = {
: ,
:
}
response = .client.post(, json=test_data)
response.status_code ==
prediction_data = response.json()
prediction_data
prediction_data
prediction_data[] >=
batch_data = {
: [
,
,
]
}
response = .client.post(, json=batch_data)
response.status_code ==
batch_result = response.json()
(batch_result[]) ==
response = .client.get()
response.status_code ==
models_data = response.json()
models_data
{
: ,
: ,
: ,
:
}
() -> [, ]:
time
test_cases = [
,
* ,
*
]
latencies = []
text test_cases:
start_time = time.perf_counter()
response = .client.post(, json={: text})
end_time = time.perf_counter()
latency = end_time - start_time
latencies.append(latency)
response.status_code ==
latency <
batch_texts = [ i ()]
start_time = time.perf_counter()
response = .client.post(, json={: batch_texts})
end_time = time.perf_counter()
batch_latency = end_time - start_time
response.status_code ==
batch_latency <
avg_latency_per_request = batch_latency / (batch_texts)
{
: latencies,
: batch_latency,
: avg_latency_per_request,
: (batch_texts) / batch_latency
}
() -> [, ]:
response = .client.post(, json={: })
response.status_code ==
response = .client.post(, json={
: ,
:
})
response.status_code [, ]
response = .client.post(, json={: []})
response.status_code ==
{
: ,
: ,
:
}
() -> [, ]:
concurrent.futures
time
test_texts = [ i ()]
():
start_time = time.perf_counter()
response = .client.post(, json={: text})
end_time = time.perf_counter()
{
: response.status_code,
: end_time - start_time
}
concurrent.futures.ThreadPoolExecutor(max_workers=) executor:
futures = [
executor.submit(make_request, text)
text test_texts
]
results = [
future.result()
future concurrent.futures.as_completed(futures)
]
status_codes = [r[] r results]
latencies = [r[] r results]
(code == code status_codes)
avg_latency = np.mean(latencies)
avg_latency <
{
: (results),
: ( code status_codes code == ) / (results),
: avg_latency,
: (latencies)
}
():
( + * )
()
( * )
total_tests = (.results)
passed_tests = ( result .results.values() result[] == )
()
()
()
total_tests - passed_tests > :
()
test_name, result .results.items():
result[] == :
()
report_data = {
: {
: total_tests,
: passed_tests,
: total_tests - passed_tests,
: passed_tests / total_tests total_tests >
},
: .results,
: time.strftime()
}
(, ) f:
json.dump(report_data, f, indent=, default=)
()
passed_tests == total_tests:
()
:
()
():
tester = EndToEndTest()
:
success = tester.run_all_tests()
success:
( + * )
()
( * )
sys.exit()
:
( + * )
()
( * )
sys.exit()
Exception e:
()
traceback
traceback.print_exc()
sys.exit()
__name__ == :
main()
# .github/workflows/test.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8, 3.9, 3.10]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov flake8 mypy
- name: Lint with flake8
run: |
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
from optimum.onnxruntime import ORTModelForSequenceClassification
from optimum.onnxruntime import ORTQuantizer
from optimum.onnxruntime.configuration import AutoQuantizationConfig
import onnxruntime as ort
import time
from typing import Dict, Any
class ModelOptimizer:
"""模型优化器"""
def __init__(self, model_path: str):
self.model_path = model_path
def convert_to_onnx(self, output_path: str = "./models/onnx"):
"""转换为 ONNX 格式"""
from optimum.onnxruntime import ORTModelForSequenceClassification
# 加载模型并转换为 ONNX
model = ORTModelForSequenceClassification.from_pretrained(
self.model_path,
from_transformers=True,
export=True
)
# 保存 ONNX 模型
model.save_pretrained(output_path)
return output_path
def quantize_model(
self,
model_path: str,
quantization_config: str = "avx512_vnni"
):
"""量化模型"""
# 加载模型
model = ORTModelForSequenceClassification.from_pretrained(model_path)
# 创建量化器
quantizer = ORTQuantizer.from_pretrained(model_path)
qconfig = AutoQuantizationConfig.avx512_vnni(is_static=, per_channel=)
quantizer.quantize(
save_dir=,
quantization_config=qconfig
)
():
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads =
sess_options.inter_op_num_threads =
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
sess_options.enable_cpu_mem_arena =
sess_options.enable_mem_pattern =
session = ort.InferenceSession(
,
sess_options=sess_options,
providers=[]
)
session
():
results = {}
variant_name, model model_variants.items():
start_time = time.time()
text test_data:
end_time = time.time()
results[variant_name] = {
: end_time - start_time,
: (end_time - start_time) / (test_data),
: (test_data) / (end_time - start_time)
}
results
from functools import lru_cache
import hashlib
import pickle
from datetime import datetime, timedelta
from typing import Dict, Any
class PredictionCache:
"""预测缓存"""
def __init__(self, max_size: int = 10000, ttl: int = 3600):
self.max_size = max_size
self.ttl = timedelta(seconds=ttl)
self.cache = {}
self.access_times = {}
self.hits = 0
self.misses = 0
def _generate_key(self, text: str, model_version: str) -> str:
"""生成缓存键"""
content = f"{model_version}:{text}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, text: str, model_version: str):
"""获取缓存值"""
key = self._generate_key(text, model_version)
if key in self.cache:
cached_time, value = self.cache[key]
datetime.now() - cached_time < .ttl:
.access_times[key] = datetime.now()
.hits +=
value
.misses +=
():
key = ._generate_key(text, model_version)
(.cache) >= .max_size:
oldest_key = (
.access_times.keys(),
key= k: .access_times[k]
)
.cache[oldest_key]
.access_times[oldest_key]
.cache[key] = (datetime.now(), value)
.access_times[key] = datetime.now()
():
now = datetime.now()
expired_keys = [
key
key, (cached_time, _) .cache.items()
now - cached_time > .ttl
]
key expired_keys:
.cache[key]
.access_times[key]
() -> [, ]:
total = .hits + .misses
hit_rate = .hits / total total >
{
: (.cache),
: .hits,
: .misses,
: hit_rate,
: .max_size,
: .ttl.total_seconds()
}
通过本项目的完整实施,我们获得了以下关键收获:
本项目可以进一步扩展为:
在将系统部署到生产环境前,请确保:
本文详细展示了开源 AI 模型从引入到测试的完整技术流程。通过这个实战项目,我们不仅学习了如何集成和使用先进的 AI 模型,更重要的是掌握了构建生产级 AI 应用的系统工程方法。这套方法论和代码框架可以应用于各种 AI 项目,为 AI 应用开发提供坚实基础。
成功的 AI 项目不仅仅是模型准确率,更是系统工程、可维护性、可扩展性和可靠性的综合体现。希望本文能成为我们 AI 工程化道路上的有力参考。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online