AIGC 异步回调系统架构设计与实现
本文介绍了一套用于处理 AIGC 长耗时任务(如视频、图片生成)的通用异步回调架构。系统通过接收请求立即返回 task_id,利用数据库追踪状态,并在任务完成后自动回调后端。核心设计包括分层架构(API、业务、数据、集成层)、处理器分发器模式、双回调地址机制(供应商回调与后端通知)以及完整的时间戳链路追踪。文档涵盖项目结构、核心代码逻辑、配置说明、开发指南及运维监控指标,支持多业务类型扩展与可观测性管理。

本文介绍了一套用于处理 AIGC 长耗时任务(如视频、图片生成)的通用异步回调架构。系统通过接收请求立即返回 task_id,利用数据库追踪状态,并在任务完成后自动回调后端。核心设计包括分层架构(API、业务、数据、集成层)、处理器分发器模式、双回调地址机制(供应商回调与后端通知)以及完整的时间戳链路追踪。文档涵盖项目结构、核心代码逻辑、配置说明、开发指南及运维监控指标,支持多业务类型扩展与可观测性管理。

本系统实现了一套通用的异步回调架构,用于处理 AIGC 服务(视频生成、图片生成等)的长耗时任务。
核心目标:
| 回调地址 | 说明 | 示例 | 配置方式 |
|---|---|---|---|
| algorithm_callback_url | AIGC 供应商回调我们的地址 | https://algorithm.com/api/callback | 方式 1:提交时发送 方式 2:供应商平台配置 |
| backend_callback_url | 我们完成后端通知的地址 | https://backend.com/api/notify | 后端通过请求参数传递 |
关键理解:
后端 → 算法端 → AIGC 供应商
发起 → 提交任务 → 异步处理
(1-5 分钟)
↓ ↓ ↓
←─────────收到回调─┘
↓ ↓ ↓
下载 + 上传 OSS
──────→通知后端
异步回调系统通常采用分层架构,典型结构如下:
project/
├── api/ # API 层 - HTTP 接口定义
│ ├── routes/ # 路由模块
│ │ └── callback.py # 回调统一入口
│ └── schemas/ # 请求/响应模型
├── core/ # 核心业务层
│ ├── dispatcher.py # 处理器分发器
│ ├── handlers/ # 回调处理器
│ │ ├── base.py # 处理器基类
│ │ └── video.py # 视频处理器示例
│ └── services/ # 业务逻辑
│ ├── models/ # 数据层
│ │ ├── task.py # 任务模型
│ │ └── database.py # 数据库操作
│ ├── integrations/ # 外部服务集成
│ │ ├── factory.py # 模型工厂
│ │ └── providers/ # 各厂商模型封装对象
├── config/ # 配置
│ └── settings.py # 配置管理
├── utils/ # 工具模块
│ ├── storage.py # 存储/OSS 工具
│ └── logger/ # 日志系统
架构分层:
┌─────────────────────────────────────────────┐
│ API 层 接收请求、参数验证 │
├─────────────────────────────────────────────┤
│ 业务层 回调处理、业务编排 │
├─────────────────────────────────────────────┤
│ 数据层 模型定义、数据持久化 │
├─────────────────────────────────────────────┤
│ 集成层 外部 API 调用 │
├─────────────────────────────────────────────┤
│ 工具层 通用工具函数 │
└─────────────────────────────────────────────┘
职责:统一回调入口,接收所有供应商的回调请求
核心功能:
核心接口:
@router.post("/callback")
async def handle_callback(request: Request) -> JSONResponse:
pass
支持的回调格式:
Event.TaskId(事件嵌套格式)TaskId(扁平格式)req_idtask_id关键代码片段:
# 提取 TaskId(支持多种格式)
model_task_id = (
data.get("TaskId") or
data.get("task_id") or
data.get("req_id") or
data.get("Event", {}).get("TaskId")
)
# 查询数据库
db = get_db()
task = await db.get_by_model_task_id(model_task_id)
# 路由到处理器
handler = Dispatcher.get_handler(task.business_type)
await handler.process_callback(task, data)
依赖关系:
callback.py → models/database.py (数据库查询) → core/dispatcher.py (处理器路由) → core/handlers/*.py (具体处理器)
职责:业务接口的路由定义
核心接口:
POST /api/v1/video/async - 视频异步生成GET /api/v1/video/status/{task_id} - 查询任务状态职责范围:
职责:处理器分发器,根据 business_type 路由到对应处理器
核心功能:
register() 方法注册处理器get_handler() 方法获取处理器实例设计模式:策略模式 + 工厂模式
关键代码:
class CallbackDispatcher:
"""回调处理器分发器"""
_handlers: dict[str, Type] = {}
@classmethod
def register(cls, business_type: str, handler_class: Type):
"""注册处理器"""
cls._handlers[business_type] = handler_class
logger.info(f"注册处理器:{business_type} -> {handler_class.__name__}")
@classmethod
def get_handler(cls, business_type: str):
"""获取对应的处理器实例"""
handler_class = cls._handlers.get(business_type)
if not handler_class:
raise ValueError(f"未知的业务类型:{business_type}")
return handler_class(business_type)
已注册的处理器示例:
video_generation → VideoGenerationCallbackHandlerimage_generation → ImageGenerationCallbackHandler职责:回调处理器基类,定义标准处理流程
核心功能:
process_callback() 抽象方法(子类必须实现)download_and_upload_storage()check_and_notify()extract_result_url()标准处理流程:
async def process_callback(task, callback_data):
# 1. 提取结果 URL
result_url = self.extract_result_url(callback_data)
# 2. 下载并上传存储
storage_url = await self.download_and_upload_storage(result_url, ...)
# 3. 更新数据库状态
await db.update_storage_uploaded(task.model_task_id, storage_url)
# 4. 检查是否所有子任务完成
await self.check_and_notify(task.task_id)
关键方法:
# 抽象方法(子类必须实现)
@abstractmethod
async def process_callback(self, task: Task, callback_data: dict) -> None:
"""处理回调的核心逻辑"""
pass
# 通用方法
async def download_and_upload_storage(self, url: str, task_id: str, ...) -> str:
"""从 URL 流式上传到存储(不落地)"""
pass
async def check_and_notify(self, task_id: str, results: list = None) -> None:
"""检查是否所有子任务完成,通知后端"""
pass
def extract_result_url(self, callback_data: dict) -> str:
"""从回调数据提取结果 URL(支持多种格式)"""
pass
继承关系:
BaseCallbackHandler (抽象基类)
├── VideoGenerationCallbackHandler
├── ImageGenerationCallbackHandler
└── ... (其他业务处理器)
职责:自动注册所有回调处理器
实现方式:模块导入时自动注册
# core/handlers/__init__.py
from .video_handler import VideoGenerationCallbackHandler
from .image_handler import ImageGenerationCallbackHandler
from core.dispatcher import CallbackDispatcher
# 注册处理器
CallbackDispatcher.register("video_generation", VideoGenerationCallbackHandler)
CallbackDispatcher.register("image_generation", ImageGenerationCallbackHandler)
注意:
职责:视频生成回调处理器(示例实现)
核心功能:
process_callback() 方法关键代码:
class VideoGenerationCallbackHandler(BaseCallbackHandler):
"""视频生成回调处理器"""
async def process_callback(
self, task: Task, callback_data: Dict[str, Any]
) -> None:
# 1. 提取视频 URL
video_url = self.extract_result_url(callback_data)
# 2. 流式上传到存储
storage_url = await self.download_and_upload_storage(
url=video_url, task_id=task.task_id, item_index=task.item_index
)
# 3. 更新数据库状态
await db.update_storage_uploaded(task.model_task_id, storage_url)
# 4. 检查是否所有子任务完成并通知后端
await self.check_and_notify(task.task_id)
def _build_result_item(self, task: Task) -> Dict[str, Any]:
"""构建单个结果项(业务格式)"""
request_data = task.get_request_data()
return {
"index": task.item_index,
"model_task_id": task.model_task_id,
"custom_field": request_data.get("custom_field", ""),
"storage_url": task.storage_url,
"status": task.status.value,
}
职责:视频生成业务逻辑
核心功能:
关键方法:
async def submit_video_generation_task(request: Dict[str, Any]) -> Dict[str, Any]:
"""提交视频生成任务"""
# 1. 获取算法端回调地址(配置)
algorithm_callback_url = _get_callback_url()
# 2. 获取后端回调地址(请求参数)
backend_callback_url = request.get("notify_url")
# 3. 循环提交任务
for idx, item in enumerate(request.get("items", [])):
result = await client.create_task(...)
model_task_id = result["TaskId"]
# 4. 保存到数据库
task_record = Task(
task_id=task_id,
model_task_id=model_task_id,
business_type="video_generation",
backend_callback_url=backend_callback_url,
algorithm_callback_url=algorithm_callback_url,
...
)
await db.create_task(task_record)
# 5. 返回响应
return {"status": "accepted", "task_id": task_id}
依赖关系:
video_service.py → integrations/factory.py (获取 Client) → models/database.py (数据库操作) → models/task.py (任务模型)
职责:任务数据模型定义
核心字段:
@dataclass
class Task:
# 核心关联字段
task_id: str # 业务 ID(多个子任务共享)
model_task_id: str # 外部服务返回的 TaskId(唯一)
# 业务类型(用于路由到对应处理器)
business_type: str # 如:video_generation, image_generation
provider: str # 如:provider_a, provider_b
# 任务信息
status: TaskStatus = TaskStatus.PENDING
item_index: int = 0 # 序号(方便排序和聚合)
# 请求数据
request_payload: Optional[str] = None
prompt: Optional[str] = None
# 结果数据
result_url: Optional[str] = None
storage_url: Optional[str] = None
callback_payload: Optional[str] = None
processed_data: Optional[str] = None
# 错误信息
error_message: Optional[str] = None
# 回调地址(重要!区分两个)
backend_callback_url: Optional[str] = None
algorithm_callback_url: Optional[str] = None
# 后端通知
backend_notified: bool = False
# 时间戳(链路追踪)
request_time: Optional[datetime] = None
created_at: Optional[datetime] = None
callback_received_at: Optional[datetime] = None
task_completed_time: Optional[datetime] = None
storage_uploaded_at: Optional[datetime] = None
processed_at: Optional[datetime] = None
backend_notified_at: Optional[datetime] = None
状态枚举:
class TaskStatus(str, Enum):
PENDING = "pending" # 待处理(已提交任务)
PROCESSING = "processing"# 处理中(收到回调)
COMPLETED = "completed" # 已完成
FAILED = "failed" # 失败
职责:数据库管理,提供 CRUD 操作
核心方法:
class DatabaseManager:
async def create_task(self, task: Task) -> int:
"""创建任务记录"""
pass
async def get_by_model_task_id(self, model_task_id: str) -> Optional[Task]:
"""根据 model_task_id 查询任务"""
pass
async def get_by_task_id(self, task_id: str) -> List[Task]:
"""根据 task_id 查询所有子任务"""
pass
async def update_status(
self, model_task_id: str, status: TaskStatus, error_message: str = None
) -> bool:
"""更新任务状态"""
pass
async def update_callback_received(
self, model_task_id: str, callback_payload: str, result_url: str, task_completed_time: datetime
) -> bool:
"""更新回调接收状态"""
pass
async def update_storage_uploaded(self, model_task_id: str, storage_url: str) -> bool:
"""更新存储上传状态"""
pass
async def check_all_completed(self, task_id: str) -> bool:
"""检查是否所有子任务完成"""
pass
async def update_backend_notified(self, task_id: str) -> bool:
"""更新后端通知状态"""
pass
数据库表结构:
CREATE TABLE tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL, -- 业务 ID(多个子任务共享)
model_task_id TEXT NOT NULL UNIQUE, -- 外部服务返回的 TaskId
business_type TEXT NOT NULL, -- "video_generation", "image_generation", ...
provider TEXT NOT NULL, -- "provider_a", "provider_b"
status TEXT DEFAULT 'pending', -- pending, processing, completed, failed
item_index INTEGER DEFAULT 0, -- 序号(方便排序和聚合)
request_payload TEXT, -- 完整的请求数据(JSON 字符串)
prompt TEXT, -- 提取的 prompt(方便查询)
result_url TEXT, -- 外部服务返回的 URL
storage_url TEXT, -- 上传到存储的 URL
callback_payload TEXT, -- 完整回调数据(JSON 字符串)
processed_data TEXT, -- 业务处理后的数据(JSON 字符串)
error_message TEXT, -- 错误信息
backend_callback_url TEXT, -- 后端的回调地址(后端传来)
algorithm_callback_url TEXT, -- 算法端的回调地址(配置,提交给外部服务)
backend_notified BOOLEAN DEFAULT 0, -- 是否已通知后端
request_time TIMESTAMP, -- HTTP 请求到达时间(API 层记录)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
callback_received_at TIMESTAMP, -- 收到外部服务回调时间
task_completed_time TIMESTAMP, -- 外部服务任务完成时间
storage_uploaded_at TIMESTAMP, -- 存储上传完成时间
processed_at TIMESTAMP, -- 业务处理完成时间
backend_notified_at TIMESTAMP, -- 后端通知时间
INDEX idx_task_id (task_id),
INDEX idx_model_task (model_task_id),
INDEX idx_business_type (business_type),
INDEX idx_status (status)
);
职责:外部服务 API 调用封装
核心方法:
class ExternalServiceClient(BaseClient):
async def create_video_task(
self, model: str, prompt: str = "", ...
) -> Dict[str, Any]:
"""创建视频生成任务"""
pass
async def query_task(self, task_id: str) -> Dict[str, Any]:
"""查询任务状态"""
pass
说明:
职责:统一配置管理
核心配置:
CALLBACK_CONFIG = {
# 算法端回调地址基础路径(推荐通过环境变量注入)
"base_url": os.getenv("CALLBACK_BASE_URL", "https://your-domain.com"),
"callback_path": "/api/callback",
}
def _get_providers():
return {
"provider_a": {"api_key": "...", "secret_id": "...", ...},
"provider_b": {"api_key": "...", ...},
}
CONFIG = {
"env": os.getenv("ENV", "dev"),
"debug": os.getenv("ENV", "dev") == "dev",
"providers": _get_providers(),
"timeout": TIMEOUT_CONFIG,
"retry": RETRY_CONFIG,
"callback": CALLBACK_CONFIG,
}
环境变量:
# 开发环境 (.env)
CALLBACK_BASE_URL=http://localhost:8000
ENV=dev
# 生产环境
CALLBACK_BASE_URL=https://your-domain.com
ENV=prod
request_time,验证参数。algorithm_callback_url(从 config/settings.py 读取)。task_id, model_task_id, business_type, backend_callback_url, algorithm_callback_url, request_time。POST CreateTask,获取 TaskId。202 Accepted 及 task_id。POST /api/callback。callback_received_atresult_urlbusiness_type 路由到对应处理器backend_callback_urlPOST backend_callback_url。backend_notified 状态。┌─────────────────────────────────────────────────────────────┐
│ 时间线 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 10:00:00 ─ request_time ← API 层记录 │
│ ↓ │
│ 10:00:01 ─ created_at ← 数据库记录创建 │
│ ↓ │
│ ─ 提交到外部服务,获得 TaskId │
│ ↓ │
│ ─ 外部服务异步处理中 (1-5 分钟) │
│ ↓ │
│ 10:05:00 ─ callback_received_at ← 收到外部服务回调 │
│ 10:05:00 ─ task_completed_time ← 外部服务任务完成 │
│ ↓ │
│ 10:05:10 ─ storage_uploaded_at ← 存储上传完成 │
│ 10:05:11 ─ processed_at ← 业务处理完成 │
│ ↓ │
│ 10:05:13 ─ backend_notified_at ← 后端通知完成 │
│ │
└─────────────────────────────────────────────────────────────┘
作用:外部服务商回调我们的地址
配置位置:config/settings.py
CALLBACK_CONFIG = {
"base_url": os.getenv("CALLBACK_BASE_URL", "https://your-domain.com"),
"callback_path": "/api/callback",
}
获取方式:
def _get_callback_url() -> str:
"""获取算法端回调地址"""
callback_config = CONFIG.get("callback", {})
base_url = callback_config.get("base_url", "")
return f"{base_url}/api/callback"
服务商配置方式对比:
| 服务商 | 配置方式 | 是否必须 | 优先级 |
|---|---|---|---|
| 服务商 A | 平台配置 / 提交时发送 | 否 | 平台配置优先 |
| 服务商 B | 提交时发送 | 是 | 必须发送 |
| 服务商 C | 平台配置 | 否 | 平台配置 |
服务商 A 平台配置示例:
https://your-domain.com/api/callback作用:我们完成后端通知的地址
来源:后端通过请求参数传递
示例:
POST /api/v1/video/async
{
"notify_url": "https://backend-service.com/api/notify"
}
使用位置:
backend_callback_url 字段)# 开发环境 (.env)
CALLBACK_BASE_URL=http://localhost:8000
ENV=dev
# 生产环境
CALLBACK_BASE_URL=https://algorithm-domain.com
ENV=prod
# 服务商 A
PROVIDER_A_SECRET_ID=your_secret_id
PROVIDER_A_SECRET_KEY=your_secret_key
# 服务商 B
PROVIDER_B_API_KEY=your_api_key
# core/handlers/my_business_handler.py
from typing import Dict, Any
from .base import BaseCallbackHandler
from models.task import Task, TaskStatus
from models.database import get_db
class MyBusinessCallbackHandler(BaseCallbackHandler):
"""我的业务回调处理器"""
def __init__(self, business_type: str = "my_business"):
super().__init__(business_type=business_type)
async def process_callback(
self, task: Task, callback_data: Dict[str, Any]
) -> None:
"""处理回调"""
# 1. 提取结果 URL
result_url = self.extract_result_url(callback_data)
if not result_url:
await get_db().update_status(
task.model_task_id, TaskStatus.FAILED, "无法提取结果 URL"
)
return
# 2. 上传到存储
try:
storage_url = await self.download_and_upload_storage(
url=result_url, task_id=task.task_id, item_index=task.item_index
)
except Exception as e:
await get_db().update_status(
task.model_task_id, TaskStatus.FAILED, f"存储上传失败:{str(e)}"
)
return
# 3. 更新数据库
await get_db().update_storage_uploaded(task.model_task_id, storage_url)
# 4. 检查是否所有子任务完成并通知后端
await self.check_and_notify(task.task_id)
def _build_result_item(self, task: Task) -> Dict[str, Any]:
"""构建单个结果项(自定义格式)"""
request_data = task.get_request_data()
return {
"index": task.item_index,
"model_task_id": task.model_task_id,
"custom_field": request_data.get("custom_field", ""),
"storage_url": task.storage_url,
"status": task.status.value,
}
# core/handlers/__init__.py
from .my_business_handler import MyBusinessCallbackHandler
from core.dispatcher import CallbackDispatcher
# 注册处理器
CallbackDispatcher.register("my_business", MyBusinessCallbackHandler)
# core/services/my_business.py
async def submit_my_business_task(request: Dict[str, Any]) -> Dict[str, Any]:
"""提交我的业务任务"""
# 1. 获取算法端回调地址
from config.settings import CONFIG
callback_config = CONFIG.get("callback", {})
base_url = callback_config.get("base_url", "")
algorithm_callback_url = f"{base_url}/api/callback"
# 2. 获取后端回调地址
backend_callback_url = request.get("notify_url")
# 3. 提交任务
for idx, item in enumerate(request.get("items", [])):
result = await client.create_task(...)
model_task_id = result["TaskId"]
# 4. 保存到数据库
task_record = Task(
task_id=task_id,
model_task_id=model_task_id,
business_type="my_business", # 必须与注册时一致
provider="xxx",
backend_callback_url=backend_callback_url,
algorithm_callback_url=algorithm_callback_url,
...
)
await db.create_task(task_record)
return {"status": "accepted", "task_id": task_id}
from utils.logger.config import get_business_logger
logger = get_business_logger("my_business")
logger.info("任务提交", extra={
"task_id": task_id,
"model_task_id": model_task_id,
"request": request # 完整请求
})
-- 查询所有未完成的任务
SELECT * FROM tasks WHERE status != 'completed';
-- 查询某个业务的所有子任务
SELECT * FROM tasks WHERE task_id = 'task_001';
-- 查询某个 model_task_id 的任务
SELECT * FROM tasks WHERE model_task_id = 'provider_abc123';
-- 统计各状态的任务数量
SELECT status, COUNT(*) FROM tasks GROUP BY status;
# test/callback/test_manual_callback.py
import httpx
import json
async def test_manual_callback():
"""手动测试回调"""
url = "http://localhost:8000/api/callback"
# 模拟外部服务商回调
callback_data = {
"Event": {
"TaskId": "provider_abc123", # 替换为真实的 TaskId
"Status": "FINISH",
"Output": {
"FileUrl": "https://test.com/video.mp4"
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=callback_data)
print(response.status_code)
print(response.json())
# 检查环境变量是否配置
echo $CALLBACK_BASE_URL
-- 检查表是否存在
SELECT name FROM sqlite_master WHERE type='table' AND name='tasks';
-- 检查索引是否存在
SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='tasks';
# 测试算法端回调地址是否可访问
curl -X POST https://your-domain.com/api/callback \
-H "Content-Type: application/json" \
-d '{"test": true}'
# 预期返回(TaskId 找不到是正常的):
# {"status": "error", "message": "Task not found"}
| 指标 | 说明 | 目标值 |
|---|---|---|
| 任务提交延迟 | 从收到请求到返回 202 | < 3s |
| 回调处理延迟 | 从收到回调到处理完成 | < 15s |
| 存储上传延迟 | 从下载到上传完成 | < 10s |
| 后端通知延迟 | 从全部完成到通知后端 | < 2s |
| 错误类型 | 监控方式 | 告警阈值 |
|---|---|---|
| TaskId 找不到 | 日志统计 | > 10 次/小时 |
| 存储上传失败 | 数据库 status=failed | > 5% |
| 后端回调失败 | 日志统计 | > 5 次/小时 |
| 任务超时 | callback_received_at 为空且 > 30min | 手动处理 |
可能原因:
排查步骤:
# 1. 检查配置
grep CALLBACK_BASE_URL .env
# 2. 检查回调地址是否可访问
curl https://your-domain.com/api/callback
# 3. 检查服务商平台配置
# 查看控制台回调配置
# 4. 检查数据库中是否有记录
sqlite3 database.db "SELECT * FROM tasks WHERE status='pending' LIMIT 10;"
可能原因:
排查步骤:
# 1. 查询任务的 backend_callback_url
sqlite3 database.db "SELECT task_id, backend_callback_url FROM tasks WHERE task_id='xxx';"
# 2. 手动测试后端回调地址
curl -X POST https://backend-service.com/api/notify \
-H "Content-Type: application/json" \
-d '{"task_id": "test", "status": "completed"}'
# 3. 查看日志中的错误信息
grep "后端回调失败" logs/*.log
可能原因:
排查步骤:
# 1. 查看失败任务
sqlite3 database.db "SELECT model_task_id, error_message FROM tasks WHERE status='failed';"
# 2. 手动测试上传
python -c "from utils.storage import upload_from_url; print(upload_from_url('https://test.com/video.mp4', 'test_task'))"
# 3. 检查外部 URL 是否可访问
curl -I https://provider.com/xxx.mp4
-- 清理 30 天前已完成的通知任务
DELETE FROM tasks
WHERE status='completed'
AND backend_notified = 1
AND backend_notified_at < datetime('now','-30 days');
-- 任务状态统计
SELECT status, COUNT(*) as count, ROUND(COUNT(*)*100.0/(SELECT COUNT(*) FROM tasks),2) as percentage
FROM tasks GROUP BY status;
-- 业务类型统计
SELECT business_type, COUNT(*) as count,
COUNT(CASE WHEN status='completed' THEN 1 END) as success_count,
COUNT(CASE WHEN status='failed' THEN 1 END) as failed_count
FROM tasks GROUP BY business_type;
-- 平均处理时间
SELECT business_type, AVG((julianday(storage_uploaded_at)- julianday(callback_received_at))*86400) as avg_processing_seconds
FROM tasks WHERE status='completed' AND storage_uploaded_at IS NOT NULL GROUP BY business_type;
| 功能 | 文件路径 | 核心类/方法 | 说明 |
|---|---|---|---|
| API 层 | |||
| 回调入口 | api/routes/callback.py | handle_callback() | 统一回调接收 |
| 业务接口 | api/routes/video.py | POST /video/async | 视频接口 |
| 业务层 | |||
| 处理器分发 | core/dispatcher.py | CallbackDispatcher | 处理器注册和路由 |
| 处理器基类 | core/handlers/base.py | BaseCallbackHandler | 标准处理流程 |
| 处理器注册 | core/handlers/__init__.py | - | 自动注册处理器 |
| 视频处理器 | core/handlers/video.py | VideoCallbackHandler | 视频处理器 |
| 业务逻辑 | core/services/video.py | submit_video_task() | 任务提交逻辑 |
| 数据层 | |||
| 任务模型 | models/task.py | Task | 任务数据模型 |
| 数据库管理 | models/database.py | DatabaseManager | 数据库 CRUD |
| 集成层 | |||
| 外部服务 | integrations/providers/provider_a.py | ProviderAClient | 外部 API |
| 配置 | |||
| 统一配置 | config/settings.py | CALLBACK_CONFIG | 回调地址配置 |
| 工具 | |||
| 存储工具 | utils/storage.py | upload_from_url() | 存储上传 |
| 日志工具 | utils/logger/config.py | get_business_logger() | 业务日志 |
| 维度 | algorithm_callback_url (算法端回调地址) | backend_callback_url (后端回调地址) |
|---|---|---|
| 方向 | 外部服务 → 我们 | 我们 → 后端 |
| 配置方式 | 平台配置 / 提交时发送 | 请求参数传递 |
| 示例 | https://algorithm.com/api/callback | https://backend.com/api/notify |
| 谁提供 | 算法端配置 | 后端请求参数 |
| 谁回调谁 | 外部服务回调算法端 | 算法端回调后端 |
| 配置位置 | 外部服务商控制台 / API 参数 | 后端请求的 notify_url |
| 安全性 | 平台配置更安全 | 需要后端验证 |
| 存储位置 | 数据库 algorithm_callback_url 字段 | 数据库 backend_callback_url 字段 |
| 使用时机 | 提交任务时发送给外部服务 | 所有子任务完成后回调后端 |
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
task_id | TEXT | 业务 ID(多个子任务共享) | task_001 |
model_task_id | TEXT | 外部服务 TaskId(唯一) | provider_abc123 |
business_type | TEXT | 业务类型(路由用) | video_generation |
provider | TEXT | 服务商名称 | provider_a/provider_b |
status | TEXT | 任务状态 | pending/processing/completed/failed |
item_index | INTEGER | 子任务序号 | 0, 1, 2 |
result_url | TEXT | 外部服务返回的 URL | https://provider.com/xxx.mp4 |
storage_url | TEXT | 上传到存储的 URL | https://storage.com/xxx.mp4 |
backend_callback_url | TEXT | 后端的回调地址 | https://backend.com/notify |
algorithm_callback_url | TEXT | 算法端的回调地址 | https://algorithm.com/callback |
backend_notified | BOOLEAN | 是否已通知后端 | 0/1 |
request_time | TIMESTAMP | HTTP 请求到达时间 | 2025-01-21 10:00:00 |
created_at | TIMESTAMP | 数据库记录创建时间 | 2025-01-21 10:00:01 |
callback_received_at | TIMESTAMP | 收到外部服务回调时间 | 2025-01-21 10:05:00 |
task_completed_time | TIMESTAMP | 外部服务任务完成时间 | 2025-01-21 10:05:00 |
storage_uploaded_at | TIMESTAMP | 存储上传完成时间 | 2025-01-21 10:05:10 |
processed_at | TIMESTAMP | 业务处理完成时间 | 2025-01-21 10:05:11 |
backend_notified_at | TIMESTAMP | 后端通知时间 | 2025-01-21 10:05:13 |

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online