Python 异步爬虫实战:FindQC 商品数据爬取系统完整教程
本文详细介绍如何使用 Python 异步编程技术构建一个高性能的商品数据爬虫系统,包括 API 调用、数据库存储、消息队列集成等核心功能。
📋 目录
- 一、项目概述
- 二、技术栈
- 三、项目架构
- 四、核心功能
- 五、环境配置
- 六、代码详解
- 七、使用示例
- 八、性能优化
- 九、常见问题
- 十、总结
一、项目概述
1.1 项目简介
service_spider 是一个基于 Python 异步编程的商品数据爬虫服务,主要功能包括:
- ✅ 目录遍历:自动遍历所有需要爬取的目录
- ✅ 分页处理:智能分页获取商品列表,直到最后一页
- ✅ 商品详情获取:获取商品基本信息、图集(QC图、视频等)
- ✅ 数据存储:保存商品数据到 MySQL 数据库
- ✅ 消息队列:发送商品新增消息到 RabbitMQ,通知下游服务
- ✅ 断点续传:支持中断后从断点继续爬取
- ✅ 优雅中断:支持 Ctrl+C 优雅退出,确保数据安全
1.2 项目结构
service_spider/ ├── __init__.py # 包初始化 ├── main.py # 主程序入口(一次性执行) ├── scheduler.py # 定时任务入口(使用 APScheduler) ├── spider.py # 爬虫核心逻辑 ├── api_client.py # FindQC API 客户端封装 ├── db_service.py # 数据库操作服务 ├── crawl_from_json.py # 从 JSON 文件爬取(辅助功能) ├── README.md # 项目文档 ├── TEST.md # 测试指南 └── ZEEKLOG博客教程.md # 本文档
二、技术栈
2.1 核心技术
| 技术 | 版本 | 用途 |
|---|---|---|
| Python | 3.8+ | 编程语言 |
| asyncio | 内置 | 异步编程框架 |
| Playwright | 最新 | 浏览器自动化,绕过 Cloudflare |
| SQLAlchemy | 2.0+ | ORM 数据库操作 |
| aiomysql | 最新 | 异步 MySQL 驱动 |
| RabbitMQ | 最新 | 消息队列 |
| APScheduler | 最新 | 定时任务调度 |
| loguru | 最新 | 日志记录 |
2.2 设计模式
- 生产者-消费者模式:目录并发爬取,商品并发处理
- 严格漏桶限流:极致平滑的 API 调用控制
- 优雅中断机制:支持 Ctrl+C 安全退出
- 断点续传:自动从上次中断位置继续
三、项目架构
3.1 架构图
┌─────────────────────────────────────────────────────────┐ │ service_spider │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ main.py │───▶│ spider.py │ │ │ │ (入口) │ │ (核心逻辑) │ │ │ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌──────────────┐ │ │ │ │ api_client.py │ │ │ │ │ (API调用) │ │ │ │ └──────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌──────────────┐ │ │ └───────────▶│ db_service.py│ │ │ │ (数据库) │ │ │ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ FindQC │ │ MySQL │ │RabbitMQ │ │ API │ │Database │ │ Queue │ └─────────┘ └─────────┘ └─────────┘
3.2 数据流
FindQC API ↓ 商品列表 (getCategoryProducts) ↓ 商品详情 (get_product_detail) ↓ 商品图集 (get_product_atlas) ↓ 数据整理 (prepare_product_data) ↓ MySQL (t_products, t_tasks_products) ↓ RabbitMQ (product.new 消息)
3.3 核心流程
开始 ↓ 获取目录列表 ↓ for 每个目录(并发): ↓ 分页获取商品列表 (while True) ↓ for 每个商品(并发): ↓ 获取商品详情 ↓ 获取商品图集(分页) ↓ 整理图片结构(QC图、主图、SKU图) ↓ 保存到数据库 ↓ 创建任务记录 ↓ 发送消息到 RabbitMQ ↓ end ↓ 判断是否最后一页 ↓ 否 → 翻页继续 ↓ 是 → 下一个目录 ↓ end ↓ 完成
四、核心功能
4.1 目录遍历
爬虫会自动遍历配置的目录列表,支持并发处理多个目录,提高爬取效率。
关键代码:
async def get_target_categories(self) -> List[Dict[str, Any]]: """获取目标目录列表""" # 从配置或数据库读取目录列表 categories = [ {"catalogueId": 4113, "name": "测试分类"}, # ... 更多目录 ] return categories
4.2 分页处理
智能分页获取商品列表,直到 hasMore=False 表示最后一页。
关键代码:
page = 1 while True: response = await self.api_client.get_category_products( catalogue_id=category_id, page=page, size=self.page_size ) if not response.get("hasMore", False): break # 最后一页 products = response.get("data", []) # 处理商品... page += 1
4.3 商品详情获取
获取商品的完整信息,包括:
- 基本信息(名称、价格、描述等)
- 主图列表
- SKU 信息
- QC 图集(分页获取)
关键代码:
# 获取商品详情 detail = await self.api_client.get_product_detail( item_id=item_id, mall_type=mall_type ) # 获取商品图集(分页) atlas_page = 1 while True: atlas_response = await self.api_client.get_product_atlas( goods_id=goods_id, item_id=item_id, mall_type=mall_type, page=atlas_page, size=50 ) if not atlas_response.get("hasMore", False): break atlas_page += 1
4.4 数据存储
将商品数据保存到 MySQL 数据库,包括:
t_products表:商品基本信息t_tasks_products表:任务记录
关键代码:
# 检查商品是否已存在 existing_product, operation = await ProductDBService.check_and_update_existing_product( session=session, findqc_id=findqc_id, update_task_id=update_task_id, last_qc_time=last_qc_time, qc_count_3days=qc_count_3days, # ... 其他参数 ) if operation == "not_exists": # 新商品:保存完整数据 product = await ProductDBService.save_or_update_product( session=session, product_data=product_data, update_task_id=update_task_id ) # 创建任务记录 task = await ProductDBService.create_task( session=session, product_id=product.id, update_task_id=update_task_id )
4.5 消息队列
发送商品新增消息到 RabbitMQ,通知下游 AI 处理管道。
消息格式:
{ "task_id": 2024052001, "findqc_id": 12345, "product_id": 1001, "itemId": "ext_999", "mallType": "taobao", "action": "product.new", "timestamp": "2024-05-20T10:00:00Z" }
4.6 断点续传
支持中断后从断点继续爬取,避免重复工作。
实现原理:
- 查询数据库中今天(
update_task_id = 当天日期)已爬取的商品 - 找出其中最小的
catalogueId(目录ID) - 从该目录重新开始爬取
关键代码:
# 检查断点续传 resume_category_id = await ProductDBService.get_resume_category_id( session=session, today_task_id=update_task_id, ) if resume_category_id is not None: start_cat_id = resume_category_id logger.info(f"启用断点续传:将从目录ID {start_cat_id} 重新开始爬取")
4.7 优雅中断
支持 Ctrl+C 优雅退出,确保数据安全和资源清理。
实现原理:
- 注册信号处理器(SIGINT、SIGTERM)
- 设置全局中断标志
- 所有循环定期检查中断标志
- 中断时等待当前任务完成,生成统计信息,关闭资源
关键代码:
class GracefulShutdown: """优雅中断管理器""" def __init__(self): self.shutdown_requested = False self.shutdown_event = asyncio.Event() def request_shutdown(self): """请求关闭""" self.shutdown_requested = True self.shutdown_event.set() logger.warning("🛑 收到中断信号,正在优雅退出...") def setup_signal_handlers(self): """设置信号处理器""" def signal_handler(signum, frame): if self.shutdown_requested: sys.exit(1) # 强制退出 else: self.request_shutdown() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
五、环境配置
5.1 安装依赖
# 创建虚拟环境 python3 -m venv venv source venv/bin/activate # Linux/Mac # 或 venv\Scripts\activate # Windows # 安装依赖 pip install sqlalchemy aiomysql loguru httpx pydantic-settings pip install playwright apscheduler pika # 安装 Playwright 浏览器 playwright install chromium
5.2 环境变量配置
创建 .env 文件:
# 数据库配置 DB_HOST=localhost DB_PORT=3306 DB_USER=root DB_PASSWORD=your_password DB_NAME=findqc_db # FindQC API 配置 FINDQC_API_BASE_URL=https://api.findqc.com FINDQC_API_KEY=your_api_key # 可选 # RabbitMQ 配置 RABBITMQ_HOST=localhost RABBITMQ_PORT=5672 RABBITMQ_USER=guest RABBITMQ_PASSWORD=guest RABBITMQ_VHOST=/ # 日志级别 LOG_LEVEL=INFO # 测试模式:只爬取 N 个商品(0 或不设置表示全量模式) MAX_PRODUCTS=0 # 并发配置 MAX_CONCURRENT_CATEGORIES=5 # 最大并发目录数 MAX_CONCURRENT_PRODUCTS_PER_CATALOGUE=10 # 每个目录内商品并发数 # QPS 限流配置 CATEGORY_PRODUCTS_MAX_QPS=10 # 目录商品列表 API QPS PRODUCT_DETAIL_MAX_QPS=15 # 商品详情 API QPS PRODUCT_ATLAS_MAX_QPS=20 # 商品图集 API QPS
5.3 数据库初始化
-- 创建数据库 CREATE DATABASE findqc_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; -- 表结构会自动创建(通过 db.init_db())
六、代码详解
6.1 API 客户端(api_client.py)
6.1.1 严格漏桶限流器
核心原理:强制每个请求之间间隔固定时间,实现极致平滑的 API 调用。
class StrictRateLimiter: """严格的漏桶限流器(Leaky Bucket)""" def __init__(self, max_qps: float, window_size: int = 30): self.max_qps = max_qps self.interval = 1.0 / max_qps if max_qps > 0 else 0.0 # 请求间隔 self.next_allow_time = 0.0 # 下一次允许请求的时间 self.lock = asyncio.Lock() async def acquire(self): """获取执行许可,包含平滑控制""" async with self.lock: now = time.time() if self.next_allow_time < now: self.next_allow_time = now # 我的执行时间就是 next_allow_time my_schedule_time = self.next_allow_time # 将下一次允许时间向后推一个间隔 self.next_allow_time += self.interval # 计算需要等待的时间 wait_time = my_schedule_time - time.time() if wait_time > 0: await asyncio.sleep(wait_time)
使用示例:
# 初始化限流器(QPS=10,即每秒最多10个请求) limiter = StrictRateLimiter(max_qps=10) # 在 API 调用前获取许可 await limiter.acquire() response = await session.get(url)
6.1.2 Playwright 浏览器自动化
核心功能:使用 Playwright 绕过 Cloudflare 反爬虫保护。
class FindQCAPIClient: """FindQC API 客户端""" async def _init_browser(self): """初始化浏览器""" if not HAS_PLAYWRIGHT: raise ImportError("Playwright 未安装") self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) self.page = await self.context.new_page() async def get_category_products(self, catalogue_id: int, page: int, size: int = 20): """获取目录商品列表""" # 使用限流器控制请求频率 await self.category_products_strict_limiter.acquire() # 使用 Playwright 访问页面,绕过 Cloudflare url = f"{self.base_url}/goods/getCategoryProducts" await self.page.goto(url) # 监听网络请求,捕获 API 响应 response_data = await self._wait_for_api_response(url_pattern) return response_data
6.2 爬虫核心逻辑(spider.py)
6.2.1 商品处理逻辑
核心流程:
async def process_product(self, product_info: Dict[str, Any], category_id: int): """处理单个商品""" try: # 1. 获取商品详情 detail = await self.api_client.get_product_detail(...) # 2. 获取商品图集(分页) all_qc_images = [] atlas_page = 1 while True: atlas_response = await self.api_client.get_product_atlas(...) all_qc_images.extend(atlas_response.get("data", [])) if not atlas_response.get("hasMore", False): break atlas_page += 1 # 3. 整理商品数据 product_data = self.prepare_product_data(detail, all_qc_images) # 4. 保存到数据库 async with self.db.async_session_maker() as session: # 检查商品是否已存在 existing_product, operation = await ProductDBService.check_and_update_existing_product(...) if operation == "not_exists": # 新商品:保存完整数据 product = await ProductDBService.save_or_update_product(...) task = await ProductDBService.create_task(...) # 发送消息到 RabbitMQ await self.send_product_message(...) elif operation == "exists_updated": # 已存在:只更新 QC 相关字段 await session.commit() elif operation == "exists_deleted": # 已存在:软删除 await session.commit() except Exception as e: logger.error(f"处理商品失败: {e}")
6.2.2 并发控制
目录并发:
async def spider_main_process(self, update_task_id: int, max_products: Optional[int] = None): """爬虫主流程""" categories = await self.get_target_categories() # 使用信号量控制并发目录数 semaphore = asyncio.Semaphore(self.max_concurrent_categories) async def process_category(category): async with semaphore: await self.process_category(category, update_task_id, max_products) # 并发处理所有目录 tasks = [process_category(cat) for cat in categories] await asyncio.gather(*tasks, return_exceptions=True)
商品并发:
async def process_category(self, category: Dict[str, Any], update_task_id: int, max_products: Optional[int] = None): """处理单个目录""" page = 1 products_processed = 0 while True: # 获取商品列表 response = await self.api_client.get_category_products(...) products = response.get("data", []) # 使用信号量控制并发商品数 semaphore = asyncio.Semaphore(self.max_concurrent_products_per_catalogue) async def process_product_safe(product_info): async with semaphore: await self.process_product(product_info, category_id) # 并发处理商品 tasks = [process_product_safe(p) for p in products] await asyncio.gather(*tasks, return_exceptions=True) if not response.get("hasMore", False): break page += 1
6.3 数据库服务(db_service.py)
6.3.1 商品存在性检查
核心逻辑:根据最新爬取的 QC 图时间判断商品状态。
@staticmethod async def check_and_update_existing_product( session: AsyncSession, findqc_id: int, update_task_id: int, last_qc_time: Optional[datetime], qc_count_360days: int, # ... 其他参数 ) -> Tuple[Optional[Product], str]: """检查现有商品并决定更新策略""" # 查询商品是否已存在 product = await session.execute( select(Product).where(Product.findqc_id == findqc_id) ).scalar_one_or_none() if not product: return None, "not_exists" # 判断最新爬取的QC图是否在360天内 thirty_days_ago = datetime.utcnow() - timedelta(days=360) if last_qc_time is None or last_qc_time < thirty_days_ago: # QC图不在360天内,软删除 product.status = 1 product.qc_count_3days = 0 product.qc_count_7days = 0 # ... 其他字段设为0 return product, "exists_deleted" else: # QC图在360天内,更新QC相关字段并重新拾取 product.last_qc_time = last_qc_time product.qc_count_3days = qc_count_3days product.qc_count_7days = qc_count_7days # ... 更新其他QC字段 product.status = 0 # 重新拾取 return product, "exists_updated"
6.3.2 新商品保存
@staticmethod async def save_or_update_product( session: AsyncSession, product_data: Dict[str, Any], update_task_id: int, ) -> Product: """保存新商品数据""" # 创建新商品对象 product = Product( findqc_id=product_data["findqc_id"], item_id=product_data["item_id"], mall_type=product_data["mall_type"], name=product_data["name"], price=product_data["price"], main_img=product_data["main_img"], # ... 其他字段 update_task_id=update_task_id, status=0, ) session.add(product) await session.flush() # 获取 product.id return product
七、使用示例
7.1 一次性执行
# 激活虚拟环境 source venv/bin/activate # 运行爬虫服务(执行一次后退出) python -m service_spider.main
7.2 定时任务模式
# 激活虚拟环境 source venv/bin/activate # 运行定时任务服务(持续运行,按配置的时间执行爬虫任务) python -m service_spider.scheduler
定时任务配置(通过环境变量):
# Cron 模式:每天 02:00 执行 SPIDER_SCHEDULE_TYPE=cron SPIDER_CRON_HOUR=2 SPIDER_CRON_MINUTE=0 # Interval 模式:每 24 小时执行一次 # SPIDER_SCHEDULE_TYPE=interval # SPIDER_INTERVAL_HOURS=24
7.3 测试模式
# 设置环境变量:只爬取 10 个商品 export MAX_PRODUCTS=10 # 运行爬虫 python -m service_spider.main
7.4 查看日志
# 控制台输出(实时) # 日志会自动输出到控制台 # 文件日志(按天轮转) tail -f logs/spider_2024-05-20.log
八、性能优化
8.1 并发控制
- 目录并发:使用
asyncio.Semaphore控制并发目录数(默认 5) - 商品并发:使用
asyncio.Semaphore控制每个目录内商品并发数(默认 10)
配置建议:
# 根据 API 限流和服务器性能调整 MAX_CONCURRENT_CATEGORIES=5 # 目录并发数 MAX_CONCURRENT_PRODUCTS_PER_CATALOGUE=10 # 商品并发数
8.2 QPS 限流
使用严格漏桶限流器实现极致平滑的 API 调用:
# 不同 API 使用不同的 QPS 限制 CATEGORY_PRODUCTS_MAX_QPS=10 # 目录商品列表 API PRODUCT_DETAIL_MAX_QPS=15 # 商品详情 API PRODUCT_ATLAS_MAX_QPS=20 # 商品图集 API
8.3 数据库优化
- 批量提交:每个商品独立事务,失败不影响其他商品
- 索引优化:在
findqc_id字段上创建唯一索引 - 连接池:使用 SQLAlchemy 连接池管理数据库连接
8.4 内存管理
- 分批处理:使用分页获取商品列表,避免一次性加载过多数据
- 及时释放:处理完的商品数据及时释放,避免内存泄漏
九、常见问题
9.1 ModuleNotFoundError
问题:ModuleNotFoundError: No module named 'sqlalchemy'
解决:安装依赖
pip install -r requirements.txt
9.2 数据库连接失败
问题:Can't connect to MySQL server
解决:
- 检查 MySQL 服务是否启动
- 检查
.env文件中的数据库配置是否正确 - 确认数据库用户有创建表的权限
9.3 API 请求失败(429 限流)
问题:HTTPStatusError: 429 Too Many Requests
解决:
- 增加请求延迟时间(修改
delay_between_requests参数) - 降低 QPS 限制(修改
CATEGORY_PRODUCTS_MAX_QPS等配置) - 减少并发数(修改
MAX_CONCURRENT_CATEGORIES等配置)
9.4 Playwright 浏览器启动失败
问题:Playwright 未安装
解决:
# 安装 Playwright pip install playwright # 安装浏览器 playwright install chromium
9.5 断点续传不生效
问题:断点续传没有从上次中断位置继续
解决:
- 检查数据库中是否有今天(
update_task_id = 当天日期)的数据 - 确认
get_resume_category_id方法能正确查询到最小catalogueId
9.6 消息队列发送失败
问题:RabbitMQ 消息发送失败
解决:
- 检查 RabbitMQ 服务是否启动
- 检查
.env文件中的 RabbitMQ 配置是否正确 - 如果未配置 RabbitMQ,爬虫仍然可以运行(只是不会发送消息)
十、总结
10.1 项目亮点
- 高性能:使用异步编程和并发控制,大幅提升爬取效率
- 稳定性:严格漏桶限流、优雅中断、断点续传等机制确保系统稳定运行
- 可扩展:模块化设计,易于扩展和维护
- 易用性:完善的日志记录、错误处理、配置管理
10.2 适用场景
- 商品数据爬取
- 电商平台数据采集
- API 数据同步
- 定时数据更新
10.3 后续优化方向
- 支持更多数据源
- 添加监控和统计功能
- 优化图片数据处理性能
- 支持分布式爬取
📚 参考资料
作者:MadPrinter
最后更新:2025-12-26
项目地址:GitHub
💡 提示:如果本文对你有帮助,欢迎点赞、收藏、转发!如有问题,欢迎在评论区留言讨论。