前言
当你的 Python 程序需要同时处理大量 I/O 操作(网络请求、文件读写、数据库查询)时,传统的同步代码会让程序大部分时间都在'等待'中度过。异步编程就是解决这个问题的利器。
本文将从 asyncio 的基础概念讲起,逐步深入到实际的并发爬虫项目,帮你真正掌握 Python 异步编程。
同步 vs 异步:一个直观的对比
先看一个简单的例子,假设我们要请求 3 个网页,每个请求需要 1 秒:
Python 异步编程通过 asyncio 库解决高并发 I/O 密集型任务的性能问题。协程、事件循环等核心概念,对比了同步与异步的执行效率。通过构建基础版、进阶版及生产级异步爬虫示例,展示了并发控制、错误处理、重试机制及代理配置的实现方法。同时总结了常见陷阱如忘记 await、阻塞操作及异常处理,并通过性能测试验证了异步方案在大量网络请求场景下的显著优势。
当你的 Python 程序需要同时处理大量 I/O 操作(网络请求、文件读写、数据库查询)时,传统的同步代码会让程序大部分时间都在'等待'中度过。异步编程就是解决这个问题的利器。
本文将从 asyncio 的基础概念讲起,逐步深入到实际的并发爬虫项目,帮你真正掌握 Python 异步编程。
先看一个简单的例子,假设我们要请求 3 个网页,每个请求需要 1 秒:
| 模式 |
|---|
| 总耗时 |
|---|
| 执行逻辑 |
|---|
| 同步方式 | 约 3 秒 | 依次请求 A -> B -> C |
| 异步方式 | 约 1 秒 | 并发请求 A, B, C |
用代码来验证这个差异:
import time
import asyncio
import aiohttp
# 同步版本
def sync_fetch():
import requests
urls = ['https://httpbin.org/delay/1'] * 3
start = time.time()
for url in urls:
requests.get(url)
print(f"同步耗时:{time.time() - start:.2f}秒")
# 异步版本
async def async_fetch():
urls = ['https://httpbin.org/delay/1'] * 3
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
await asyncio.gather(*tasks)
print(f"异步耗时:{time.time() - start:.2f}秒")
# 运行测试
sync_fetch() # 输出:同步耗时:3.xx 秒
asyncio.run(async_fetch()) # 输出:异步耗时:1.xx 秒
协程是异步编程的基本单位,用 async def 定义:
import asyncio
# 定义一个协程
async def say_hello(name, delay):
await asyncio.sleep(delay) # 模拟 I/O 操作
print(f"Hello, {name}!")
return f"Done: {name}"
# 运行协程
async def main():
# 方式 1:直接 await
result = await say_hello("Alice", 1)
print(result)
# 方式 2:创建任务并发执行
task1 = asyncio.create_task(say_hello("Bob", 2))
task2 = asyncio.create_task(say_hello("Charlie", 1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
事件循环是 asyncio 的核心,负责调度和执行协程。其工作流程如下:
await。import asyncio
async def task_a():
print("Task A: 开始")
await asyncio.sleep(2)
print("Task A: 完成")
async def task_b():
print("Task B: 开始")
await asyncio.sleep(1)
print("Task B: 完成")
async def main():
# 并发执行两个任务
await asyncio.gather(task_a(), task_b())
asyncio.run(main())
# 输出顺序:
# Task A: 开始
# Task B: 开始
# Task B: 完成(1 秒后)
# Task A: 完成(2 秒后)
| API | 用途 | 示例 |
|---|---|---|
asyncio.run() | 运行主协程 | asyncio.run(main()) |
await | 等待协程完成 | await some_coroutine() |
asyncio.create_task() | 创建并发任务 | task = asyncio.create_task(coro()) |
asyncio.gather() | 并发执行多个协程 | await asyncio.gather(coro1(), coro2()) |
asyncio.wait() | 等待任务集合 | done, pending = await asyncio.wait(tasks) |
asyncio.sleep() | 异步睡眠 | await asyncio.sleep(1) |
asyncio.wait_for() | 带超时的等待 | await asyncio.wait_for(coro(), timeout=5) |
asyncio.Semaphore | 限制并发数 | sem = asyncio.Semaphore(10) |
下面我们来实现一个完整的异步爬虫,包含并发控制、错误处理、进度显示等功能。
import asyncio
import aiohttp
from typing import List, Dict
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""获取单个 URL 的内容"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'length': len(content),
'success': True
}
except asyncio.TimeoutError:
return {'url': url, 'error': 'Timeout', 'success': False}
except Exception as e:
return {'url': url, 'error': str(e), 'success': False}
async def crawl_urls(urls: List[str]) -> List[Dict]:
"""并发爬取多个 URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 测试
urls = [
'https://www.python.org',
'https://docs.python.org',
'https://pypi.org',
'https://github.com',
'https://stackoverflow.com'
]
start = time.time()
results = asyncio.run(crawl_urls(urls))
print(f"爬取 {len(urls)} 个 URL,耗时:{time.time() - start:.2f}秒")
for r in results:
if r['success']:
print(f"✓ {r['url']} - {r['status']} - {r['length']} bytes")
else:
print(f"✗ {r['url']} - {r['error']}")
当 URL 数量很大时,需要限制并发数,避免被封 IP 或耗尽系统资源:
import asyncio
import aiohttp
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CrawlResult:
url: str
status: Optional[int] = None
content_length: int = 0
error: Optional[str] = None
elapsed: float = 0.0
@property
def success(self) -> bool:
return self.error is None
class AsyncCrawler:
def __init__(self, max_concurrent: int = 10, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.semaphore: Optional[asyncio.Semaphore] = None
self.results: List[CrawlResult] = []
self.completed = 0
self.total = 0
async def fetch_one(self, session: aiohttp.ClientSession, url: str) -> CrawlResult:
"""带并发限制的单 URL 获取"""
async with self.semaphore:
start = datetime.now()
try:
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers={'User-Agent': 'Mozilla/5.0 (compatible; AsyncCrawler/1.0)'}
) as response:
content = await response.text()
elapsed = (datetime.now() - start).total_seconds()
self.completed += 1
self._print_progress()
return CrawlResult(
url=url,
status=response.status,
content_length=len(content),
elapsed=elapsed
)
except asyncio.TimeoutError:
return CrawlResult(url=url, error='Timeout')
except aiohttp.ClientError as e:
return CrawlResult(url=url, error=f'ClientError: {e}')
except Exception as e:
return CrawlResult(url=url, error=str(e))
def _print_progress(self):
"""打印进度"""
percent = (self.completed / self.total) * 100
print(f"\r进度:{self.completed}/{self.total} ({percent:.1f}%)", end='', flush=True)
async def crawl(self, urls: List[str]) -> List[CrawlResult]:
"""执行爬取"""
self.total = len(urls)
self.completed = 0
self.semaphore = asyncio.Semaphore(self.max_concurrent)
connector = aiohttp.TCPConnector(limit=self.max_concurrent, limit_per_host=5)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self.fetch_one(session, url) for url in urls]
self.results = await asyncio.gather(*tasks)
print()
return self.results
def print_summary(self):
"""打印统计摘要"""
success = sum(1 for r in self.results if r.success)
failed = len(self.results) - success
total_bytes = sum(r.content_length for r in self.results)
avg_time = sum(r.elapsed for r in self.results if r.success) / max(success, 1)
print(f"\n{'='*50}")
print(f"爬取完成统计:")
print(f" 总数:{len(self.results)}")
print(f" 成功:{success}")
print(f" 失败:{failed}")
print(f" 总数据量:{total_bytes / 1024:.2f} KB")
print(f" 平均响应时间:{avg_time:.2f}秒")
print(f"{'='*50}")
# 使用示例
async def main():
urls = [
'https://www.python.org',
'https://docs.python.org/3/',
'https://pypi.org',
'https://github.com',
'https://stackoverflow.com',
'https://www.google.com',
'https://www.baidu.com',
'https://httpbin.org/get',
'https://httpbin.org/headers',
'https://httpbin.org/ip'
]
crawler = AsyncCrawler(max_concurrent=5, timeout=15)
start = time.time()
results = await crawler.crawl(urls)
total_time = time.time() - start
print(f"\n总耗时:{total_time:.2f}秒")
crawler.print_summary()
failed = [r for r in results if not r.success]
if failed:
print("\n失败列表:")
for r in failed:
print(f" {r.url}: {r.error}")
if __name__ == '__main__':
asyncio.run(main())
import asyncio
import aiohttp
import random
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CrawlConfig:
"""爬虫配置"""
max_concurrent: int = 10
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
user_agents: List[str] = field(default_factory=lambda:[
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
])
proxies: List[str] = field(default_factory=list)
class RetryableCrawler:
def __init__(self, config: CrawlConfig = None):
self.config = config or CrawlConfig()
self.semaphore: Optional[asyncio.Semaphore] = None
self.stats = {'success': 0, 'failed': 0, 'retried': 0}
def _get_random_headers(self) -> Dict:
return {
'User-Agent': random.choice(self.config.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
}
def _get_random_proxy(self) -> Optional[str]:
if self.config.proxies:
return random.choice(self.config.proxies)
return None
async def fetch_with_retry(
self, session: aiohttp.ClientSession, url: str, callback: Optional[Callable] = None
) -> Dict:
"""带重试机制的请求"""
last_error = None
for attempt in range(self.config.max_retries):
async with self.semaphore:
try:
proxy = self._get_random_proxy()
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.config.timeout),
headers=self._get_random_headers(),
proxy=proxy
) as response:
content = await response.text()
if response.status >= 400:
raise aiohttp.ClientResponseError(
response.request_info, response.history, status=response.status
)
self.stats['success'] += 1
result = {
'url': url,
'status': response.status,
'content': content,
'length': len(content),
'success': True
}
if callback:
await callback(result)
return result
except (asyncio.TimeoutError, aiohttp.ClientError) as e:
last_error = str(e)
self.stats['retried'] += 1
if attempt < self.config.max_retries - 1:
delay = self.config.retry_delay * (attempt + 1)
logger.warning(f"重试 {url} (第{attempt + 1}次), 等待{delay}秒...")
await asyncio.sleep(delay)
except Exception as e:
last_error = str(e)
break
self.stats['failed'] += 1
return {'url': url, 'error': last_error, 'success': False}
async def crawl(
self, urls: List[str], callback: Optional[Callable] = None
) -> List[Dict]:
"""执行爬取"""
self.semaphore = asyncio.Semaphore(self.config.max_concurrent)
self.stats = {'success': 0, 'failed': 0, 'retried': 0}
connector = aiohttp.TCPConnector(
limit=self.config.max_concurrent,
limit_per_host=5,
enable_cleanup_closed=True
)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self.fetch_with_retry(session, url, callback) for url in urls]
results = await asyncio.gather(*tasks)
logger.info(f"爬取完成:成功={self.stats['success']}, 失败={self.stats['failed']}, 重试={self.stats['retried']}")
return results
# 使用示例
async def process_result(result: Dict):
"""处理爬取结果的回调函数"""
if result['success']:
logger.info(f"处理:{result['url']} ({result['length']} bytes)")
async def main():
config = CrawlConfig(
max_concurrent=5,
timeout=15,
max_retries=3,
retry_delay=1.0
)
crawler = RetryableCrawler(config)
urls = [
'https://httpbin.org/get',
'https://httpbin.org/headers',
'https://httpbin.org/ip',
'https://httpbin.org/user-agent',
'https://httpbin.org/status/500'
]
results = await crawler.crawl(urls, callback=process_result)
for r in results:
status = '✓' if r['success'] else '✗'
info = f"{r.get('status','N/A')}" if r['success'] else r.get('error','Unknown')
print(f"{status}{r['url']}: {info}")
if __name__ == '__main__':
asyncio.run(main())
# ❌ 错误:忘记 await,协程不会执行
async def wrong():
asyncio.sleep(1) # 这行什么都不会做
print("done")
# ✓ 正确
async def correct():
await asyncio.sleep(1)
print("done")
import time
# ❌ 错误:time.sleep 会阻塞整个事件循环
async def wrong():
time.sleep(1) # 阻塞!
# ✓ 正确
async def correct():
await asyncio.sleep(1) # 非阻塞
# 如果必须调用同步阻塞函数,使用 run_in_executor
async def call_blocking():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 1)
# ❌ 异常可能被忽略
async def may_fail():
raise ValueError("出错了")
async def wrong():
asyncio.create_task(may_fail()) # 异常不会被捕获
await asyncio.sleep(1)
# ✓ 正确:确保处理任务异常
async def correct():
task = asyncio.create_task(may_fail())
try:
await task
except ValueError as e:
print(f"捕获异常:{e}")
async def task1():
await asyncio.sleep(1)
return "task1"
async def task2():
await asyncio.sleep(0.5)
raise ValueError("task2 failed")
async def task3():
await asyncio.sleep(0.8)
return "task3"
# gather: 默认一个失败全部失败
async def use_gather():
try:
results = await asyncio.gather(task1(), task2(), task3())
except ValueError:
print("有任务失败")
# gather + return_exceptions: 收集所有结果(包括异常)
async def use_gather_safe():
results = await asyncio.gather(task1(), task2(), task3(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print(f"异常:{r}")
else:
print(f"结果:{r}")
# wait: 更灵活的控制
async def use_wait():
tasks = [
asyncio.create_task(task1()),
asyncio.create_task(task2()),
asyncio.create_task(task3()),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"完成:{len(done)}, 待处理:{len(pending)}")
import asyncio
import aiohttp
import requests
import time
from concurrent.futures import ThreadPoolExecutor
URL = 'https://httpbin.org/delay/1'
COUNT = 10
def sync_requests():
"""同步请求"""
start = time.time()
for _ in range(COUNT):
requests.get(URL)
return time.time() - start
def threaded_requests():
"""多线程请求"""
start = time.time()
with ThreadPoolExecutor(max_workers=COUNT) as executor:
list(executor.map(requests.get, [URL] * COUNT))
return time.time() - start
async def async_requests():
"""异步请求"""
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(URL) for _ in range(COUNT)]
await asyncio.gather(*tasks)
return time.time() - start
# 运行对比
print(f"同步:{sync_requests():.2f}秒")
print(f"多线程:{threaded_requests():.2f}秒")
print(f"异步:{asyncio.run(async_requests()):.2f}秒")
预期输出:
同步:10.xx 秒 多线程:1.xx 秒 异步:1.xx 秒
| 请求数 | 同步 | 多线程 | 异步 |
|---|---|---|---|
| 10 | ~10s | ~1s | ~1s |
异步编程的核心要点:
任务类型判断流程:
异步编程有一定学习曲线,但一旦掌握,在处理高并发 I/O 场景时会非常高效。建议从简单的例子开始,逐步增加复杂度。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online